当前位置: 首页 > news >正文

单机Kafka配置ssl并在springboot使用

目录

  • SSL证书
    • 生成根证书
    • 生成服务端和客户端证书
      • 生成keystore.jks和truststore.jks
      • 辅助脚本
      • 单独生成truststore.jks
  • 环境配置
    • hosts文件
    • kafka server.properties配置ssl
  • 启动kafka
  • kafka基础操作
  • springboot集成
    • 准备工作
    • 需要配置的文件
    • 开始消费

SSL证书

证书主要包含两大类,一个是根证书,用于签发和认证证书。其他证书可以用同一个根证书签发,也可以用不同的根证书签发各自的证书,使用同一个的话比较方便管理,这样所有节点的trust可以公用,即只需要生成一次,其他节点复制就可以。
整个证书生成过程大概如下图:
在这里插入图片描述
最终用于认证的是keystore.jks和truststore.jks,两个证书的作用分别是:
keystore.jks:证明自己的身份,自己的keystore.jks是由别人truststore.jks包含的ca-cert.pem签发就可以证明
truststore.jks:认证别人是否可信,看到别人的keystore.jks里有自己truststore.jks包含的ca-cert.pem就认为可信
这里要注意的是,如果要信任别人,就要在truststore中导入别人的根证书,这里是因为用的同一个根证书签发,所以导入的根证书一样,否则应该交叉导入,也就是客户端导入服务端的,服务端导入客户端的。

生成根证书

生成根证书包含流程图的第一步,这时会生成根证书和他的私钥,命令脚本如下:

#!/bin/bashset -e# === 配置部分 需要根据自己的实际情况进行调整===
#根证书
CA_CERT="ca-cert.pem"
#根证书私钥
CA_KEY="ca-key.pem"
#有效期
VALIDITY=365
#subj
SUBJ="/CN=KafkaCA"
# 检查目录
if [ ! -d "/usr/ca/ssl" ]; thenmkdir -p /usr/ca/sslchmod 700 /usr/ca/ssl
ficd /usr/ca/ssl
# 检查文件是否已存在
if [ -f "$CA_CERT" ] || [ -f "$CA_KEY" ]; thenecho "错误: CA证书或私钥已存在,请先删除或备份现有文件"exit 1
fi#正式生成证书,以下内容可以不用调整
echo "=== 步骤 1: 生成自签名 CA ==="
openssl req -new -x509 \-keyout $CA_KEY \-out $CA_CERT \-days $VALIDITY \-nodes \-subj $SUBJ# 设置文件权限
chmod 600 "$CA_KEY"

有效命令其实就是最后一句,其他的是因为放在脚本中所以进行一些通用配置,方便复用

生成服务端和客户端证书

这个阶段包含流程图的2-7,在都用同一个CA的情况下,步骤7只需要执行一次,然后复制到所有需要用到的计算机中就可以。

生成keystore.jks和truststore.jks

#!/bin/bashset -e# === 配置部分 需要根据自己的实际情况进行调整===
ALIAS="kafka"
KEYSTORE="keystore.jks"
TRUSTSTORE="truststore.jks"
CSR="sign.csr"
SIGN="signed.crt"
STOREPASS="123456"
KEYPASS="654321"
DNAME="CN=localhost, OU=IT, O=Kafka, L=City, S=State, C=CN"
VALIDITY=365
#上一步生成的根证书
CA_CERT="/usr/ca/ssl/ca-cert.pem"
CA_KEY="/usr/ca/ssl/ca-key.pem"
# 检查目录
if [ ! -d "/usr/ca/ssl" ]; thenecho "错误: 目录 /usr/ca/ssl 不存在"exit 1
ficd /usr/ca/ssl#正式生成各个证书,以下内容可以不用调整
echo "=== 步骤 1: 生成Keystore证书和私钥 ==="
keytool -genkeypair \-alias $ALIAS \-keyalg RSA \-keysize 2048 \-validity $VALIDITY \-keystore $KEYSTORE \-storepass $STOREPASS \-keypass $KEYPASS \-dname "$DNAME"echo "=== 步骤 2: 生成证书签名请求 (CSR) ==="
keytool -keystore $KEYSTORE \-alias $ALIAS \-certreq \-file $CSR \-storepass $STOREPASSecho "=== 步骤 3: 使用 CA 签名证书 ==="
openssl x509 -req \-CA $CA_CERT \-CAkey $CA_KEY \-in $CSR \-out $SIGN \-days $VALIDITY \-CAcreateserialecho "=== 步骤 4: 将 CA 根证书导入Keystore ==="
keytool -keystore $KEYSTORE \-alias CARoot \-import -file $CA_CERT \-storepass $STOREPASS -nopromptecho "=== 步骤 5: 将签名证书导入 Keystore ==="
keytool -keystore $KEYSTORE \-alias $ALIAS \-import -file $SIGN \-storepass $STOREPASS -noprompt#使用同一个CA证书,在多个计算机使用时,下面这步可以只执行一次,每次新生成也不影响
echo "=== 步骤 6: 创建Truststore(导入 CA 根证书) ==="
keytool -keystore $TRUSTSTORE \-alias CARoot \-import -file $CA_CERT \-storepass $STOREPASS -noprompt

