当前位置: 首页 > wzjs >正文

娄底北京网站建设建设网站时

娄底北京网站建设,建设网站时,如何黑掉jsp做的网站,wordpress禁止自动更新Java操作kafka客户端 文章目录 Java操作kafka客户端3.Java操作kafka客户端1.引入依赖2. Kafka服务配置3、生产者(Producer)实现1. 基础配置与发送消息2. 关键配置说明 4.消费者(Consumer)实现1. 基础配置与消费消息2. 关键配置说明…

Java操作kafka客户端

文章目录

  • Java操作kafka客户端
    • 3.Java操作kafka客户端
      • 1.引入依赖
      • 2. Kafka服务配置
      • 3、生产者(Producer)实现
        • 1. 基础配置与发送消息
        • 2. 关键配置说明
      • 4.消费者(Consumer)实现
        • 1. 基础配置与消费消息
        • 2. 关键配置说明
      • 3.auto.offset.reset参数可选值及行为
        • 1.代码示例与行为验证
          • 1. 配置为 `earliest`
          • 2. 配置为 `latest`
          • 3. 配置为 `none`
        • 2.关键注意事项
          • 1. Offset 提交机制的影响
          • 2. 消费者组隔离性
          • 3. 命令行验证 Offset
        • 3、生产环境最佳实践
        • 4、常见问题解答
          • Q:配置了 `latest`,为什么还能消费到旧消息?
          • Q:如何让消费者组永久保留 Offset?
      • 5.主题管理示例(AdminClient)
      • 6.最佳实践与注意事项
      • 7.关于flush和close方法的说明

来源参考的deepseek,如有侵权联系立删

3.Java操作kafka客户端

Java API提供以下核心接口:

  • Producer API:发送消息。
  • Consumer API:订阅消息。
  • Streams API:流式处理。
  • Admin API:管理Topic和集群。

1.引入依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.4.0</version> 
</dependency>

2. Kafka服务配置

确保已启动Zookeeper和Kafka服务,默认端口分别为21819092

3、生产者(Producer)实现

1. 基础配置与发送消息

无需提前创建topic

