news 2026/3/1 5:18:54

消息队列设计:从同步到异步的性能突破

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
消息队列设计:从同步到异步的性能突破

前言

2024年初,我们的订单系统经常出现"超时"问题。用户下单后,系统需要同时调用库存服务、支付服务、通知服务,任何一个服务慢都会导致整个请求超时。

我们决定引入消息队列,将同步调用改为异步处理。这个改造带来了显著的性能提升。


一、问题:同步调用的瓶颈

原始的订单流程是这样的:

python

@app.route('/api/orders', methods=['POST']) def create_order(): # 1. 创建订单 order = Order.create(request.json) # 2. 同步调用库存服务 inventory_response = requests.post( 'http://inventory-service/deduct', json={'product_id': order.product_id, 'quantity': order.quantity} ) if inventory_response.status_code != 200: return {"error": "库存不足"}, 400 # 3. 同步调用支付服务 payment_response = requests.post( 'http://payment-service/pay', json={'order_id': order.id, 'amount': order.amount} ) if payment_response.status_code != 200: return {"error": "支付失败"}, 400 # 4. 同步调用通知服务 notify_response = requests.post( 'http://notify-service/send', json={'order_id': order.id, 'type': 'order_created'} ) return {"order_id": order.id}, 201

问题

  • 任何一个服务慢都会导致整个请求慢;
  • 任何一个服务故障都会导致订单创建失败;
  • 耦合度太高,难以扩展。

性能数据

  • 库存服务:200ms
  • 支付服务:300ms
  • 通知服务:150ms
  • 总耗时:200 + 300 + 150 =650ms

二、解决方案:引入RabbitMQ

我们选择RabbitMQ作为消息队列。改造后的流程:

2.1 发布订单创建事件

python

import pika import json def create_order(): # 1. 创建订单 order = Order.create(request.json) # 2. 发布事件到消息队列 connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) channel = connection.channel() # 声明交换机和队列 channel.exchange_declare(exchange='orders', exchange_type='topic') # 发布消息 message = { 'order_id': order.id, 'product_id': order.product_id, 'quantity': order.quantity, 'amount': order.amount } channel.basic_publish( exchange='orders', routing_key='order.created', body=json.dumps(message) ) connection.close() # 立即返回响应 return {"order_id": order.id}, 201

耗时:仅需10ms(发布到队列)

2.2 消费者:库存服务

python

def inventory_consumer(): connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) channel = connection.channel() channel.exchange_declare(exchange='orders', exchange_type='topic') result = channel.queue_declare(queue='inventory_queue', durable=True) queue_name = result.method.queue # 绑定队列到交换机 channel.queue_bind( exchange='orders', queue=queue_name, routing_key='order.created' ) def callback(ch, method, properties, body): message = json.loads(body) try: # 扣减库存 deduct_inventory( message['product_id'], message['quantity'] ) # 确认消息 ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: # 拒绝消息,重新入队 ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) channel.basic_consume( queue=queue_name, on_message_callback=callback ) print('库存服务已启动,等待消息...') channel.start_consuming() if __name__ == '__main__': inventory_consumer()

2.3 消费者:支付服务

python

def payment_consumer(): connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) channel = connection.channel() channel.exchange_declare(exchange='orders', exchange_type='topic') result = channel.queue_declare(queue='payment_queue', durable=True) queue_name = result.method.queue channel.queue_bind( exchange='orders', queue=queue_name, routing_key='order.created' ) def callback(ch, method, properties, body): message = json.loads(body) try: # 处理支付 process_payment( message['order_id'], message['amount'] ) ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) channel.basic_consume( queue=queue_name, on_message_callback=callback ) print('支付服务已启动,等待消息...') channel.start_consuming()

2.4 消费者:通知服务

python

def notify_consumer(): connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) channel = connection.channel() channel.exchange_declare(exchange='orders', exchange_type='topic') result = channel.queue_declare(queue='notify_queue', durable=True) queue_name = result.method.queue channel.queue_bind( exchange='orders', queue=queue_name, routing_key='order.created' ) def callback(ch, method, properties, body): message = json.loads(body) try: # 发送通知 send_notification( message['order_id'], 'order_created' ) ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) channel.basic_consume( queue=queue_name, on_message_callback=callback ) print('通知服务已启动,等待消息...') channel.start_consuming()


三、可靠性保证

3.1 消息持久化

python

