高频行情事件队列
一、原问题分析
1.1 原有模数分配算法问题
算法公式:
index=(next_index_+1)%handler_ptrs_.size()问题分析:
- 算法错误:每次分配都先+1再取模,导致实际分配的起始索引偏移了1
- 轮转偏移:如果
next_index_初始为0,且size=4,那么分配序列为:第一次:(0+1)%4=1→ 处理器1第二次:(1+1)%4=2→ 处理器2第三次:(2+1)%4=3→ 处理器3第四次:(3+1)%4=0→ 处理器0第五次:(0+1)%4=1→ 处理器1 - 正确公式:
index = next_index_ % handler_ptrs_.size()
1.2 根本性问题
问题:
- 静态哈希映射:股票ID到处理器的映射是固定不变的
- 无法动态均衡:即使某些处理器空闲,也无法分担其他处理器的负载
- 单线程瓶颈:每个处理器只能单线程运行,无法利用多核
- 热点股票问题:热门股票消息量大,其固定分配的处理器必然过载
二、未来方向
2.1 目标
- 多线程可驱动同一队列:允许多个CPU核心同时处理同一股票的消息
- 保持时序线性:同一股票的多个消息必须按接收顺序串行处理
- 动态负载均衡:队列能自动将负载分配到空闲处理器
- 最小化同步开销:避免传统锁带来的性能损耗
2.2 示意图
三、StrandQueue(串行队列)
3.1 状态机与执行权
3.2 原子操作与内存屏障
原子变量:
classStrandQueue{// 执行线程标记:0=空闲,非0=当前执行线程IDstd::atomic<uint64_t>executing_thread{0};// 待处理消息计数std::atomic<uint32_t>pending_messages{0};// 写锁(用于消息入队/出队)std::atomic_flag write_lock=ATOMIC_FLAG_INIT;// 内部消息队列(线程不安全)std::list<MessagePtr>message_list;}执行权获取:
booltry_acquire_execution(uint64_tthread_id){uint64_texpected=0;// CAS操作:只有当executing_thread==0时,才将其设置为thread_idreturnexecuting_thread.compare_exchange_strong(expected,thread_id,std::memory_order_acquire,// 成功时的内存序std::memory_order_relaxed// 失败时的内存序);}四、公平调度
4.1 调度结构
公平调度: ┌─────────────────────────────────────────────┐ │ 第一级:主队列管理器 │ │ ┌─────────────┐ ┌─────────────┐ │ │ │ 股票A队列 │ │ 股票B队列 │...│ │ │ pending=3│ │ pending=0│ │ │ └─────────────┘ └─────────────┘ │ │ │ │ │ │ ▼ ▼ │ │ ┌────────────────────────────────────┐ │ │ │ 调度链表(仅pending>0) │ │ │ │[股票A]→[股票C]→[股票E]→...│ │ │ └────────────────────────────────────┘ │ └─────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────┐ │ 第二级:多线程并行驱动 │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ 线程1│ │ 线程2│ │ 线程3│ │ │ │ 绑定核0│ │ 绑定核1│ │ 绑定核2│ │ │ └──────────┘ └──────────┘ └──────────┘ │ │ │ │ │ │ │ └───────────┼───────────┘ │ │ ▼ │ │ 竞争执行StrandQueue │ └─────────────────────────────────────────────┘4.2 调度链表
惰性调度:
- 只有
pending_messages > 0的队列才会进入调度链表 - 当队列的
pending_messages从0变为1时,自动加入调度链表 - 当队列的
pending_messages从1变为0时,自动从调度链表移除 - 调度链表操作由主队列自旋锁保护,但操作频率很低
调度链表:
classScheduleList{// 双链表节点,便于快速插入/删除structNode{StrandQueue*queue;Node*prev;Node*next;};// 头尾指针,支持FIFO调度Node*head;Node*tail;// 自旋锁保护链表操作SpinLock lock;};