news 2026/2/25 16:51:21

mediasoup源码走读(五)——RTP流处理

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
mediasoup源码走读(五)——RTP流处理

🧩 5.1、整体架构图

发送RTP包
接收RTP包
路由决策
维护关系
维护关系
存储流
请求流
转发数据
发送RTP包
推流客户端
WebRtcTransport
Worker进程
Router
Producer
Consumer
WebRtcTransport
观看客户端

说明

  • Worker进程:C++子进程,负责媒体处理(ICE/DTLS/RTP)
  • Router:媒体流逻辑中枢,维护Producer-Consumer关系
  • Producer:媒体源(推流端),存储并管理RTP流
  • Consumer:媒体接收端(观看端),请求并消费RTP流
  • WebRtcTransport:网络传输层,处理RTP/RTCP协议

📦 5.2、RTP包流转全流程

🌟 关键流程图(含函数名中文注释)

推流客户端
WebRtcTransport
Router
Producer
Consumer
WebRtcTransport
观看客户端
sendRtpPacket
onTransportProducerRtpPacketReceived
addRtpPacket
遍历Consumer列表
onRtpPacket

说明

  1. sendRtpPacket():推流客户端发送RTP包
  2. onTransportProducerRtpPacketReceived():Worker接收RTP包后触发
  3. addRtpPacket():Producer存储RTP包
  4. onRtpPacket():Consumer处理RTP包
  5. sendRtpPacket():Consumer转发RTP包给观看端

⏱️ 5.3、关键时序图

推流客户端WebRtcTransportWorkerRouterProducerConsumer观看客户端sendRtpPacket(rtpPacket)onTransportProducerRtpPacketReceived(producer, rtpPacket)onTransportProducerRtpPacketReceived(producer, rtpPacket)addRtpPacket(rtpPacket)遍历mapProducerConsumers[producer]onRtpPacket(rtpPacket)sendRtpPacket(rtpPacket)发送RTP包loop[每个Consumer]解码并渲染推流客户端WebRtcTransportWorkerRouterProducerConsumer观看客户端

说明

  • onTransportProducerRtpPacketReceived():Worker接收RTP包的核心入口
  • addRtpPacket():Producer存储RTP包,用于RTX重传
  • mapProducerConsumers[producer]:Router维护的Producer-Consumer映射
  • onRtpPacket():Consumer处理RTP包的核心方法

📊 5.4、关键类图关联

包含
关联
关联
包含
包含
关联
«interface»
EnhancedEventEmitter
+emit(event, data)
+on(event, callback)
Worker
-channel: Channel 与JS层通信
-routerMap: Map 管理多个Router
+createRouter(options)
+onTransportProducerRtpPacketReceived(producer, packet)
Router
-producerMap: Map 所有Producer
-consumerMap: Map 所有Consumer
-mapProducerConsumers: Map> Producer-Consumer关系
-mapConsumerProducer: Map Consumer-Producer关系
+onTransportProducerRtpPacketReceived(producer, packet)
+onTransportNewProducer(transport, producer)
+onTransportNewConsumer(transport, consumer, producerId)
Producer
-rtpStreamMap: Map 存储RTP流
-producerId: string Producer唯一ID
+addRtpPacket(packet)
+getRtpStream(ssrc)
+requestRtx(ssrc, seq)
Consumer
-rtpStreamMap: Map 存储RTP流
-producer: Producer 关联的Producer
-consumerId: string Consumer唯一ID
+onRtpPacket(packet)
+requestNack(seq)
WebRtcTransport
-localSdp: string 本地SDP
-remoteSdp: string 远端SDP
+sendRtpPacket(packet)
+sendRtcpPacket(packet)
RtpStream
-ssrc: uint32 SSRC
-sequenceNumber: uint16 序列号
-timestamp: uint32 时间戳
-packetList: Queue 包队列
+addPacket(packet)
+getPacket(seq)

