news 2026/6/23 19:55:39

Spring Kafka 动态消费实现案例

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Spring Kafka 动态消费实现案例

动态消费的使用场景

首先,什么是动态消费? 简单来讲,就是Spring Kafka提供了安全地在运行时调整消费状态的实现,可以随时调整消费者的消费状态。 比如暂时停止正在消费消息的消费者的消费,等到合适的时机再重新从中断的地方开始消费。 再比如关闭消费者线程不再消费,在执行某些操作(如修改消费者参数)之后,再重新创建消费者并开始消费。

下面详细说说。

  • 开关降级

当一个新功能发布之后,还没有正式使用,消费线程就没有创建的必要,此时@KafkaListener注解参数autoStartup = "false"就可以实现。 如果某个功能需要降级,也可以销毁消费线程,不再注册这个消费者。

  • 延迟消费

开始消费前需要进行自检、数据准备等操作。 有时候消费消息,需要提前检查某些配置或者数据是否已经存在,或者当服务启动之后,需要异步完成一些资源的初始化,才能正常消费。此时可以等待完成准备之后,再执行start()方法创建消费线程。

  • 限流背压

在高峰期由消费者端控制,防止系统崩溃,使用pause() 暂停拉取消息、resume()恢复拉取消息。

  • 参数调整

不停机修改消费配置(如 groupId、topic),使用stop() / start() ,重新创建消费者会触发消费者重平衡,会重新建立与 Kafka 的连接。

SpringKafka消费的实现机制

在实现Spring Kafka动态调整消费之前,需要先了解Spring Kafka的消费者是如何创建的

@KafkaListener 用于声明一个方法为Kafka消息监听器。 Spring Kafka 会在应用启动时自动完成以下工作:

  1. 扫描并解析 @KafkaListener 注解
  2. 创建对应的监听端点(KafkaListenerEndpoint)
  3. 将端点注册到 KafkaListenerEndpointRegistry
  4. 使用 KafkaListenerContainerFactory 创建监听容器
  5. 启动监听容器,开始消费消息

详细描述如下:

Java @KafkaListener ↓ 【后处理器PostProcessor】 Spring在启动时会加载一个后处理器:[KafkaListenerAnnotationBeanPostProcessor] 负责扫描所有Spring Bean,查找带有 @KafkaListener 的方法。 ↓ 【监听端点ListenerEndpoint】 创建监听端点:[MethodKafkaListenerEndpoint] 负责定义要监听的主题;消费组;监听方法;消息反序列化类型;容器工厂名称等。 ↓ 【全局注册表EndpointRegistry】 Processor将Endpoint注册到全局注册表:[KafkaListenerEndpointRegistry] 保存所有监听端点;管理监听容器的生命周期(启动、停止、暂停、恢复);提供运行时访问接口。 ↓ 【容器工厂ContainerFactory】 创建监听容器工厂:[KafkaListenerContainerFactory] 注册表使用工厂创建实际的监听容器 ↓ 【监听容器ListenerContainer】 拉取消息并调用监听方法:[ConcurrentMessageListenerContainer] 根据配置(autoStartup=true)自动启动所有监听容器 ↓ KafkaConsumer

通过上面的描述我们知道,KafkaListenerEndpointRegistry注册表管理者所有容器,MessageListenerContainer 接口的实现类保存着所有消费者容器,我们可以从消费者容器中获取到我们的消费者。

那么MessageListenerContainer提供了哪些方法呢? 主要是下面4个:

  • pause()

暂停拉取消息,不释放资源;

  • resume()

恢复拉取消息,不释放资源;

  • stop()

停止容器(可重新启动),不释放资源;

  • start()

启动容器,不释放资源;

这4个方法可以分为两组,分别是

  • pause() / resume()
  • stop() / start()

如何进行选择?下面详细讲解一下它们的差异。

pause() / resume():这两个方法是对消费者的轻量级控制,暂停或恢复消息拉取,不影响容器生命周期。适用场景是临时暂停消费(例如下游系统压力过大);或实现“背压”机制;或动态限流或批量处理控制。

pause():暂停拉取消息,但保持与 Kafka Broker 的连接,不会提交偏移量,也不会关闭线程;resume():恢复拉取消息,继续消费。

pause()之后,容器仍在运行状态(isRunning() == true,但isContainerPaused() == true);消费线程仍存在;不会触发消费者重平衡;恢复后从上次暂停的偏移继续消费。

stop() / start():重级控制,停止或启动整个监听容器,会释放或重建资源。适合场景:应用启动时延迟启动消费;动态启停某个监听器;部署或维护期间临时关闭消费;切换消费配置(如 groupId、topic)。

stop():停止容器运行;关闭所有 KafkaConsumer;释放线程池和资源;容器状态变为 isRunning() == false。

start():重新创建 KafkaConsumer;重新订阅主题;启动消费线程;容器状态变为 isRunning() == true。

Consumer动态消费实现案例

消费者Consumer注解:

