news 2026/2/4 12:58:12

解决MQ消息丢失问题的5种方案

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
解决MQ消息丢失问题的5种方案

前言

今天我们来聊聊一个让很多开发者头疼的话题——MQ消息丢失问题。

有些小伙伴在工作中,一提到消息队列就觉得很简单,但真正遇到线上消息丢失时,排查起来却让人抓狂。

其实,我在实际工作中,也遇到过MQ消息丢失的情况。

今天这篇文章,专门跟大家一起聊聊这个话题,希望对你会有所帮助。

一、消息丢失的三大环节

在深入解决方案之前,我们先搞清楚消息在哪几个环节可能丢失:

1. 生产者发送阶段

网络抖动导致发送失败

生产者宕机未发送

Broker处理失败未返回确认

2. Broker存储阶段

内存消息未持久化,重启丢失

磁盘故障导致数据丢失

集群切换时消息丢失

3. 消费者处理阶段

自动确认模式下处理异常

消费者宕机处理中断

手动确认但忘记确认

理解了问题根源,接下来我们看5种实用的解决方案。

二、方案一:生产者确认机制

核心原理

生产者发送消息后等待Broker确认,确保消息成功到达。

这是防止消息丢失的第一道防线。

关键实现

// RabbitMQ生产者确认配置

@Bean

public RabbitTemplate rabbitTemplate() {

RabbitTemplate template = new RabbitTemplate(connectionFactory);

template.setConfirmCallback((correlationData, ack, cause) -> {

if (ack) {

// 消息成功到达Broker

messageStatusService.markConfirmed(correlationData.getId());

} else {

// 发送失败,触发重试

retryService.scheduleRetry(correlationData.getId());

}

});

return template;

}

// 可靠发送方法

public void sendReliable(String exchange, String routingKey, Object message) {

String messageId = generateId();

// 先落库保存发送状态

messageStatusService.saveSendingStatus(messageId, message);

// 发送持久化消息

rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {

msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);

msg.getMessageProperties().setMessageId(messageId);

return msg;

}, new CorrelationData(messageId));

}

适用场景

对消息可靠性要求高的业务

金融交易、订单处理等关键业务

需要精确知道消息发送结果的场景

三、方案二:消息持久化机制

核心原理

将消息保存到磁盘,确保Broker重启后消息不丢失。

这是防止Broker端消息丢失的关键。

关键实现

// 持久化队列配置

@Bean

public Queue orderQueue() {

return QueueBuilder.durable("order.queue") // 队列持久化

.deadLetterExchange("order.dlx") // 死信交换机

.build();

}

// 发送持久化消息

public void sendPersistentMessage(Object message) {

rabbitTemplate.convertAndSend("order.exchange", "order.create", message, msg -> {

msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 消息持久化

return msg;

});

}

// Kafka持久化配置

@Bean

public ProducerFactory<String, Object> producerFactory() {

Map<String, Object> props = new HashMap<>();

props.put(ProducerConfig.ACKS_CONFIG, "all"); // 所有副本确认

props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性

return new DefaultKafkaProducerFactory<>(props);

}

优缺点

优点:

有效防止Broker重启导致的消息丢失

配置简单,效果明显

缺点:

磁盘IO影响性能

需要足够的磁盘空间

四、方案三:消费者确认机制

核心原理

消费者处理完消息后手动向Broker发送确认,Broker收到确认后才删除消息。

这是保证消息不丢失的最后一道防线。

关键实现

// 手动确认消费者

@RabbitListener(queues = "order.queue")

public void handleMessage(Order order, Message message, Channel channel) {

long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {

// 业务处理

orderService.processOrder(order);

// 手动确认

channel.basicAck(deliveryTag, false);

log.info("消息处理完成: {}", order.getOrderId());

} catch (Exception e) {

log.error("消息处理失败: {}", order.getOrderId(), e);

// 处理失败,重新入队

channel.basicNack(deliveryTag, false, true);

}

}

// 消费者容器配置

@Bean

public SimpleRabbitListenerContainerFactory containerFactory() {

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动确认

factory.setPrefetchCount(10); // 预取数量

factory.setConcurrentConsumers(3); // 并发消费者

return factory;

}

注意事项

确保业务处理完成后再确认

合理设置预取数量,避免内存溢出

处理异常时要正确使用NACK

五、方案四:事务消息机制

核心原理

通过事务保证本地业务操作和消息发送的原子性,要么都成功,要么都失败。

关键实现

// 本地事务表方案

@Transactional

public void createOrder(Order order) {

// 1. 保存订单到数据库

orderRepository.save(order);

// 2. 保存消息到本地消息表

LocalMessage localMessage = new LocalMessage();

localMessage.setBusinessId(order.getOrderId());

localMessage.setContent(JSON.toJSONString(order));

localMessage.setStatus(MessageStatus.PENDING);

localMessageRepository.save(localMessage);

// 3. 事务提交,本地业务和消息存储保持一致性

}

