背景痛点:轮询式采集的“三高”困境
去年双十一前夜,我们的智能客服系统突然“罢工”:CPU 飙到 95%,接口 P99 延迟从 300 ms 涨到 3 s,客服电话排队飙升到 2 k+。根因很简单——轮询。
- 传统 REST 轮询,每 200 ms 拉一次,峰值 5 k 坐席同时抢接口,QPS 瞬间被打满。
- 每次轮询带回整段音频,平均 120 KB,网卡被打成“麻花”,CPU 花在 JSON 解包上就占 40%。
- 更惨的是,Nginx 默认短连接,TIME_WAIT 直接吃光端口池,Linux 开始丢 SYN 包,用户听到的是“喂?喂?”然后挂电话。
一句话:高 CPU、高带宽、高延迟,三高俱全,业务只能“躺平”。
技术选型:三条路线量化对比
我们拉了三天原型,把 WebSocket、gRPC Streaming、MQ 拉到同一起跑线,压测环境 16 C32 G,同机房千兆网,指标如下:
| 方案 | 峰值 QPS | CPU 峰值 | 内存占用 | 网络 RTT 99线 | 备注 |
|---|---|---|---|---|---|
| WebSocket 长连接 | 4.2 w | 62% | 1.8 GB | 18 ms | 需自己实现 ACK 背压 |
| gRPC 流式 | 5.1 w | 55% | 2.1 GB | 15 ms | HTTP/2 多路复用,零拷贝 |
| MQ 异步 | 3.7 w | 48% | 1.5 GB | 28 ms | 吞吐受限于 Broker 磁盘 IO |
结论:gRPC 在“高并发 + 低延迟”维度最均衡;WebSocket 胜在浏览器端直接落地;MQ 适合“可接受秒级延迟”的离线场景。我们客服要求 200 ms 内返回,于是拍板:gRPC + 流式传输做主干,WebSocket 做兜底,MQ 做离线补录。
核心实现:Python asyncio 秒级拼装
1. 异步采集服务骨架
# grpc_server.py import asyncio import grpc from concurrent import futures import audio_pb2, audio_pb2_grpc class AudioServicer(audio_pb2_grpc.AudioStreamerServicer): async def StreamAudio(self, request_iterator, context): async for chunk in request_iterator: # 只把元数据放内存,音频块直接落盘 await asyncio.to_thread(save_chunk, chunk.data, chunk.seq) yield audio_pb2.Ack(seq=chunk.seq, ts=time_ns()) async def serve(): server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=200)) audio_pb2_grpc.add_AudioStreamerServicer_to_server(AudioServicer(), server) server.add_insecure_port('[::]:50051') await server.start() await server.wait_for_termination() if __name__ == '__main': asyncio.run(serve())亮点:
grpc.aio官方异步 stub,天然搭配asyncio。- 线程池只负责磁盘 IO,CPU 密集逻辑(如编码)交给线程池外的
ProcessPoolExecutor,避免 GIL 互掐。
2. 音频分块与元数据分离
- 块文件:
{call_id}/{seq}.opus,单块 20 ms,平均 2 KB。 - 元数据:JSON 存 Redis,结构
{ "call_id": "UUID", "seq": 123, "ts": 168000123456789, "bucket": "oss-bucket", "path": "audio/2024/05/22/xxx.opus" }分离后,索引写 Redis 平均 0.3 ms,回调查询走内存,不碰磁盘。
3. 分布式锁防重复
# redis_lock.py import redis.asyncio as redis import uuid async def acquire_lock(r: redis.Redis, key: str, ttl: int = 10) -> str: val = uuid.uuid4().hex ok = await r.set(key, val, nx=True, ex=ttl) return val if ok else None async def release_lock(r: redis.Redis, key: str, val: str): lua = """ if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end """ await r.eval(lua, 1, key, val)- 采用 Redlock 单实例版,10 s 过期,防止进程挂掉死锁。
- 同一
call_id只会有一个实例写块,其余直接回 ACK,不落地,节省 30% 磁盘 IO。
性能测试:数据说话
压测脚本用k6 + grpc-js,跑在 8 台 4 C8 G 压测机,分别模拟 100 / 1 k / 5 k 路并发通话,每路 50 包 / s,持续 5 min。
| 并发路数 | 优化前吞吐(包/s) | 优化后吞吐(包/s) | 提升倍数 |
|---|---|---|---|
| 100 | 4,800 | 14,500 | 3.0× |
| 1 k | 30,000 | 125,000 | 4.2× |
| 5 k | 71,000 | 290,000 | 4.1× |
内存占用监控用 Prometheus +prometheus_client,关键片段:
# metrics.py from prometheus_client import Gauge, start_http_server import psutil MEM_GAUGE = Gauge('svc_mem_rss_bytes', 'RSS memory') async def report_mem(): while True: MEM_GAUGE.set(psutil.Process().memory_info().rss) await asyncio.sleep(15) if __name__ == '__main__': start_http_server(8000) asyncio.run(report_mem())5 k 并发下,RSS 从 3.2 GB 降到 1.9 GB,主要得益于“零拷贝”传输与对象复用。
避坑指南:三次踩坑三次填平
音频编解码 CPU 尖峰
现象:opus 解码在 5 k 路时单核飙 100 %。
解法:把解码后置到离线流水线,采集端只存原始压缩流;需要实时 ASR 的,用 Cgo 调用 libopus 的 NEON 汇编版,CPU 降 60%。网络抖动导致缺包
现象:客户端重传后,seq 跳号,ASR 拼接报错。
解法:gRPC 层加 200 ms 滑动窗口,服务端收到跳号就发 Nack,客户端缓存重发;窗口内补齐率 > 99.5%。分布式时钟同步陷阱
现象:两台机器 NTP 偏差 300 ms,导致同一次通话的块时间戳乱序,质检系统误判“丢 300 ms 录音”。
解法:- 时间戳统一用
time_ns()本地单调时钟,业务层只取相对差值; - 对外展示再换算成 NTP 校准后的墙上时钟;
- 杜绝“混合使用”本地与远程时钟。
- 时间戳统一用
结论与开放问题
把轮询换成 gRPC 流式,叠加 asyncio、分块存储、分布式锁三板斧,我们在不增加机器的情况下把吞吐提升 4 倍,CPU 反而降了 30%。但新架构也带来一个新难题:流式传输的实时性越高,数据在链路上“飞”得越快,一旦下游 ASR 或存储慢 1 s,背压就会反压到客户端,用户可能感知到延迟。此时若为了强一致性把 ACK 改成同步落盘再返回,实时性又被打回原型。
如何平衡实时性与数据一致性?是继续用“客户端缓存 + 重试”的柔性事务,还是引入基于 Paxos 的强一致写?期待大家一起探讨。