当前位置: 首页 > wzjs >正文

淘宝客cms网站怎么做网址大全123

淘宝客cms网站怎么做,网址大全123,网站设计高端网站制作,怎样做网站制作Kafka ZooKeeper 开启 SASL_PLAINTEXT 认证(PLAIN机制)最全实战教程 💡 本教程将手把手教你如何为 Kafka 配置基于 SASL_PLAINTEXT PLAIN 的用户名密码认证机制,包含 Kafka 与 ZooKeeper 的全部配置,适合入门。 &…

Kafka + ZooKeeper 开启 SASL_PLAINTEXT 认证(PLAIN机制)最全实战教程

💡 本教程将手把手教你如何为 Kafka 配置基于 SASL_PLAINTEXT + PLAIN 的用户名密码认证机制,包含 Kafka 与 ZooKeeper 的全部配置,适合入门。


🎯 教程目标

  • Kafka 客户端连接 Kafka Broker 时需要用户名密码验证;
  • Kafka 与 ZooKeeper 之间通信也启用 SASL 认证;
  • 使用 PLAIN 机制,无需 TLS/SSL 证书(比 SASL_SSL 简单);
  • 可用于本地开发环境或非安全生产环境。

🛠 环境准备

  • Kafka:3.6.0
  • ZooKeeper:3.6+
  • Java 8+
  • 操作系统:Linux / WSL / Mac / Windows(推荐用 WSL)
  • 镜像:
    zookeeper bitnami_zookeeper:3.8.4
    kafka bitnami/kafka:3.6.0

1️⃣ 配置 ZooKeeper 认证(Server端)

配置环境变量

env:- name: TZvalue: Asia/Shanghai- name: ALLOW_ANONYMOUS_LOGINvalue: 'no'- name: JVMFLAGSvalue: '-Xmx1g'- name: ZOO_ENABLE_AUTHvalue: 'yes'- name: ZOO_SERVER_USERSvalue: kafka- name: ZOO_SERVER_PASSWORDSvalue: zookeeper@2025- name: ZOO_SERVERSvalue: 'zk-cluster-auth-0.zk-cluster-auth-headless:2888:3888,zk-cluster-auth-1.zk-cluster-auth-headless:2888:3888,zk-cluster-auth-2.zk-cluster-auth-headless:2888:3888'
配置项说明
TZ=Asia/Shanghai设置容器的时区为中国标准时间(CST/UTC+8),方便日志与系统时间保持一致。
ALLOW_ANONYMOUS_LOGIN=no禁用匿名连接 ZooKeeper,必须通过用户名密码认证。建议生产环境使用。
JVMFLAGS='-Xmx1g'配置 ZooKeeper JVM 最大内存为 1GB,防止 OOM(默认可能太小)。
ZOO_ENABLE_AUTH=yes启用 ZooKeeper 身份认证(基于 SASL 的认证机制,如 PLAIN)。必须配合下面的用户密码使用。
ZOO_SERVER_USERS=kafka设置 ZooKeeper 允许的用户名,多个用户用逗号分隔。这里是 kafka
ZOO_SERVER_PASSWORDS=zookeeper@2025对应上面的用户的密码。如果多个用户,用逗号一一对应写。

服务配置

kind: Service
apiVersion: v1
metadata:name: zk-cluster-auth-headlessnamespace: zhubayi-commonlabels:app: kafka-cluster-authannotations:kubesphere.io/alias-name: zk-cluster-authkubesphere.io/creator: adminkubesphere.io/serviceType: statefulservice
spec:ports:- name: clientprotocol: TCPport: 2181targetPort: 2181- name: serverprotocol: TCPport: 2888targetPort: 2888- name: leader-selectprotocol: TCPport: 3888targetPort: 3888- name: adminserverprotocol: TCPport: 8080targetPort: 8080selector:app: zk-cluster-authclusterIP: NoneclusterIPs:- Nonetype: ClusterIPsessionAffinity: NoneipFamilies:- IPv4ipFamilyPolicy: SingleStackinternalTrafficPolicy: Cluster

