当前位置: 首页 > news >正文

k8s 配置 Kafka SASL_SSL双重认证

说明

kafka提供了多种安全认证机制,主要分为SASLSSL两大类。

SASL: 是一种身份验证机制,用于在客户端和服务器之间进行身份验证的过程,其中SASL/PLAIN是基于账号密码的认证方式。
SSL: 是一种加密协议,用于在网络通信中提供数据的保密性和完整性。它使用公钥和私钥来建立安全的连接,并对传输的数据进行加密和解密,以防止未经授权的访问和篡改。
Kafka 中启用 SASL_SSL 安全协议时,SASL 用于客户端和服务器之间的身份验证,SSL 则用于加密和保护数据的传输。不仅提供身份验证,还提供加密和数据保护的功能。


要在 Kubernetes 中为 Kafka 配置 SASL_SSL,你需要先完成以下两大步骤:


第一部分:生成 SSL 证书用于 SASL_SSL(JKS 格式)

Kafka 使用 Java 的 Keystore/Truststore(.jks 文件)作为证书格式。

步骤 1:创建 CA 根证书

openssl req -new -x509 -keyout ca-key -out ca-cert -days 3650 -passout pass:123456 -subj "/CN=Kafka-CA"

在这里插入图片描述


步骤 2:为 Kafka 生成 keystore

keytool -genkeypair -keystore kafka.keystore.jks  -validity 365 -storepass 123456 -keypass 123456 -dname "CN=kafka"   -alias kafka  -keyalg RSA

在这里插入图片描述

步骤 3:创建证书签名请求(CSR)

keytool -keystore kafka.keystore.jks -alias kafka  -certreq -file kafka.csr -storepass 123456

在这里插入图片描述


步骤 4:用 CA 签名 Kafka 证书

openssl x509 -req -CA ca-cert -CAkey ca-key -in kafka.csr -out kafka-signed.crt -days 365 -CAcreateserial -passin pass:123456

在这里插入图片描述


步骤 5:将 CA 证书导入 Kafka keystore

keytool -keystore kafka.keystore.jks -alias CARoot -import -file ca-cert -storepass 123456 -noprompt

在这里插入图片描述


步骤 6:导入已签名 Kafka 证书

keytool -keystore kafka.keystore.jks -alias kafka -import -file kafka-signed.crt -storepass 123456

在这里插入图片描述


步骤 7:创建 truststore(客户端用)

keytool -keystore kafka.truststore.jks -alias CARoot -import -file ca-cert -storepass 123456 -noprompt

在这里插入图片描述


✅ 生成完成的文件:

  • kafka.keystore.jks:服务端使用(含私钥)
  • kafka.truststore.jks:客户端和服务端都使用(信任CA)
  • 密码:统一用 123456(你可以自定义)
    在这里插入图片描述

✅ 第二部分:Kafka 中配置 SASL_SSL

将上述文件配置到 Kafka 中(假设你在 Kubernetes 中运行):


🗂 配置 1:Kafka 环境变量(在 StatefulSet / Deployment 中)

