当前位置: 首页 > wzjs >正文

保定市建设局安监网站大连网站建设 青鸟传媒

保定市建设局安监网站,大连网站建设 青鸟传媒,在哪网站可以做农信社模拟试卷,seo搜索引擎优化工程师招聘在消费 Kafka 消息时,手动确认(acknowledge)消息的消费,可以通过使用 KafkaConsumer 类中的 commitSync() 或 commitAsync() 方法来实现。这些方法将提交当前偏移量,确保在消费者崩溃时不会重新消费已处理的消息。 以…

在消费 Kafka 消息时,手动确认(acknowledge)消息的消费,可以通过使用 KafkaConsumer 类中的 commitSync()commitAsync() 方法来实现。这些方法将提交当前偏移量,确保在消费者崩溃时不会重新消费已处理的消息。

以下是一个简单的手动 ack 的示例代码:

1. 配置 KafkaConsumer 和手动确认消费

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Collections;
import java.util.Properties;public class KafkaManualAckConsumer {public static void main(String[] args) {// 配置消费者的基本属性Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka 服务器地址properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); // 消费者组IDproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消息key反序列化properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消息value反序列化properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 关闭自动提交,启用手动提交// 创建 KafkaConsumerKafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);// 订阅主题consumer.subscribe(Collections.singletonList("my-topic"));try {while (true) {// 拉取消息var records = consumer.poll(1000); // 拉取数据,等待最多1000ms// 处理每一条消息records.forEach(record -> {System.out.println("Consumed message: " + record.value());// 处理完消息后手动提交偏移量// commitSync: 确保消息成功提交consumer.commitSync();});}} catch (Exception e) {e.printStackTrace();} finally {// 关闭消费者consumer.close();}}
}

2. 代码解析

  1. 配置消费者:
    • ENABLE_AUTO_COMMIT_CONFIG 设置为 false,禁用自动提交偏移量。这样就可以在处理完每条消息后手动提交。
  2. 消息消费与手动 ack:
    • poll(1000) 方法拉取最多 1000 毫秒内的消息。
    • commitSync() 方法用于同步提交当前的偏移量,即消费到的消息的位移,这样可以确保 Kafka 消费者确认该消息已处理。
  3. 异常处理:
    • 异常捕获块 catch 用于处理消费过程中可能出现的任何错误,确保程序不会崩溃。
  4. 关闭消费者:
    • finally 块中调用 consumer.close() 来关闭消费者连接。

3. 使用 commitAsync 提高性能(可选)

如果对性能要求更高,可以考虑使用 commitAsync() 方法,它不会阻塞当前线程,提交操作将在后台异步完成:

consumer.commitAsync((offsets, exception) -> {if (exception != null) {System.out.println("Error committing offset: " + exception.getMessage());} else {System.out.println("Successfully committed offsets: " + offsets);}
});

这样你可以不阻塞线程,提高消费性能,尤其是在高吞吐量的环境中。
如果你有其他的需求或者想更细致地控制消费的细节,随时告诉我!


文章转载自:

http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://00000000.qytby.cn
http://www.dtcms.com/wzjs/621364.html

相关文章:

  • 公司网站开发联系方式mg电子游戏网站开发
  • php网站出现乱码战鼓的h5网站如何做
  • 网站建设的杂志昆山网站建设机构
  • 网站建设后台管理在网站里继费
  • 深圳网站开发网站和平区网站制作
  • 做赚钱的网站有哪些搭建网站干什么
  • 我的企业网站怎么seo龙岗网站建设代理商
  • 区块链交易网站开发东营建设信息网网
  • 惠州市惠城区建设局网站纺织行业网站怎么做吸引人
  • 网站的开发环境设计如何做网站定位
  • 网站建设 呢咕云惠州建设企业网站
  • 网站开发技术入股协议24小时通过网站备案
  • 鞍山外国网站制作wordpress 统计代码添加
  • 都江堰城乡建设局网站百度贴吧官网app下载
  • 想找可以在家做的手工活去什么网站乐清房产在线网
  • 绿色手机网站模板专门做隐形眼镜的网站
  • 做网站管理员开会怎么演讲wordpress后台发布文章发不
  • 网站子页面怎么做网站恶意镜像 301
  • 黄陂区建设局网站建网站找哪个公司
  • 网站做可以退款吗网站建设需要注意哪些关键细节
  • 如何在建设教育协会网站注册考试黄山风景区
  • 网站建设的文档什么网站可以做相册视频
  • 网站的seo方案怎么做南皮网站建设价格
  • pytson做网站安全吗网站开发人员工资水平
  • 0基础自学做网站上海对外贸易公司
  • 深圳网站建设方案书郑州网站seo排名
  • 山西网站开发公司电话门户网站搭建软件
  • 视频门户网站建设服务器网站标题前的小图标怎么做
  • 做网站公司费用平面设计和网站运营
  • 专业苏州网站建设公司关于美食的网页模板