辅助脚本

如果正在生成服务端证书,需要把相关证书配置到server.properties可以在上面脚本中增加一下内容:

#顺便生成后续Kafka要配置的内容,直接复制到server.properties文件
cat <<EOF############ SSL 配置 - server.properties 中添加 ############listeners=SSL://:9092
#下面的localhost需要改成ip,否则只有自己能连上
advertised.listeners=SSL://localhost:9092
security.inter.broker.protocol=SSL
ssl.endpoint.identification.algorithm=
ssl.keystore.location=$(pwd)/$KEYSTORE
ssl.keystore.password=$STOREPASS
ssl.key.password=$KEYPASS
ssl.truststore.location=$(pwd)/$TRUSTSTORE
ssl.truststore.password=$STOREPASS
#这里配置成双向认证
ssl.client.auth=required
# 不验证客户端证书
#ssl.client.auth=none  #############################################################EOF

如果是为客户端生成证书,可以增加一下内容:

#如果是客户端就增加使用以下脚本生成的文件去执行Kafka相关命令
echo "=== 创建 Kafka 客户端配置文件 client.properties ==="
cat <<EOF > client.properties
security.protocol=SSL
ssl.truststore.location=$(pwd)/$TRUSTSTORE
ssl.truststore.password=$STOREPASS
ssl.endpoint.identification.algorithm=
group.id=test-group
#如果单向认证就不用添加下面三个配置
ssl.keystore.location=$(pwd)/$KEYSTORE
ssl.keystore.password=$STOREPASS
ssl.key.password=$KEYPASS
EOF

单独生成truststore.jks

#!/bin/bashset -e
# === 配置部分 需要根据自己的实际情况进行调整===
TRUSTSTORE="truststore.jks"
STOREPASS="123456"
#信任的根证书
CA_CERT="/usr/ca/ssl/ca-cert.pem"
echo "=== 步骤 6: 创建Truststore(导入 CA 根证书) ==="
keytool -keystore $TRUSTSTORE \-alias CARoot \-import -file $CA_CERT \-storepass $STOREPASS -noprompt

环境配置

hosts文件

文件位置:
Windows hosts:C:\Windows\System32\drivers\etc\hosts
Linux hosts:/etc/hosts
添加内容:公网IP kafka-single
如果是本机使用也可以直接用内弯IP

kafka server.properties配置ssl

把生成jks那步输出的内容增加到server.properties中就可以,如果不是第一次配置,就只增加自己需要配置的内容即可。或者没有增加辅助脚本的话,直接把下面内容中keystore和truststore的位置手动替换一下就行:

#下面两项原来如果已经配置过就不要重复
listeners=SSL://:9092
#下面的localhost需要改成ip,否则只有自己能连上
advertised.listeners=SSL://localhost:9092security.inter.broker.protocol=SSL
ssl.endpoint.identification.algorithm=
ssl.keystore.location=这里替换成keystore的路径
ssl.keystore.password=keystore的密码
ssl.key.password=key的密码
ssl.truststore.location=这里替换成truststore的路径
ssl.truststore.password=truststore的密码#这里配置成双向认证
ssl.client.auth=required
# 不验证客户端证书
#ssl.client.auth=none  

启动kafka

cd到kafka安装路径下可以直接执行下面命令,或者使用绝对路径
/bin/zookeeper-server-start.sh -daemon /config/zookeeper.properties
/bin/kafka-server-start.sh -daemon /config/server.properties

kafka基础操作

client.properties的路径要换成自己的
创建topic

 bin/kafka-topics.sh   --create --bootstrap-server kafka:9092 --replication-factor 1 --partitions 1 --topic test-topic  --command-config /usr/local/kafka/ssl/client.properties

