当前位置: 首页 > news >正文

【Kafka】Kafka如何开启sasl认证?

要配置Kafka的SASL_PLAINTEXT(基于SASL/PLAIN机制的非加密认证),需分别完成Broker端(Kafka集群)和客户端(生产者/消费者)的配置,以下是详细、可操作的步骤(基于Kafka 3.6+版本,适用于生产/测试环境):

一、核心概念说明

SASL_PLAINTEXT是SASL(简单认证与安全层)的一种实现,通过用户名+密码进行身份验证,不加密传输数据(仅适用于内网或信任网络)。

  • Broker端:需启用SASL认证,配置监听端口和JAAS(Java认证授权服务)文件。
  • 客户端:需指定SASL协议、机制(PLAIN)和凭证,与Broker建立认证连接。

二、Broker端配置(Kafka集群)

1. 准备JAAS配置文件(Broker认证规则)

创建kafka_server_jaas.conf文件(路径:$KAFKA_HOME/config/),定义Broker间通信客户端连接的认证规则:

# Broker间通信的认证规则(使用PLAIN机制)
KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"                # Broker间通信的用户名password="admin-secret"         # Broker间通信的密码user_admin="admin-secret"       # 客户端用户"admin"的密码user_alice="alice-secret";       # 客户端用户"alice"的密码
};
  • 说明user_*定义了客户端的用户名和密码(如user_admin="admin-secret"表示客户端用admin用户名和admin-secret密码连接)。
2. 修改Broker配置文件(server.properties

编辑$KAFKA_HOME/config/server.properties,添加以下配置:

# 1. 监听器配置(指定SASL_PLAINTEXT端口)
listeners=SASL_PLAINTEXT://:9092  # 监听9092端口,使用SASL_PLAINTEXT协议# 2. 广播地址(客户端连接的地址,需修改为Broker的实际IP)
advertised.listeners=SASL_PLAINTEXT://192.168.1.100:9092  # 替换为Broker的IP# 3. Broker间通信协议(使用SASL_PLAINTEXT)
security.inter.broker.protocol=SASL_PLAINTEXT# 4. 启用的SASL机制(仅PLAIN)
sasl.enabled.mechanisms=PLAIN# 5. Broker间通信的SASL机制(仅PLAIN)
sasl.mechanism.inter.broker.protocol=PLAIN# 6. JAAS配置文件路径(指向步骤1的文件)
sasl.jaas.config=file:/path/to/kafka_server_jaas.conf  # 替换为实际路径
  • 关键说明
    • advertised.listeners:客户端连接的地址,需确保客户端能访问该IP和端口(如内网IP)。
    • sasl.jaas.config:指定JAAS文件的绝对路径(如file:/opt/kafka/config/kafka_server_jaas.conf)。
3. 启动Broker(加载JAAS配置)

修改Broker的启动脚本($KAFKA_HOME/bin/kafka-server-start.sh),添加JAAS配置的环境变量:

# 在脚本开头添加(确保在启动命令前)
export KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/kafka_server_jaas.conf"

启动Broker:

./bin/kafka-server-start.sh -daemon config/server.properties

三、客户端配置(生产者/消费者)

客户端(生产者/消费者)需指定SASL_PLAINTEXT协议、PLAIN机制和凭证,与Broker建立连接。以下是Java客户端的示例:

1. 生产者配置(Java)
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;public class ProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "192.168.1.100:9092");  // Broker的IP和端口props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// SASL_PLAINTEXT配置props.put("security.protocol", "SASL_PLAINTEXT");  // 使用SASL_PLAINTEXT协议props.put("sasl.mechanism", "PLAIN");              // 使用PLAIN机制props.put("sasl.jaas.config",  // JAAS配置(用户名和密码)"org.apache.kafka.common.security.plain.PlainLoginModule required " +"username=\"admin\" " +"password=\"admin-secret\";");KafkaProducer<String, String> producer = new KafkaProducer<>(props);producer.send(new ProducerRecord<>("test-topic", "key", "value"));producer.close();}
}
2. 消费者配置(Java)
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;public class ConsumerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "192.168.1.100:9092");  // Broker的IP和端口props.put("group.id", "test-group");                   // 消费者组IDprops.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// SASL_PLAINTEXT配置(与生产者一致)props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");props.put("sasl.jaas.config",  // JAAS配置(用户名和密码)"org.apache.kafka.common.security.plain.PlainLoginModule required " +"username=\"admin\" " +"password=\"admin-secret\";");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("test-topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(java.time.Duration.ofMillis(100));records.forEach(record -> System.out.printf("Received message: %s%n", record.value()));}}
}