说明

  • mapProducerConsumers:Router维护的Producer-Consumer映射(一对多)
  • mapConsumerProducer:Router维护的Consumer-Producer映射(一对一)
  • RtpStream:RTP流管理单元,存储RTP包队列
  • addPacket():RtpStream存储RTP包

🔧 5.5、关键代码片段

5.5.1. Router核心处理逻辑(C++层)

// 文件: src/Router.cpp/** * 处理接收到的RTP包(核心入口函数) * @param producer 关联的Producer * @param packet 接收到的RTP包 */voidRouter::onTransportProducerRtpPacketReceived(Producer*producer,RtpPacket*packet){// 1. 检查是否为RTX包(重传包)if(packet->isRtx()){handleRtxPacket(producer,packet);// 处理RTX重传包return;}// 2. 检查RTP包是否为新流(SSRC首次出现)if(!producer->hasRtpStream(packet->ssrc())){// 2.1 创建新的RTP流RtpStream*rtpStream=newRtpStream(packet->ssrc(),packet);producer->addRtpStream(rtpStream);// 2.2 通知所有关联的Consumer新流已建立auto&consumers=mapProducerConsumers[producer];for(auto*consumer:consumers){consumer->onNewRtpStream(rtpStream);// Consumer处理新流}}// 3. 将RTP包添加到Producer的RTP流中producer->addRtpPacket(packet);// 4. 遍历所有关联的Consumer,转发RTP包auto&consumers=mapProducerConsumers[producer];for(auto*consumer:consumers){consumer->onRtpPacket(packet);// Consumer处理RTP包}}/** * 处理RTX重传包 * @param producer 关联的Producer * @param rtxPacket RTX重传包 */voidRouter::handleRtxPacket(Producer*producer,RtpPacket*rtxPacket){// 1. 从RTX包中提取原始RTP包信息RtpPacket*originalPacket=rtxPacket->getOriginalPacket();// 2. 检查原始包是否已存在(是否已接收过)if(!producer->hasRtpStream(originalPacket->ssrc())){// 2.1 如果原始包不存在,请求重传producer->requestRtx(originalPacket->ssrc(),originalPacket->sequenceNumber());return;}// 3. 将原始RTP包转发给所有关联的Consumerauto&consumers=mapProducerConsumers[producer];for(auto*consumer:consumers){consumer->onRtpPacket(originalPacket);// 转发原始包}}/** * 新Producer创建时的处理 * @param transport 关联的Transport * @param producer 新创建的Producer */voidRouter::onTransportNewProducer(Transport*transport,Producer*producer){// 1. 将Producer添加到Router的Producer列表producerMap[producer->id()]=producer;// 2. 初始化Producer的Consumer集合mapProducerConsumers[producer]=std::unordered_set<Consumer*>();// 3. 通知应用层(可选)emit("producer",producer);}/** * 新Consumer创建时的处理 * @param transport 关联的Transport * @param consumer 新创建的Consumer * @param producerId 要消费的Producer ID */voidRouter::onTransportNewConsumer(Transport*transport,Consumer*consumer,conststd::string&producerId){// 1. 查找目标ProducerautoproducerIt=producerMap.find(producerId);if(producerIt==producerMap.end()){throwstd::runtime_error("Producer not found: "+producerId);}Producer*producer=producerIt->second;// 2. 建立Consumer与Producer的关联mapConsumerProducer[consumer]=producer;mapProducerConsumers[producer].insert(consumer);// 3. 通知应用层(可选)emit("consumer",consumer);// 4. 如果Producer已有数据,立即发送if(producer->isRtpStreamActive()){producer->sendRtpStreamToConsumer(consumer);}}

5.5.2. Producer核心处理逻辑(C++层)