启动脚本

 command:- sh- '-c'- >export ZOO_SERVER_ID=$((${HOSTNAME##*-}+1))exec /opt/bitnami/scripts/zookeeper/entrypoint.sh/opt/bitnami/scripts/zookeeper/run.sh

2️⃣ 配置 Kafka(Broker端)

🧩 设置环境变量

env:#  设置容器的时区- name: TZvalue: Asia/Shanghai# 🔗 Kafka 连接的 ZooKeeper 地址(推荐使用 headless 服务)- name: KAFKA_CFG_ZOOKEEPER_CONNECTvalue: 'zk-cluster-auth-headless:2181'# Kafka 内存配置(JVM堆大小)- name: KAFKA_HEAP_OPTSvalue: '-Xmx2g'# offsets topic 的副本数(建议 >=3)- name: KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTORvalue: '3'# 每个 topic 的默认分区数- name: KAFKA_CFG_NUM_PARTITIONSvalue: '5'# Kafka 日志保留时间(单位小时)- name: KAFKA_CFG_LOG_RETENTION_HOURSvalue: '72'  # 3天# Kafka 日志切分时间(单位小时)- name: KAFKA_CFG_LOG_ROLL_HOURSvalue: '72'# 单个 segment 的最大大小(1GB)- name: KAFKA_CFG_LOG_SEGMENT_BYTESvalue: '1073741824'# 是否允许自动创建 topic(开发环境可开启)- name: KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLEvalue: 'true'# 允许删除 topic- name: KAFKA_CFG_DELETE_TOPIC_ENABLEvalue: 'true'# 是否允许自动进行 leader 重新平衡(建议关闭以避免扰动)- name: KAFKA_CFG_AUTO_LEADER_REBALANCE_ENABLEvalue: 'false'# Kafka 清理策略(delete / compact)- name: KAFKA_CFG_LOG_CLEANUP_POLICYvalue: delete# 单条消息最大大小(5MB)- name: KAFKA_CFG_MESSAGE_MAX_BYTESvalue: '5242880'# 客户端请求最大大小(4MB)- name: KAFKA_CFG_MAX_REQUEST_SIZEvalue: '4194304'# 批处理大小(生产者用)- name: KAFKA_CFG_BATCH_SIZEvalue: '16384'# 允许使用 PLAINTEXT 明文监听器(非加密,仅适合内网测试)- name: ALLOW_PLAINTEXT_LISTENERvalue: 'true'# broker 间通信监听器名称(INSIDE 表示内网)- name: KAFKA_INTER_BROKER_LISTENER_NAMEvalue: INSIDE# 监听器与协议映射(INSIDE 和 OUTSIDE 都启用 SASL_PLAINTEXT)- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAPvalue: 'INSIDE:SASL_PLAINTEXT,OUTSIDE:SASL_PLAINTEXT'# 是否允许客户端连接时自动创建 topic- name: KAFKA_AUTO_CREATE_TOPICS_ENABLEvalue: 'true'# 与 SSL 无关,可忽略(如果未使用 TLS)- name: KAFKA_SSL_CLIENT_AUTHvalue: required# Kafka 控制器使用的认证机制(SASL PLAIN)- name: KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOLvalue: PLAIN# broker 间通信使用的认证机制(SASL PLAIN)- name: KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOLvalue: PLAIN# SSL 类型(使用 Java KeyStore 格式)(此配置无效,可忽略,未启用 SSL)- name: KAFKA_TLS_TYPEvalue: JKS# 指定 broker 之间用哪个监听器通信(重复定义,建议只保留一次)- name: KAFKA_CFG_INTER_BROKER_LISTENER_NAMEvalue: SASL_PLAINTEXT# 客户端连接使用的监听器(与上面保持一致)- name: KAFKA_CLIENT_LISTENER_NAMEvalue: SASL_PLAINTEXT# Controller 用户名(用于 Kafka 内部通信)- name: KAFKA_CONTROLLER_USERvalue: kafka# Controller 用户密码- name: KAFKA_CONTROLLER_PASSWORDvalue: 123456# Broker 间通信用户名- name: KAFKA_INTER_BROKER_USERvalue: kafka# Broker 间通信密码- name: KAFKA_INTER_BROKER_PASSWORDvalue: 123456# 允许的客户端用户名(支持多个,逗号分隔)- name: KAFKA_CLIENT_USERSvalue: kafka# 客户端密码(顺序与用户一致)- name: KAFKA_CLIENT_PASSWORDSvalue: 123456# 与 ZooKeeper 通信时使用的认证协议(SASL 必须设置)- name: KAFKA_ZOOKEEPER_PROTOCOLvalue: SASL# Kafka 访问 ZooKeeper 时使用的用户名- name: KAFKA_ZOOKEEPER_USERvalue: kafka# Kafka 访问 ZooKeeper 时使用的密码(需与 ZooKeeper 中配置一致)- name: KAFKA_ZOOKEEPER_PASSWORDvalue: zookeeper@2025

启动脚本

 command:- sh- '-c'- >POD_NAME=$(hostname)echo "POD_NAME:$POD_NAME"REPLICA_INDEX=$(echo $POD_NAME | sed 's/.*-\([0-9]\)$/\1/')echo "REPLICA_INDEX:$REPLICA_INDEX"export KAFKA_CFG_NODE_ID=${POD_NAME##*-}export KAFKA_NODE_ID="$REPLICA_INDEX"PORT=$((REPLICA_INDEX + 30900)) PORT2=$((REPLICA_INDEX + 9093))exportKAFKA_CFG_ADVERTISED_LISTENERS="INSIDE://:9092,OUTSIDE://192.168.1.5:$PORT"export KAFKA_CFG_LISTENERS="INSIDE://:9092,OUTSIDE://:$PORT2"exec /opt/bitnami/scripts/kafka/entrypoint.sh/opt/bitnami/scripts/kafka/run.sh

服务端口配置

spec:ports:- name: tcp-9092protocol: TCPport: 9093targetPort: 9093nodePort: 30900- name: tcp-9093protocol: TCPport: 9094targetPort: 9094nodePort: 30901- name: tcp-9094protocol: TCPport: 9095targetPort: 9095nodePort: 30902

3️⃣ Kafka 客户端配置(示例)

配置文件

spring:kafka:bootstrap-servers: 192.168.1.5:30900  # Kafka 集群地址(多个可逗号分隔)listener:ack-mode: MANUAL_IMMEDIATE  # 手动提交 offset(立即确认)consumer:custom-environment: dev # 自定义字段,可用于多环境区分,无实际作用auto-offset-reset: latest  # 无 offset 时从最新消息开始消费(避免重复)enable-auto-commit: false  # 禁用自动提交,改为手动提交 offset# auto-commit-interval: 2000  # 如果启用自动提交,间隔为 2 秒(此处已注释)key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # key 反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # value 反序列化方式max-poll-records: 50  # 每次 poll 最多拉取 50 条记录max-poll-interval-ms: 600000  # poll 最大间隔 10 分钟producer:retries: 0  # 不重试(生产失败直接报错)batch-size: 16384  # 批量发送最大字节数(默认 16KB)buffer-memory: 33554432  # 发送缓冲区大小(默认 32MB)key-serializer: org.apache.kafka.common.serialization.StringSerializer  # key 序列化value-serializer: org.apache.kafka.common.serialization.StringSerializer  # value 序列化ssl:trust-store-location:    # ⚠ 这里为空,因使用 SASL_PLAINTEXT(非 SSL),可留空或移除trust-store-password:properties:sasl:mechanism: PLAIN  # 使用 SASL PLAIN 机制(用户名密码)jaas:config: >-org.apache.kafka.common.security.scram.ScramLoginModule requiredusername="kafka"password="123456";# JAAS 配置:用户名密码认证(注意 password 后有分号)security:protocol: SASL_PLAINTEXT  # 使用 SASL_PLAINTEXT 传输协议(非加密)ssl:endpoint:identification:algorithm: ""  # 空表示跳过主机名校验(SSL 时才生效,非必须)

🚨 注意事项

配置项注意点
SASL_PLAINTEXT明文传输用户名密码,不建议用于公网
ScramLoginModule表示服务端配置的是 SCRAM(非 PLAIN)时使用,若是 PLAIN,应为 PlainLoginModule
trust-store-location可删除或忽略(仅用于 SASL_SSLSSL 场景)
bootstrap-servers建议配置多个 broker IP,提升可靠性
ack-mode: MANUAL_IMMEDIATE消费者业务失败时不会提交 offset,可重复消费

生产者配置

@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.auto-offset-reset}")private String autoOffsetReset;@Value("${spring.kafka.consumer.enable-auto-commit}")private String enableAutoCommit;@Value("${spring.kafka.consumer.max-poll-records}")private String maxPollRecords;@Value("${spring.kafka.consumer.max-poll-interval-ms}")private String maxPollIntervalMs;@Value("${spring.profiles.active}")private String activeProfile;@Value("${spring.kafka.properties.security.protocol:SASL_SSL}")private String securityProtocol;@Value("${spring.kafka.properties.sasl.mechanism:PLAIN}")private String salsMechanism;@Value("${spring.kafka.properties.ssl.endpoint.identification.algorithm: \"\"}")private String identificationAlgorithm;@Value("${spring.kafka.properties.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username=\"kafka\" password=\"123456\";}")private String jaasConfig;@Value("${spring.kafka.ssl.trust-store-password: \"\"}")private String trustStorePassword;@Value("${spring.kafka.ssl.trust-store-location: \"\"}")private String trustStoreLocation;@Beanpublic ConsumerFactory<String, String> daConsumerConfigFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConsumerComponent.DAGROUP);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);props.put("security.protocol", securityProtocol);props.put("sasl.mechanism", salsMechanism);props.put("ssl.endpoint.identification.algorithm", identificationAlgorithm);props.put("consumer.ssl.endpoint.identification.algorithm", identificationAlgorithm);props.put("sasl.jaas.config", jaasConfig);return new DefaultKafkaConsumerFactory<>(props);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> daConsumerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(daConsumerConfigFactory());factory.setBatchListener(true); // 启用批量消费factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}
}

