当前位置: 首页 > news >正文

002 Java操作kafka客户端

Java操作kafka客户端

文章目录

  • Java操作kafka客户端
    • 3.Java操作kafka客户端
      • 1.引入依赖
      • 2. Kafka服务配置
      • 3、生产者(Producer)实现
        • 1. 基础配置与发送消息
        • 2. 关键配置说明
      • 4.消费者(Consumer)实现
        • 1. 基础配置与消费消息
        • 2. 关键配置说明
      • 3.auto.offset.reset参数可选值及行为
        • 1.代码示例与行为验证
          • 1. 配置为 `earliest`
          • 2. 配置为 `latest`
          • 3. 配置为 `none`
        • 2.关键注意事项
          • 1. Offset 提交机制的影响
          • 2. 消费者组隔离性
          • 3. 命令行验证 Offset
        • 3、生产环境最佳实践
        • 4、常见问题解答
          • Q:配置了 `latest`,为什么还能消费到旧消息?
          • Q:如何让消费者组永久保留 Offset?
      • 5.主题管理示例(AdminClient)
      • 6.最佳实践与注意事项
      • 7.关于flush和close方法的说明

来源参考的deepseek,如有侵权联系立删

3.Java操作kafka客户端

Java API提供以下核心接口:

  • Producer API:发送消息。
  • Consumer API:订阅消息。
  • Streams API:流式处理。
  • Admin API:管理Topic和集群。

1.引入依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.4.0</version> 
</dependency>

2. Kafka服务配置

确保已启动Zookeeper和Kafka服务,默认端口分别为21819092

3、生产者(Producer)实现

1. 基础配置与发送消息

无需提前创建topic

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerDemo {
    public static void main(String[] args) {
        // 1. 配置生产者参数
        Properties props = new Properties();
        // Broker地址
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 消息确认机制
        props.put("acks", "all");
        // 重试次数
        props.put("retries", 3);

        // 2. 创建生产者实例
        try (Producer<String, String> producer = new KafkaProducer<>(props)) {
            // 3. 构造消息并发送
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>(
                        "test-topic", // 主题名称
                        "key-" + i,   // 消息键
                        "value-" + i  // 消息值
                );
                // 异步发送(可改用get()同步等待)
                producer.send(record, (metadata, exception) -> {
                    if (exception == null) {
                        System.out.printf("消息发送成功:topic=%s, partition=%d, offset=%d%n",
                                metadata.topic(), metadata.partition(), metadata.offset());
                    } else {
                        exception.printStackTrace();
                    }
                });
            }
            producer.flush(); // 确保所有消息发送完成
        }
    }
}

在这里插入图片描述

2. 关键配置说明
参数说明
bootstrap.serversBroker地址列表,多个用逗号分隔
key.serializer键的序列化类(如StringSerializer)
value.serializer值的序列化类
acks消息持久化确认机制(0/1/all
retries发送失败后的重试次数
batch.size批量发送的消息大小(字节)

4.消费者(Consumer)实现

1. 基础配置与消费消息
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerDemo {
    public static void main(String[] args) {
        // 1. 配置消费者参数
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("group.id", "test-group"); // 消费者组ID
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest"); // 从最早消息开始消费
        props.put("enable.auto.commit", "false");   // 关闭自动提交偏移量

        // 2. 创建消费者实例
        try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList("test-topic")); // 订阅主题

            while (true) {
                // 3. 轮询消息(超时时间100ms)
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("收到消息:topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
                // 4. 手动提交偏移量(同步提交)
                consumer.commitSync();
            }
        }
    }
}

在这里插入图片描述
在这里插入图片描述

可收到实时消费的消息,但队列中消息并没有移除,

  • 消息保留规则由 Broker 配置控制,与消费者无关。
  • 消费者 Offset 仅标记消费进度,不会删除消息。
  • 通过 kafka-consumer-groups.sh 工具监控消费状态。
  • 生产环境中,合理设置 log.retention.hourslog.retention.bytes
