当前位置: 首页 > wzjs >正文

台州网站建设选浙江华企如何修改网站ico

台州网站建设选浙江华企,如何修改网站ico,汕头网站优化电话,如何用自己的电脑建网站Java操作kafka客户端 文章目录 Java操作kafka客户端3.Java操作kafka客户端1.引入依赖2. Kafka服务配置3、生产者(Producer)实现1. 基础配置与发送消息2. 关键配置说明 4.消费者(Consumer)实现1. 基础配置与消费消息2. 关键配置说明…

Java操作kafka客户端

文章目录

  • Java操作kafka客户端
    • 3.Java操作kafka客户端
      • 1.引入依赖
      • 2. Kafka服务配置
      • 3、生产者(Producer)实现
        • 1. 基础配置与发送消息
        • 2. 关键配置说明
      • 4.消费者(Consumer)实现
        • 1. 基础配置与消费消息
        • 2. 关键配置说明
      • 3.auto.offset.reset参数可选值及行为
        • 1.代码示例与行为验证
          • 1. 配置为 `earliest`
          • 2. 配置为 `latest`
          • 3. 配置为 `none`
        • 2.关键注意事项
          • 1. Offset 提交机制的影响
          • 2. 消费者组隔离性
          • 3. 命令行验证 Offset
        • 3、生产环境最佳实践
        • 4、常见问题解答
          • Q:配置了 `latest`,为什么还能消费到旧消息?
          • Q:如何让消费者组永久保留 Offset?
      • 5.主题管理示例(AdminClient)
      • 6.最佳实践与注意事项
      • 7.关于flush和close方法的说明

来源参考的deepseek,如有侵权联系立删

3.Java操作kafka客户端

Java API提供以下核心接口:

  • Producer API:发送消息。
  • Consumer API:订阅消息。
  • Streams API:流式处理。
  • Admin API:管理Topic和集群。

1.引入依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.4.0</version> 
</dependency>

2. Kafka服务配置

确保已启动Zookeeper和Kafka服务,默认端口分别为21819092

3、生产者(Producer)实现

1. 基础配置与发送消息

无需提前创建topic