# 声明持久化队列 channel.queue_declare( queue='payment_queue', durable=True # 队列持久化 ) # 发布持久化消息 channel.basic_publish( exchange='orders', routing_key='order.created', body=json.dumps(message), properties=pika.BasicProperties( delivery_mode=2 # 消息持久化 ) )

3.2 消息确认机制

python

Copy code

# 手动确认消息 def callback(ch, method, properties, body): try: process_message(body) ch.basic_ack(delivery_tag=method.delivery_tag) # 确认 except Exception as e: ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) # 拒绝并重新入队 # 禁用自动确认 channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=False # 手动确认 )

3.3 死信队列

python

# 声明死信交换机 channel.exchange_declare(exchange='dlx', exchange_type='direct') channel.queue_declare(queue='dead_letter_queue', durable=True) channel.queue_bind(exchange='dlx', queue='dead_letter_queue') # 声明普通队列,指定死信交换机 channel.queue_declare( queue='payment_queue', durable=True, arguments={ 'x-dead-letter-exchange': 'dlx', 'x-dead-letter-routing-key': 'dead_letter' } )


四、监控和告警

python

import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def callback(ch, method, properties, body): start_time = time.time() try: process_message(body) duration = time.time() - start_time logger.info(f"消息处理成功, 耗时: {duration}ms") ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: logger.error(f"消息处理失败: {str(e)}") ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)


五、国际化团队的挑战

在跨国团队中,消息队列的错误日志和告警需要支持多语言。我们使用同言翻译(Transync AI)来自动翻译消息队列的错误信息和监控告警,确保不同语言背景的团队成员能够快速理解问题并做出响应。


六、性能对比

指标同步调用异步消息队列提升
平均响应时间650ms10ms-98.5%
P99响应时间2000ms50ms-97.5%
系统吞吐量1000 req/s10000 req/s+900%
故障隔离-

七、最佳实践

  1. 幂等性设计:消费者应该能够安全地处理重复消息;
  2. 超时设置:为消息处理设置合理的超时时间;
  3. 监控队列深度:及时发现消费者处理不过来的情况;
  4. 分离关注点:生产者和消费者应该解耦;
  5. 定期审查:定期检查死信队列,找出问题消息。

八、结语

消息队列的引入,从根本上改变了我们的系统架构。从同步的紧耦合,到异步的松耦合,系统的可扩展性和可靠性都得到了显著提升。

如果你的系统也在经历性能瓶颈,消息队列可能是一个很好的解决方案。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/27 16:34:43

一个完全本地运行的视频转文字工具:Vid2X

在处理内部技术视频时,我常需要提取其中的文字稿。出于内容隐私的考虑,更倾向于使用本地化工具。Vid2X 便是一个在浏览器内即可完成所有处理的视频语音识别工具,无需将文件上传至任何服务器。 它的工作模式是真正的本地处理。上传视频文件后&…

作者头像 李华
网站建设 2026/2/26 11:44:33

Java 开发最容易犯的 10 个错误

🧑 博主简介:CSDN博客专家,历代文学网(PC端可以访问:https://literature.sinhy.com/#/literature?__c1000,移动端可微信小程序搜索“历代文学”)总架构师,15年工作经验,…

作者头像 李华
网站建设 2026/2/28 7:22:29

用 Reader 建个私人图书馆,加上cpolar随时随地畅快阅读

文章目录前言:告别书荒,拯救灵魂的“摸鱼神器”1、关于Reader:小而美的开源在线阅读器2、Docker部署3、简单使用reader和添加书源4.群晖安装Cpolar工具5.创建reader阅读器的公网地址6.配置固定公网地址结尾:让阅读更自由前言&…

作者头像 李华
网站建设 2026/2/28 5:12:14

下一代盲盒系统核心架构解析:JAVA-S1如何打造极致公平与全球化体验

源码: shuai.68api.cn 随着数字经济的飞速发展,以“惊喜”和“社交”为核心的盲盒商业模式已成为消费热点。然而,盲盒系统的 一、核心挑战攻克:公平性与高并发的平衡术 1. 极致公平:基于时间戳的种子哈希随机算法 用户对盲盒抽…

作者头像 李华
网站建设 2026/2/28 21:09:10

LangGraph深度解析:从图基础到人机交互的AI工作流框架实践

摘要:LangGraph作为新兴的AI工作流编排框架,通过图结构为Agent开发提供了全新的编程范式。本文从基础概念入手,深度剖析LangGraph的核心三要素(节点、边、状态),详解并行处理、记忆管理、工具集成等高级特性…

作者头像 李华
网站建设 2026/2/28 20:27:40

C++--

set

作者头像 李华