连接 kafka0.8.1.1 java
最近有个kafka0.8.1.1的版本需要连接取数据,找了很多库都不行
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
</dependency>
最后使用的这个,部分版本的jm和jmx的依赖jar拉不下来
在 仓库 手动下载并放到本地maven中
消费者代码
@Component
@Slf4j
public class KafkaConsumer {
@Value("${kafka-client.consumer.zookeeper}")
private final String zookeeper;
public KafkaConsumer(@Value("${kafka-client.consumer.zookeeper}") String zookeeper) {
this.zookeeper = zookeeper;
}
public void consume(String topic, String groupId) {
if (StrUtil.isEmpty(groupId)) {
groupId = StrUtil.format("{}_consumer_group", topic);
}
log.info("create kafka consumer zookeeper: {}", zookeeper);
log.info("gather topic:{}", topic);
List<String> rv = new ArrayList<>();
// 订阅 topic 并获取数据流
Properties props = new Properties();
props.put("zookeeper.connect", this.zookeeper);
props.put("group.id", groupId);
// props.put("zookeeper.connection.timeout.ms", "4000");
// props.put("zookeeper.sync.time.ms", "5000");
props.put("auto.offset.reset", "smallest");
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
// 读取消息并处理
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
for (final KafkaStream<byte[], byte[]> stream : streams) {
// 处理每个消息
for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) {
log.info("stop topic {}", messageAndMetadata.topic());
var path = StrUtil.format("{}_{}.txt", topic, groupId);
log.info("写入文件 {}", path);
FileUtil.writeLines(rv, new File(path), StandardCharsets.UTF_8);
String message = new String(messageAndMetadata.message());
rv.add(message);
System.out.println(topic + " Received: " + message.substring(0, 30));
}
}
}
public static void main(String[] args) {
String topic = "test"; // 设置你要消费的 topic
String zookeeper = "127.0.0.1:2181"; // Zookeeper 地址
String groupId = "test"; // 消费者组 ID
KafkaConsumer consumer = new KafkaConsumer(zookeeper);
consumer.consume(topic, groupId);
}
}
生产者
public class KafkaProducer {
@Value("${kafka-client.producer.brokerList}")
private String brokerList;
private Producer<String, String> producer;
public KafkaProducer(@Value("${kafka-client.producer.brokerList}") String brokers) {
this.brokerList = brokers;
log.info("Created KafkaProducer with brokerList: {}", brokerList);
Properties props = new Properties();
props.put("metadata.broker.list", brokerList); // Kafka broker 地址
props.put("serializer.class", "kafka.serializer.StringEncoder"); // 消息序列化方式
props.put("partitioner.class", "kafka.producer.DefaultPartitioner"); // 默认分区器
props.put("request.required.acks", "1"); // 消息的ack级别
ProducerConfig config = new ProducerConfig(props);
this.producer = new Producer<>(config);
}
public void producer(String topic, java.util.List<String> msgList) {
ArrayList<KeyedMessage<String, String>> keyedMessageList = new ArrayList<>();
for (String string : msgList) {
KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(topic, string);
keyedMessageList.add(keyedMessage);
}
List<KeyedMessage<String, String>> list = List.fromArray(keyedMessageList.toArray());
log.info("send kafka messages: {}", list);
producer.send(list);
}
public void close() {
producer.close();
producer = null;
}
public static void main(String[] args) {
KafkaProducer kafkaProducer = new KafkaProducer("127.0.0.1:9092");
for (int i = 0; i <= 11; i++) {
kafkaProducer.producer("alarm1", java.util.List.of("sadiashdihasihdasdas"+i));
}
}
}