四、验证配置是否生效

  1. 发送消息:运行生产者程序,向test-topic发送消息。
  2. 消费消息:运行消费者程序,查看是否能收到生产者的消息。
  3. 检查日志
    • Broker日志($KAFKA_HOME/logs/server.log):若有SASL authentication successful条目,说明认证成功。
    • 客户端日志:若无Authentication failed错误,说明连接正常。

五、注意事项

  1. 安全性警告:SASL_PLAINTEXT不加密传输数据,禁止在生产环境的外网中使用(建议使用SASL_SSL,即SASL+SSL加密)。
  2. 版本兼容性:确保Kafka Broker和客户端的版本一致(如均为3.6+),避免因版本差异导致的认证失败。
  3. JAAS文件权限:确保kafka_server_jaas.conf文件的权限正确(如仅Broker用户可读),避免凭证泄露。
  4. 多用户管理:若需多个客户端用户,可在kafka_server_jaas.conf中添加user_*条目(如user_bob="bob-secret"),客户端使用对应的用户名和密码连接。

六、常见问题排查

  1. 认证失败:检查客户端的usernamepassword是否与Broker的kafka_server_jaas.conf中的一致。
  2. 连接超时:检查advertised.listeners的IP和端口是否正确,客户端是否能访问该地址。
  3. JAAS配置未加载:检查Broker启动脚本是否添加了KAFKA_OPTS="-Djava.security.auth.login.config=..."

通过以上步骤,即可完成Kafka的SASL_PLAINTEXT配置,实现基于用户名+密码的身份验证。如需更高的安全性,建议升级到SASL_SSL(参考Kafka官方文档)。

http://www.dtcms.com/a/388977.html

相关文章:

  • 国产化Excel开发组件Spire.XLS教程:C# 轻松将 DataSet 导出到 Excel
  • NLP情绪因子解构鲍威尔“风险管理降息”信号,黄金价格在3707高位触发量化抛售潮
  • 【Python办公】Excel多Sheet拆分工具
  • Unity_程序集_.asmdef_引用命名域失败
  • FPGA采集AD7606转SRIO传输,基于Serial Rapidlo Gen2,提供6套工程源码和技术支持
  • Cloudcompare实现在模型上进行点云(下)采样
  • 【Linux】聊聊文件那些事:从空文件占空间到系统调用怎么玩
  • 基于代码层对运动台性能提升实战
  • openfeigin配置相关
  • 网络传输协议解析及SSE补充
  • 视觉SLAM第12讲:建图
  • 2025编程技术学习网站大全:从入门到精通的免费资源指南
  • 刷题日记0918
  • emacs 如何显示断点和运行的行标
  • 【c++】继承(2)
  • 大模型提示词Prompt工程:万能公式-完整指南
  • Flask RESTful API 教程:从零实现 Python CRUD 后端服务
  • 百年奢品家电ASKO亮相IFA2025|以至臻品质绘就生活新境
  • jvm排查full gc或者humongous obj思路?如何调优?
  • 实现.NetCore集成Serilog,写入日志文件,并按日期拆分文件夹
  • [新启航]航空发动机燃烧室喷嘴孔深光学 3D 轮廓测量 - 激光频率梳 3D 轮廓技术
  • iOS 上架 App 流程全解析 苹果应用发布步骤、App Store 审核流程、ipa 文件上传与 uni-app 打包实战经验
  • 22.6 单卡A100驯服30亿参数模型!DeepSpeed ZeRO-3实战显存优化指南
  • jvm垃圾搜集器
  • 小红书开放平台笔记详情接口实战:内容解析与数据挖掘全方案
  • App 上架平台全解析,iOS 应用发布流程、苹果 App Store 审核步骤
  • BeeWorks:私有化部署即时通讯,铸就企业数字安全基石
  • (数据分析方向)Flask 动漫数据可视化分析系统(Echarts + 番剧管理・大数据)(源码)✅
  • 2025 最新版 Node.js 下载安装及环境配置教程
  • 分布式流处理与消息传递——Kafka ISR(In-Sync Replicas)算法深度解析