当前位置: 首页 > news >正文

Kafka消息服务之Java工具类

注:此内容是本人在另一个技术平台发布的历史文章,转载发布到CSDN;

Apache Kafka是一个开源分布式事件流平台,也是当前系统开发中流行的高性能消息队列服务,数千家公司使用它来实现高性能数据管道、流分析、数据集成和关键任务应用程序。
Kafka 可以很好地替代更传统的消息代理。消息代理的使用原因多种多样(将处理与数据生产者分离开来、缓冲未处理的消息等)。与大多数消息系统相比,Kafka 具有更好的吞吐量、内置分区、复制和容错能力,这使其成为大规模消息处理应用程序的良好解决方案。

Java工具类

此基于kafka客户端的工具类,提供基础的消息发送与监听功能。

pom.xml

       <!-- 集成kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>2.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.2.2</version>
        </dependency>

KafkaUtils.java

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Future;

/**
 * @Description kafka工具类,提供消息发送与监听
 */
public class KafkaUtils {

    /**
     * 获取实始化KafkaStreamServer对象
     * @return
     */
    public static KafkaStreamServer bulidServer(){
        return new KafkaStreamServer();
    }

    /**
     * 获取实始化KafkaStreamClient对象
     * @return
     */
    public static KafkaStreamClient bulidClient(){
        return new KafkaStreamClient();
    }

    public static class KafkaStreamServer{
        KafkaProducer<String, String> kafkaProducer = null;

        private KafkaStreamServer(){}

        /**
         * 创建配置属性
         * @param host
         * @param port
         * @return
         */
        public KafkaStreamServer createKafkaStreamServer(String host, int port){
            String bootstrapServers = String.format("%s:%d", host, port);
            if (kafkaProducer != null){
                return this;
            }
            Properties properties = new Properties();
            //kafka地址,多个地址用逗号分割
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            kafkaProducer = new KafkaProducer<>(properties);
            return this;
        }

        /**
         * 向kafka服务发送生产者消息
         * @param topic
         * @param msg
         * @return
         */
        public Future<RecordMetadata> sendMsg(String topic, String msg){
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);
            Future<RecordMetadata> future = kafkaProducer.send(record);
            System.out.println("消息发送成功:" + msg);
            return future;
        }

        /**
         * 关闭kafka连接
         */
        public void close(){
            if (kafkaProducer != null){
                kafkaProducer.flush();
                kafkaProducer.close();
                kafkaProducer = null;
            }
        }
    }

    public static class KafkaStreamClient {
        KafkaConsumer<String, String> kafkaConsumer = null;
        private KafkaStreamClient(){}

        /**
         * 配置属性,创建消费者
         * @param host
         * @param port
         * @return
         */
        public KafkaStreamClient createKafkaStreamClient(String host, int port, String groupId){
            String bootstrapServers = String.format("%s:%d", host, port);
            if (kafkaConsumer != null){
                return this;
            }
            Properties properties = new Properties();
            //kafka地址,多个地址用逗号分割
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,  bootstrapServers);
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            kafkaConsumer = new KafkaConsumer<String, String>(properties);
            return this;
        }

        /**
         * 客户端消费者拉取消息,并通过回调HeaderInterface实现类传递消息
         * @param topic
         * @param headerInterface
         */
        public void pollMsg(String topic, HeaderInterface headerInterface) {
            kafkaConsumer.subscribe(Collections.singletonList(topic));
            while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    try{
                        headerInterface.execute(record);
                    }catch(Exception e){
                        e.printStackTrace();
                    }
                }
            }
        }

        /**
         * 关闭kafka连接
         */
        public void close(){
            if (kafkaConsumer != null){
                kafkaConsumer.close();
                kafkaConsumer = null;
            }
        }
    }


    @FunctionalInterface
    interface HeaderInterface{
        void execute(ConsumerRecord<String, String> record);
    }

    /**
     * 测试示例
     * @param args
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException {
        //生产者发送消息
//        KafkaStreamServer kafkaStreamServer =  KafkaUtils.bulidServer().createKafkaStreamServer("127.0.0.1", 9092);
//        int i=0;
//        while (i<10) {
//            String msg = "Hello," + new Random().nextInt(100);
//            kafkaStreamServer.sendMsg("test", msg);
//            i++;
//            Thread.sleep(100);
//        }
//        kafkaStreamServer.close();
//        System.out.println("发送结束");

        System.out.println("接收消息");
        KafkaStreamClient kafkaStreamClient =  KafkaUtils.bulidClient().createKafkaStreamClient("127.0.0.1", 9092, "consumer-45");
        kafkaStreamClient.pollMsg("test", new HeaderInterface() {
            @Override
            public void execute(ConsumerRecord<String, String> record) {
                System.out.println(String.format("topic:%s,offset:%d,消息:%s", record.topic(), record.offset(), record.value()));
            }
        });

    }
}

相关文章:

  • 在局域网中连接Grafana数据源
  • 什么是Grok-3?技术特点,场景,潜在问题与挑战
  • 数据结构(第八章 排序算法)
  • 成人床垫更新关于 SOR/2016-183 和《纺织品贴标和广告法规》的合规
  • nginx ngx_http_module(10) 指令详解
  • YOLOv11-ultralytics-8.3.67部分代码阅读笔记-dataset.py
  • Windows编程:在 VS2019 里面,显示行号
  • petalinux高版本设置自动登录和开机自启动配置
  • 敏捷项目管理:适应快速变化的项目环境
  • 武汉小米 Java 岗位一二面校招面经
  • Bazel 教程
  • MyBatis 中 SqlMapConfig 配置文件详解
  • HTML/CSS中交集选择器
  • 前七章综合练习
  • 集合 数据结构 泛型
  • Element UI常用组件
  • 知识库-登陆接口
  • 《论语别裁》第01章 学而(03) 四书五经的假面目
  • 萌新学 Python 之 if 语句的三目运算符
  • SFT数据指令评估-2.基于困惑度的5种指标(微调白盒模型)
  • 中科飞测将投资超10亿元,在上海张江成立第二总部
  • 多家外资看好中国市场!野村建议“战术超配”,花旗上调恒指目标价
  • 智能手表眼镜等存泄密隐患,国安部提醒:严禁在涉密场所使用
  • 宁德时代港股募资预计最高至50亿美元:90%将投向匈牙利项目
  • 来伊份深夜回应“粽子中吃出疑似创可贴”:拿到实物后会查明原因
  • 听企业聊感受,《外企聊营商》5月13日起推出