当前位置: 首页 > news >正文

kafka consumer 手动 ack

在消费 Kafka 消息时,手动确认(acknowledge)消息的消费,可以通过使用 KafkaConsumer 类中的 commitSync()commitAsync() 方法来实现。这些方法将提交当前偏移量,确保在消费者崩溃时不会重新消费已处理的消息。

以下是一个简单的手动 ack 的示例代码:

1. 配置 KafkaConsumer 和手动确认消费

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

public class KafkaManualAckConsumer {
    public static void main(String[] args) {
        // 配置消费者的基本属性
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka 服务器地址
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); // 消费者组ID
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消息key反序列化
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消息value反序列化
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 关闭自动提交,启用手动提交

        // 创建 KafkaConsumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        // 订阅主题
        consumer.subscribe(Collections.singletonList("my-topic"));

        try {
            while (true) {
                // 拉取消息
                var records = consumer.poll(1000); // 拉取数据,等待最多1000ms

                // 处理每一条消息
                records.forEach(record -> {
                    System.out.println("Consumed message: " + record.value());

                    // 处理完消息后手动提交偏移量
                    // commitSync: 确保消息成功提交
                    consumer.commitSync();
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭消费者
            consumer.close();
        }
    }
}

2. 代码解析

  1. 配置消费者:
    • ENABLE_AUTO_COMMIT_CONFIG 设置为 false,禁用自动提交偏移量。这样就可以在处理完每条消息后手动提交。
  2. 消息消费与手动 ack:
    • poll(1000) 方法拉取最多 1000 毫秒内的消息。
    • commitSync() 方法用于同步提交当前的偏移量,即消费到的消息的位移,这样可以确保 Kafka 消费者确认该消息已处理。
  3. 异常处理:
    • 异常捕获块 catch 用于处理消费过程中可能出现的任何错误,确保程序不会崩溃。
  4. 关闭消费者:
    • finally 块中调用 consumer.close() 来关闭消费者连接。

3. 使用 commitAsync 提高性能(可选)

如果对性能要求更高,可以考虑使用 commitAsync() 方法,它不会阻塞当前线程,提交操作将在后台异步完成:

consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        System.out.println("Error committing offset: " + exception.getMessage());
    } else {
        System.out.println("Successfully committed offsets: " + offsets);
    }
});

这样你可以不阻塞线程,提高消费性能,尤其是在高吞吐量的环境中。
如果你有其他的需求或者想更细致地控制消费的细节,随时告诉我!

相关文章:

  • 详解直方图均衡化
  • Java最新面试题(全网最全、最细、附答案)
  • mysql 全方位安装教程
  • jvm内存区域、调优参数,堆区栈区分别存什么
  • Buildroot学习笔记
  • doris:Hudi Catalog
  • Windows逆向工程入门之MASM字符处理机制
  • 11天 -- Redis 中跳表的实现原理是什么?Redis 的 hash 是什么?Redis Zset 的实现原理是什么?
  • Linux小程序-进度条
  • 《基于鸿蒙系统的类目标签AI功能开发实践》
  • 《Ollama官网可以下载使用的50个AI模型及介绍》:此文为AI自动生成
  • 机器学习:线性回归,梯度下降,多元线性回归
  • 工程化与框架系列(13)--虚拟DOM实现
  • Springboot中SLF4J详解
  • Winbox5怎样设置上网
  • SpringMVC(2)传递JSON、 从url中获取参数、上传文件、cookie 、session
  • 【图文详解】什么是微服务?什么是SpringCloud?
  • Python 实现定时查询数据库并发送消息的完整流程
  • Eureka Server 数据同步原理深度解析
  • Go红队开发—编解码工具
  • 一图读懂制作网站/新闻头条
  • 贵州网站备案局/国外免费网站服务器
  • 淘宝属于什么类型的网站/石家庄网站建设seo公司
  • wordPress主题模板站/关键词优化报价查询
  • 怎么可以上传自己做的网站/必应搜索引擎网址
  • 免费做app网站/seo引擎优化专员