当前位置: 首页 > news >正文

kafak


一、普通使用

生产者
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerExample {
public static void main(String[] args) {
// 1. 配置生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all"); // 确保消息持久化[6](@ref)
props.put("retries", 3); // 重试次数

        // 2. 创建生产者实例
Producer<String, String> producer = new KafkaProducer<>(props);

        // 3. 发送消息(异步回调)
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key1", "Hello Kafka!");
producer.send(record, (metadata, e) -> {
if (e == null) {
System.out.printf("✅ 消息发送成功: topic=%s, partition=%d, offset=%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
e.printStackTrace();
}
});

        // 4. 关闭连接
producer.close();
}
}

消费者
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
public static void main(String[] args) {
// 1. 配置消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group"); // 消费者组ID[6](@ref)
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest"); // 从最早消息开始消费

        // 2. 创建消费者实例
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic")); // 订阅主题

        // 3. 持续拉取消息
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("📩 收到消息: key=%s, value=%s (partition=%d, offset=%d)%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
} finally {
consumer.close(); // 确保资源释放
}
}
}


2、SpringBoot环境下使用

生产者的配置与使用
@Configuration
public class KafkaConfig {
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
}
}

@Service
public class MessageService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message); // 发送消息[4](@ref)
}
}


消费者的配置与使用

@Configuration
@EnableKafka
public class ConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "spring-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}

    @Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

@Service
public class MessageListener {
@KafkaListener(topics = "test-topic", groupId = "spring-group")
public void listen(String message) {
System.out.println("🔔 Spring消费消息: " + message); // 自动监听主题[4,6](@ref)
}
}

http://www.dtcms.com/a/321627.html

相关文章:

  • 经常问的14000
  • HTML5 Web Workers 深度剖析:助力网页性能飞速提升
  • imx6ull-驱动开发篇14——原子操作
  • FFmpeg 视频旋转信息处理:3.4 vs 7.0.2
  • 开发避坑指南(22):Vue3响应式编程中this绑定机制与解决方案
  • C++ 部署LSTM(.onnx)
  • 大模型中的核心参数temperature 您知道是什么东东吗?
  • KEIL 环境下 printf 导致程序无法执行的解决方案
  • GPT5评测对比与使用
  • 2025年城市建设与智慧交通国际会议(ICUCIT 2025)
  • OpenAI重磅开源回归!GPT-OSS-120B/20B登陆星辰MaaS
  • 【长度最小的子数组】
  • C++ 红黑树实现详解:理论+代码+图解
  • 主流多模态大模型使用总结
  • GPT-5测评:AI新纪元的开启还是炒作?
  • 【SpringBoot】01 基础入门-SpringBoot2:从核心技术到响应式编程
  • Jenkins自动化构建部署Java、Web前后端项目
  • 使用Python将中文语音翻译成英语音频
  • 达梦DISQL执行SQL和SQL脚本
  • 医疗数据中台架构实战:Java实现高可用、低耦合的数据治理方案
  • 30人大型视频会议设备清单
  • 零基础小白如何使用QGIS制作研究区地形区位图教程
  • 参数服务器 server and client
  • 一文可视化分析2025年6月计算机视觉顶刊IJCV前沿热点
  • 满足高性能AI服务器的企业SSD有哪些?三星PM1743与Solidigm PS1010
  • Ⅹ—6.计算机二级综合题27---30套
  • 研发流程管理经验分享
  • 部署ELK8.18对日志进行收集、展示
  • 1Panel Agent 证书绕过实现远程命令执行漏洞复现(CVE-2025-54424)
  • 【Spring Boot 快速入门】八、登录认证