当前位置: 首页 > news >正文

springboot 整合spring-kafka客户端:SASL_SSL+PLAINTEXT方式

有关kafka的部署可以参考前面的文章:Kafka KRaft + SSL _SASL/PLAIN 部署文档

先确定最新版kafka_2.13-4.0.0的配:server.properties

# 节点角色配置
process.roles=broker,controller
node.id=1# 监听器配置
listeners=PLAINTEXT://:9092,CONTROLLER://:9093,SSL://:9094,SASL_SSL://:9095
advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9094,SASL_SSL://localhost:9095
controller.listener.names=CONTROLLER
inter.broker.listener.name=SSL
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_SSL:SASL_SSL# 控制器配置
controller.quorum.voters=1@localhost:9093# 日志配置
log.dirs=D:/kafka_2.13-4.0.0/logs# 网络配置
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600# 日志保留策略
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000# 分区配置
num.partitions=1
default.replication.factor=1
offsets.topic.replication.factor=1# 元数据配置
metadata.log.dir=D:/kafka-logs
metadata.log.segment.bytes=1073741824
metadata.max.retention.bytes=-1
metadata.max.retention.ms=604800000# 安全配置
# security.inter.broker.protocol=SSL
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
allow.everyone.if.no.acl.found=true# SSL 配置 D:\kafka_2.13-4.0.0\config\ssl\kafka.server.keystore.jks
ssl.keystore.location=D:/kafka_2.13-4.0.0/config/ssl/kafka.server.keystore.jks
ssl.keystore.password=kafka123
ssl.key.password=kafka123
ssl.truststore.location=D:/kafka_2.13-4.0.0/config/ssl/kafka.server.truststore.jks
ssl.truststore.password=kafka123
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.3
ssl.endpoint.identification.algorithm=HTTPS
ssl.secure.random.implementation=SHA1PRNG

以及:jaas.conf

确保kafka正常启动

可以使用以下命令查看kafka中的topic,向某个topic发送消息,查看kafka中的消息:

kafka-topics.bat --bootstrap-server localhost:9092 --list
kafka-console-producer.bat --bootstrap-server localhost:9092 --topic demo-topic
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic demo-topic --from-beginning

然后引入spring-kafka相关依赖包:

        <!-- Spring Kafka Starter --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Apache Kafka Clients (可选) --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version> <!-- 使用你需要的版本 --></dependency>

接着在项目配置文件中加入如下配置:

# Kafka configuration PLAINTEXT
#spring.kafka.bootstrap-servers=localhost:9092
#
#spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#
#spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.consumer.auto-offset-reset=earliest
#spring.kafka.consumer.group-id=my-group# Kafka configuration SASL_SSL
spring.kafka.bootstrap-servers=localhost:9095spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=my-group

重要的一步,还需要将kafa证书导入到java 的信任库

首先cd 到kafka目录:D:\kafka_2.13-4.0.0\config\ssl
使用命令查看truststore.jks 文件中的证书别名:

 keytool -list -keystore kafka.server.truststore.jks -storepass kafka123

在从truststore.jks文件中导出证书:

keytool -exportcert -alias kafka -keystore kafka.server.truststore.jks -storepass kafka123 -file kafka.crt

最后使用命令将证书导入信任库:这一步 需要以管理员的身份运行cmd

keytool -importcert -alias kafka-server -file kafka.crt -cacerts -storepass changeit

到这里所有的配置就完成了

下面我们在springboot项目中分别加入一个生产者和消费者测试发送和消费消息:

@Service
public class KafkaProducerService {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);System.out.println("Sent message: " + message);}
}
@Service
public class KafkaConsumerService {@KafkaListener(topics = "demo-topic", groupId = "my-group")public void listen(String message) {System.out.println("Received message: " + message);}
}

通过请求发送:

@RestController
public class KafkaController {@AutowiredKafkaProducerService kafkaProducerService;@GetMapping("/kafka")public Map<String, String> kafka(@RequestParam String message) {kafkaProducerService.sendMessage("demo-topic", message != null ? message : "Hello from Spring Boot Kafka!");return Map.of("message", "Sent");}
}

发送消息:

成功接收消费到消息:

http://www.dtcms.com/a/284724.html

相关文章:

  • mongodb 入门级别操作
  • Unity VR多人手术模拟恢复2:客户端移动同步问题分析与解决方案
  • jeecgbootvue3使用封装组件注意事项
  • 学习 Flutter (四):玩安卓项目实战 - 中
  • 【WPF】WPF 自定义控件之依赖属性
  • Matlab2025a软件安装|详细安装步骤➕安装文件|附下载文件
  • Mask2Former,分割新范式
  • Kafka 控制器(Controller)详解:架构、原理与实战
  • Python23 —— 标准库(time库)
  • c++列表初始化
  • Dijkstra 算法求解多种操作
  • Stone3D教程:免编码制作在线家居生活用品展示应用
  • 【初始Java】
  • mysql中where字段的类型转换
  • (转)Kubernetes基础介绍
  • SQL增查
  • Windows下odbc配置连接SQL Server
  • .Net将控制台的输出信息存入到日志文件按分钟生成日志文件
  • 【JavaEE进阶】使用云服务器搭建Linux环境
  • Java网络通信:UDP和TCP
  • 关于CDH以及HUE的介绍
  • vue-seo优化
  • Android构建流程与Transform任务
  • 题解:P13311 [GCJ 2012 Qualification] Speaking in Tongues
  • java面向对象-多态
  • 【前端】Power BI自动化指南:从API接入到Web嵌入
  • 旅游管理实训基地建设:筑牢文旅人才培养的实践基石
  • LeetCode热题100—— 238. 除自身以外数组的乘积
  • Pygame创建窗口教程 - 从入门到实践 | Python游戏开发指南
  • 小白学Python,网络爬虫篇(1)——requests库