// 文件: src/Producer.cpp/** * 添加RTP包到Producer的RTP流 * @param packet 要添加的RTP包 */voidProducer::addRtpPacket(RtpPacket*packet){// 1. 获取或创建RTP流RtpStream*rtpStream=getOrCreateRtpStream(packet->ssrc());// 2. 将RTP包添加到RTP流队列rtpStream->addPacket(packet);// 3. 更新统计信息updateStatistics(packet);}/** * 获取或创建RTP流 * @param ssrc RTP流的SSRC * @return RtpStream指针 */RtpStream*Producer::getOrCreateRtpStream(uint32_tssrc){// 1. 检查是否已存在该SSRC的RTP流autoit=rtpStreamMap.find(ssrc);if(it!=rtpStreamMap.end()){returnit->second;}// 2. 创建新的RTP流RtpStream*rtpStream=newRtpStream(ssrc);rtpStreamMap[ssrc]=rtpStream;returnrtpStream;}/** * 请求RTX重传(当Consumer请求重传时) * @param ssrc RTP流的SSRC * @param sequenceNumber 要重传的序列号 */voidProducer::requestRtx(uint32_tssrc,uint16_tsequenceNumber){// 1. 获取RTP流autoit=rtpStreamMap.find(ssrc);if(it==rtpStreamMap.end()){return;// 不存在该流}RtpStream*rtpStream=it->second;// 2. 检查序列号是否在范围内if(sequenceNumber<rtpStream->getFirstSequenceNumber()||sequenceNumber>rtpStream->getLastSequenceNumber()){return;// 序列号超出范围}// 3. 获取要重传的包RtpPacket*packet=rtpStream->getPacket(sequenceNumber);if(!packet){return;// 未找到包}// 4. 创建RTX包并发送RtpPacket*rtxPacket=createRtxPacket(packet);sendRtxPacket(rtxPacket);}/** * 发送RTP流到Consumer(用于新Consumer建立连接) * @param consumer 要发送的Consumer */voidProducer::sendRtpStreamToConsumer(Consumer*consumer){// 1. 遍历所有RTP流for(auto&pair:rtpStreamMap){RtpStream*rtpStream=pair.second;// 2. 获取当前RTP流的最新包RtpPacket*latestPacket=rtpStream->getLastPacket();if(latestPacket){// 3. 发送最新包给Consumerconsumer->onRtpPacket(latestPacket);}}}

5.5.3. Consumer核心处理逻辑(C++层)

// 文件: src/Consumer.cpp/** * 处理接收到的RTP包 * @param packet 接收到的RTP包 */voidConsumer::onRtpPacket(RtpPacket*packet){// 1. 更新接收统计信息updateStatistics(packet);// 2. 检查是否需要NACK(丢包重传请求)if(isPacketLost(packet->sequenceNumber)){requestNack(packet->sequenceNumber());// 请求重传}// 3. 将RTP包转发给WebRtcTransporttransport->sendRtpPacket(packet);}/** * 检查序列号是否丢失 * @param sequenceNumber 要检查的序列号 * @return 是否丢失 */boolConsumer::isPacketLost(uint16_tsequenceNumber){// 1. 检查是否是第一个包if(firstSequenceNumber==-1){firstSequenceNumber=sequenceNumber;returnfalse;}// 2. 检查序列号是否连续if(sequenceNumber==nextExpectedSequenceNumber){nextExpectedSequenceNumber=(sequenceNumber+1)%65536;returnfalse;}// 3. 如果序列号不是下一个,说明有丢包returntrue;}/** * 请求NACK重传(发送NACK请求给Producer) * @param sequenceNumber 丢失的序列号 */voidConsumer::requestNack(uint16_tsequenceNumber){// 1. 创建NACK包RtcpPacket*nackPacket=createNackPacket(sequenceNumber);// 2. 发送NACK包到Producertransport->sendRtcpPacket(nackPacket);}/** * 处理新RTP流(当Producer有新流时) * @param rtpStream 新的RTP流 */voidConsumer::onNewRtpStream(RtpStream*rtpStream){// 1. 将RTP流添加到Consumer的RTP流集合rtpStreamMap[rtpStream->ssrc()]=rtpStream;// 2. 通知应用层emit("newRtpStream",rtpStream);}

