当前位置: 首页 > news >正文

kafka数据拉取和发送

文章目录

  • 一、原生 KafkaConsumer
    • 1、pom文件引入kafka
    • 2、拉取数据
    • 3、发送数据
  • 二、在spring boot中使用@KafkaListener
    • 1、添加依赖
    • 2、application.yml
    • 3、消息拉取:consumer
    • 4、自定义ListenerContainerFactory
    • 5、消息发送:producer
    • 6、kafka通过clientId鉴权时的鉴权失败问题

一、原生 KafkaConsumer

1、pom文件引入kafka

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
</dependency>

2、拉取数据

简单说只要以下几个步骤:
1、获取kafka地址,并设置Properties
2、获取consumer:KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
3、订阅topic:consumer.subscribe(topic);
4、拉取数据:consumer.poll()
5、遍历数据
6、示例:

package com.yogi.test.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.InitializingBean;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;

@Component
public class TestMsgConsumer implements InitializingBean {
   

    @Value("${test.kafka.address:127.0.0.1:9092}")
    private String kafkaAddress;
    @Value("${test.kafka.msg.topic:topic_test_1,topic_test_2}")
    private String msgTopic;
    @Value("${test.consumer.name:yogima}")
    private String consumerGroupId;

    /**
     * 消费开关: true-消费,false-暂停消费
     * 在服务正常停止时用于停止继续消费数据,将缓存中的数据发送完即可
     */
    private Boolean consumeSwitch = true;

    public void consumerMessage(List<String> topic, String groupId) {
   
        LOGGER.info("consumer topic list1:{}",topic.toString());
        Properties props = new Properties();
        /**
         * 指定一组host:port对,用于创建与Kafka broker服务器的Socket连接,可以指定多组,使用逗号分隔,对于多broker集群,只需配置
         * 部分broker地址即可,consumer启动后可以通过这些机器找到完整的broker列表
         */
        LOGGER.info("test.kafka.address:{}",kafkaAddress);
        props.put("bootstrap.servers", kafkaAddress);
        /**
         * 指定group名字,能唯一标识一个consumer group,如果不显示指定group.id会抛出InvalidGroupIdException异常,通常为group.id
         * 设置一个有业务意义的名字即可
         */
        props.put("group.id", groupId);
        /**
         * 自动提交位移
         */
        props.put("enable.auto.commit", Boolean.TRUE);
        /**
         * 位移提交超时时间
         */
        props.put("auto.commit.interval.ms", "1000");
        /**
         * 从最早的消息开始消费
         * 1,earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
         * 2,latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
         */
        props.put("auto.offset.reset", "latest");
        /**
         * 指定消费解序列化操作。consumer从broker端获取的任何消息都是字节数组的格式,因此需要指定解序列化操作才能还原为原本对象,
         * Kafka对绝大部分初始类型提供了解序列化器,consumer支持自定义解序列化器org.apache.kafka.common.serialization.Deserializer
         * org.apache.kafka.common.serialization.ByteArrayDeserializer
         * StringDeserializer
         */
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        /**
         * 对消息体进行解序列化,与key解序列化类似
         */
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //一次从kafka中poll出来的数据条数,max.poll.records条数据需要在在session.timeout.ms这个时间内处理完
        props.put("max.poll.records", "500");
        //fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。
        props.put("fetch.message.max.bytes", "300000000");

        KafkaConsumer<String, String> consumer;

        try{
   
            /**
             * 通过Properties实例对象构建KafkaConsumer对象,可同时指定key、value序列化器
             */
            LOGGER.info("start set consumer,props:{}",props.toString());
            consumer = new KafkaConsumer<>(props);
            LOGGER.info("set consumer finished");
            /**
             * 订阅consumer group需要消费的topic列表
             */
            LOGGER.info("consumer topic list:{}",topic.toString());
            consumer.subscribe(topic);
        }catch (Exception e){
   
            LOGGER.info("consumer subscribe failed,msg:{},cause:{},e:{}",e.getMessage(),e.getCause(),e);
            return;
        }

        /**
         * 并行从订阅topic获取多个分区消息,为此新版本consumer的poll方法使用类似Linux的 selec I/O机制,
         * 所有相关的事件都发生在一个事件循环中,这样consuner端只使用一个线程就能完成所有类型I/o操作
         */
        try {
   
            while (true) {
   
                if (!consumeSwitch) {
   
                    try {
   
                        Thread.sleep(30000);
                    } catch (InterruptedException e) {
   
                        LOGGER.error("err msg:" + e.getMessage());
                    }
                }
                /**
                 * 指定超时时间,通常情况下consumer拿到了足够多的可用数据,会立即从该方法返回,但若当前没有足够多数据
                 * consumer会处于阻塞状态,但当到达设定的超时时间,则无论数据是否足够都为立即返回
                 */
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1L));
                /**
                 * poll调用返回ConsumerRecord类分装的Kafka消息,之后会根据自己业务实现信息处理,对于consumer而言poll方法
                 * 返回即认为consumer成功消费了消息
                 */
                for (ConsumerRecord<String, String> record : records) {
   
                    LOGGER.debug("offset = {}, key = {}, value = {}"

相关文章:

  • MYSQL之相关子查询
  • Jmeter接口并发测试
  • 图像处理、数据挖掘、数据呈现
  • JavaScript数组
  • 结构型模式 - 适配器模式 (Adapter Pattern)
  • 图像处理案例06 OCR应用
  • C++基础入门——Vetor与函数
  • 华为机试牛客刷题之HJ11 数字颠倒
  • KylinSP3 | 防火墙和麒麟安全增强设置KySec
  • 【电机控制器】ESP32-C3语言模型——通义千问
  • 数据库 安装initializing database不通过
  • 货车一键启动无钥匙进入手机远程启动的正确使用方法
  • 【SpringBoot】——分组校验、自定义注解、登入验证(集成redis)、属性配置方式、多环境开发系统学习知识
  • python zipfile
  • 在spring项目中,引入mybatis
  • PCB设计常用布局布线方法
  • 错误 MSB3073 命令“setlocal“
  • Vue中环境配置的若干问题解决
  • 蓝桥杯刷题-dp-线性dp(守望者的逃离,摆花,线段)
  • HTML解析 → DOM树 CSS解析 → CSSOM → 合并 → 渲染树 → 布局 → 绘制 → 合成 → 屏幕显示
  • 公司网站平台/网站怎么快速收录
  • b2b电子商务网站调研报告文字/seo优化
  • 2018 政府网站建设/站长工具seo综合查询网
  • c 做网站怎么显示歌词/深圳全网推互联科技有限公司
  • 网站做ppt模板/什么叫做关键词
  • wordpress能输数学公式吗/安徽seo