kafak
一、普通使用
生产者
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
// 1. 配置生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all"); // 确保消息持久化[6](@ref)
props.put("retries", 3); // 重试次数
// 2. 创建生产者实例
Producer<String, String> producer = new KafkaProducer<>(props);
// 3. 发送消息(异步回调)
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key1", "Hello Kafka!");
producer.send(record, (metadata, e) -> {
if (e == null) {
System.out.printf("✅ 消息发送成功: topic=%s, partition=%d, offset=%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
e.printStackTrace();
}
});
// 4. 关闭连接
producer.close();
}
}
消费者
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
// 1. 配置消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group"); // 消费者组ID[6](@ref)
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest"); // 从最早消息开始消费
// 2. 创建消费者实例
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic")); // 订阅主题
// 3. 持续拉取消息
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("📩 收到消息: key=%s, value=%s (partition=%d, offset=%d)%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
} finally {
consumer.close(); // 确保资源释放
}
}
}
2、SpringBoot环境下使用
生产者的配置与使用
@Configuration
public class KafkaConfig {
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
}
}
@Service
public class MessageService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message); // 发送消息[4](@ref)
}
}
消费者的配置与使用
@Configuration
@EnableKafka
public class ConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "spring-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
@Service
public class MessageListener {
@KafkaListener(topics = "test-topic", groupId = "spring-group")
public void listen(String message) {
System.out.println("🔔 Spring消费消息: " + message); // 自动监听主题[4,6](@ref)
}
}