查看topic

 bin/kafka-topics.sh --list --bootstrap-server kafka:9092  --command-config /usr/ca/ssl-server/ssl-client/client-ssl.properties

生成消息

 bin/kafka-console-producer.sh  --bootstrap-server  kafka:9092 --topic test-topic --producer.config /usr/local/kafka/ssl/client.properties

springboot集成

准备工作

1、生成客户端keystore.jks和truststore.jks
这时候spring程序作为客户端,所以需要为他生成一个keystore.jks和truststore.jks,然后放到项目或别的位置,配置到项目中用来认证。如果只是暂时测试一下是否能连通,也可以讨巧,直接用服务端的同一套keystore.jks和truststore.jks,但是这样的操作不能用到正式中。

2、修改项目所在计算机的hosts文件

需要配置的文件

pom:引入kafka依赖,有说需要版本对应的,我直接没有指定版本也是可以的

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

yml:增加Kafka配置项,当然也可以放到代码里

spring:kafka:#kafka代理地址bootstrap-servers: kafka:9092ssl:protocol: SSL###服务端证书配置的时候设置的密码#broke对client的认证,ssl.client.auth=required时需要key的配置key-store-location: classpath:/certs/keystore.jkskey-store-password: 123456key-password: 654321#client对broke的认证trust-store-password: 123456trust-store-location: classpath:/certs/truststore.jkskey-store-type: JKS#不验证主机名  properties:ssl:endpoint:identification:algorithm: ''security:protocol: SSL#认证的配置就到这里了,下面的配置可以根据自己的习惯配置#消息发送失败重试次数producer:retries: 0# 指定消息key和消息体的编解码方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: consumer-dev-groupauto-offset-reset: earliestenable-auto-commit: falsemax-poll-records: 30# 指定消息key和消息体的编解码方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:ack-mode: manualtype: batchconcurrency: 1#kafka监听的topic和group
report:kafka:#接收kafka消息的topic和groupproducerTopic: test-topicreportGroup: test-group

开始消费

如果需要生成可以找别的教程,我直接通过命令进行生产的,只是写了一个消费(后面有时间可以补上)


import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;import java.util.List;@Slf4j
@Component
public class Consumer {@KafkaListener(topics = "${report.kafka.producerTopic}", groupId = "${report.kafka.reportGroup}")public void reportConsumer(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {log.info("---------- 从Kafka上接收消息 -----");for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {log.info("offset是" + consumerRecord.offset() + "," + consumerRecord.partition());String value = consumerRecord.value();System.out.println("接到的内容:"+value);//具体的业务处理逻辑可以写在后面}// 手动批量ackack.acknowledge();log.info("kafka提交成功");}}

相关文章:

  • Spring MVC-面试题(33)
  • 「二叉搜索树·手撕暴走篇」:用C++《一路向北》狂写指针のの死亡轮盘!
  • C++成员对象和封闭类
  • 在ubuntu 24安装 postgresql 17 (源码安装)
  • 【Python数据库全栈指南】从SQL到ORM深度实践
  • 深入浅出IIC协议 - 从总线原理到FPGA实战开发 -- 第五篇:多主仲裁与错误恢复
  • 【编程语言】【Python】一篇文章搭建python知识体系
  • 数据保护与通讯安全
  • 基于 STC89C52 的养殖场智能温控系统设计与实现
  • 基于S7-1200 PLC与MM440变频器的速冻库制冷控制系统设计与实现
  • 算法打卡第五天
  • CMake指令:option()
  • vue-table-print 一个强大的Vue 3表格打印工具,支持ElementPlus、Ant Design Vue等主流UI组件库。
  • Windows逆向工程提升之IMAGE_IMPORT_DESCRIPTOR
  • 【Django DRF】一篇文章总结Django DRF框架
  • LabVIEW与SQLServer2019换计算机重新安装数据库
  • LET 2025盛大开幕!数智工厂×智慧物流×机器人,一展get创新科技
  • ConceptAttention:Diffusion Transformers learn highly interpretable features
  • 【LaTex】基础语法入门
  • Maven打包SpringBoot项目,因包含SpringBootTest单元测试和Java预览版特性导致打包失败
  • 免费制作自己的网站长/外贸平台排行榜前十名
  • 深圳网站建设黄浦网络 骗子/谷歌广告上海有限公司
  • php网站开发专业/如何快速搭建网站
  • 重庆渝中区企业网站建设哪家好/关键词挖掘ppt
  • asp技术网站开发案例/seo网站关键词优化哪家好
  • 网站建设商标保护/企业宣传视频