🧩 5.1、整体架构图
说明:
- Worker进程:C++子进程,负责媒体处理(ICE/DTLS/RTP)
- Router:媒体流逻辑中枢,维护Producer-Consumer关系
- Producer:媒体源(推流端),存储并管理RTP流
- Consumer:媒体接收端(观看端),请求并消费RTP流
- WebRtcTransport:网络传输层,处理RTP/RTCP协议
📦 5.2、RTP包流转全流程
🌟 关键流程图(含函数名中文注释)
说明:
sendRtpPacket():推流客户端发送RTP包onTransportProducerRtpPacketReceived():Worker接收RTP包后触发addRtpPacket():Producer存储RTP包onRtpPacket():Consumer处理RTP包sendRtpPacket():Consumer转发RTP包给观看端
⏱️ 5.3、关键时序图
说明:
onTransportProducerRtpPacketReceived():Worker接收RTP包的核心入口addRtpPacket():Producer存储RTP包,用于RTX重传mapProducerConsumers[producer]:Router维护的Producer-Consumer映射onRtpPacket():Consumer处理RTP包的核心方法
📊 5.4、关键类图关联
说明:
mapProducerConsumers:Router维护的Producer-Consumer映射(一对多)mapConsumerProducer:Router维护的Consumer-Producer映射(一对一)RtpStream:RTP流管理单元,存储RTP包队列addPacket():RtpStream存储RTP包
🔧 5.5、关键代码片段
5.5.1. Router核心处理逻辑(C++层)
// 文件: src/Router.cpp/** * 处理接收到的RTP包(核心入口函数) * @param producer 关联的Producer * @param packet 接收到的RTP包 */voidRouter::onTransportProducerRtpPacketReceived(Producer*producer,RtpPacket*packet){// 1. 检查是否为RTX包(重传包)if(packet->isRtx()){handleRtxPacket(producer,packet);// 处理RTX重传包return;}// 2. 检查RTP包是否为新流(SSRC首次出现)if(!producer->hasRtpStream(packet->ssrc())){// 2.1 创建新的RTP流RtpStream*rtpStream=newRtpStream(packet->ssrc(),packet);producer->addRtpStream(rtpStream);// 2.2 通知所有关联的Consumer新流已建立auto&consumers=mapProducerConsumers[producer];for(auto*consumer:consumers){consumer->onNewRtpStream(rtpStream);// Consumer处理新流}}// 3. 将RTP包添加到Producer的RTP流中producer->addRtpPacket(packet);// 4. 遍历所有关联的Consumer,转发RTP包auto&consumers=mapProducerConsumers[producer];for(auto*consumer:consumers){consumer->onRtpPacket(packet);// Consumer处理RTP包}}/** * 处理RTX重传包 * @param producer 关联的Producer * @param rtxPacket RTX重传包 */voidRouter::handleRtxPacket(Producer*producer,RtpPacket*rtxPacket){// 1. 从RTX包中提取原始RTP包信息RtpPacket*originalPacket=rtxPacket->getOriginalPacket();// 2. 检查原始包是否已存在(是否已接收过)if(!producer->hasRtpStream(originalPacket->ssrc())){// 2.1 如果原始包不存在,请求重传producer->requestRtx(originalPacket->ssrc(),originalPacket->sequenceNumber());return;}// 3. 将原始RTP包转发给所有关联的Consumerauto&consumers=mapProducerConsumers[producer];for(auto*consumer:consumers){consumer->onRtpPacket(originalPacket);// 转发原始包}}/** * 新Producer创建时的处理 * @param transport 关联的Transport * @param producer 新创建的Producer */voidRouter::onTransportNewProducer(Transport*transport,Producer*producer){// 1. 将Producer添加到Router的Producer列表producerMap[producer->id()]=producer;// 2. 初始化Producer的Consumer集合mapProducerConsumers[producer]=std::unordered_set<Consumer*>();// 3. 通知应用层(可选)emit("producer",producer);}/** * 新Consumer创建时的处理 * @param transport 关联的Transport * @param consumer 新创建的Consumer * @param producerId 要消费的Producer ID */voidRouter::onTransportNewConsumer(Transport*transport,Consumer*consumer,conststd::string&producerId){// 1. 查找目标ProducerautoproducerIt=producerMap.find(producerId);if(producerIt==producerMap.end()){throwstd::runtime_error("Producer not found: "+producerId);}Producer*producer=producerIt->second;// 2. 建立Consumer与Producer的关联mapConsumerProducer[consumer]=producer;mapProducerConsumers[producer].insert(consumer);// 3. 通知应用层(可选)emit("consumer",consumer);// 4. 如果Producer已有数据,立即发送if(producer->isRtpStreamActive()){producer->sendRtpStreamToConsumer(consumer);}}5.5.2. Producer核心处理逻辑(C++层)
// 文件: src/Producer.cpp/** * 添加RTP包到Producer的RTP流 * @param packet 要添加的RTP包 */voidProducer::addRtpPacket(RtpPacket*packet){// 1. 获取或创建RTP流RtpStream*rtpStream=getOrCreateRtpStream(packet->ssrc());// 2. 将RTP包添加到RTP流队列rtpStream->addPacket(packet);// 3. 更新统计信息updateStatistics(packet);}/** * 获取或创建RTP流 * @param ssrc RTP流的SSRC * @return RtpStream指针 */RtpStream*Producer::getOrCreateRtpStream(uint32_tssrc){// 1. 检查是否已存在该SSRC的RTP流autoit=rtpStreamMap.find(ssrc);if(it!=rtpStreamMap.end()){returnit->second;}// 2. 创建新的RTP流RtpStream*rtpStream=newRtpStream(ssrc);rtpStreamMap[ssrc]=rtpStream;returnrtpStream;}/** * 请求RTX重传(当Consumer请求重传时) * @param ssrc RTP流的SSRC * @param sequenceNumber 要重传的序列号 */voidProducer::requestRtx(uint32_tssrc,uint16_tsequenceNumber){// 1. 获取RTP流autoit=rtpStreamMap.find(ssrc);if(it==rtpStreamMap.end()){return;// 不存在该流}RtpStream*rtpStream=it->second;// 2. 检查序列号是否在范围内if(sequenceNumber<rtpStream->getFirstSequenceNumber()||sequenceNumber>rtpStream->getLastSequenceNumber()){return;// 序列号超出范围}// 3. 获取要重传的包RtpPacket*packet=rtpStream->getPacket(sequenceNumber);if(!packet){return;// 未找到包}// 4. 创建RTX包并发送RtpPacket*rtxPacket=createRtxPacket(packet);sendRtxPacket(rtxPacket);}/** * 发送RTP流到Consumer(用于新Consumer建立连接) * @param consumer 要发送的Consumer */voidProducer::sendRtpStreamToConsumer(Consumer*consumer){// 1. 遍历所有RTP流for(auto&pair:rtpStreamMap){RtpStream*rtpStream=pair.second;// 2. 获取当前RTP流的最新包RtpPacket*latestPacket=rtpStream->getLastPacket();if(latestPacket){// 3. 发送最新包给Consumerconsumer->onRtpPacket(latestPacket);}}}5.5.3. Consumer核心处理逻辑(C++层)
// 文件: src/Consumer.cpp/** * 处理接收到的RTP包 * @param packet 接收到的RTP包 */voidConsumer::onRtpPacket(RtpPacket*packet){// 1. 更新接收统计信息updateStatistics(packet);// 2. 检查是否需要NACK(丢包重传请求)if(isPacketLost(packet->sequenceNumber)){requestNack(packet->sequenceNumber());// 请求重传}// 3. 将RTP包转发给WebRtcTransporttransport->sendRtpPacket(packet);}/** * 检查序列号是否丢失 * @param sequenceNumber 要检查的序列号 * @return 是否丢失 */boolConsumer::isPacketLost(uint16_tsequenceNumber){// 1. 检查是否是第一个包if(firstSequenceNumber==-1){firstSequenceNumber=sequenceNumber;returnfalse;}// 2. 检查序列号是否连续if(sequenceNumber==nextExpectedSequenceNumber){nextExpectedSequenceNumber=(sequenceNumber+1)%65536;returnfalse;}// 3. 如果序列号不是下一个,说明有丢包returntrue;}/** * 请求NACK重传(发送NACK请求给Producer) * @param sequenceNumber 丢失的序列号 */voidConsumer::requestNack(uint16_tsequenceNumber){// 1. 创建NACK包RtcpPacket*nackPacket=createNackPacket(sequenceNumber);// 2. 发送NACK包到Producertransport->sendRtcpPacket(nackPacket);}/** * 处理新RTP流(当Producer有新流时) * @param rtpStream 新的RTP流 */voidConsumer::onNewRtpStream(RtpStream*rtpStream){// 1. 将RTP流添加到Consumer的RTP流集合rtpStreamMap[rtpStream->ssrc()]=rtpStream;// 2. 通知应用层emit("newRtpStream",rtpStream);}5.5.4. RTP流管理核心逻辑(C++层)
// 文件: src/RtpStream.cpp/** * RtpStream构造函数 * @param ssrc RTP流的SSRC * @param firstPacket 首个RTP包 */RtpStream::RtpStream(uint32_tssrc,RtpPacket*firstPacket):ssrc(ssrc),firstSequenceNumber(firstPacket->sequenceNumber()),lastSequenceNumber(firstPacket->sequenceNumber()){// 1. 将首个包加入队列packetList.push(firstPacket);}/** * 添加RTP包到流 * @param packet 要添加的RTP包 */voidRtpStream::addPacket(RtpPacket*packet){// 1. 检查序列号是否连续if(packet->sequenceNumber()==lastSequenceNumber+1){// 2. 序列号连续,添加到队列末尾packetList.push(packet);lastSequenceNumber=packet->sequenceNumber();}elseif(packet->sequenceNumber()>lastSequenceNumber){// 3. 序列号跳跃,可能有丢包// 4. 但不处理,先存储packetList.push(packet);lastSequenceNumber=packet->sequenceNumber();}else{// 5. 序列号回绕(超过65535)if(packet->sequenceNumber()<firstSequenceNumber){// 6. 重传包,直接覆盖packetList.push(packet);}}}/** * 获取指定序列号的RTP包 * @param sequenceNumber 要获取的序列号 * @return RtpPacket指针,若不存在返回nullptr */RtpPacket*RtpStream::getPacket(uint16_tsequenceNumber){// 1. 遍历包队列for(auto&packet:packetList){if(packet->sequenceNumber()==sequenceNumber){returnpacket;}}returnnullptr;}/** * 获取最新RTP包 * @return 最新RTP包指针 */RtpPacket*RtpStream::getLastPacket(){if(!packetList.empty()){returnpacketList.back();}returnnullptr;}🌟 5.6、关键机制解析
5.6.1. RTX(重传)机制
工作流程:
- Consumer检测到丢包(序列号不连续)
- Consumer发送NACK请求给Router
- Router将NACK转发给Producer
- Producer生成RTX包(包含原始包信息)
- Router将RTX包转发给Consumer
5.6.2. NACK(丢包重传请求)机制
关键点:
- NACK是Consumer向Producer请求重传的机制
- Router作为中介转发NACK请求
- Producer生成RTX包(重传包)并发送
5.6.3. 流量控制机制
// 在Consumer::onRtpPacket()中voidConsumer::onRtpPacket(RtpPacket*packet){// 1. 更新接收统计updateStatistics(packet);// 2. 检查是否超过带宽限制if(isBandwidthExceeded()){// 3. 请求降级(如降低码率)requestBandwidthReduction();return;}// 4. 正常转发transport->sendRtpPacket(packet);}机制说明:
- Consumer实时监控接收带宽
- 超过阈值时请求Producer降级
- 通过RTCP反馈机制实现
📊 5.7、关键数据结构关系图
关系说明:
mapProducerConsumers:Router维护Producer-Consumer映射(一对多)mapConsumerProducer:Router维护Consumer-Producer映射(一对一)rtpStreamMap:Producer/Consumer存储RTP流packetList:RtpStream存储RTP包队列
💡 5.8、这样设计的优势
选择性转发:Router仅转发Consumer需要的RTP包,避免全量复制
- 4人视频会议:4个Producer → 4×3=12个RTP包 → 实际转发12个包(不是48个)
RTX重传优化:只重传丢失的包,而非整个流
- 丢包率10% → 仅重传10%的包,而非100%
内存高效:RtpStream仅存储最近的RTP包(滑动窗口)
- 通常只存储100个包(约2秒视频数据)
事件驱动:通过
EnhancedEventEmitter实现解耦- Producer、Consumer、Router之间通过事件通信
🌟 5.9、总结:RTP流处理全生命周期
完整生命周期:
- 推流客户端发送RTP包 → WebRtcTransport
- Router接收RTP包,存储到Producer
- Router遍历Consumer列表,转发RTP包
- Consumer处理RTP包,检测丢包
- Consumer请求NACK重传 → Router转发
- Producer生成RTX包 → Router转发给Consumer
- 观看客户端接收RTP包并渲染