2. 关键配置说明
参数说明
group.id消费者组ID,相同组内共享分区
auto.offset.reset无偏移量时的策略(earliest/latest
enable.auto.commit是否自动提交偏移量(建议false手动控制)
max.poll.records单次poll最大消息数

3.auto.offset.reset参数可选值及行为

作用典型场景
earliest从分区的最早消息开始消费(从头消费)需要处理 Topic 中所有历史消息
latest从分区的最新消息开始消费(仅消费新消息)实时处理最新数据,忽略历史消息
none抛出异常(NoOffsetForPartitionException需要严格确保 Offset 有效性
1.代码示例与行为验证
1. 配置为 earliest
props.put("auto.offset.reset", "earliest");

参数生效的触发条件

场景auto.offset.reset 是否生效消费起始位置
消费者组首次启动(无 Offset)根据参数值(earliest/latest
Offset 已提交且有效(未过期)从已提交 Offset 继续消费
Offset 已过期(消息被删除)根据参数值重新定位

行为

  • 如果消费者组首次启动,会从 Topic 每个分区的第一条消息开始消费。
  • 如果 Offset 过期(例如消息被删除),会从现存的最早消息开始消费。

适用场景

  • 数据回放(重放全部历史数据)
  • 测试环境需要消费完整数据集
2. 配置为 latest
props.put("auto.offset.reset", "latest");

行为

  • 如果消费者组首次启动,只消费启动后新写入的消息。
  • 如果 Offset 过期,会从当前最新消息开始消费。

适用场景

  • 生产环境实时处理(避免处理历史积压数据)
  • 日志收集系统(只需最新日志)
3. 配置为 none
props.put("auto.offset.reset", "none");

行为

  • 如果 Offset 无效,直接抛出 NoOffsetForPartitionException
  • 需手动处理异常或确保 Offset 始终有效。

适用场景

  • 高可靠性系统(需严格监控 Offset 有效性)
2.关键注意事项
1. Offset 提交机制的影响
  • 如果启用了自动提交 (enable.auto.commit=true),消费者会定期提交 Offset。
    重复消费风险:若消息处理失败但 Offset 已提交,会导致消息丢失。
  • 推荐做法
  props.put("enable.auto.commit", "false"); // 关闭自动提交
  // 处理完消息后手动提交 Offset
  consumer.commitSync();
2. 消费者组隔离性
  • 不同group.id的 Offset 互相独立。例如:
    • 消费者组 A(group.id=group1)配置为 latest → 只消费新消息。
    • 消费者组 B(group.id=group2)配置为 earliest → 可以消费全部消息。
3. 命令行验证 Offset

通过 Kafka 工具查看消费者组的 Offset:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group your-group-id

输出示例:

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
test-topic      0          5000            10000           5000
  • LAG:未消费的消息数量。若 LAG 持续增长,说明消费速度跟不上生产速度。

3、生产环境最佳实践
  1. 明确业务需求
    • 需要重放数据 → earliest
    • 仅处理实时数据 → latest
  2. 监控 Offset 提交
    • 使用 kafka-consumer-groups.sh 定期检查 LAG。
    • 集成监控系统(如 Prometheus + Grafana)。
  3. 防御性代码
   try {
       while (true) {
           ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
           // 处理消息
           consumer.commitSync(); // 同步提交
       }
   } catch (NoOffsetForPartitionException e) {
       // 处理 Offset 无效的极端情况
       logger.error("Offset 无效,需人工介入!", e);
   }

4、常见问题解答
Q:配置了 latest,为什么还能消费到旧消息?
  • 可能原因
    消费者组之前已提交过 Offset,且当前 Offset 指向旧消息位置。
  • 解决
    重置消费者组 Offset:
  bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
    --group your-group-id --reset-offsets --to-latest --execute --topic test-topic
Q:如何让消费者组永久保留 Offset?
  • Kafka 默认行为
    Offset 存储在内部 Topic __consumer_offsets 中,默认保留时间为 7 天。
  • 修改保留策略
  # 修改 Offset 保留时间(单位:毫秒)
  bin/kafka-configs.sh --bootstrap-server localhost:9092 \
    --entity-type topics --entity-name __consumer_offsets \
    --alter --add-config retention.ms=604800000

5.主题管理示例(AdminClient)

import org.apache.kafka.clients.admin.*;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaAdminDemo {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");

        try (AdminClient admin = AdminClient.create(props)) {
            // 创建主题(3分区,1副本)
            NewTopic newTopic = new NewTopic("test-topic2", 3, (short) 1);
            CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic));
            result.all().get(); // 阻塞等待创建完成
            System.out.println("主题创建成功");
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在这里插入图片描述

6.最佳实践与注意事项

  1. 生产者优化
    • 启用压缩(compression.type=snappy)减少网络开销。
    • 合理设置batch.sizelinger.ms提高吞吐量。
  2. 消费者可靠性
    • 使用手动提交偏移量,避免消息丢失或重复消费。
    • 处理CommitFailedException,防止因处理超时导致提交失败。
  3. 序列化选择
    • 默认支持String、ByteArray等序列化器。
    • 复杂对象推荐使用JSON(Jackson)或Avro。
  4. 消费者组管理
    • 通过kafka-consumer-groups.sh工具监控消费进度。
    • 避免频繁重平衡(Rebalance),调整session.timeout.ms参数。

7.关于flush和close方法的说明

  • flush():强制发送缓冲区中所有未发送的消息(同步等待发送完成)
  • close():释放生产者占用的所有资源(包括线程、网络连接、内存等)

若未调用close()可能导致:

  • 线程泄漏:生产者后台的Sender线程未终止
  • 连接泄漏:与Broker的TCP连接未关闭
  • 内存泄漏:未释放消息缓冲区内存

可通过jstackVisualVM工具检查线程状态验证。

关键区别说明

方法作用是否必须调用是否自动包含对方功能
flush()清空发送缓冲区,确保所有消息被发送可选(按需调用)❌ 不释放资源
close()关闭生产者并释放资源必须调用✅ 内部会自动调用flush()

正确写法(推荐):

try (Producer<String, String> producer = new KafkaProducer<>(props)) {
    producer.send(record);
    producer.flush(); // 显式清空缓冲区(可选)
} // 自动调用close(),包含flush()

错误写法(资源泄漏):

Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(record);
producer.flush(); 
// 忘记调用close() → 线程/连接未释放!

最佳实践建议

1.优先使用try-with-resources(Java 7+特性):

   try (Producer<String, String> producer = new KafkaProducer<>(props)) {
       // 发送消息...
   } // 自动调用close()

这是最安全的写法,无需手动调用flush()close()

2.需要立即发送时

   producer.send(record);
   producer.flush(); // 强制立即发送(如实时系统关键消息)
   // ...其他操作...
   producer.close(); // 仍需显式关闭

3.不要依赖finalize()
Kafka客户端的finalize()方法已废弃,不能保证资源释放。

4.KafkaProducer.close()源码:

public void close() {
    close(Duration.ofMillis(Long.MAX_VALUE)); // 默认无限等待
}

public void close(Duration timeout) {
    // ...
    flush();    // 内部自动调用flush()
    client.close(); // 释放网络资源
    metrics.close(); // 关闭监控指标
    // ...
}

相关文章:

  • Spring Security实战:如何实现OAuth2.0认证与授权?
  • Spring Boot 实战:构建 RESTful API 服务
  • Vue 项目中配置代理的必要性与实现指南
  • ChatGPT入驻Safari,AI搜索时代加速到来
  • 打包rocketmq-dashboard报错问题记录
  • 游戏引擎学习第125天
  • 免费使用SCI润色神器QuillBot
  • LabVIEW同步数据采集功能
  • 力扣1:两数之和
  • mac设置 pip 的镜像
  • Windows逆向工程入门之LOOP与REP指令的深度解析
  • Ubuntu 20.04环境下安装cuda、cuDNN和pytorch
  • 计算机毕设-基于springboot的仁和机构的体检预约系统的设计与实现(附源码+lw+ppt+开题报告)
  • 安科瑞AM5SE-IS防孤岛保护装置:新能源领域的“安全卫士“-安科瑞 耿笠
  • 记一次线上Tomcat服务内存溢出的问题处理
  • vue3使用iframe全屏展示pdf效果
  • 基于 Spring Boot +VUE的 “机动车号牌管理系统” 系统的设计与实现
  • JavaScript 系列之:Ajax、Promise、Axios
  • <网络> 网络基础3
  • 建筑三维设计软件如何实现弯道超车?
  • 某博主遭勒索后自杀系自导自演,成都警方立案调查
  • 跨越时空的“精神返乡”,叶灵凤藏书票捐赠上海文学馆
  • 多地再发网约车从业及投资风险提示:避免盲目花费大笔资金“购车”入行
  • 央行设立服务消费与养老再贷款,额度5000亿元
  • 中国象棋协会坚决支持司法机关依法打击涉象棋行业的违法行为
  • 股价两天涨超30%,中航成飞:不存在应披露而未披露的重大事项