当前位置: 首页 > news >正文

连接 kafka0.8.1.1 java

最近有个kafka0.8.1.1的版本需要连接取数据,找了很多库都不行

      <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.1.1</version>
        </dependency>

最后使用的这个,部分版本的jm和jmx的依赖jar拉不下来
在 仓库 手动下载并放到本地maven中
消费者代码

@Component
@Slf4j
public class KafkaConsumer {
    @Value("${kafka-client.consumer.zookeeper}")
    private final String zookeeper;

    public KafkaConsumer(@Value("${kafka-client.consumer.zookeeper}") String zookeeper) {
        this.zookeeper = zookeeper;
    }

    public void consume(String topic, String groupId) {
        if (StrUtil.isEmpty(groupId)) {
            groupId = StrUtil.format("{}_consumer_group", topic);
        }
        log.info("create kafka consumer zookeeper: {}", zookeeper);
        log.info("gather topic:{}", topic);
        List<String> rv = new ArrayList<>();
        // 订阅 topic 并获取数据流
        Properties props = new Properties();
        props.put("zookeeper.connect", this.zookeeper);
        props.put("group.id", groupId);
//        props.put("zookeeper.connection.timeout.ms", "4000");
//        props.put("zookeeper.sync.time.ms", "5000");
        props.put("auto.offset.reset", "smallest");
        ConsumerConfig consumerConfig = new ConsumerConfig(props);
        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
        Map<String, Integer> topicCountMap = new HashMap<>();
        topicCountMap.put(topic, 1);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        // 读取消息并处理
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
        for (final KafkaStream<byte[], byte[]> stream : streams) {
            // 处理每个消息
            for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) {
                    log.info("stop topic {}", messageAndMetadata.topic());
                    var path = StrUtil.format("{}_{}.txt", topic, groupId);
                    log.info("写入文件 {}", path);
                    FileUtil.writeLines(rv, new File(path), StandardCharsets.UTF_8);
                
                String message = new String(messageAndMetadata.message());
                rv.add(message);
                System.out.println(topic + " Received: " + message.substring(0, 30));
            }
        }
    }

    public static void main(String[] args) {
        String topic = "test";  // 设置你要消费的 topic
        String zookeeper = "127.0.0.1:2181";  // Zookeeper 地址
        String groupId = "test";  // 消费者组 ID
        KafkaConsumer consumer = new KafkaConsumer(zookeeper);
        consumer.consume(topic, groupId);
    }
}

生产者

public class KafkaProducer {
    @Value("${kafka-client.producer.brokerList}")
    private String brokerList;

    private Producer<String, String> producer;

    public KafkaProducer(@Value("${kafka-client.producer.brokerList}") String brokers) {
        this.brokerList = brokers;
        log.info("Created KafkaProducer with brokerList: {}", brokerList);
        Properties props = new Properties();
        props.put("metadata.broker.list", brokerList); // Kafka broker 地址
        props.put("serializer.class", "kafka.serializer.StringEncoder"); // 消息序列化方式
        props.put("partitioner.class", "kafka.producer.DefaultPartitioner"); // 默认分区器
        props.put("request.required.acks", "1"); // 消息的ack级别
        ProducerConfig config = new ProducerConfig(props);
        this.producer = new Producer<>(config);
    }

    public void producer(String topic, java.util.List<String> msgList) {
        ArrayList<KeyedMessage<String, String>> keyedMessageList = new ArrayList<>();
        for (String string : msgList) {
            KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(topic, string);
            keyedMessageList.add(keyedMessage);
        }
        List<KeyedMessage<String, String>> list = List.fromArray(keyedMessageList.toArray());
        log.info("send kafka messages: {}", list);
        producer.send(list);
    }

    public void close() {
        producer.close();
        producer = null;
    }

    public static void main(String[] args) {
        KafkaProducer kafkaProducer = new KafkaProducer("127.0.0.1:9092");
        for (int i = 0; i <= 11; i++) {
            kafkaProducer.producer("alarm1", java.util.List.of("sadiashdihasihdasdas"+i));
        }
    }
}

相关文章:

  • 使用HAI来打通DeepSeek的任督二脉
  • Python简单爬虫实践案例
  • 基于Rockylinux9.5(LTS-SP4)安装MySQL Community Server 9.2.0
  • 侯捷 C++ 课程学习笔记:从对象生命周期谈C++内存管理范式演进——侯捷C++课程学习启示录
  • Transformer原理
  • 数据结构篇——二叉树的存储与遍历
  • libnvdla_compiler.so: cannot open shared object file: No such file or directory
  • 在LwIP中,`tcp_recved()`、`tcp_sndbuf()` 和 `tcp_write()`三个函数详细用法及示例
  • 【QT】Q_GLOBAL_STATIC的使用
  • 免费提供多样风格手机壁纸及自动更换功能的软件
  • 高速网络包处理,基础网络协议上内核态直接处理数据包,XDP技术的原理
  • C#实现自己的Json解析器(LALR(1)+miniDFA)
  • 正则表达式详解(regular expression)
  • 数据库连接不上,端口号为0?
  • CLR 线程池
  • 构建第二个Django的应用程序
  • Androidstudio出现警告warning:意外的元素
  • 《Linux系统编程篇》Linux Socket 网络编程01 API介绍(Linux 进程间通信(IPC))——基础篇
  • 学习知识的心理和方法杂记-04
  • HTML和CSS基础
  • 殷墟出土鸮尊时隔50年首次聚首,北京新展“看·见殷商”
  • 人民网:激发博物馆创新活力,让“过去”拥有“未来”
  • 孟夏韵评《无序的学科》丨误读与重构的文化漂流
  • 3月中国减持189亿美元美债、持仓规模降至第三,英国升至第二
  • 从近200件文物文献里,回望光华大学建校百年
  • 博物馆日|为一个展奔赴一座城!上海171家博物馆等你来