5.5.4. RTP流管理核心逻辑(C++层)

// 文件: src/RtpStream.cpp/** * RtpStream构造函数 * @param ssrc RTP流的SSRC * @param firstPacket 首个RTP包 */RtpStream::RtpStream(uint32_tssrc,RtpPacket*firstPacket):ssrc(ssrc),firstSequenceNumber(firstPacket->sequenceNumber()),lastSequenceNumber(firstPacket->sequenceNumber()){// 1. 将首个包加入队列packetList.push(firstPacket);}/** * 添加RTP包到流 * @param packet 要添加的RTP包 */voidRtpStream::addPacket(RtpPacket*packet){// 1. 检查序列号是否连续if(packet->sequenceNumber()==lastSequenceNumber+1){// 2. 序列号连续,添加到队列末尾packetList.push(packet);lastSequenceNumber=packet->sequenceNumber();}elseif(packet->sequenceNumber()>lastSequenceNumber){// 3. 序列号跳跃,可能有丢包// 4. 但不处理,先存储packetList.push(packet);lastSequenceNumber=packet->sequenceNumber();}else{// 5. 序列号回绕(超过65535)if(packet->sequenceNumber()<firstSequenceNumber){// 6. 重传包,直接覆盖packetList.push(packet);}}}/** * 获取指定序列号的RTP包 * @param sequenceNumber 要获取的序列号 * @return RtpPacket指针,若不存在返回nullptr */RtpPacket*RtpStream::getPacket(uint16_tsequenceNumber){// 1. 遍历包队列for(auto&packet:packetList){if(packet->sequenceNumber()==sequenceNumber){returnpacket;}}returnnullptr;}/** * 获取最新RTP包 * @return 最新RTP包指针 */RtpPacket*RtpStream::getLastPacket(){if(!packetList.empty()){returnpacketList.back();}returnnullptr;}

🌟 5.6、关键机制解析

5.6.1. RTX(重传)机制

ConsumerRouterProducer发送NACK请求请求RTX重传发送RTX包转发RTX包ConsumerRouterProducer

工作流程

  1. Consumer检测到丢包(序列号不连续)
  2. Consumer发送NACK请求给Router
  3. Router将NACK转发给Producer
  4. Producer生成RTX包(包含原始包信息)
  5. Router将RTX包转发给Consumer

5.6.2. NACK(丢包重传请求)机制

ConsumerRouterProducer检测到丢包发送NACK包请求重传发送RTX包转发RTX包ConsumerRouterProducer

关键点

  • NACK是Consumer向Producer请求重传的机制
  • Router作为中介转发NACK请求
  • Producer生成RTX包(重传包)并发送

5.6.3. 流量控制机制

// 在Consumer::onRtpPacket()中voidConsumer::onRtpPacket(RtpPacket*packet){// 1. 更新接收统计updateStatistics(packet);// 2. 检查是否超过带宽限制if(isBandwidthExceeded()){// 3. 请求降级(如降低码率)requestBandwidthReduction();return;}// 4. 正常转发transport->sendRtpPacket(packet);}

机制说明

  • Consumer实时监控接收带宽
  • 超过阈值时请求Producer降级
  • 通过RTCP反馈机制实现

📊 5.7、关键数据结构关系图

mapProducerConsumers
mapConsumerProducer
rtpStreamMap
rtpStreamMap
packetList
Router
Producer
Consumer
RtpStream
RtpPacket

关系说明

  1. mapProducerConsumers:Router维护Producer-Consumer映射(一对多)
  2. mapConsumerProducer:Router维护Consumer-Producer映射(一对一)
  3. rtpStreamMap:Producer/Consumer存储RTP流
  4. packetList:RtpStream存储RTP包队列

