在分布式消息队列领域,Kafka 以其高吞吐、高可用、低延迟的特性占据着核心地位。对于 Java 开发者而言,熟练掌握 Kafka 生产者(Producer)与消费者(Consumer)API 是实现业务解耦、流量削峰、日志收集等场景的基础。本文将从核心概念出发,结合完整的 Java 代码示例,详细解析 API 用法及关键参数配置,帮你快速上手并规避常见问题。
一、前置知识:Kafka 核心概念快速回顾
在进入 API 细节前,先明确几个核心术语,避免后续理解混淆:
Topic(主题):消息的分类容器,生产者向主题发送消息,消费者从主题订阅消息,是 Kafka 消息路由的核心。
Partition(分区):主题的物理分片,一个主题可包含多个分区,分区内消息有序,分区间消息无序,通过分区实现负载均衡和水平扩展。
Replica(副本):每个分区的备份,包含主副本(Leader)和从副本(Follower),Leader 处理读写请求,Follower 同步数据,保证高可用。
Producer(生产者):向 Kafka 主题发送消息的客户端。
Consumer(消费者):从 Kafka 主题订阅并消费消息的客户端,消费者可组成消费者组(Consumer Group),实现消息的负载消费和广播消费。
本文基于 Kafka 3.x 版本,依赖的 Maven 坐标如下(需根据实际版本调整):
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.5.1</version></dependency>二、Kafka 生产者 API 详解
Kafka 生产者的核心工作流程:配置参数 → 创建生产者实例 → 构建消息 → 发送消息 → 关闭生产者。其中消息发送分为同步和异步两种方式,参数配置直接影响生产性能和可靠性。
2.1 核心 API 类
ProducerConfig:包含所有生产者配置参数的常量类,用于简化配置编写。KafkaProducer<K, V>:生产者核心类,封装了与 Kafka 集群的通信逻辑,线程安全,建议单例复用。ProducerRecord<K, V>:消息载体,包含主题、分区、键、值等信息,是生产者发送的基本单元。Callback:异步发送消息时的回调接口,用于处理发送结果(成功/失败)。
2.2 完整 Java 代码示例
以下示例包含同步发送、异步发送两种方式,并添加了异常处理和资源关闭逻辑:
importorg.apache.kafka.clients.producer.*;importorg.apache.kafka.common.serialization.StringSerializer;importjava.util.Properties;importjava.util.concurrent.ExecutionException;publicclassKafkaProducerDemo{// Kafka 集群地址(多个地址用逗号分隔)privatestaticfinalStringBOOTSTRAP_SERVERS="localhost:9092,localhost:9093,localhost:9094";// 目标主题(需提前创建)privatestaticfinalStringTOPIC_NAME="test_topic";publicstaticvoidmain(String[]args){// 1. 配置生产者参数Propertiesprops=newProperties();// 核心参数:Kafka 集群地址props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);// 序列化器:键的序列化方式(需与生产者泛型 K 一致)props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 序列化器:值的序列化方式(需与生产者泛型 V 一致)props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 可选参数:消息确认机制(0-无需确认,1-主副本确认,all-所有同步副本确认)props.put(ProducerConfig.ACKS_CONFIG,"all");// 可选参数:重试次数(解决网络抖动等临时故障)props.put(ProducerConfig.RETRIES_CONFIG,3);// 可选参数:批量发送大小(达到 16KB 时批量发送)props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);// 可选参数:批量发送延迟(100ms 内若未达批量大小则强制发送)props.put(ProducerConfig.LINGER_MS_CONFIG,100);// 2. 创建生产者实例(泛型 K 为消息键类型,V 为消息值类型)KafkaProducer<String,String>producer=newKafkaProducer<>(props);try{// 3. 发送消息(同步 + 异步两种方式)for(inti=0;i<10;i++){// 构建消息:参数依次为主题、消息键、消息值ProducerRecord<String,String>record=newProducerRecord<>(TOPIC_NAME,"key_"+i,"value_"+i+": Hello Kafka!");// 方式一:同步发送(调用 get() 阻塞等待结果)if(i%2==0){try{RecordMetadatametadata=producer.send(record).get();System.out.printf("同步发送成功 - 主题:%s,分区:%d,偏移量:%d%n",metadata.topic(),metadata.partition(),metadata.offset());}catch(InterruptedException|ExecutionExceptione){System.err.printf("同步发送失败 - 消息索引:%d,原因:%s%n",i,e.getMessage());}}// 方式二:异步发送(通过 Callback 处理结果,非阻塞)else{producer.send(record,newCallback(){@OverridepublicvoidonCompletion(RecordMetadatametadata,Exceptionexception){if(exception==null){System.out.printf("异步发送成功 - 主题:%s,分区:%d,偏移量:%d%n",metadata.topic(),metadata.partition(),metadata.offset());}else{System.err.printf("异步发送失败 - 消息索引:%d,原因:%s%n",i,exception.getMessage());}}});}}}finally{// 4. 关闭生产者(必须执行,释放资源,避免消息丢失)producer.close();}}}2.3 核心参数配置解析
生产者参数直接影响可靠性、吞吐量、延迟三个核心指标,需根据业务场景权衡配置:
| 参数名(ProducerConfig 常量) | 默认值 | 核心作用 | 推荐配置 |
|---|---|---|---|
| BOOTSTRAP_SERVERS_CONFIG | 无 | 指定 Kafka 集群地址,用于建立初始连接 | 填写所有 Broker 地址(逗号分隔),提高可用性 |
| KEY_SERIALIZER_CLASS_CONFIG | 无 | 消息键的序列化器,将 Java 对象转为字节数组 | String 用 StringSerializer,自定义对象用 KafkaAvroSerializer 等 |
| VALUE_SERIALIZER_CLASS_CONFIG | 无 | 消息值的序列化器,同键序列化器要求 | 与键序列化器逻辑一致,需与消费者反序列化器匹配 |
| ACKS_CONFIG | “1” | 消息确认机制,决定生产者何时认为消息发送成功 | 高可靠场景用 “all”,高吞吐场景用 “1” 或 “0” |
| RETRIES_CONFIG | Integer.MAX_VALUE | 消息发送失败后的重试次数,解决临时故障 | 3-5 次(过多重试可能导致消息乱序) |
| BATCH_SIZE_CONFIG | 16384(16KB) | 批量发送的消息大小阈值,达到阈值则发送 | 高吞吐场景可增大至 32KB 或 64KB |
| LINGER_MS_CONFIG | 0(立即发送) | 批量发送的延迟时间,即使未达阈值也会发送 | 10-100ms(平衡吞吐量与延迟) |
| BUFFER_MEMORY_CONFIG | 33554432(32MB) | 生产者缓冲区大小,用于暂存待发送消息 | 根据并发量调整,避免缓冲区溢出 |
| MAX_BLOCK_MS_CONFIG | 60000(60s) | 发送消息时的最大阻塞时间,避免无限等待 | 10-30s(根据业务超时需求调整) |
注意:消息乱序问题——当启用重试(RETRIES_CONFIG>0)且未配置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION为 1 时,可能出现消息乱序(前一条消息重试时,后一条消息已发送成功)。若业务要求严格有序,需将该参数设为 1,代价是吞吐量下降。
三、Kafka 消费者 API 详解
Kafka 消费者的核心工作流程:配置参数 → 创建消费者实例 → 订阅主题 → 拉取消息 → 提交偏移量 → 关闭消费者。消费者的核心是“拉取消息”模式,而非推送,且偏移量管理直接影响消息是否重复消费或丢失。
3.1 核心 API 类
ConsumerConfig:消费者配置参数常量类,与生产者对应。KafkaConsumer<K, V>:消费者核心类,非线程安全,需避免多线程共享。ConsumerRecord<K, V>:消费者拉取到的消息载体,包含消息的完整信息。ConsumerRebalanceListener:消费者组重平衡时的监听接口,用于处理偏移量提交等逻辑。
3.2 完整 Java 代码示例
以下示例包含自动提交偏移量和手动提交偏移量两种方式,手动提交更能保证消息消费的可靠性:
importorg.apache.kafka.clients.consumer.*;importorg.apache.kafka.common.TopicPartition;importorg.apache.kafka.common.serialization.StringDeserializer;importjava.time.Duration;importjava.util.Collections;importjava.util.List;importjava.util.Properties;importjava.util.Map;publicclassKafkaConsumerDemo{// Kafka 集群地址privatestaticfinalStringBOOTSTRAP_SERVERS="localhost:9092,localhost:9093,localhost:9094";// 订阅的主题privatestaticfinalStringTOPIC_NAME="test_topic";// 消费者组 ID(必须指定,同一组内消费者负载消费,不同组独立消费)privatestaticfinalStringGROUP_ID="test_consumer_group";publicstaticvoidmain(String[]args){// 1. 配置消费者参数Propertiesprops=newProperties();// 核心参数:Kafka 集群地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);// 核心参数:消费者组 IDprops.put(ConsumerConfig.GROUP_ID_CONFIG,GROUP_ID);// 反序列化器:键的反序列化方式(与生产者序列化器匹配)props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 反序列化器:值的反序列化方式(与生产者序列化器匹配)props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 偏移量提交方式:false-手动提交,true-自动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 自动提交偏移量的间隔时间(仅当自动提交开启时生效)props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);// 消费位置重置策略:无偏移量或偏移量无效时的处理方式props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");// 拉取消息的超时时间props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,500);// 单次拉取的最大消息数props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,10);// 2. 创建消费者实例KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props);try{// 3. 订阅主题(可订阅单个或多个主题,这里订阅单个主题)consumer.subscribe(Collections.singletonList(TOPIC_NAME),newConsumerRebalanceListener(){// 重平衡开始前触发(用于提交当前分区的偏移量,避免消息丢失)@OverridepublicvoidonPartitionsRevoked(Collection<TopicPartition>partitions){if(!partitions.isEmpty()){consumer.commitSync(partitions);System.out.printf("重平衡前提交偏移量 - 分区数:%d%n",partitions.size());}}// 重平衡完成后触发(用于初始化新分配分区的消费位置)@OverridepublicvoidonPartitionsAssigned(Collection<TopicPartition>partitions){System.out.printf("重平衡后分配分区 - 分区数:%d,分区列表:%s%n",partitions.size(),partitions);}});// 4. 循环拉取消息(消费者核心逻辑,持续运行)while(true){// 拉取消息:参数为超时时间,若无消息则阻塞至超时ConsumerRecords<String,String>records=consumer.poll(Duration.ofMillis(1000));// 处理拉取到的消息for(ConsumerRecord<String,String>record:records){System.out.printf("消费消息 - 主题:%s,分区:%d,偏移量:%d,键:%s,值:%s%n",record.topic(),record.partition(),record.offset(),record.key(),record.value());// 模拟业务处理(实际场景中替换为真实业务逻辑)processMessage(record.value());}// 5. 手动提交偏移量(同步提交,确保消息处理完成后再提交)if(!records.isEmpty()){try{// 同步提交:阻塞等待提交结果,失败则抛出异常consumer.commitSync();System.out.println("偏移量提交成功");}catch(CommitFailedExceptione){System.err.println("偏移量提交失败:"+e.getMessage());// 处理提交失败逻辑(如重试、记录日志等)}// 异步提交(非阻塞,适合高吞吐场景,需通过回调处理失败)// consumer.commitAsync((offsets, exception) -> {// if (exception != null) {// System.err.println("异步提交失败:" + exception.getMessage());// }// });}}}finally{// 6. 关闭消费者(释放资源,关闭前会提交已拉取但未提交的偏移量)consumer.close();}}// 模拟业务处理方法privatestaticvoidprocessMessage(Stringmessage){// 实际业务逻辑:如写入数据库、调用接口等if(message.contains("error")){thrownewRuntimeException("业务处理失败:"+message);}}}3.3 核心参数配置解析
消费者参数核心关注偏移量管理、消费速度、重平衡等问题,以下是关键配置:
| 参数名(ConsumerConfig 常量) | 默认值 | 核心作用 | 推荐配置 |
|---|---|---|---|
| GROUP_ID_CONFIG | 无 | 消费者组唯一标识,Kafka 基于组 ID 进行分区分配和偏移量管理 | 业务相关命名,如 “order_consumer_group” |
| ENABLE_AUTO_COMMIT_CONFIG | true | 是否自动提交偏移量,自动提交可能导致消息未处理完成就提交 | 高可靠场景设为 false(手动提交),简单场景可设为 true |
| AUTO_OFFSET_RESET_CONFIG | “latest” | 无有效偏移量时的策略:earliest(从开头消费)、latest(从最新消费) | 初始化消费设为 earliest,增量消费设为 latest |
| MAX_POLL_RECORDS_CONFIG | 500 | 单次 poll() 方法拉取的最大消息数 | 根据业务处理速度调整,避免单次拉取过多导致超时 |
| SESSION_TIMEOUT_MS_CONFIG | 10000(10s) | 消费者组会话超时时间,超过时间未心跳则被认为下线,触发重平衡 | 10-30s,需大于 HEARTBEAT_INTERVAL_MS_CONFIG |
| HEARTBEAT_INTERVAL_MS_CONFIG | 3000(3s) | 消费者向 Coordinator 发送心跳的间隔时间,用于证明在线 | 设为 SESSION_TIMEOUT_MS_CONFIG 的 1/3 左右,如 3s |
| MAX_POLL_INTERVAL_MS_CONFIG | 300000(5min) | 两次 poll() 之间的最大间隔时间,超过则被认为能力不足,触发重平衡 | 根据业务处理耗时调整,如 1min,避免过长导致重平衡 |
偏移量提交核心原则:先处理消息,后提交偏移量。手动提交分为同步(commitSync)和异步(commitAsync),同步提交阻塞等待结果,可靠性高;异步提交非阻塞,吞吐量高,但需通过回调处理失败情况。
四、常见问题与避坑指南
4.1 生产者消息丢失
原因及解决方案:
未关闭生产者:生产者缓冲区消息未发送,需调用
close()方法。确认机制配置过低:将
ACKS_CONFIG设为 “all”,确保所有同步副本接收。未处理发送异常:同步发送需捕获
ExecutionException,异步发送需在 Callback 中处理异常。
4.2 消费者消息重复消费
原因及解决方案:
自动提交偏移量:消息未处理完成就提交偏移量,改为手动提交。
消费者下线导致重平衡:重平衡前通过
ConsumerRebalanceListener提交偏移量。业务处理失败未回滚偏移量:处理消息异常时,不提交偏移量,重试或死信队列处理。
4.3 消费者重平衡频繁
原因及解决方案:
会话超时过短:调大
SESSION_TIMEOUT_MS_CONFIG,确保业务处理在超时内完成。poll 间隔过长:调小
MAX_POLL_INTERVAL_MS_CONFIG,或减少单次 poll 消息数。消费者组内成员不稳定:检查消费者实例是否频繁上下线,优化部署稳定性。
五、总结
Kafka 生产者与消费者 API 是 Java 开发者与 Kafka 交互的核心工具,掌握其用法需重点关注三个维度:流程规范性(如生产者关闭、消费者循环 poll)、参数合理性(根据业务权衡可靠性与吞吐量)、异常处理完整性(避免消息丢失或重复)。
实际开发中,建议结合具体业务场景(如金融场景优先保证可靠性,日志场景优先保证吞吐量)调整参数配置,并通过监控工具(如 Kafka Eagle、Prometheus)跟踪生产者/消费者的运行状态,及时发现并解决问题。