env:# --- 控制器配置(KRaft 模式) ---- name: KAFKA_CFG_NODE_IDvalue: "0"  # 当前节点的唯一 ID(用于 controller quorum)- name: KAFKA_CFG_CONTROLLER_LISTENER_NAMESvalue: CONTROLLER  # 控制器监听使用的 listener 名称,需与 KAFKA_CFG_LISTENERS 中一致- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTER_CLIENT_QUOTA_WINDOW_NUMvalue: "10"  # 控制器选举客户端配额窗口数量(流控相关)- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTER_CLIENT_QUOTA_WINDOW_SIZE_SECONDSvalue: "1"  # 每个窗口的时长,单位秒- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTER_REQUEST_TIMEOUT_MSvalue: "5000"  # 控制器之间选举通信的请求超时时间(毫秒)- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERSvalue: "0@kafka:9093"  # controller 选举配置:nodeId@host:port# --- Kafka 常规配置 ---- name: KAFKA_AUTO_CREATE_TOPICS_ENABLEvalue: "true"  # 启用自动创建 topic(生产建议关闭)- name: KAFKA_ENABLE_KRAFTvalue: "YES"  # 启用 KRaft 模式(即不使用 ZooKeeper)# --- 节点角色定义 ---- name: KAFKA_CFG_PROCESS_ROLESvalue: "broker,controller"  # 当前节点同时担任 broker 和 controller 角色# --- Listener 与协议映射 ---- name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAPvalue: "CONTROLLER:SASL_PLAINTEXT,PLAINTEXT:SASL_SSL"  # 每个 listener 使用的安全协议# CONTROLLER 使用 SASL_PLAINTEXT,用于控制器通信# PLAINTEXT 实际绑定 SASL_SSL,用于 broker/client 通信(命名不影响协议)- name: KAFKA_INTER_BROKER_LISTENER_NAMEvalue: PLAINTEXT  # Kafka Broker 间通信使用的 listener 名称(上方定义)- name: KAFKA_CFG_LISTENERSvalue: PLAINTEXT://:9092,CONTROLLER://:9093   # Broker 和 Controller 的监听端口及协议标识- name: KAFKA_CFG_ADVERTISED_LISTENERSvalue: PLAINTEXT://192.168.1.5:9092   # Kafka 向客户端暴露的访问地址  # --- SSL 配置 ---- name: KAFKA_SSL_KEYSTORE_LOCATIONvalue: /bitnami/kafka/config/certs/kafka.keystore.jks  # 服务端密钥 + 证书文件路径- name: KAFKA_SSL_KEYSTORE_PASSWORDvalue: 123456  # keystore 文件访问密码- name: KAFKA_SSL_KEY_PASSWORDvalue: 123456   # keystore 内私钥使用的密码- name: KAFKA_SSL_TRUSTSTORE_LOCATIONvalue: /bitnami/kafka/config/certs/kafka.truststore.jks  # CA 证书文件路径(信任的客户端)- name: KAFKA_SSL_TRUSTSTORE_PASSWORDvalue: 123456  # truststore 文件访问密码- name: KAFKA_SSL_CLIENT_AUTHvalue: required  # 启用客户端证书校验(双向认证)# --- SASL 配置(PLAIN 机制)---- name: KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOLvalue: PLAIN  # 控制器间通信使用 PLAIN SASL 机制- name: KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOLvalue: PLAIN  # Broker 间通信使用 PLAIN SASL 机制# --- TLS 类型与证书密码 ---- name: KAFKA_TLS_TYPEvalue: JKS  # TLS 密钥文件类型(Java KeyStore)- name: KAFKA_CERTIFICATE_PASSWORDvalue: 123456# 证书统一使用的访问密码(用于 SSL 参数)# --- 监听器角色映射 ---- name: KAFKA_CFG_INTER_BROKER_LISTENER_NAMEvalue: SASL_SSL  # Broker 间通信使用的安全 listener 名称- name: KAFKA_CLIENT_LISTENER_NAMEvalue: SASL_SSL  # 客户端连接 Kafka 使用的 listener 名称# --- SASL 用户配置 ---- name: KAFKA_CONTROLLER_USERvalue: kafka  # 控制器之间通信使用的用户名- name: KAFKA_CONTROLLER_PASSWORDvalue: kafka123  # 控制器之间通信使用的密码- name: KAFKA_INTER_BROKER_USERvalue: kafka  # Broker 间通信使用的用户名- name: KAFKA_INTER_BROKER_PASSWORDvalue: kafka123  # Broker 间通信使用的密码- name: KAFKA_CLIENT_USERSvalue: kafka  # 允许连接的客户端用户名(多个用逗号分隔)- name: KAFKA_CLIENT_PASSWORDSvalue: kafka123  # 客户端对应密码,顺序与用户名保持一致

🗂 配置 2:Kubernetes 中挂载证书和 JAAS 文件

volumeMounts:- name: kafka-secretsmountPath: /bitnami/kafka/config/certsvolumes:- name: kafka-secretssecret:secretName: kafka-cert-secret- name: jaas-configconfigMap:name: kafka-jaas-config

你需要将 .jks 文件和密码打包为 Secret:

kubectl create secret generic kafka-cert-secret  -n 命名空间 --from-file=kafka.keystore.jks  --from-file=kafka.truststore.jks

✅ Kafka 客户端配置(示例)

yaml 配置文件

spring:kafka:bootstrap-servers: 192.168.1.5:9092 # Kafka Broker 的地址和端口listener:ack-mode: MANUAL_IMMEDIATE # 消费者手动提交消息确认(ack),立即提交(MANUAL_IMMEDIATE),适合需要精确控制 offset 提交时机的业务场景。consumer:custom-environment: dev  # 自定义字段,可用于 profile 配置(非 Spring Kafka 标准字段)auto-offset-reset: latest # 从最新消息开始消费(若无提交 offset)enable-auto-commit: false # 关闭自动提交,需手动调用 ack.acknowledge()#      auto-commit-interval: 2000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 50 # 每次 poll 最多拉取 50 条记录max-poll-interval-ms: 600000 # 最大处理时间 10 分钟,超时视为挂掉producer:retries: 0 # 不重试(可调高)batch-size: 16384 # 每批 16KB,达到此大小或 linger.ms 超时才发送buffer-memory: 33554432 # 缓冲区大小# String 类型键值序列化key-serializer: org.apache.kafka.common.serialization.StringSerializer #value-serializer: org.apache.kafka.common.serialization.StringSerializerssl:# Kafka 客户端验证服务端证书是否可信(客户端信任的 CA 证书放在 truststore 中)# .jks 文件必须放在 resources/ 目录下并打包到 classpathtrust-store-location: classpath:kafka.truststore.jkstrust-store-password: 123456properties:sasl:mechanism: PLAIN # 使用 SASL/PLAIN 机制进行身份验证jaas:# 此处填写 SASL登录时分配的用户名密码(注意password结尾;)# 此处用户名 kafka 和密码 kafka123 必须与服务端 Kafka 设置的 KAFKA_CLIENT_USERS 一致config: org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka123"; security:protocol: SASL_SSL # 通信使用 SASL 认证 + SSL 加密ssl:endpoint:identification:algorithm: "" # 关闭主机名验证,否则会因 SAN 缺失导致 SSL 握手失败(Java 默认开启)# ssl.endpoint.identification.algorithm=# producer.ssl.endpoint.identification.algorithm=# consumer.ssl.endpoint.identification.algorithm= 