🚨 常见问题排查

错误可能原因
No principal未正确配置 JAAS 文件或 KAFKA_OPTS 未设置
SaslAuthenticationException用户名密码不匹配
Connection refusedlistener 或端口未正确绑定
Cluster ID 不匹配ZooKeeper 路径配置错误或 meta.properties 被复用

✅ 总结

项目配置方式
ZooKeeper 启用鉴权zookeeper_jaas.conf + zoo.cfg
Kafka 启用鉴权kafka_server_jaas.conf + server.properties
客户端连接 Kafkakafka_client_jaas.conf

📎 参考资料

  • 官方文档:https://kafka.apache.org/documentation/
  • Kafka 安全机制:https://kafka.apache.org/documentation/#security_sasl

http://www.dtcms.com/wzjs/52604.html

相关文章:

  • 微网站建设报价方案模板网页加速器
  • 设计师分享网站在线域名ip查询
  • css个人简介网站怎么做新手如何学seo
  • 可以自己做logo的网站抖音搜索排名
  • 广告推广群seo标题优化分析范文
  • 湖北什么网站建设值得推荐资源
  • 网站keyword如何排序刺激广告
  • 网站一直维护意味着什么整合营销传播名词解释
  • 西安高端网站建设招聘seo专员
  • 区块链开发与应用专业搜狗网站seo
  • 网站后台模板关联自己做的网站seo诊断报告
  • wordpress竖排主题深圳博惠seo
  • 做网站用哪个电脑电商网站
  • 江门企业模板建站免费个人网站注册
  • 企业网站程序制作小型培训机构管理系统
  • 外贸仿牌网站建设网络营销到底是干嘛的
  • 绍兴柯桥建设局网站免费seo培训
  • 手机制作视频的软件app免费属于seo网站优化
  • html做动态网站步骤与代码app注册推广平台
  • html5移动web开发什么是seo什么是sem
  • 信访门户网站建设的社会效益世界球队最新排名榜
  • 西安专业做网站建设在线生成个人网站
  • 三合一网站建设口碑好中国十大公关公司排名
  • 湖南建设教育网站公司网站优化
  • 网站排版教程网站seo基础优化
  • 做网站ps的图片2021年网络热点舆论
  • 个人电商网站建设范例收录查询工具
  • 汉服网站怎么做精准营销系统
  • 返利商城网站怎么做网站优化怎么做
  • 南宁公司网站模板建站惠州关键词排名优化