《RabbitMQ四板斧,订单超时取消就该这么干!》
我是小坏,今儿个咱不扯那么多花里胡哨的,直接捞干的讲。昨天说了缓存,今儿聊聊消息队列。RabbitMQ这玩意儿,说简单也简单,说难也难。关键是你得知道啥时候用,咋用。
零基础全栈开发Java微服务版本实战-后端-前端-运维-实战企业级三个实战项目
资源获取:关注公众号: 小坏说Java ,获取本文所有示例代码、配置模板及导出工具。
一、别急着上技术,先想清楚场景
零基础全栈开发Java微服务版本实战-后端-前端-运维-实战企业级三个实战项目
资源获取:关注公众号: 小坏说Java ,获取本文所有示例代码、配置模板及导出工具。
前两天有个老弟问我:“哥,订单30分钟不支付自动取消,咋实现?”
我问:“你现在咋弄的?”
他说:“定时任务,每分钟扫一遍表。”
我问:“多少订单?”
他说:“一天十几万吧。”
我说:“那你数据库不得被你扫秃噜皮了?”
定时任务扫表的毛病:
- 每分钟全表扫描,数据库压力大
- 取消时间不精准(可能晚1分钟)
- 订单多了根本扫不动
换成消息队列,咋弄:
- 用户下单 → 发个30分钟后到期的消息
- RabbitMQ到点告诉你:“该取消了!”
- 直接处理,不用扫表
二、RabbitMQ快速上手
零基础全栈开发Java微服务版本实战-后端-前端-运维-实战企业级三个实战项目
资源获取:关注公众号: 小坏说Java ,获取本文所有示例代码、配置模板及导出工具。
2.1 先整明白四个概念
- 生产者:发消息的(比如下单服务)
- 消费者:收消息的(比如取消订单服务)
- 交换机:邮局,决定把信往哪儿送
- 队列:邮箱,消息在这儿等着
2.2 5分钟跑起来
第一步:加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>第二步:写配置
spring:rabbitmq:host:localhostport:5672username:guestpassword:guest# 重点:开启消息确认publisher-confirms:truepublisher-returns:true第三步:发消息
@RestControllerpublicclassOrderController{@AutowiredprivateRabbitTemplaterabbitTemplate;@PostMapping("/order")publicStringcreateOrder(@RequestBodyOrderorder){// 1. 保存订单到数据库orderService.save(order);// 2. 发个延迟消息,30分钟后取消rabbitTemplate.convertAndSend("order.exchange",// 交换机"order.cancel",// 路由键order.getId(),// 订单IDmessage->{// 设置30分钟后过期message.getMessageProperties().setExpiration("1800000");// 30分钟=1800000毫秒returnmessage;});return"下单成功,请30分钟内支付";}}三、四种交换机模式,别用错了
3.1 Direct(直连)模式
特点:一把钥匙开一把锁
场景:订单取消(指定队列)
@ConfigurationpublicclassDirectConfig{// 创建队列@BeanpublicQueuecancelQueue(){returnnewQueue("order.cancel.queue");}// 创建交换机@BeanpublicDirectExchangeorderExchange(){returnnewDirectExchange("order.exchange");}// 绑定:把队列绑到交换机上@BeanpublicBindingcancelBinding(){returnBindingBuilder.bind(cancelQueue()).to(orderExchange()).with("order.cancel");// 路由键}}3.2 Fanout(广播)模式
特点:大喇叭广播,谁都能听见
场景:订单创建成功,通知所有相关服务
@ConfigurationpublicclassFanoutConfig{// 三个队列:发短信、发邮件、更新统计@BeanpublicQueuesmsQueue(){returnnewQueue("sms.queue");}@BeanpublicQueueemailQueue(){returnnewQueue("email.queue");}@BeanpublicQueuestatsQueue(){returnnewQueue("stats.queue");}// 广播交换机@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("order.success.exchange");}// 都绑到同一个交换机@BeanpublicBindingsmsBinding(){/* ... */}@BeanpublicBindingemailBinding(){/* ... */}@BeanpublicBindingstatsBinding(){/* ... */}}3.3 Topic(主题)模式
特点:按话题订阅
场景:商品更新,不同服务关心不同商品
// 路由键规则:order.手机.创建// order.电脑.创建// order.手机.取消@ConfigurationpublicclassTopicConfig{@BeanpublicTopicExchangetopicExchange(){returnnewTopicExchange("order.topic.exchange");}// 监听所有手机订单@BeanpublicQueuephoneQueue(){returnnewQueue("phone.queue");}@BeanpublicBindingphoneBinding(){returnBindingBuilder.bind(phoneQueue()).to(topicExchange()).with("order.手机.*");// * 匹配一个词}// 监听所有创建订单@BeanpublicQueuecreateQueue(){returnnewQueue("create.queue");}@BeanpublicBindingcreateBinding(){returnBindingBuilder.bind(createQueue()).to(topicExchange()).with("order.*.创建");// # 匹配多个词}}3.4 Headers模式(用的少)
特点:根据消息头匹配
场景:特殊需求,一般用不到
四、死信队列:订单取消的核心
4.1 啥是死信?
消息变成“死信”的三种情况:
- 消息被拒绝(并且不重新入队)
- 消息过期
- 队列满了
4.2 订单取消实战
零基础全栈开发Java微服务版本实战-后端-前端-运维-实战企业级三个实战项目
资源获取:关注公众号: 小坏说Java ,获取本文所有示例代码、配置模板及导出工具。
@ConfigurationpublicclassDeadLetterConfig{// 1. 创建死信交换机(就是个普通交换机)@BeanpublicDirectExchangedeadLetterExchange(){returnnewDirectExchange("dead.letter.exchange");}// 2. 创建死信队列@BeanpublicQueuedeadLetterQueue(){returnnewQueue("dead.letter.queue");}// 3. 绑定死信队列@BeanpublicBindingdeadLetterBinding(){returnBindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("dead.letter");}// 4. 创建订单队列,并指定死信交换机@BeanpublicQueueorderQueue(){Map<String,Object>args=newHashMap<>();args.put("x-dead-letter-exchange","dead.letter.exchange");// 死信交换机args.put("x-dead-letter-routing-key","dead.letter");// 死信路由键args.put("x-message-ttl",1800000);// 30分钟过期returnnewQueue("order.queue",true,false,false,args);}// 5. 监听死信队列(处理过期订单)@Component@Slf4jpublicclassOrderCancelConsumer{@RabbitListener(queues="dead.letter.queue")publicvoidcancelOrder(StringorderId){log.info("订单 {} 30分钟未支付,开始取消",orderId);// 1. 查询订单状态Orderorder=orderService.findById(orderId);if(order==null||!"待支付".equals(order.getStatus())){log.warn("订单 {} 状态异常,无需取消",orderId);return;}// 2. 取消订单order.setStatus("已取消");order.setCancelTime(newDate());order.setCancelReason("超时未支付");orderService.update(order);// 3. 释放库存inventoryService.releaseStock(order.getProductId(),order.getQuantity());// 4. 通知用户notifyService.sendCancelNotice(order.getUserId(),orderId);log.info("订单 {} 取消完成",orderId);}}}五、消息可靠性:别把订单弄丢了
5.1 生产者确认(发出去要知道成没成)
spring:rabbitmq:# 开启确认回调publisher-confirms:truepublisher-returns:true@Component@Slf4jpublicclassRabbitConfirmCallbackimplementsRabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{@AutowiredprivateRabbitTemplaterabbitTemplate;@PostConstructpublicvoidinit(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);rabbitTemplate.setMandatory(true);// 消息无法路由时返回给生产者}// 消息是否成功到达交换机@Overridepublicvoidconfirm(CorrelationDatacorrelationData,booleanack,Stringcause){if(ack){log.info("消息到达交换机,消息ID:{}",correlationData.getId());}else{log.error("消息未到达交换机,原因:{},消息ID:{}",cause,correlationData.getId());// 这里可以重发或者记录到数据库resendMessage(correlationData);}}// 消息是否成功到达队列(路由失败时触发)@OverridepublicvoidreturnedMessage(Messagemessage,intreplyCode,StringreplyText,Stringexchange,StringroutingKey){log.error("消息路由失败,交换机:{},路由键:{},消息:{}",exchange,routingKey,newString(message.getBody()));// 处理路由失败的消息handleRoutingFailure(message,exchange,routingKey);}}5.2 消费者确认(处理完要告诉RabbitMQ)
@Component@Slf4jpublicclassOrderConsumer{// 手动确认模式@RabbitListener(queues="order.queue",ackMode="MANUAL")publicvoidhandleOrder(Messagemessage,Channelchannel,@Header(AmqpHeaders.DELIVERY_TAG)longdeliveryTag){try{StringorderId=newString(message.getBody());// 处理订单booleansuccess=processOrder(orderId);if(success){// 处理成功,确认消息channel.basicAck(deliveryTag,false);log.info("订单处理成功:{}",orderId);}else{// 处理失败,拒绝消息(不重新入队)channel.basicNack(deliveryTag,false,false);log.error("订单处理失败:{}",orderId);// 记录到数据库,人工处理saveFailedMessage(orderId);}}catch(Exceptione){log.error("处理订单异常",e);try{// 发生异常,拒绝消息(重新入队)channel.basicNack(deliveryTag,false,true);}catch(IOExceptionex){log.error("拒绝消息失败",ex);}}}// 自动确认(小心使用)@RabbitListener(queues="sms.queue",ackMode="AUTO")publicvoidhandleSms(Stringphone){// 这里如果抛异常,消息会重新入队// 如果是发短信,可能造成重复发送smsService.send(phone);}}六、实战:电商订单完整流程
6.1 下单流程
@Service@Transactional@Slf4jpublicclassOrderService{@AutowiredprivateRabbitTemplaterabbitTemplate;publicOrderResultcreateOrder(OrderRequestrequest){// 1. 扣减库存(预占库存)booleanstockSuccess=inventoryService.lockStock(request.getProductId(),request.getQuantity());if(!stockSuccess){returnOrderResult.fail("库存不足");}// 2. 创建订单Orderorder=newOrder();order.setStatus("待支付");order.setCreateTime(newDate());// ... 其他字段order=orderRepository.save(order);try{// 3. 发送延迟消息(30分钟取消)rabbitTemplate.convertAndSend("order.exchange","order.create",order.getId(),message->{message.getMessageProperties().setExpiration("1800000");// 30分钟// 设置消息ID,用于追踪message.getMessageProperties().setCorrelationId(order.getId());returnmessage;});// 4. 发送创建成功通知(广播)rabbitTemplate.convertAndSend("order.success.exchange","",order.getId()// 广播不需要路由键);returnOrderResult.success(order);}catch(Exceptione){// 消息发送失败,回滚订单log.error("订单创建消息发送失败,回滚订单",e);thrownewRuntimeException("订单创建失败",e);}}}6.2 支付成功处理
@Service@Slf4jpublicclassPaymentService{@AutowiredprivateRabbitTemplaterabbitTemplate;@TransactionalpublicvoidhandlePaymentSuccess(StringorderId){// 1. 更新订单状态Orderorder=orderRepository.findById(orderId).orElseThrow();order.setStatus("已支付");order.setPayTime(newDate());orderRepository.save(order);// 2. 扣减真实库存(预占转实际)inventoryService.deductStock(order.getProductId(),order.getQuantity());// 3. 取消之前的延迟消息(如果还没被消费)// 注意:RabbitMQ不支持直接取消延迟消息// 需要在消费者端做幂等性处理// 4. 发送支付成功通知rabbitTemplate.convertAndSend("order.exchange","order.pay.success",orderId);log.info("订单 {} 支付成功处理完成",orderId);}}七、常见问题与解决方案
问题1:消息重复消费
场景:网络问题导致消费者确认失败,消息重新入队
解决:消费者做幂等处理
@ServicepublicclassOrderCancelService{@AutowiredprivateRedisTemplate<String,String>redisTemplate;@RabbitListener(queues="dead.letter.queue")publicvoidcancelOrder(StringorderId){// 使用Redis做幂等性控制Stringkey="order:cancel:"+orderId;// 使用setnx,只有第一次能设置成功Booleansuccess=redisTemplate.opsForValue().setIfAbsent(key,"processing",5,TimeUnit.MINUTES);if(success!=null&&success){// 第一次处理doCancelOrder(orderId);}else{// 已经处理过或在处理中log.warn("订单 {} 取消请求重复,忽略",orderId);}}}问题2:消息顺序问题
场景:先支付成功,后取消订单
解决:版本号或状态机
publicclassOrder{@VersionprivateIntegerversion;// 乐观锁版本号publicbooleancanCancel(){// 只有待支付状态才能取消return"待支付".equals(this.status);}publicbooleancanPay(){// 只有待支付状态才能支付return"待支付".equals(this.status);}}问题3:队列积压
场景:消费者挂了,消息堆积
解决:监控+告警+扩容
# 监控RabbitMQmanagement:endpoints:web:exposure:include:health,metricsmetrics:export:prometheus:enabled:true八、性能优化建议
8.1 连接池配置
spring:rabbitmq:# 连接池配置cache:channel:size:25# 通道缓存大小connection:mode:channel# 连接模式size:5# 连接池大小8.2 批量确认
channel.basicAck(deliveryTag,true);// 批量确认8.3 预取数量
@BeanpublicSimpleRabbitListenerContainerFactoryrabbitListenerContainerFactory(ConnectionFactoryconnectionFactory){SimpleRabbitListenerContainerFactoryfactory=newSimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setPrefetchCount(10);// 每次预取10条returnfactory;}九、今日要点总结
- 延迟消息用死信队列:订单取消、优惠券过期
- 广播用Fanout:通知多个服务
- 精准路由用Direct:指定到具体队列
- 灵活订阅用Topic:按规则订阅
- 消息一定要可靠:确认机制+幂等处理
- 监控不能少:队列长度、消费速度
十、思考题
场景:双十一大促,订单量暴涨100倍
- 消息队列如何扩容?
- 消费者挂了怎么办?
- 如何保证消息不丢失?
- 如何快速处理积压消息?
评论区聊聊你的方案,明儿咱们讲搜索。
明天预告:《SpringBoot+ES:打造毫秒级搜索》
今日福利:关注后回复“RabbitMQ”,获取完整订单系统源码。
公众号运营小贴士:
零基础全栈开发Java微服务版本实战-后端-前端-运维-实战企业级三个实战项目
资源获取:关注公众号: 小坏说Java ,获取本文所有示例代码、配置模板及导出工具。
🏷️标签:#RabbitMQ #消息队列 #订单系统
💡互动:
- 你用过消息队列解决过啥问题?
- 投票:你们公司用RabbitMQ还是Kafka?
- 留言提问,明天文章解答
🎁福利:
- 留言区抽3人送《RabbitMQ实战》
- 转发截图送配置文件模板
👥进群:
扫码加技术群,获取完整源码
零基础全栈开发Java微服务版本实战-后端-前端-运维-实战企业级三个实战项目
资源获取:关注公众号: 小坏说Java ,获取本文所有示例代码、配置模板及导出工具。