news 2026/2/2 12:46:34

高性能消息传输系统Aeron:低延迟UDP与IPC通信框架

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
高性能消息传输系统Aeron:低延迟UDP与IPC通信框架

项目标题与描述

Aeron - 高性能消息传输系统

Aeron是一个高效的可靠UDP单播、UDP多播和IPC消息传输系统。Java、C和C++客户端在此代码库中提供,.NET客户端也可用。所有客户端都可以跨机器交换消息,或通过IPC在同一机器上非常高效地交换消息。

消息流可以通过Archive模块记录到持久存储中,以供后期或实时重播。Aeron Cluster提供了基于Raft共识算法的复制状态机形式的容错服务支持。

性能是关键焦点。Aeron的设计目标是成为吞吐量最高、延迟最低且最可预测的消息系统。Aeron与简单二进制编码(SBE)集成,以实现最佳的消息编码和解码性能。Aeron创建中使用的许多数据结构已被提取到Agrona项目中。

有关使用详情、协议规范、FAQ等,请查看Wiki。

功能特性

核心功能

  • 高效可靠的消息传输:支持UDP单播、UDP多播和IPC通信
  • 多语言客户端支持:提供Java、C、C++和.NET客户端实现
  • 消息流记录与重播:通过Archive模块实现消息流的持久化存储和重播
  • 容错集群支持:基于Raft共识算法提供故障容错服务
  • 高性能设计:专注于最高吞吐量和最低延迟的消息传输
  • 可扩展架构:支持消息流的实时处理和历史回放

独特优势

  • 性能优化:专门为低延迟和高吞吐量场景设计
  • 灵活的传输方式:支持跨机器和同机器通信
  • 完善的生态系统:包含Archive记录、Cluster集群等扩展模块
  • 集成SBE编码:实现最优的消息编码和解码性能

安装指南

Java客户端安装

Aeron的Java版本可以通过Maven Central获取最新版本:

<dependency><groupId>io.aeron</groupId><artifactId>aeron-all</artifactId><version>最新版本</version></dependency>

C客户端构建

从源代码构建C客户端:

# 克隆仓库gitclone https://github.com/aeron-io/aeron.gitcdaeron# 构建项目./gradlew

系统要求

  • Java版本:需要Java 8或更高版本
  • 构建工具:Gradle构建系统
  • 操作系统:支持Linux、macOS和Windows

平台注意事项

  • Linux:需要安装必要的开发工具和头文件
  • macOS:可能需要安装Xcode命令行工具
  • Windows:需要Visual Studio构建工具

使用说明

基础使用示例

Java客户端连接Archive

// 创建Archive上下文并连接aeron_archive_context_t*ctx;aeron_archive_t*archive=NULL;aeron_archive_context_init(&ctx);aeron_archive_connect(&archive,ctx);aeron_archive_context_close(ctx);// 使用完成后关闭连接aeron_archive_close(archive);

异步连接示例

aeron_archive_context_t*ctx;aeron_archive_async_connect_t*async;aeron_archive_t*archive=NULL;aeron_archive_context_init(&ctx);aeron_archive_async_connect(&async,ctx);aeron_archive_context_close(ctx);// 轮询直到连接完成while(NULL==archive){idle();aeron_archive_async_connect_poll(&archive,async);}

记录消息流

int64_t subscription_id;// 开始记录特定通道和流IDaeron_archive_start_recording(&subscription_id,archive,"aeron:udp?endpoint=localhost:3333",1234,AERON_ARCHIVE_SOURCE_LOCATION_LOCAL,false);// ... 执行操作 ...// 停止记录aeron_archive_stop_recording_subscription(archive,subscription_id);

典型使用场景

消息流重播

// 初始化重放参数aeron_archive_replay_params_t replay_params;aeron_archive_replay_params_init(&replay_params);replay_params.position=0;// 从录制位置0开始重放replay_params.length=stop_position;// 重放长度replay_params.file_io_max_length=4096;// 开始重放int64_t replay_session_id;aeron_archive_start_replay(&replay_session_id,archive,recording_id,// 要重放的录制IDmy_channel,// 重放发布通道my_stream_id,// 重放流ID&replay_params);// ... 处理重放数据 ...// 停止重放aeron_archive_stop_replay(archive,replay_session_id);

列出录制内容

