k8s 配置 Kafka SASL_SSL双重认证
说明
kafka
提供了多种安全认证机制,主要分为SASL
和SSL
两大类。
SASL: 是一种身份验证机制,用于在客户端和服务器之间进行身份验证的过程,其中SASL/PLAIN是基于账号密码的认证方式。
SSL: 是一种加密协议,用于在网络通信中提供数据的保密性和完整性。它使用公钥和私钥来建立安全的连接,并对传输的数据进行加密和解密,以防止未经授权的访问和篡改。
在 Kafka
中启用 SASL_SSL
安全协议时,SASL
用于客户端和服务器之间的身份验证,SSL
则用于加密和保护数据的传输。不仅提供身份验证,还提供加密和数据保护的功能。
要在 Kubernetes 中为 Kafka 配置 SASL_SSL,你需要先完成以下两大步骤:
第一部分:生成 SSL 证书用于 SASL_SSL(JKS 格式)
Kafka 使用 Java 的 Keystore/Truststore(.jks
文件)作为证书格式。
步骤 1:创建 CA 根证书
openssl req -new -x509 -keyout ca-key -out ca-cert -days 3650 -passout pass:123456 -subj "/CN=Kafka-CA"
步骤 2:为 Kafka 生成 keystore
keytool -genkeypair -keystore kafka.keystore.jks -validity 365 -storepass 123456 -keypass 123456 -dname "CN=kafka" -alias kafka -keyalg RSA
步骤 3:创建证书签名请求(CSR)
keytool -keystore kafka.keystore.jks -alias kafka -certreq -file kafka.csr -storepass 123456
步骤 4:用 CA 签名 Kafka 证书
openssl x509 -req -CA ca-cert -CAkey ca-key -in kafka.csr -out kafka-signed.crt -days 365 -CAcreateserial -passin pass:123456
步骤 5:将 CA 证书导入 Kafka keystore
keytool -keystore kafka.keystore.jks -alias CARoot -import -file ca-cert -storepass 123456 -noprompt
步骤 6:导入已签名 Kafka 证书
keytool -keystore kafka.keystore.jks -alias kafka -import -file kafka-signed.crt -storepass 123456
步骤 7:创建 truststore(客户端用)
keytool -keystore kafka.truststore.jks -alias CARoot -import -file ca-cert -storepass 123456 -noprompt
✅ 生成完成的文件:
kafka.keystore.jks
:服务端使用(含私钥)kafka.truststore.jks
:客户端和服务端都使用(信任CA)- 密码:统一用
123456
(你可以自定义)
✅ 第二部分:Kafka 中配置 SASL_SSL
将上述文件配置到 Kafka 中(假设你在 Kubernetes 中运行):
🗂 配置 1:Kafka 环境变量(在 StatefulSet / Deployment 中)
env:# --- 控制器配置(KRaft 模式) ---- name: KAFKA_CFG_NODE_IDvalue: "0" # 当前节点的唯一 ID(用于 controller quorum)- name: KAFKA_CFG_CONTROLLER_LISTENER_NAMESvalue: CONTROLLER # 控制器监听使用的 listener 名称,需与 KAFKA_CFG_LISTENERS 中一致- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTER_CLIENT_QUOTA_WINDOW_NUMvalue: "10" # 控制器选举客户端配额窗口数量(流控相关)- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTER_CLIENT_QUOTA_WINDOW_SIZE_SECONDSvalue: "1" # 每个窗口的时长,单位秒- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTER_REQUEST_TIMEOUT_MSvalue: "5000" # 控制器之间选举通信的请求超时时间(毫秒)- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERSvalue: "0@kafka:9093" # controller 选举配置:nodeId@host:port# --- Kafka 常规配置 ---- name: KAFKA_AUTO_CREATE_TOPICS_ENABLEvalue: "true" # 启用自动创建 topic(生产建议关闭)- name: KAFKA_ENABLE_KRAFTvalue: "YES" # 启用 KRaft 模式(即不使用 ZooKeeper)# --- 节点角色定义 ---- name: KAFKA_CFG_PROCESS_ROLESvalue: "broker,controller" # 当前节点同时担任 broker 和 controller 角色# --- Listener 与协议映射 ---- name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAPvalue: "CONTROLLER:SASL_PLAINTEXT,PLAINTEXT:SASL_SSL" # 每个 listener 使用的安全协议# CONTROLLER 使用 SASL_PLAINTEXT,用于控制器通信# PLAINTEXT 实际绑定 SASL_SSL,用于 broker/client 通信(命名不影响协议)- name: KAFKA_INTER_BROKER_LISTENER_NAMEvalue: PLAINTEXT # Kafka Broker 间通信使用的 listener 名称(上方定义)- name: KAFKA_CFG_LISTENERSvalue: PLAINTEXT://:9092,CONTROLLER://:9093 # Broker 和 Controller 的监听端口及协议标识- name: KAFKA_CFG_ADVERTISED_LISTENERSvalue: PLAINTEXT://192.168.1.5:9092 # Kafka 向客户端暴露的访问地址 # --- SSL 配置 ---- name: KAFKA_SSL_KEYSTORE_LOCATIONvalue: /bitnami/kafka/config/certs/kafka.keystore.jks # 服务端密钥 + 证书文件路径- name: KAFKA_SSL_KEYSTORE_PASSWORDvalue: 123456 # keystore 文件访问密码- name: KAFKA_SSL_KEY_PASSWORDvalue: 123456 # keystore 内私钥使用的密码- name: KAFKA_SSL_TRUSTSTORE_LOCATIONvalue: /bitnami/kafka/config/certs/kafka.truststore.jks # CA 证书文件路径(信任的客户端)- name: KAFKA_SSL_TRUSTSTORE_PASSWORDvalue: 123456 # truststore 文件访问密码- name: KAFKA_SSL_CLIENT_AUTHvalue: required # 启用客户端证书校验(双向认证)# --- SASL 配置(PLAIN 机制)---- name: KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOLvalue: PLAIN # 控制器间通信使用 PLAIN SASL 机制- name: KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOLvalue: PLAIN # Broker 间通信使用 PLAIN SASL 机制# --- TLS 类型与证书密码 ---- name: KAFKA_TLS_TYPEvalue: JKS # TLS 密钥文件类型(Java KeyStore)- name: KAFKA_CERTIFICATE_PASSWORDvalue: 123456# 证书统一使用的访问密码(用于 SSL 参数)# --- 监听器角色映射 ---- name: KAFKA_CFG_INTER_BROKER_LISTENER_NAMEvalue: SASL_SSL # Broker 间通信使用的安全 listener 名称- name: KAFKA_CLIENT_LISTENER_NAMEvalue: SASL_SSL # 客户端连接 Kafka 使用的 listener 名称# --- SASL 用户配置 ---- name: KAFKA_CONTROLLER_USERvalue: kafka # 控制器之间通信使用的用户名- name: KAFKA_CONTROLLER_PASSWORDvalue: kafka123 # 控制器之间通信使用的密码- name: KAFKA_INTER_BROKER_USERvalue: kafka # Broker 间通信使用的用户名- name: KAFKA_INTER_BROKER_PASSWORDvalue: kafka123 # Broker 间通信使用的密码- name: KAFKA_CLIENT_USERSvalue: kafka # 允许连接的客户端用户名(多个用逗号分隔)- name: KAFKA_CLIENT_PASSWORDSvalue: kafka123 # 客户端对应密码,顺序与用户名保持一致
🗂 配置 2:Kubernetes 中挂载证书和 JAAS 文件
volumeMounts:- name: kafka-secretsmountPath: /bitnami/kafka/config/certsvolumes:- name: kafka-secretssecret:secretName: kafka-cert-secret- name: jaas-configconfigMap:name: kafka-jaas-config
你需要将 .jks
文件和密码打包为 Secret:
kubectl create secret generic kafka-cert-secret -n 命名空间 --from-file=kafka.keystore.jks --from-file=kafka.truststore.jks
✅ Kafka 客户端配置(示例)
yaml 配置文件
spring:kafka:bootstrap-servers: 192.168.1.5:9092 # Kafka Broker 的地址和端口listener:ack-mode: MANUAL_IMMEDIATE # 消费者手动提交消息确认(ack),立即提交(MANUAL_IMMEDIATE),适合需要精确控制 offset 提交时机的业务场景。consumer:custom-environment: dev # 自定义字段,可用于 profile 配置(非 Spring Kafka 标准字段)auto-offset-reset: latest # 从最新消息开始消费(若无提交 offset)enable-auto-commit: false # 关闭自动提交,需手动调用 ack.acknowledge()# auto-commit-interval: 2000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 50 # 每次 poll 最多拉取 50 条记录max-poll-interval-ms: 600000 # 最大处理时间 10 分钟,超时视为挂掉producer:retries: 0 # 不重试(可调高)batch-size: 16384 # 每批 16KB,达到此大小或 linger.ms 超时才发送buffer-memory: 33554432 # 缓冲区大小# String 类型键值序列化key-serializer: org.apache.kafka.common.serialization.StringSerializer #value-serializer: org.apache.kafka.common.serialization.StringSerializerssl:# Kafka 客户端验证服务端证书是否可信(客户端信任的 CA 证书放在 truststore 中)# .jks 文件必须放在 resources/ 目录下并打包到 classpathtrust-store-location: classpath:kafka.truststore.jkstrust-store-password: 123456properties:sasl:mechanism: PLAIN # 使用 SASL/PLAIN 机制进行身份验证jaas:# 此处填写 SASL登录时分配的用户名密码(注意password结尾;)# 此处用户名 kafka 和密码 kafka123 必须与服务端 Kafka 设置的 KAFKA_CLIENT_USERS 一致config: org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka123"; security:protocol: SASL_SSL # 通信使用 SASL 认证 + SSL 加密ssl:endpoint:identification:algorithm: "" # 关闭主机名验证,否则会因 SAN 缺失导致 SSL 握手失败(Java 默认开启)# ssl.endpoint.identification.algorithm=# producer.ssl.endpoint.identification.algorithm=# consumer.ssl.endpoint.identification.algorithm=
生产者
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class KafkaProducerSaslSslExample {public static void main(String[] args) {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-kafka-broker:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 配置SASL认证方式为SASL_SSLprops.put(ProducerConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");props.put(ProducerConfig.SASL_MECHANISM, "PLAIN"); // 或者其他支持的SASL机制// 配置Kerberos认证所需的相关参数props.put(ProducerConfig.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<your-username>\" password=\"<your-password>\";");Producer<String, String> producer = new KafkaProducer<>(props);// 生产者使用示例producer.send(new ProducerRecord<>("your-topic", "message-key", "message-value"), (metadata, exception) -> {if (exception == null) {System.out.println("消息发送成功");} else {exception.printStackTrace();}});producer.close();}
}
注意
在这个示例中,我们配置了 Kafka 生产者所需的基本参数,并通过ProducerConfig.SECURITY_PROTOCOL_CONFIG
指定了安全协议为 SASL_PLAINTEXT
。然后,我们设置了 SASL_MECHANISM_CONFIG
为 PLAIN
并提供了 JAAS
配置 (SASL_JAAS_CONFIG
),其中包含了用于连接到 Kafka 集群的用户名和密码。
请确保将 <your-username>, <your-password>
, kafka-broker1:9092, kafka-broker2:9092
, 和 your-topic
替换为你的实际用户名、密码、Kafka 代理地址、主题名称。
你的Kafka集群已经配置了SSL和SASL认证,并且相关的安全设置是正确的
消费者
@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.auto-offset-reset}")private String autoOffsetReset;@Value("${spring.kafka.consumer.enable-auto-commit}")private String enableAutoCommit;@Value("${spring.kafka.consumer.max-poll-records}")private String maxPollRecords;@Value("${spring.kafka.consumer.max-poll-interval-ms}")private String maxPollIntervalMs;@Beanpublic ConsumerFactory<String, String> custConsumerConfigFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);props.put("security.protocol", "SASL_SSL");props.put("sasl.mechanism", "PLAIN");props.put("ssl.endpoint.identification.algorithm", "");props.put("consumer.ssl.endpoint.identification.algorithm", "");props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"kafka\" password=\"kafka123\";");// SSL配置props.put("ssl.truststore.location", "D:\\code\\ideaprojects\\zhubay-test\\src\\main\\resources\\kafka.truststore.jks");props.put("ssl.truststore.password", "123456");return new DefaultKafkaConsumerFactory<>(props);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> daConsumerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(custConsumerConfigFactory());factory.setBatchListener(true); // 启用批量消费factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}
}
在这个示例中,我们配置了KafkaConsumer
以使用SASL_SSL
协议进行通信,并且指定了SASL
的PLAIN
认证机制。我们还需要指定SSL
的信任库和密钥库的位置以及它们的密码。sasl.jaas.config
属性中应该包含有效的JAAS
配置,它定义了用于认证的用户名和密码。