【Kafka】Kafka如何开启sasl认证?
要配置Kafka的SASL_PLAINTEXT(基于SASL/PLAIN机制的非加密认证),需分别完成Broker端(Kafka集群)和客户端(生产者/消费者)的配置,以下是详细、可操作的步骤(基于Kafka 3.6+版本,适用于生产/测试环境):
一、核心概念说明
SASL_PLAINTEXT是SASL(简单认证与安全层)的一种实现,通过用户名+密码进行身份验证,不加密传输数据(仅适用于内网或信任网络)。
- Broker端:需启用SASL认证,配置监听端口和JAAS(Java认证授权服务)文件。
- 客户端:需指定SASL协议、机制(PLAIN)和凭证,与Broker建立认证连接。
二、Broker端配置(Kafka集群)
1. 准备JAAS配置文件(Broker认证规则)
创建kafka_server_jaas.conf
文件(路径:$KAFKA_HOME/config/
),定义Broker间通信和客户端连接的认证规则:
# Broker间通信的认证规则(使用PLAIN机制)
KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin" # Broker间通信的用户名password="admin-secret" # Broker间通信的密码user_admin="admin-secret" # 客户端用户"admin"的密码user_alice="alice-secret"; # 客户端用户"alice"的密码
};
- 说明:
user_*
定义了客户端的用户名和密码(如user_admin="admin-secret"
表示客户端用admin
用户名和admin-secret
密码连接)。
2. 修改Broker配置文件(server.properties
)
编辑$KAFKA_HOME/config/server.properties
,添加以下配置:
# 1. 监听器配置(指定SASL_PLAINTEXT端口)
listeners=SASL_PLAINTEXT://:9092 # 监听9092端口,使用SASL_PLAINTEXT协议# 2. 广播地址(客户端连接的地址,需修改为Broker的实际IP)
advertised.listeners=SASL_PLAINTEXT://192.168.1.100:9092 # 替换为Broker的IP# 3. Broker间通信协议(使用SASL_PLAINTEXT)
security.inter.broker.protocol=SASL_PLAINTEXT# 4. 启用的SASL机制(仅PLAIN)
sasl.enabled.mechanisms=PLAIN# 5. Broker间通信的SASL机制(仅PLAIN)
sasl.mechanism.inter.broker.protocol=PLAIN# 6. JAAS配置文件路径(指向步骤1的文件)
sasl.jaas.config=file:/path/to/kafka_server_jaas.conf # 替换为实际路径
- 关键说明:
advertised.listeners
:客户端连接的地址,需确保客户端能访问该IP和端口(如内网IP)。sasl.jaas.config
:指定JAAS文件的绝对路径(如file:/opt/kafka/config/kafka_server_jaas.conf
)。
3. 启动Broker(加载JAAS配置)
修改Broker的启动脚本($KAFKA_HOME/bin/kafka-server-start.sh
),添加JAAS配置的环境变量:
# 在脚本开头添加(确保在启动命令前)
export KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/kafka_server_jaas.conf"
启动Broker:
./bin/kafka-server-start.sh -daemon config/server.properties
三、客户端配置(生产者/消费者)
客户端(生产者/消费者)需指定SASL_PLAINTEXT协议、PLAIN机制和凭证,与Broker建立连接。以下是Java客户端的示例:
1. 生产者配置(Java)
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;public class ProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "192.168.1.100:9092"); // Broker的IP和端口props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// SASL_PLAINTEXT配置props.put("security.protocol", "SASL_PLAINTEXT"); // 使用SASL_PLAINTEXT协议props.put("sasl.mechanism", "PLAIN"); // 使用PLAIN机制props.put("sasl.jaas.config", // JAAS配置(用户名和密码)"org.apache.kafka.common.security.plain.PlainLoginModule required " +"username=\"admin\" " +"password=\"admin-secret\";");KafkaProducer<String, String> producer = new KafkaProducer<>(props);producer.send(new ProducerRecord<>("test-topic", "key", "value"));producer.close();}
}
2. 消费者配置(Java)
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;public class ConsumerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "192.168.1.100:9092"); // Broker的IP和端口props.put("group.id", "test-group"); // 消费者组IDprops.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// SASL_PLAINTEXT配置(与生产者一致)props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");props.put("sasl.jaas.config", // JAAS配置(用户名和密码)"org.apache.kafka.common.security.plain.PlainLoginModule required " +"username=\"admin\" " +"password=\"admin-secret\";");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("test-topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(java.time.Duration.ofMillis(100));records.forEach(record -> System.out.printf("Received message: %s%n", record.value()));}}
}
四、验证配置是否生效
- 发送消息:运行生产者程序,向
test-topic
发送消息。 - 消费消息:运行消费者程序,查看是否能收到生产者的消息。
- 检查日志:
- Broker日志(
$KAFKA_HOME/logs/server.log
):若有SASL authentication successful
条目,说明认证成功。 - 客户端日志:若无
Authentication failed
错误,说明连接正常。
- Broker日志(
五、注意事项
- 安全性警告:SASL_PLAINTEXT不加密传输数据,禁止在生产环境的外网中使用(建议使用SASL_SSL,即SASL+SSL加密)。
- 版本兼容性:确保Kafka Broker和客户端的版本一致(如均为3.6+),避免因版本差异导致的认证失败。
- JAAS文件权限:确保
kafka_server_jaas.conf
文件的权限正确(如仅Broker用户可读),避免凭证泄露。 - 多用户管理:若需多个客户端用户,可在
kafka_server_jaas.conf
中添加user_*
条目(如user_bob="bob-secret"
),客户端使用对应的用户名和密码连接。
六、常见问题排查
- 认证失败:检查客户端的
username
和password
是否与Broker的kafka_server_jaas.conf
中的一致。 - 连接超时:检查
advertised.listeners
的IP和端口是否正确,客户端是否能访问该地址。 - JAAS配置未加载:检查Broker启动脚本是否添加了
KAFKA_OPTS="-Djava.security.auth.login.config=..."
。
通过以上步骤,即可完成Kafka的SASL_PLAINTEXT配置,实现基于用户名+密码的身份验证。如需更高的安全性,建议升级到SASL_SSL(参考Kafka官方文档)。