当前位置: 首页 > news >正文

kafka入门(二)

Java客户端访问Kafka

引入maven依赖

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka‐clients</artifactId>
<version>2.4.1</version>
</dependency>

消息发送端代码

package com.tuling.kafka.kafkaDemo;import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;public class MsgProducer {private final static String TOPIC_NAME = "my‐replicated‐topic";public static void main(String[] args) throws InterruptedException, ExecutionException {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");// props.put(ProducerConfig.ACKS_CONFIG, "1");// props.put(ProducerConfig.RETRIES_CONFIG, 3);// props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);// props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);// props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// props.put(ProducerConfig.LINGER_MS_CONFIG, 10);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());Producer<String, String> producer = new KafkaProducer<>(props);int msgNum = 5;final CountDownLatch countDownLatch = new CountDownLatch(msgNum);for (int i = 1; i <= msgNum; i++) {Order order = new Order(i, 100 + i, 1, 1000.00);// 指定发送分区// ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, 0, order.getOrderId().toString(), JSON.toJSONString(order));// 未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNumProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, order.getOrderId().toString(), JSON.toJSONString(order));// 等待消息发送成功的同步阻塞方法// RecordMetadata metadata = producer.send(producerRecord).get();// System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"//         + metadata.partition() + "|offset-" + metadata.offset());// 异步回调方式发送消息producer.send(producerRecord, new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {System.err.println("发送消息失败:" + exception.getStackTrace());}if (metadata != null) {System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());}countDownLatch.countDown();}});// 送积分 TODO}countDownLatch.await(5, TimeUnit.SECONDS);producer.close();}
}

消息接收端代码

package com.tuling.kafka.kafkaDemo;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class MsgConsumer {private final static String TOPIC_NAME = "my‐replicated‐topic";private final static String CONSUMER_GROUP_NAME = "testGroup";public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(TOPIC_NAME));// 消费指定分区// consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));// 消息回溯消费/*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));*/// 指定offset消费/*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);*/// 从指定时间点开始消费/*List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);// 从1小时前开始消费long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;Map<TopicPartition, Long> map = new HashMap<>();for (PartitionInfo par : topicPartitions) {map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);}Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {TopicPartition key = entry.getKey();OffsetAndTimestamp value = entry.getValue();if (key == null || value == null) continue;Long offset = value.offset();System.out.println("partition-" + key.partition() + "|offset-" + offset);System.out.println();// 根据消费里的timestamp确定offsetif (value != null) {consumer.assign(Arrays.asList(key));consumer.seek(key, offset);}}*/while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("收到消息:partition = %d, offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), record.value());}/*if (records.count() > 0) {// 手动同步提交offset,当前线程会阻塞直到offset提交成功// 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了consumer.commitSync();// 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception != null) {System.err.println("Commit failed for " + offsets);System.err.printl

Spring Boot整合Kafka

引入spring boot kafka依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring‐kafka</artifactId></dependency>

application.yml配置如下:

server:port: 8080spring:kafka:bootstrap-servers: 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094producer:retries: 3batch-size: 16384buffer-memory: 33554432acks: 1key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: default-groupenable-auto-commit: falseauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:ack-mode: manual_immediate

发送者代码:

package com.kafka;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {private final static String TOPIC_NAME = "my‐replicated‐topic";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@RequestMapping("/send")public void send() {kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg");}
}

消费者代码:

package com.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;@Component
public class MyConsumer {/*** @KafkaListener(groupId = "testGroup", topicPartitions = {* @TopicPartition(topic = "topic1", partitions = {"0", "1"}),* @TopicPartition(topic = "topic2", partitions = "0",* partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))* }, concurrency = "6")* // concurrency 就是同组下的消费者个数,就是并发消费数,必须小于等于分区总数* @param record*/@KafkaListener(topics = "my‐replicated‐topic", groupId = "zhugeGroup")public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {String value = record.value();System.out.println(value);System.out.println(record);// 手动提交offsetack.acknowledge();}/* // 配置多个消费组@KafkaListener(topics = "my‐replicated‐topic", groupId = "tulingGroup")public void listenTulingGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {String value = record.value();System.out.println(value);System.out.println(record);ack.acknowledge();} */
}

相关文章:

  • Golang的文件上传与下载
  • 【北邮通信系统建模与仿真simulink笔记】(1)主要用到的模块库介绍
  • Linux:面试题
  • jenkins授权管理.
  • 界面控件DevExpress WinForms v24.2——PDF Viewer功能升级
  • 佰力博科技与您浅谈低温介电材料特性及应用分析
  • Matplotlib 高级进阶实战:多维度数据可视化组合图表
  • RK3588 IREE+Vulkan ResNet50推理测试
  • Hive drop column 的解决方法
  • Linux系统之traceroute命令详解:追踪网络路径的核心工具
  • docker常用指令总结
  • 嵌入式项目之交叉编译m2440篇
  • 深入探讨redis:哨兵模式
  • 【ISP算法精粹】什么是global tone mapping和local tone mapping?
  • 吃透 Golang 基础:数据结构之数组
  • 中级网络工程师知识点8
  • 【Linux笔记】——简单实习一个日志项目
  • AI编程辅助哪家强?深度解析主流AI编程工具的现状与未来-优雅草卓伊凡
  • 内核常见面试问题汇总
  • Mujoco 学习系列(二)基础功能与xml使用
  • 《远山淡影》改编电影入围戛纳关注单元,张怡微谈石黑一雄
  • 南宁海关辟谣网传“查获600公斤稀土材料”:实为焊锡膏
  • 上海青少年书法学习园开园:少年以巨笔书写《祖国万岁》
  • 2024年全国博物馆接待观众14.9亿人次
  • 体坛联播|热刺追平单赛季输球纪录,世俱杯或创收20亿美元
  • 大外交丨3天拿下数万亿美元投资,特朗普在中东做经济“加法”和政治“减法”