SpringBoot整合Kafka
SpringBoot整合Kafka
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。
Windows版本下载地址:https://kafka.apache.org/downloads
启动服务器
Kafka服务器的功能相当于RocketMQ中broker,kafka运行还需要一个类似于命名服务器的服务。在kafka安装目录中自带一个类似于命名服务器的工具,叫做zookeeper,他的作用是注册中心,相关知识请自行百度zookeeper,后续我也会补充zookeeper学习心得。
zookeeper-server-start.bat ..\..\config\zookeeper.properties #启动zookeeper
kafka-server-start.bat ..\..\config\server.properties #启动Kafka
运行bin目录下的windows目录下的zookeeper-server-start命令即可启动注册中心,默认对外服务端口:2181。
运行bin目录下的windows目录下的kafka-server-start命令即可启动kafka服务器,默认对外服务端口:9092。
创建主题
和之前操作其他MQ产品相似,Kafka也是基于主题操作,操作之前需要先初始化topic。
#创建topic
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
#查询topic
kafka-topics.bat --zookeeper 127.0.0.1:2181 --list
#删除topic
kafka-topic.bat --delete --zookeeper localhost:2181 --topic test
测试服务器启动状态
Kafka提供有一套测试服务器功能的测试程序,运行bin目录下的windows目录下的命令即可使用。
kafka-console-producer.bat --broker-list localhost:9092 --topic test #测试生产消费
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning #测试消息消费
Spring Boot整合Kafka
1、引入spring boot整合Kafka的依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2、配置Kafka的服务器地址
spring:
kafka:
bootstrap-servers: localhost:9092 #配置服务器地址
consumer:
group-id: order #配置消费组
3、使用KafkaTemplate操作Kafka
@Service
public class KafkaMessageServiceImpl implements MessageService{
@Autowried
private KafkaTemplate<String, String> template;
//同步消息生产推送
@Override
void sendMessage(String id){
System.out.println("准备推送订单消息,已纳入队列 :" + id);
//定义topic主题test
template.send( "test", id);
}
}
4、使用KafkaListener监听器监听消息,处理消息消费.
//消息监听器
@Component
class KafkaMessageListener {
//Kafka监听,监听对应的topic主题
@KafkaListener(topics="test")
void onMessage(ConsumerRecord<String, String> record){
//消费业务处理
System.out.println("已经完成业务消息消费,消费的数据是:" + record.value());
}
}