import org.apache.kafka.clients.producer.*;
import java.util.Properties;public class KafkaProducerDemo {public static void main(String[] args) {// 1. 配置生产者参数Properties props = new Properties();// Broker地址props.put("bootstrap.servers", "127.0.0.1:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 消息确认机制props.put("acks", "all");// 重试次数props.put("retries", 3);// 2. 创建生产者实例try (Producer<String, String> producer = new KafkaProducer<>(props)) {// 3. 构造消息并发送for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", // 主题名称"key-" + i,   // 消息键"value-" + i  // 消息值);// 异步发送(可改用get()同步等待)producer.send(record, (metadata, exception) -> {if (exception == null) {System.out.printf("消息发送成功:topic=%s, partition=%d, offset=%d%n",metadata.topic(), metadata.partition(), metadata.offset());} else {exception.printStackTrace();}});}producer.flush(); // 确保所有消息发送完成}}
}

在这里插入图片描述

2. 关键配置说明
参数说明
bootstrap.serversBroker地址列表,多个用逗号分隔
key.serializer键的序列化类(如StringSerializer)
value.serializer值的序列化类
acks消息持久化确认机制(0/1/all
retries发送失败后的重试次数
batch.size批量发送的消息大小(字节)

4.消费者(Consumer)实现

1. 基础配置与消费消息
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerDemo {public static void main(String[] args) {// 1. 配置消费者参数Properties props = new Properties();props.put("bootstrap.servers", "127.0.0.1:9092");props.put("group.id", "test-group"); // 消费者组IDprops.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "earliest"); // 从最早消息开始消费props.put("enable.auto.commit", "false");   // 关闭自动提交偏移量// 2. 创建消费者实例try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {consumer.subscribe(Collections.singletonList("test-topic")); // 订阅主题while (true) {// 3. 轮询消息(超时时间100ms)ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("收到消息:topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",record.topic(), record.partition(), record.offset(), record.key(), record.value());}// 4. 手动提交偏移量(同步提交)consumer.commitSync();}}}
}

在这里插入图片描述
在这里插入图片描述

可收到实时消费的消息,但队列中消息并没有移除,

  • 消息保留规则由 Broker 配置控制,与消费者无关。
  • 消费者 Offset 仅标记消费进度,不会删除消息。
  • 通过 kafka-consumer-groups.sh 工具监控消费状态。
  • 生产环境中,合理设置 log.retention.hourslog.retention.bytes
2. 关键配置说明
参数说明
group.id消费者组ID,相同组内共享分区
auto.offset.reset无偏移量时的策略(earliest/latest
enable.auto.commit是否自动提交偏移量(建议false手动控制)
max.poll.records单次poll最大消息数

3.auto.offset.reset参数可选值及行为

作用典型场景
earliest从分区的最早消息开始消费(从头消费)需要处理 Topic 中所有历史消息
latest从分区的最新消息开始消费(仅消费新消息)实时处理最新数据,忽略历史消息
none抛出异常(NoOffsetForPartitionException需要严格确保 Offset 有效性
1.代码示例与行为验证
1. 配置为 earliest
props.put("auto.offset.reset", "earliest");

参数生效的触发条件

场景auto.offset.reset 是否生效消费起始位置
消费者组首次启动(无 Offset)根据参数值(earliest/latest
Offset 已提交且有效(未过期)从已提交 Offset 继续消费
Offset 已过期(消息被删除)根据参数值重新定位

行为

  • 如果消费者组首次启动,会从 Topic 每个分区的第一条消息开始消费。
  • 如果 Offset 过期(例如消息被删除),会从现存的最早消息开始消费。

适用场景

  • 数据回放(重放全部历史数据)
  • 测试环境需要消费完整数据集
2. 配置为 latest
props.put("auto.offset.reset", "latest");

行为

  • 如果消费者组首次启动,只消费启动后新写入的消息。
  • 如果 Offset 过期,会从当前最新消息开始消费。

适用场景

  • 生产环境实时处理(避免处理历史积压数据)
  • 日志收集系统(只需最新日志)
3. 配置为 none
props.put("auto.offset.reset", "none");

行为

  • 如果 Offset 无效,直接抛出 NoOffsetForPartitionException
  • 需手动处理异常或确保 Offset 始终有效。

适用场景

  • 高可靠性系统(需严格监控 Offset 有效性)
2.关键注意事项
1. Offset 提交机制的影响
  • 如果启用了自动提交 (enable.auto.commit=true),消费者会定期提交 Offset。
    重复消费风险:若消息处理失败但 Offset 已提交,会导致消息丢失。
  • 推荐做法
  props.put("enable.auto.commit", "false"); // 关闭自动提交// 处理完消息后手动提交 Offsetconsumer.commitSync();
2. 消费者组隔离性
  • 不同group.id的 Offset 互相独立。例如:
    • 消费者组 A(group.id=group1)配置为 latest → 只消费新消息。
    • 消费者组 B(group.id=group2)配置为 earliest → 可以消费全部消息。
3. 命令行验证 Offset

通过 Kafka 工具查看消费者组的 Offset:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \--describe --group your-group-id

输出示例:

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
test-topic      0          5000            10000           5000
  • LAG:未消费的消息数量。若 LAG 持续增长,说明消费速度跟不上生产速度。

3、生产环境最佳实践
  1. 明确业务需求
    • 需要重放数据 → earliest
    • 仅处理实时数据 → latest
  2. 监控 Offset 提交
    • 使用 kafka-consumer-groups.sh 定期检查 LAG。
    • 集成监控系统(如 Prometheus + Grafana)。
  3. 防御性代码
   try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));// 处理消息consumer.commitSync(); // 同步提交}} catch (NoOffsetForPartitionException e) {// 处理 Offset 无效的极端情况logger.error("Offset 无效,需人工介入!", e);}

4、常见问题解答
Q:配置了 latest,为什么还能消费到旧消息?
  • 可能原因
    消费者组之前已提交过 Offset,且当前 Offset 指向旧消息位置。
  • 解决
    重置消费者组 Offset:
  bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \--group your-group-id --reset-offsets --to-latest --execute --topic test-topic
Q:如何让消费者组永久保留 Offset?
  • Kafka 默认行为
    Offset 存储在内部 Topic __consumer_offsets 中,默认保留时间为 7 天。
  • 修改保留策略
  # 修改 Offset 保留时间(单位:毫秒)bin/kafka-configs.sh --bootstrap-server localhost:9092 \--entity-type topics --entity-name __consumer_offsets \--alter --add-config retention.ms=604800000

5.主题管理示例(AdminClient)

import org.apache.kafka.clients.admin.*;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;public class KafkaAdminDemo {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "127.0.0.1:9092");try (AdminClient admin = AdminClient.create(props)) {// 创建主题(3分区,1副本)NewTopic newTopic = new NewTopic("test-topic2", 3, (short) 1);CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic));result.all().get(); // 阻塞等待创建完成System.out.println("主题创建成功");} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}}
}

在这里插入图片描述

6.最佳实践与注意事项

  1. 生产者优化
    • 启用压缩(compression.type=snappy)减少网络开销。
    • 合理设置batch.sizelinger.ms提高吞吐量。
  2. 消费者可靠性
    • 使用手动提交偏移量,避免消息丢失或重复消费。
    • 处理CommitFailedException,防止因处理超时导致提交失败。
  3. 序列化选择
    • 默认支持String、ByteArray等序列化器。
    • 复杂对象推荐使用JSON(Jackson)或Avro。
  4. 消费者组管理
    • 通过kafka-consumer-groups.sh工具监控消费进度。
    • 避免频繁重平衡(Rebalance),调整session.timeout.ms参数。

7.关于flush和close方法的说明

  • flush():强制发送缓冲区中所有未发送的消息(同步等待发送完成)
  • close():释放生产者占用的所有资源(包括线程、网络连接、内存等)

若未调用close()可能导致:

  • 线程泄漏:生产者后台的Sender线程未终止
  • 连接泄漏:与Broker的TCP连接未关闭
  • 内存泄漏:未释放消息缓冲区内存

可通过jstackVisualVM工具检查线程状态验证。

关键区别说明

方法作用是否必须调用是否自动包含对方功能
flush()清空发送缓冲区,确保所有消息被发送可选(按需调用)❌ 不释放资源
close()关闭生产者并释放资源必须调用✅ 内部会自动调用flush()

正确写法(推荐):

try (Producer<String, String> producer = new KafkaProducer<>(props)) {producer.send(record);producer.flush(); // 显式清空缓冲区(可选)
} // 自动调用close(),包含flush()

错误写法(资源泄漏):

Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(record);
producer.flush(); 
// 忘记调用close() → 线程/连接未释放!

最佳实践建议

1.优先使用try-with-resources(Java 7+特性):

   try (Producer<String, String> producer = new KafkaProducer<>(props)) {// 发送消息...} // 自动调用close()

这是最安全的写法,无需手动调用flush()close()

2.需要立即发送时

   producer.send(record);producer.flush(); // 强制立即发送(如实时系统关键消息)// ...其他操作...producer.close(); // 仍需显式关闭

3.不要依赖finalize()
Kafka客户端的finalize()方法已废弃,不能保证资源释放。

4.KafkaProducer.close()源码:

public void close() {close(Duration.ofMillis(Long.MAX_VALUE)); // 默认无限等待
}public void close(Duration timeout) {// ...flush();    // 内部自动调用flush()client.close(); // 释放网络资源metrics.close(); // 关闭监控指标// ...
}

文章转载自:

http://Sp7NgSaX.Lgwpm.cn
http://VluHGkdc.Lgwpm.cn
http://EbdEYS9k.Lgwpm.cn
http://BMEop4ts.Lgwpm.cn
http://8ShbIw6C.Lgwpm.cn
http://xCRreUlK.Lgwpm.cn
http://ktBf5dlP.Lgwpm.cn
http://0HAdzvan.Lgwpm.cn
http://FQbMcI2T.Lgwpm.cn
http://8MDPLkhz.Lgwpm.cn
http://WnZAPjOr.Lgwpm.cn
http://csYq8ZPe.Lgwpm.cn
http://pPGoq2vY.Lgwpm.cn
http://BMGCzqN7.Lgwpm.cn
http://HzHD0r4k.Lgwpm.cn
http://FiDHNhX9.Lgwpm.cn
http://8HWtnceU.Lgwpm.cn
http://9UhGGPrk.Lgwpm.cn
http://R4oQYMTn.Lgwpm.cn
http://jnvPjlrs.Lgwpm.cn
http://gOiC7TOw.Lgwpm.cn
http://FmHwXuQp.Lgwpm.cn
http://zqKN99dQ.Lgwpm.cn
http://oWaypHuw.Lgwpm.cn
http://qvhtz34F.Lgwpm.cn
http://ma6iWZlr.Lgwpm.cn
http://H98yxikv.Lgwpm.cn
http://w6ZBqT1R.Lgwpm.cn
http://CmRglRFE.Lgwpm.cn
http://H0UNCwEy.Lgwpm.cn
http://www.dtcms.com/wzjs/771989.html

相关文章:

  • 门户定制网站建设公司广州最繁华的三个区
  • php成品网站下载建筑工程管理软件
  • 黄岛英文网站建设查询网入口
  • 最好的网站统计百度推广全国代理商排名
  • 网站制做公司中国住房和城乡建设部网站6
  • 企业网站建设方案策划编程软件scratch下载
  • 网站建设实训结论专业手机app开发公司
  • 重庆公司联系方式网站海外seo
  • dz论坛做视频网站教程电脑网站和手机网站的区别
  • 全屏网站 内页怎么做佛山网站优化方法软件
  • 化纤公司网站建设公司做网站大概多少钱
  • 如何判断网站程序使用asp还是phpwordpress自动翻译双语主页
  • 企业网站规划与开发国内装修公司
  • 网络宣传网站建设网站怎么添加百度商桥
  • 怎样建网站视频教程国家基础设施建设网站
  • 速橙科技有限公司网站建设门户网站建设技术要求
  • 合肥 电子商务 网站推广完全不收费的聊天软件
  • 专门做网站开发的公司seo是什么姓
  • 最省钱的购物软件杭州网站优化体验
  • 建设网站需要深圳网站建设 贴吧
  • 做网站济南网站重新设计
  • 在哪里学做网站wordpress内容只有自已可见
  • 查询数据的网站怎么做秦皇岛网站开发
  • 增加网站收录网站建设与管理的实训
  • 杭州公司网站开发网站开发竞争性谈判
  • dedecms 生成网站地图墓园网站建设价格
  • 做物流网站费用多少江西做网站找谁
  • 郑州网站建设报价表如何制作公司官网
  • 网站免费域名申请中国中建设计网站
  • 设计公司网站是什么是重要的如何将网站排名做高