import org.apache.kafka.clients.producer.*;
import java.util.Properties;public class KafkaProducerDemo {public static void main(String[] args) {// 1. 配置生产者参数Properties props = new Properties();// Broker地址props.put("bootstrap.servers", "127.0.0.1:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 消息确认机制props.put("acks", "all");// 重试次数props.put("retries", 3);// 2. 创建生产者实例try (Producer<String, String> producer = new KafkaProducer<>(props)) {// 3. 构造消息并发送for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", // 主题名称"key-" + i,   // 消息键"value-" + i  // 消息值);// 异步发送(可改用get()同步等待)producer.send(record, (metadata, exception) -> {if (exception == null) {System.out.printf("消息发送成功:topic=%s, partition=%d, offset=%d%n",metadata.topic(), metadata.partition(), metadata.offset());} else {exception.printStackTrace();}});}producer.flush(); // 确保所有消息发送完成}}
}

在这里插入图片描述

2. 关键配置说明
参数说明
bootstrap.serversBroker地址列表,多个用逗号分隔
key.serializer键的序列化类(如StringSerializer)
value.serializer值的序列化类
acks消息持久化确认机制(0/1/all
retries发送失败后的重试次数
batch.size批量发送的消息大小(字节)

4.消费者(Consumer)实现

1. 基础配置与消费消息
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerDemo {public static void main(String[] args) {// 1. 配置消费者参数Properties props = new Properties();props.put("bootstrap.servers", "127.0.0.1:9092");props.put("group.id", "test-group"); // 消费者组IDprops.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "earliest"); // 从最早消息开始消费props.put("enable.auto.commit", "false");   // 关闭自动提交偏移量// 2. 创建消费者实例try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {consumer.subscribe(Collections.singletonList("test-topic")); // 订阅主题while (true) {// 3. 轮询消息(超时时间100ms)ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("收到消息:topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",record.topic(), record.partition(), record.offset(), record.key(), record.value());}// 4. 手动提交偏移量(同步提交)consumer.commitSync();}}}
}

在这里插入图片描述
在这里插入图片描述

可收到实时消费的消息,但队列中消息并没有移除,

  • 消息保留规则由 Broker 配置控制,与消费者无关。
  • 消费者 Offset 仅标记消费进度,不会删除消息。
  • 通过 kafka-consumer-groups.sh 工具监控消费状态。
  • 生产环境中,合理设置 log.retention.hourslog.retention.bytes
2. 关键配置说明
参数说明
group.id消费者组ID,相同组内共享分区
auto.offset.reset无偏移量时的策略(earliest/latest
enable.auto.commit是否自动提交偏移量(建议false手动控制)
max.poll.records单次poll最大消息数

3.auto.offset.reset参数可选值及行为

作用典型场景
earliest从分区的最早消息开始消费(从头消费)需要处理 Topic 中所有历史消息
latest从分区的最新消息开始消费(仅消费新消息)实时处理最新数据,忽略历史消息
none抛出异常(NoOffsetForPartitionException需要严格确保 Offset 有效性
1.代码示例与行为验证
1. 配置为 earliest
props.put("auto.offset.reset", "earliest");

参数生效的触发条件

场景auto.offset.reset 是否生效消费起始位置
消费者组首次启动(无 Offset)根据参数值(earliest/latest
Offset 已提交且有效(未过期)从已提交 Offset 继续消费
Offset 已过期(消息被删除)根据参数值重新定位

行为

  • 如果消费者组首次启动,会从 Topic 每个分区的第一条消息开始消费。
  • 如果 Offset 过期(例如消息被删除),会从现存的最早消息开始消费。

适用场景

  • 数据回放(重放全部历史数据)
  • 测试环境需要消费完整数据集
2. 配置为 latest
props.put("auto.offset.reset", "latest");

行为

  • 如果消费者组首次启动,只消费启动后新写入的消息。
  • 如果 Offset 过期,会从当前最新消息开始消费。

适用场景

  • 生产环境实时处理(避免处理历史积压数据)
  • 日志收集系统(只需最新日志)
3. 配置为 none
props.put("auto.offset.reset", "none");

行为

  • 如果 Offset 无效,直接抛出 NoOffsetForPartitionException
  • 需手动处理异常或确保 Offset 始终有效。

适用场景

  • 高可靠性系统(需严格监控 Offset 有效性)
2.关键注意事项
1. Offset 提交机制的影响
  • 如果启用了自动提交 (enable.auto.commit=true),消费者会定期提交 Offset。
    重复消费风险:若消息处理失败但 Offset 已提交,会导致消息丢失。
  • 推荐做法
  props.put("enable.auto.commit", "false"); // 关闭自动提交// 处理完消息后手动提交 Offsetconsumer.commitSync();
2. 消费者组隔离性
  • 不同group.id的 Offset 互相独立。例如:
    • 消费者组 A(group.id=group1)配置为 latest → 只消费新消息。
    • 消费者组 B(group.id=group2)配置为 earliest → 可以消费全部消息。
3. 命令行验证 Offset

通过 Kafka 工具查看消费者组的 Offset:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \--describe --group your-group-id

输出示例:

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
test-topic      0          5000            10000           5000
  • LAG:未消费的消息数量。若 LAG 持续增长,说明消费速度跟不上生产速度。

3、生产环境最佳实践
  1. 明确业务需求
    • 需要重放数据 → earliest
    • 仅处理实时数据 → latest
  2. 监控 Offset 提交
    • 使用 kafka-consumer-groups.sh 定期检查 LAG。
    • 集成监控系统(如 Prometheus + Grafana)。
  3. 防御性代码
   try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));// 处理消息consumer.commitSync(); // 同步提交}} catch (NoOffsetForPartitionException e) {// 处理 Offset 无效的极端情况logger.error("Offset 无效,需人工介入!", e);}

4、常见问题解答
Q:配置了 latest,为什么还能消费到旧消息?
  • 可能原因
    消费者组之前已提交过 Offset,且当前 Offset 指向旧消息位置。
  • 解决
    重置消费者组 Offset:
  bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \--group your-group-id --reset-offsets --to-latest --execute --topic test-topic
Q:如何让消费者组永久保留 Offset?
  • Kafka 默认行为
    Offset 存储在内部 Topic __consumer_offsets 中,默认保留时间为 7 天。
  • 修改保留策略
  # 修改 Offset 保留时间(单位:毫秒)bin/kafka-configs.sh --bootstrap-server localhost:9092 \--entity-type topics --entity-name __consumer_offsets \--alter --add-config retention.ms=604800000

5.主题管理示例(AdminClient)

import org.apache.kafka.clients.admin.*;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;public class KafkaAdminDemo {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "127.0.0.1:9092");try (AdminClient admin = AdminClient.create(props)) {// 创建主题(3分区,1副本)NewTopic newTopic = new NewTopic("test-topic2", 3, (short) 1);CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic));result.all().get(); // 阻塞等待创建完成System.out.println("主题创建成功");} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}}
}

在这里插入图片描述

6.最佳实践与注意事项

  1. 生产者优化
    • 启用压缩(compression.type=snappy)减少网络开销。
    • 合理设置batch.sizelinger.ms提高吞吐量。
  2. 消费者可靠性
    • 使用手动提交偏移量,避免消息丢失或重复消费。
    • 处理CommitFailedException,防止因处理超时导致提交失败。
  3. 序列化选择
    • 默认支持String、ByteArray等序列化器。
    • 复杂对象推荐使用JSON(Jackson)或Avro。
  4. 消费者组管理
    • 通过kafka-consumer-groups.sh工具监控消费进度。
    • 避免频繁重平衡(Rebalance),调整session.timeout.ms参数。

7.关于flush和close方法的说明

  • flush():强制发送缓冲区中所有未发送的消息(同步等待发送完成)
  • close():释放生产者占用的所有资源(包括线程、网络连接、内存等)

若未调用close()可能导致:

  • 线程泄漏:生产者后台的Sender线程未终止
  • 连接泄漏:与Broker的TCP连接未关闭
  • 内存泄漏:未释放消息缓冲区内存

可通过jstackVisualVM工具检查线程状态验证。

关键区别说明

方法作用是否必须调用是否自动包含对方功能
flush()清空发送缓冲区,确保所有消息被发送可选(按需调用)❌ 不释放资源
close()关闭生产者并释放资源必须调用✅ 内部会自动调用flush()

正确写法(推荐):

try (Producer<String, String> producer = new KafkaProducer<>(props)) {producer.send(record);producer.flush(); // 显式清空缓冲区(可选)
} // 自动调用close(),包含flush()

错误写法(资源泄漏):

Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(record);
producer.flush(); 
// 忘记调用close() → 线程/连接未释放!

最佳实践建议

1.优先使用try-with-resources(Java 7+特性):

   try (Producer<String, String> producer = new KafkaProducer<>(props)) {// 发送消息...} // 自动调用close()

这是最安全的写法,无需手动调用flush()close()

2.需要立即发送时

   producer.send(record);producer.flush(); // 强制立即发送(如实时系统关键消息)// ...其他操作...producer.close(); // 仍需显式关闭

3.不要依赖finalize()
Kafka客户端的finalize()方法已废弃,不能保证资源释放。

4.KafkaProducer.close()源码:

public void close() {close(Duration.ofMillis(Long.MAX_VALUE)); // 默认无限等待
}public void close(Duration timeout) {// ...flush();    // 内部自动调用flush()client.close(); // 释放网络资源metrics.close(); // 关闭监控指标// ...
}

文章转载自:

http://HlOffUaF.Lfdzr.cn
http://1vKpqWUQ.Lfdzr.cn
http://EfwHQ4j9.Lfdzr.cn
http://Jt6ykRiv.Lfdzr.cn
http://uxNQLFo7.Lfdzr.cn
http://UNv2IXbw.Lfdzr.cn
http://jzoOBNdE.Lfdzr.cn
http://u0WOujDX.Lfdzr.cn
http://nfFfQ9qH.Lfdzr.cn
http://kfhfgTD9.Lfdzr.cn
http://XUvcx4Ev.Lfdzr.cn
http://mOktzGvT.Lfdzr.cn
http://1FuZPWRp.Lfdzr.cn
http://oi8qsF2R.Lfdzr.cn
http://CDMpJNEP.Lfdzr.cn
http://11UiHhsv.Lfdzr.cn
http://HSOvyisa.Lfdzr.cn
http://7kzVuG5f.Lfdzr.cn
http://b86cPtmT.Lfdzr.cn
http://6w0tvIjA.Lfdzr.cn
http://xXVx2DyO.Lfdzr.cn
http://bDtkaLkS.Lfdzr.cn
http://z2mZjLg3.Lfdzr.cn
http://CC6CRdvl.Lfdzr.cn
http://SEBJ3SyP.Lfdzr.cn
http://7PdPSLzW.Lfdzr.cn
http://pD71rz7u.Lfdzr.cn
http://qs8rzqaG.Lfdzr.cn
http://37JVWe8I.Lfdzr.cn
http://yI0DlO08.Lfdzr.cn
http://www.dtcms.com/wzjs/733523.html

相关文章:

  • php网站收录做购物网站那个好
  • a5站长网春节网页设计主题
  • 钓鱼网站免费空间房产信息查询官网
  • 企业网站设计的方案加盟网站建设
  • 点击量高的网站沙洋网站开发
  • 做网站自适应框架wordpress 链接 拼音
  • 使用element做的网站汕头门户网站
  • 佛山外贸建站莱芜网站建设sikesoft
  • 扁平化网站布局如何做网站推广方案
  • 对其网站建设进行了考察调研空间设计师
  • 龙腾盛世网站建设广告公司企业简介怎么写
  • 南通城乡建设局网站招聘客厅设计
  • 深圳鼎诚网站建设哪个网站做高仿衣服
  • python做网站性能wordpress怎么搜索网站
  • 网站泛解析wordpress 缓存用什么
  • 人才网站怎么做学校网站群管理系统建设项目
  • 微网站页面网络运维工程师考试
  • 西安自助建站系统网站建设html代码
  • 网站建设合同 下载怎么制作公众号视频
  • 南京移动网站建设wordpress auth_key
  • 湖北网站建设网址wordpress什么编辑器好用
  • 惠州企业网站建设选哪家域名查询ip
  • 聊城开发区建设局网站好看的网站色彩搭配
  • 南京的电商网站设计摄影网站源代码
  • 网站 手机 微信 app外包软件公司在哪里去接项目
  • 山东省建设备案网站审批表会用框架做网站能找到工作吗
  • 北京网站优化推广分析免费咨询法律援助电话号码
  • 网站建设可视化工具美食网站的设计与实现
  • 建设家具网站的目的及功能定位色无极网站正在建设中
  • php网站建设原码网站还建设 域名可以备案吗