// 定时任务扫描并发送消息

@Scheduled(fixedDelay = 5000)

public void sendPendingMessages() {

List<LocalMessage> pendingMessages = localMessageRepository.findByStatus(MessageStatus.PENDING);

for (LocalMessage message : pendingMessages) {

try {

// 发送消息到MQ

rabbitTemplate.convertAndSend("order.exchange", "order.create", message.getContent());

// 更新消息状态为已发送

message.setStatus(MessageStatus.SENT);

localMessageRepository.save(message);

} catch (Exception e) {

log.error("发送消息失败: {}", message.getId(), e);

}

}

}

// RocketMQ事务消息

public void sendTransactionMessage(Order order) {

TransactionMQProducer producer = new TransactionMQProducer("order_producer");

// 发送事务消息

Message msg = new Message("order_topic", "create",

JSON.toJSONBytes(order));

TransactionSendResult result = producer.sendMessageInTransaction(msg, null);

if (result.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {

log.info("事务消息提交成功");

}

}

适用场景

需要严格保证业务和消息一致性的场景

分布式事务场景

金融、电商等对数据一致性要求高的业务

六、方案五:消息重试与死信队列

核心原理

通过重试机制处理临时故障,通过死信队列处理最终无法消费的消息。

关键实现

// 重试队列配置

@Bean

public Queue orderQueue() {

return QueueBuilder.durable("order.queue")

.withArgument("x-dead-letter-exchange", "order.dlx") // 死信交换机

.withArgument("x-dead-letter-routing-key", "order.dead")

.withArgument("x-message-ttl", 60000) // 60秒后进入死信

.build();

}

// 死信队列配置

@Bean

public Queue orderDeadLetterQueue() {

return QueueBuilder.durable("order.dead.queue").build();

}

// 消费者重试逻辑

@RabbitListener(queues = "order.queue")

public void handleMessageWithRetry(Order order, Message message, Channel channel) {

long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {

orderService.processOrder(order);

channel.basicAck(deliveryTag, false);

} catch (TemporaryException e) {

// 临时异常,重新入队重试

channel.basicNack(deliveryTag, false, true);

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/4 11:50:49

芜湖,千兆网络下载速率只有10MB秒,过的什么苦日子

第一坑&#xff1a;百度网盘的“灵魂限速”果然&#xff0c;下载链接指向了那个让人又爱又恨的百度网盘。非会员的下载速度&#xff1f;稳定在100KB/秒左右&#xff0c;好家伙&#xff0c;算下来得下一整天……我是那种坐以待毙的人吗&#xff1f;当然不&#xff01;我默默打开…

作者头像 李华
网站建设 2026/1/25 22:02:03

AI一周大事盘点(2025年12月14日~2025年12月20日)

【摘要】2025年12月第三周&#xff0c;全球AI领域呈现出三大核心趋势&#xff1a;首先&#xff0c;模型技术层面&#xff0c;以谷歌Gemini 3 Flash为代表的高性价比轻量级模型实现关键突破&#xff0c;为智能体&#xff08;Agent&#xff09;大规模应用奠定基础&#xff0c;同时…

作者头像 李华
网站建设 2026/2/4 10:08:15

K3s + Sysbox:让容器拥有“虚拟机的灵魂”

Containerd 与 Runc 的关系首先&#xff0c;让我们简要了解一下 containerd 是如何与 runc 协作的。containerd 是一个常驻的守护进程&#xff0c;主要负责以下任务&#xff1a;镜像管理&#xff1a;从镜像仓库拉取并存储镜像。容器管理&#xff1a;管理容器生命周期&#xff0…

作者头像 李华
网站建设 2026/2/5 2:23:54

8 个降AI率工具推荐,继续教育学生必备

8 个降AI率工具推荐&#xff0c;继续教育学生必备 AI降重工具&#xff0c;让论文更自然更安心 随着人工智能技术的不断进步&#xff0c;越来越多的学生和研究人员在撰写论文时会借助AI工具进行辅助。然而&#xff0c;AI生成的内容往往存在明显的痕迹&#xff0c;容易被查重系统…

作者头像 李华
网站建设 2026/2/4 4:57:18

从开发一个AI美女聊天群组开始

ramework。很多开发者可能会有疑问&#xff1a;为什么微软要推出这么多框架&#xff1f;它们之间有什么区别&#xff1f;本文将通过一个实际的AI美女聊天群组项目&#xff0c;带你深入理解Microsoft Agent Framework&#xff0c;掌握多智能体开发的核心概念。本文的示例代码已开…

作者头像 李华