💡 5.8、这样设计的优势

  1. 选择性转发:Router仅转发Consumer需要的RTP包,避免全量复制

    • 4人视频会议:4个Producer → 4×3=12个RTP包 → 实际转发12个包(不是48个)
  2. RTX重传优化:只重传丢失的包,而非整个流

    • 丢包率10% → 仅重传10%的包,而非100%
  3. 内存高效:RtpStream仅存储最近的RTP包(滑动窗口)

    • 通常只存储100个包(约2秒视频数据)
  4. 事件驱动:通过EnhancedEventEmitter实现解耦

    • Producer、Consumer、Router之间通过事件通信

🌟 5.9、总结:RTP流处理全生命周期

发送RTP包
onTransportProducerRtpPacketReceived
存储RTP包
遍历Consumer
处理RTP包
发送RTP包
检测丢包
转发
请求RTX
发送RTX包
转发
推流客户端
WebRtcTransport
Router
Producer
Consumer
WebRtcTransport
观看客户端
NACK请求

完整生命周期

  1. 推流客户端发送RTP包 → WebRtcTransport
  2. Router接收RTP包,存储到Producer
  3. Router遍历Consumer列表,转发RTP包
  4. Consumer处理RTP包,检测丢包
  5. Consumer请求NACK重传 → Router转发
  6. Producer生成RTX包 → Router转发给Consumer
  7. 观看客户端接收RTP包并渲染
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/24 0:59:11

解放搜索时间!SearchEngineJumpPlus让你告别重复复制粘贴

解放搜索时间&#xff01;SearchEngineJumpPlus让你告别重复复制粘贴 【免费下载链接】SearchEngineJumpPlus 增强版搜索引擎跳转脚本&#xff0c;优化一些使用体验&#xff0c;Tampermonkey Userscript 项目地址: https://gitcode.com/GitHub_Trending/se/SearchEngineJumpP…

作者头像 李华
网站建设 2026/2/23 20:16:06

AI视频生成终极指南:腾讯HunyuanVideo 1.5完整部署教程

AI视频生成终极指南&#xff1a;腾讯HunyuanVideo 1.5完整部署教程 【免费下载链接】HunyuanVideo 项目地址: https://ai.gitcode.com/hf_mirrors/tencent/HunyuanVideo 随着AI视频生成技术的快速发展&#xff0c;腾讯混元团队推出的HunyuanVideo 1.5以83亿参数实现了专…

作者头像 李华
网站建设 2026/2/25 7:44:51

46、Python 网络编程与套接字全解析

Python 网络编程与套接字全解析 1. UDP 消息客户端 以下是一个向服务器发送消息的 UDP 客户端示例: # UDP message client import socket s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.sendto(b"Hello World", ("", 10000)) resp, addr =…

作者头像 李华
网站建设 2026/2/22 17:58:59

微信自动答题小工具终极指南:Python开发者的效率利器

微信自动答题小工具终极指南&#xff1a;Python开发者的效率利器 【免费下载链接】微信自动答题小工具使用说明 微信自动答题小工具是一款专为PyCharm环境设计的实用工具&#xff0c;支持在PC端运行的微信小程序中实现自动答题功能。通过预设的智能算法&#xff0c;该工具能够高…

作者头像 李华
网站建设 2026/2/23 20:16:06

实战指南:从零开始掌握Langflow自定义组件开发

实战指南&#xff1a;从零开始掌握Langflow自定义组件开发 【免费下载链接】langflow ⛓️ Langflow is a visual framework for building multi-agent and RAG applications. Its open-source, Python-powered, fully customizable, model and vector store agnostic. 项目地…

作者头像 李华
网站建设 2026/2/23 21:03:44

FastAPI性能优化深度解析:从基础到高级实践

FastAPI性能优化深度解析&#xff1a;从基础到高级实践 【免费下载链接】fastapi-tips FastAPI Tips by The FastAPI Expert! 项目地址: https://gitcode.com/GitHub_Trending/fa/fastapi-tips 在当今的Web开发领域&#xff0c;FastAPI凭借其卓越的性能和开发效率&#…

作者头像 李华