生产者

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class KafkaProducerSaslSslExample {public static void main(String[] args) {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-kafka-broker:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 配置SASL认证方式为SASL_SSLprops.put(ProducerConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");props.put(ProducerConfig.SASL_MECHANISM, "PLAIN"); // 或者其他支持的SASL机制// 配置Kerberos认证所需的相关参数props.put(ProducerConfig.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<your-username>\" password=\"<your-password>\";");Producer<String, String> producer = new KafkaProducer<>(props);// 生产者使用示例producer.send(new ProducerRecord<>("your-topic", "message-key", "message-value"), (metadata, exception) -> {if (exception == null) {System.out.println("消息发送成功");} else {exception.printStackTrace();}});producer.close();}
}

注意
在这个示例中,我们配置了 Kafka 生产者所需的基本参数,并通过ProducerConfig.SECURITY_PROTOCOL_CONFIG 指定了安全协议为 SASL_PLAINTEXT。然后,我们设置了 SASL_MECHANISM_CONFIGPLAIN 并提供了 JAAS 配置 (SASL_JAAS_CONFIG),其中包含了用于连接到 Kafka 集群的用户名和密码。

请确保将 <your-username>, <your-password>, kafka-broker1:9092, kafka-broker2:9092, 和 your-topic 替换为你的实际用户名、密码、Kafka 代理地址、主题名称。
你的Kafka集群已经配置了SSL和SASL认证,并且相关的安全设置是正确的

消费者

@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.auto-offset-reset}")private String autoOffsetReset;@Value("${spring.kafka.consumer.enable-auto-commit}")private String enableAutoCommit;@Value("${spring.kafka.consumer.max-poll-records}")private String maxPollRecords;@Value("${spring.kafka.consumer.max-poll-interval-ms}")private String maxPollIntervalMs;@Beanpublic ConsumerFactory<String, String> custConsumerConfigFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);props.put("security.protocol", "SASL_SSL");props.put("sasl.mechanism", "PLAIN");props.put("ssl.endpoint.identification.algorithm", "");props.put("consumer.ssl.endpoint.identification.algorithm", "");props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"kafka\" password=\"kafka123\";");// SSL配置props.put("ssl.truststore.location", "D:\\code\\ideaprojects\\zhubay-test\\src\\main\\resources\\kafka.truststore.jks");props.put("ssl.truststore.password", "123456");return new DefaultKafkaConsumerFactory<>(props);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> daConsumerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(custConsumerConfigFactory());factory.setBatchListener(true); // 启用批量消费factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}
}

在这个示例中,我们配置了KafkaConsumer以使用SASL_SSL协议进行通信,并且指定了SASLPLAIN认证机制。我们还需要指定SSL的信任库和密钥库的位置以及它们的密码。sasl.jaas.config 属性中应该包含有效的JAAS配置,它定义了用于认证的用户名和密码。

相关文章:

  • 在tensorflow源码环境里,编译出独立的jni.so,避免依赖libtensorflowlite.so,从而实现apk体积最小化
  • Oracle 11g post PSU Oct18 设置ssl连接(使用wallets)
  • linux crontab定时执行python找不到module问题解决
  • 实现图片自动压缩算法,canvas压缩图片方法
  • Fiddler抓包教程->HTTP和HTTPS基础知识
  • 《算法笔记》11.4小节——动态规划专题->最长公共子序列(LCS) 问题 A: 最长公共子序列
  • [Web服务器对决] Nginx vs. Apache vs. LiteSpeed:2025年性能、功能与适用场景深度对比
  • 双指针法高效解决「移除元素」问题
  • 机器学习10-随机森林
  • [SpringBoot]Spring MVC(5.0)----留言板
  • 算法与数据结构:质数、互质判定和裴蜀定理
  • React 常见的陷阱之(如异步访问事件对象)
  • AI驱动发展——高能受邀参加华为2025广东新质生产力创新峰会
  • 榕壹云上门家政系统:基于Spring Boot+MySQL+UniApp的全能解决方案
  • uniapp如何设置uni.request可变请求ip地址
  • 高等数学笔记——向量代数与空间解析几何1
  • [概率论基本概念1]什么是经验分布
  • 蓝桥杯2114 李白打酒加强版
  • 塔式服务器都有哪些重要功能?
  • 大型商业综合体AI智能保洁管理系统:开启智能保洁新时代
  • 美发布“金穹”导弹防御系统发展规划
  • B站一季度净亏损收窄99%:游戏营收大增76%,AI类广告收入增近4倍
  • 英欧再“牵手”,友好“靠美国”
  • 一座与人才共成长的理想之城,浙江嘉兴为何如此吸引人?
  • 陈龙带你观察上海生物多样性,纪录片《我的城市邻居》明播出
  • 2025吉林市马拉松开跑,用赛道绘制“博物馆之城”动感地图