// 定义录制描述符消费者回调voidrecording_descriptor_consumer(aeron_archive_recording_descriptor_t*descriptor,void*clientd){printf("Recording ID: %ld\n",descriptor->recording_id);printf("Start Position: %ld\n",descriptor->start_position);printf("Stop Position: %ld\n",descriptor->stop_position);}// 列出录制内容int32_t count;void*my_data;aeron_archive_list_recordings(&count,// 找到的描述符数量archive,initial_recording_id,// 起始录制IDmax_record_count,// 最大列出数量recording_descriptor_consumer,my_data);

核心代码

Archive组件日志记录器

以下代码展示了Aeron中Archive组件的日志记录器实现:

/* * Copyright 2014-2025 Real Logic Limited. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */packageio.aeron.agent;importio.aeron.AeronCounters;importio.aeron.version.Versioned;importnet.bytebuddy.agent.builder.AgentBuilder;importorg.agrona.MutableDirectBuffer;importorg.agrona.collections.Object2ObjectHashMap;importjava.util.EnumSet;importjava.util.Map;importstaticio.aeron.agent.ArchiveEventCode.*;importstaticio.aeron.agent.ConfigOption.DISABLED_ARCHIVE_EVENT_CODES;importstaticio.aeron.agent.ConfigOption.ENABLED_ARCHIVE_EVENT_CODES;importstaticio.aeron.agent.EventConfiguration.parseEventCodes;importstaticnet.bytebuddy.asm.Advice.to;importstaticnet.bytebuddy.matcher.ElementMatchers.nameEndsWith;importstaticnet.bytebuddy.matcher.ElementMatchers.named;/** * Archive模块日志事件组件记录器实现。 * 用于拦截和记录Archive模块中的事件,包括连接、关闭会话、开始/停止录制等操作。 */@VersionedpublicclassArchiveComponentLoggerimplementsComponentLogger{// 启用的事件集合staticfinalEnumSet<ArchiveEventCode>ENABLED_EVENTS=EnumSet.noneOf(ArchiveEventCode.class);// 特殊事件映射(如"all"表示所有事件)privatestaticfinalObject2ObjectHashMap<String,EnumSet<ArchiveEventCode>>SPECIAL_EVENTS=newObject2ObjectHashMap<>();static{SPECIAL_EVENTS.put("all",EnumSet.allOf(ArchiveEventCode.class));}/** * 获取此记录器的类型代码 * @return Archive事件类型代码 */publicinttypeCode(){returnEventCodeType.ARCHIVE.getTypeCode();}/** * 解码缓冲区中的消息 * @param buffer 包含消息的缓冲区 * @param offset 缓冲区中消息的偏移量 * @param eventCodeId 要解码的事件ID * @param builder 用于渲染消息的StringBuilder */publicvoiddecode(finalMutableDirectBufferbuffer,finalintoffset,finalinteventCodeId,finalStringBuilderbuilder){// 解码逻辑实现}}

Archive事件枚举

以下代码定义了Archive模块中可以启用日志记录的事件类型:

/* * Copyright 2014-2025 Real Logic Limited. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */packageio.aeron.agent;importio.aeron.archive.codecs.*;importorg.agrona.MutableDirectBuffer;importjava.util.Arrays;importjava.util.function.ToIntFunction;importstaticio.aeron.agent.ArchiveEventDissector.*;/** * Archive模块中可启用日志记录的事件枚举。 * 定义了各种Archive操作对应的事件类型和处理器。 */publicenumArchiveEventCodeimplementsEventCode{/** * connect命令的Archive日志事件 */CMD_IN_CONNECT(1,ConnectRequestDecoder.TEMPLATE_ID,ArchiveEventDissector::dissectControlRequest),/** * close-session命令的Archive日志事件 */CMD_IN_CLOSE_SESSION(2,CloseSessionRequestDecoder.TEMPLATE_ID,ArchiveEventDissector::dissectControlRequest),/** * start-recording命令的Archive日志事件 */CMD_IN_START_RECORDING(3,StartRecordingRequestDecoder.TEMPLATE_ID,ArchiveEventDissector::dissectControlRequest),/** * stop-recording命令的Archive日志事件 */CMD_IN_STOP_RECORDING(4,StopRecordingRequestDecoder.TEMPLATE_ID,ArchiveEventDissector::dissectControlRequest),/** * replay命令的Archive日志事件 */CMD_IN_REPLAY(5,ReplayRequestDecoder.TEMPLATE_ID,ArchiveEventDissector::dissectControlRequest),/** * stop-replay命令的Archive日志事件 */CMD_IN_STOP_REPLAY(6,StopReplayRequestDecoder.TEMPLATE_ID,ArchiveEventDissector::dissectControlRequest);// 构造函数和其他事件定义}

动态日志记录代理

以下代码展示了Aeron的动态日志记录代理实现,支持运行时附加/分离日志记录功能:

/* * Copyright 2014-2025 Real Logic Limited. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */packageio.aeron.agent;importnet.bytebuddy.agent.ByteBuddyAgent;importorg.agrona.PropertyAction;importorg.agrona.Strings;importorg.agrona.SystemUtil;importjava.io.File;importjava.nio.file.Paths;importjava.util.Map;importstaticio.aeron.agent.ConfigOption.*;importstaticjava.lang.System.out;/** * 动态附加/分离日志记录代理到运行中的进程。 * 允许在不重启JVM的情况下启用或禁用Aeron组件的日志记录功能。 */publicclassDynamicLoggingAgent{/** * 将日志记录代理JAR附加到运行中的进程 * @param args 程序参数:[代理JAR路径] [Java进程ID] [命令] [属性文件...] */publicstaticvoidmain(finalString[]args){if(args.length<3){printHelp();System.exit(-1);}// 验证代理JAR文件finalFileagentJar=Paths.get(args[0]).toAbsolutePath().toFile();if(!agentJar.exists()){thrownewIllegalArgumentException(agentJar+" does not exist!");}// 验证进程IDfinalStringprocessId=args[1];if(Strings.isEmpty(processId)){thrownewIllegalArgumentException("no PID provided!");}finalStringcommand=args[2];switch(command){caseSTART_COMMAND:// 启动日志记录:加载属性文件并附加代理for(inti=3;i<args.length;i++){SystemUtil.loadPropertiesFile(PropertyAction.PRESERVE,args[i]);}finalMap<String,String>configOptions=fromSystemProperties();finalStringagentArgs=buildAgentArgs(configOptions);attachAgent(START_COMMAND,agentJar,processId,agentArgs);out.println("Logging started.");break;caseSTOP_COMMAND:// 停止日志记录:分离代理attachAgent(STOP_COMMAND,agentJar,processId,command);out.println("Logging stopped.");break;default:thrownewIllegalArgumentException("invalid command: "+command);}}privatestaticvoidprintHelp(){out.println("Usage: <agent-jar> <java-process-id> <command> [property files...]");out.println(" <agent-jar> - fully qualified path to the agent jar");out.println(" <java-process-id> - PID of the Java process to attach an agent to");out.println(" <command> - either '"+START_COMMAND+"' or '"+STOP_COMMAND+"'");out.println(" [property files...] - an optional list of property files to configure logging options");}}

事件配置管理

以下代码展示了Aeron事件记录系统的配置管理实现:

/* * Copyright 2014-2025 Real Logic Limited. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */packageio.aeron.agent;importorg.agrona.Strings;importorg.agrona.concurrent.UnsafeBuffer;importorg.agrona.concurrent.ringbuffer.ManyToOneRingBuffer;importjava.util.EnumSet;importjava.util.Map;importjava.util.function.Function;importjava.util.function.IntFunction;importstaticjava.lang.System.err;importstaticjava.lang.System.lineSeparator;importstaticorg.agrona.BitUtil.CACHE_LINE_LENGTH;importstaticorg.agrona.BufferUtil.allocateDirectAligned;importstaticorg.agrona.SystemUtil.getSizeAsInt;importstaticorg.agrona.concurrent.ringbuffer.RingBufferDescriptor.TRAILER_LENGTH;/** * 事件记录器和事件读取器之间的通用配置元素。 * 管理事件缓冲区、最大事件长度和事件解析等功能。 */publicfinalclassEventConfiguration{// 事件缓冲区长度系统属性名publicstaticfinalStringBUFFER_LENGTH_PROP_NAME="aeron.event.buffer.length";// 事件缓冲区默认长度(字节)publicstaticfinalintBUFFER_LENGTH_DEFAULT=8*1024*1024;// 事件最大长度(字节)publicstaticfinalintMAX_EVENT_LENGTH=4096-lineSeparator().length();// 事件读取器循环迭代限制publicstaticfinalintEVENT_READER_FRAME_LIMIT=20;// 用于日志记录的环形缓冲区publicstaticfinalManyToOneRingBufferEVENT_RING_BUFFER;static{// 初始化事件环形缓冲区EVENT_RING_BUFFER=newManyToOneRingBuffer(newUnsafeBuffer(allocateDirectAligned(getSizeAsInt(BUFFER_LENGTH_PROP_NAME,BUFFER_LENGTH_DEFAULT)+TRAILER_LENGTH,CACHE_LINE_LENGTH)));}privateEventConfiguration(){}/** * 解析代理配置 * @param eventCodeType 事件枚举类型 * @param eventCodes 用户传递的事件代码 * @param specialEvents 特殊事件(如"admin"和"all") * @param eventCodeById 通过ID解析事件的函数 * @param eventCodeByName 通过名称解析事件的函数 * @return 启用的事件集合 */publicstatic<EextendsEnum<E>>EnumSet<E>parseEventCodes(finalClass<E>eventCodeType,finalStringeventCodes,finalMap<String,EnumSet<E>>specialEvents,finalIntFunction<E>eventCodeById,finalFunction<String,E>eventCodeByName){if(Strings.isEmpty(eventCodes)){returnEnumSet.noneOf(eventCodeType);}finalEnumSet<E>eventCodeSet=EnumSet.noneOf(eventCodeType);finalString[]codeIds=eventCodes.split(",");// 解析每个事件代码for(finalStringcodeId:codeIds){finalEnumSet<E>specialCodes=specialEvents.get(codeId);if(null!=specialCodes){eventCodeSet.addAll(specialCodes);}else{Ecode=null;try{code=eventCodeByName.apply(codeId);}catch(finalIllegalArgumentExceptionignore){}if(null==code){try{code=eventCodeById.apply(Integer.parseInt(codeId));}catch(finalIllegalArgumentExceptionignore){}}if(null!=code){eventCodeSet.add(code);}else{err.println("unknown event code: "+codeId);}}}returneventCodeSet;}}

lV0AxNWpfPhw8EGNhxs7t6U5RChPpSUqB73ATmFtjvw=
更多精彩内容 请关注我的个人公众号 公众号(办公AI智能小助手)
对网络安全、黑客技术感兴趣的朋友可以关注我的安全公众号(网络安全技术点滴分享)

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/2 4:51:25

九款AI论文工具解密评测:揭开学术写作的智能黑科技!

各位学术探索者&#xff0c;我是专注论文写作科普的教育测评博主。随着AI技术全面渗透学术领域&#xff0c;后台关于“AI写论文工具如何选择”的疑问越来越多。今天&#xff0c;我将带大家深度解析九款主流AI写作工具&#xff0c;看看它们究竟如何改变我们的学术写作方式。宏智…

作者头像 李华
网站建设 2026/1/31 20:16:04

C#通用进度条怎么设计?关键问题与实现思路

对于需要长时间运行的程序&#xff0c;一个清晰、可靠的进度指示器至关重要。它不仅是用户耐心的桥梁&#xff0c;更是程序健壮性的体现。一个“通用”的进度条组件&#xff0c;其价值在于封装核心逻辑&#xff0c;提供一致的接口&#xff0c;让开发者能快速集成到命令行、图形…

作者头像 李华
网站建设 2026/1/28 6:16:30

python大学生家教管理系统_django Flask vue pycharm项目

目录已开发项目效果实现截图关于博主开发技术路线相关技术介绍核心代码参考示例结论源码lw获取/同行可拿货,招校园代理 &#xff1a;文章底部获取博主联系方式&#xff01;已开发项目效果实现截图 同行可拿货,招校园代理 ,本人源头供货商 python大学生家教管理系统_django Fl…

作者头像 李华
网站建设 2026/1/24 13:00:10

python超市进销存统计系统echart_django Flask vue pycharm项目

目录已开发项目效果实现截图关于博主开发技术路线相关技术介绍核心代码参考示例结论源码lw获取/同行可拿货,招校园代理 &#xff1a;文章底部获取博主联系方式&#xff01;已开发项目效果实现截图 同行可拿货,招校园代理 ,本人源头供货商 python超市进销存统计系统echart_dja…

作者头像 李华