ini@KafkaListener( topics = "${spring.kafka.multiple.consumer.mybiz.topics}", concurrency = "${spring.kafka.multiple.consumer.mybiz.concurrency}", containerFactory = "mybizConsumer", id = "mybizListenerId", // 容器id autoStartup = "false" // 是否自动注册,是"true",否"false",默认是 )

服务启动之后,由于 autoStartup = "false" ,不会注册消费者。

可以通过下面的管理器调整消费状态:

typescript @Service public class KafkaListenerAdjuster { @Autowired private KafkaListenerEndpointRegistry registry; // 暂停消费 public void pause(String listenerId) { MessageListenerContainer container = registry.getListenerContainer(listenerId); if (container != null && container.isRunning()) { container.pause(); } } // 恢复消费 public void resume(String listenerId) { MessageListenerContainer container = registry.getListenerContainer(listenerId); if (container != null) { container.resume(); } } // 停止消费 public void stop(String listenerId) { MessageListenerContainer container = registry.getListenerContainer(listenerId); if (container != null) { container.stop(); } } // 启动消费 public void start(String listenerId) { MessageListenerContainer container = registry.getListenerContainer(listenerId); if (container != null && !container.isRunning()) { container.start(); } } // 动态调整并发数 public void changeConcurrency(String listenerId, int concurrency) { ConcurrentMessageListenerContainer<?, ?> container = (ConcurrentMessageListenerContainer<?, ?>) registry.getListenerContainer(listenerId); if (container != null) { container.setConcurrency(concurrency); } } }

总结

以上的实现在实际项目开发和生产环境,可以和分布式配置管理框架或注册中心(Spring Cloud Config、Nacos、ZooKeeper、Apollo、Consul等)结合,通过事件如SpringEvent等进行动态调整。在分布式系统或容器中,需要确保每台机器的消费状态都得到了调整。

除了动态调整消费状态之外,还可以 Spring Kafka 源码的基础之上,实现其他能力的动态调整,如增加自定义过滤器,可以实现消息的过滤消费,或者在消息中增加灰度标识等。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/23 21:30:26

Wan2.2-T2V-A14B模型推理性能调优实战技巧分享

Wan2.2-T2V-A14B模型推理性能调优实战技巧分享 在AI生成内容&#xff08;AIGC&#xff09;浪潮席卷影视、广告和短视频行业的今天&#xff0c;文本到视频&#xff08;Text-to-Video, T2V&#xff09;技术正从实验室原型快速走向商业化落地。相比图像生成&#xff0c;T2V不仅要处…

作者头像 李华
网站建设 2026/6/23 21:31:29

GraniStudio零代码平台调试算子方式有多少种?分别都是如何调试?

GraniStudio零代码平台,分为主动执行和被动执行,执行模式分为 DuBug模式和Release 模式。 可在快捷菜单栏设置调试模式,默认&#xff1a;Debug模式 主动执行 流程配置窗口 主动执行开关打开,将设置为主动执行。 被动执行 流程配置窗口 主动执行开关关闭,将设置为被动执行。 …

作者头像 李华
网站建设 2026/6/23 8:29:32

小米14C刷国际版步骤

​​​​​​https://xiaomirom.com/rom/redmi-14r-5g-poco-m7-5g-14c-5g-flame-india-fastboot-recovery-rom/ 小米14C的价格比较合适。可以买了 6125G的套餐差不多518元。再刷个全球版的ROM。 方便调用和测试。 下载ROM。 https://xiaomirom.com/download-xiaomi-flash-t…

作者头像 李华
网站建设 2026/6/23 4:28:08

智谱开源天团登陆 AtomGit,4 大模型覆盖多模态全场景!

智谱 AI 4 款多模态核心模型在 AtomGit 平台集中开源&#xff01;基于 Open-AutoGLM 、GLM-4.6V、GLM-ASR-Nano-2512、GLM-TTS 组成的模型矩阵&#xff0c;构建起 “手机操作 视觉理解 语音识别 文本转语音”的全链路多模态 AI 生态。这次开源不仅打破 “AI 只停留在聊天框”…

作者头像 李华
网站建设 2026/6/23 17:47:16

开源视频生成技术再突破:Wan2.1-FLF2V-14B模型实现720P高清流畅过渡

在人工智能生成内容&#xff08;AIGC&#xff09;领域&#xff0c;视频生成技术正经历着前所未有的快速发展。其中&#xff0c;首尾帧驱动的视频生成技术因其高效性和易用性&#xff0c;逐渐成为内容创作领域的新宠。近日&#xff0c;Wan团队正式发布了旗下最新力作——Wan2.1-…

作者头像 李华
网站建设 2026/6/23 10:35:37

教学辅助微信小程序设计毕业设计(源码+lw+部署文档+讲解等)

博主介绍&#xff1a;✌ 专注于VUE,小程序&#xff0c;安卓&#xff0c;Java,python,物联网专业&#xff0c;有18年开发经验&#xff0c;长年从事毕业指导&#xff0c;项目实战✌选取一个适合的毕业设计题目很重要。✌关注✌私信我✌具体的问题&#xff0c;我会尽力帮助你。一、…

作者头像 李华