当前位置: 首页 > news >正文

kafka4使用记录

  1. 解决 main ERROR Reconfiguration failed: No configuration found for ‘2b193f2d’ at ‘null’ in ‘null’
    windows:
set KAFKA_LOG4J_OPTS=-Dlog4j.configurationFile=file:/D:/02-software/kafka_2.13-4.1.0/config/tools-log4j2.yaml
  1. 生成随机数
.\bin\windows\kafka-storage.bat random-uuid
  1. 启动参考
    https://debezium.io/documentation/reference/3.3/tutorial.html#starting-kafka
#!/bin/bash# Exit immediately if a *pipeline* returns a non-zero status. (Add -x for command tracing)
set -eget_broker_endpoint() {if [[ -z "$KAFKA_BROKER" ]]; then# Look for any environment variables set by Docker container linking. For example, if the container# running Kafka were named 'broker' in this container, then Docker should have created several envs,# such as 'BROKER_PORT_9092_TCP'. If so, then use that to automatically connect to the linked broker.export KAFKA_BROKER=$(env | grep .*PORT_9092_TCP= | sed -e 's|.*tcp://||' | uniq | paste -sd ,)fiif [[ "x$KAFKA_BROKER" = "x" ]]; thenexport KAFKA_BROKER=0.0.0.0:9092fiecho "Using KAFKA_BROKER=$KAFKA_BROKER"
}if [[ -z "$NODE_ID" ]]; thenif [[ -z "$BROKER_ID" ]]; thenNODE_ID=1echo "WARNING: Using default NODE_ID=1, which is valid only for non-clustered installations."elseNODE_ID="$BROKER_ID"echo "WARNING: Using NODE_ID=$BROKER_ID, as specified via BROKER_ID variable. Please update your configuration to use the NODE_ID variable instead."fi
fi# KRaft mode
if [[ -z "$CLUSTER_ID" ]]; thenCLUSTER_ID='ldlg5yhkQeWtnUrZrC6edg';echo 'CLUSTER_ID not set, using default'
fi
if [[ -z "$NODE_ROLE" ]]; thenNODE_ROLE='combined';
ficase "$NODE_ROLE" in'combined' ) CONFIG_FILE=config/server.properties;;'broker' ) CONFIG_FILE=config/broker.properties;;'controller' ) CONFIG_FILE=config/controller.properties;;*) CONFIG_FILE=config/server.properties;;
esacecho "Starting in KRaft mode, using CLUSTER_ID=$CLUSTER_ID, NODE_ID=$NODE_ID and NODE_ROLE=$NODE_ROLE."echo "Using configuration $CONFIG_FILE."if [[ -n "$HEAP_OPTS" ]]; thensed -r -i "s/^(export KAFKA_HEAP_OPTS)=\"(.*)\"/\1=\"${HEAP_OPTS}\"/g" $KAFKA_HOME/bin/kafka-server-start.shunset HEAP_OPTS
fiexport KAFKA_NODE_ID=$NODE_ID
export KAFKA_BROKER_ID=$NODE_ID
export KAFKA_LOG_DIRS="${KAFKA_DATA}/$KAFKA_NODE_ID"
mkdir -p $KAFKA_LOG_DIRS
unset NODE_IDif [[ -z "$ADVERTISED_PORT" ]]; thenADVERTISED_PORT=9092
fi
if [[ -z "$HOST_NAME" ]]; thenHOST_NAME=$(ip addr | grep 'BROADCAST' -A2 | tail -n1 | awk '{print $2}' | cut -f1  -d'/')
fi: ${PORT:=9092}
: ${ADVERTISED_PORT:=9092}
: ${CONTROLLER_PORT:=9093}: ${ADVERTISED_PORT:=${PORT}}
: ${ADVERTISED_HOST_NAME:=${HOST_NAME}}: ${KAFKA_ADVERTISED_PORT:=${ADVERTISED_PORT}}
: ${KAFKA_ADVERTISED_HOST_NAME:=${ADVERTISED_HOST_NAME}}: ${KAFKA_PORT:=${PORT}}
: ${KAFKA_HOST_NAME:=${HOST_NAME}}case "$NODE_ROLE" in'combined' ) : ${KAFKA_LISTENERS:=PLAINTEXT://$KAFKA_HOST_NAME:$KAFKA_PORT,CONTROLLER://$KAFKA_HOST_NAME:$CONTROLLER_PORT};;'broker' ) : ${KAFKA_LISTENERS:=PLAINTEXT://$KAFKA_HOST_NAME:$KAFKA_PORT};;'controller' ) : ${KAFKA_LISTENERS:=PLAINTEXT://$KAFKA_HOST_NAME:$CONTROLLER_PORT};;*) : ${KAFKA_LISTENERS:=PLAINTEXT://$KAFKA_HOST_NAME:$KAFKA_PORT,CONTROLLER://$KAFKA_HOST_NAME:$CONTROLLER_PORT};;
esac: ${KAFKA_ADVERTISED_LISTENERS:=PLAINTEXT://$KAFKA_ADVERTISED_HOST_NAME:$KAFKA_ADVERTISED_PORT}export KAFKA_LISTENERS KAFKA_ADVERTISED_LISTENERS
unset HOST_NAME ADVERTISED_HOST_NAME KAFKA_HOST_NAME KAFKA_ADVERTISED_HOST_NAME PORT ADVERTISED_PORT KAFKA_PORT KAFKA_ADVERTISED_PORT CONTROLLER_PORT NODE_ROLEecho "Using KAFKA_LISTENERS=$KAFKA_LISTENERS and KAFKA_ADVERTISED_LISTENERS=$KAFKA_ADVERTISED_LISTENERS"#
# Set up the JMX options
#
: ${JMXAUTH:="false"}
: ${JMXSSL:="false"}
if [[ -n "$JMXPORT" && -n "$JMXHOST" ]]; thenecho "Enabling JMX on ${JMXHOST}:${JMXPORT}"export KAFKA_JMX_OPTS="-Djava.rmi.server.hostname=${JMXHOST} -Dcom.sun.management.jmxremote.rmi.port=${JMXPORT} -Dcom.sun.management.jmxremote.port=${JMXPORT} -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=${JMXAUTH} -Dcom.sun.management.jmxremote.ssl=${JMXSSL} "
fi# Choose the right `cp` argument, `--update=none` is not available on RHEL
release=`cat /etc/redhat-release | cut -d ' ' -f1`
if [ $release = "Fedora" ]; thencp_arg="-r --update=none"
elsecp_arg="-rn"
fi
# Copy config files if not provided in volume
cp $cp_arg $KAFKA_HOME/config.orig/* $KAFKA_HOME/config# Process the argument to this container ...
case $1 instart)## Configure the log files ...#if [[ -z "$LOG_LEVEL" ]]; thenLOG_LEVEL="INFO"fised -i -r -e "s|=INFO, stdout|=$LOG_LEVEL, stdout|g" $KAFKA_HOME/config/log4j.propertiessed -i -r -e "s|^(log4j.appender.stdout.threshold)=.*|\1=${LOG_LEVEL}|g" $KAFKA_HOME/config/log4j.propertiesexport KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$KAFKA_HOME/config/log4j.properties"unset LOG_LEVEL# Add missing EOF at the end of the config fileecho "" >> $KAFKA_HOME/$CONFIG_FILE## Process all environment variables that start with 'KAFKA_' (but not 'KAFKA_HOME' or 'KAFKA_VERSION'):#for VAR in `env`doenv_var=`echo "$VAR" | sed "s/=.*//"`if [[ $env_var =~ ^KAFKA_ && $env_var != "KAFKA_VERSION" && $env_var != "KAFKA_HOME"  && $env_var != "KAFKA_LOG4J_OPTS" && $env_var != "KAFKA_JMX_OPTS" ]]; thenprop_name=`echo "$VAR" | sed -r "s/^KAFKA_(.*)=.*/\1/g" | tr '[:upper:]' '[:lower:]' | tr _ .`if grep -Eq "(^|^#)$prop_name=" $KAFKA_HOME/$CONFIG_FILE; then#note that no config names or values may contain an '@' charsed -r -i "s%(^|^#)($prop_name)=(.*)%\2=${!env_var}%g" $KAFKA_HOME/$CONFIG_FILEelse#echo "Adding property $prop_name=${!env_var}"echo "$prop_name=${!env_var}" >> $KAFKA_HOME/$CONFIG_FILEfifidoneif [[ -n $CREATE_TOPICS ]]; thenecho "Creating topics: $CREATE_TOPICS"# Start a subshell in the background that waits for the Kafka broker to open socket on port 9092 and# then creates the topics when the broker is running and able to receive connections ...(echo "STARTUP: Waiting for Kafka broker to open socket on port 9092 ..."while ss -n | awk '$5 ~ /:9092$/ {exit 1}'; do sleep 1; doneecho "START: Found running Kafka broker on port 9092, so creating topics ..."IFS=','; for topicToCreate in $CREATE_TOPICS; do# remove leading and trailing whitespace ...topicToCreate="$(echo ${topicToCreate} | xargs )"IFS=':' read -a topicConfig <<< "$topicToCreate"config=if [ -n "${topicConfig[3]}" ]; thenconfig="--config=cleanup.policy=${topicConfig[3]}"figet_broker_endpointecho "STARTUP: Creating topic ${topicConfig[0]} with ${topicConfig[1]} partitions and ${topicConfig[2]} replicas with cleanup policy ${topicConfig[3]}..."$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server $KAFKA_BROKER --replication-factor ${topicConfig[2]} --partitions ${topicConfig[1]} --topic "${topicConfig[0]}" ${config}done)&fiif [[ ! -z "$CLUSTER_ID" && ! -f "$KAFKA_LOG_DIRS/meta.properties" ]]; thenecho "No meta.properties found in $KAFKA_LOG_DIRS; going to format the directory"$KAFKA_HOME/bin/kafka-storage.sh format --standalone -t $CLUSTER_ID -c $KAFKA_HOME/$CONFIG_FILEfiexec $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/$CONFIG_FILE;;watch-topic)shiftFROM_BEGINNING=""FETCH_MIN_BYTES=1PRINT_KEY="false"while getopts :akm: option; docase ${option} ina)FROM_BEGINNING="--from-beginning";;k)PRINT_KEY="true";;m)FETCH_MIN_BYTES=$OPTARG;;h|\?)echo "Usage:   watch-topic [-a] [-k] [-m minBytes] topicname"echo ""echo "where"echo ""echo "    -a              Consume all messages from the beginning of the topic."echo "                    By default, this starts consuming messages starting at the"echo "                    time this utility connects."echo "    -k              Display the key with the value. By default, the key will"echo "                    not be displayed."echo "    -m minBytes     Fetch messages only when doing so will consume at least"echo "                    the specified number of bytes. Defaults to '1'."echo "    topicname       The required name of the topic to watch."exit 1;;;esacdoneshift $((OPTIND -1))if [[ -z $1 ]]; thenecho "ERROR: A topic name must be specified"exit 1;fiTOPICNAME=$1shiftget_broker_endpointecho "Contents of topic $TOPICNAME:"exec $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server $KAFKA_BROKER --property print.key=$PRINT_KEY --property fetch.min.bytes=$FETCH_MIN_BYTES --topic "$TOPICNAME" $FROM_BEGINNING $@;;create-topic)shiftPARTITION=1REPLICAS=1CLEANUP_POLICY=deletewhile getopts :p:r:c: option; docase ${option} inp)PARTITION=$OPTARG;;r)REPLICAS=$OPTARG;;c)CLEANUP_POLICY=$OPTARG;;h|\?)echo "Usage:   create-topic [-p numPartitions] [-r numReplicas] [-c cleanupPolicy] topicname"echo ""echo "where"echo ""echo "    -p numPartitions   Create the topic with the specified number of partitions."echo "                       By default, the topic is created with only one partition."echo "    -r numReplicas     Create the topic with the specified number of replicas."echo "                       By default, the topic is created with only one replica."echo "                       The number of replicas may not be larger than the number"echo "                       of brokers."echo "    -c cleanupPolicy   Create the topic with the specified cleanup policy."echo "                       By default, the topic is created with delete cleanup policy."echo "    topicname          The required name of the new topic."exit 1;;;esacdoneget_broker_endpointshift $((OPTIND -1))if [[ -z $1 ]]; thenecho "ERROR: A topic name must be specified"exit 1;fiTOPICNAME=$1echo "Creating new topic $TOPICNAME with $PARTITION partition(s), $REPLICAS replica(s) and cleanup policy set to $CLEANUP_POLICY..."exec $KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server $KAFKA_BROKER --replication-factor $REPLICAS --partitions $PARTITION --topic "$TOPICNAME" --config=cleanup.policy=$CLEANUP_POLICY;;list-topics)echo "Listing topics..."get_broker_endpointexec $KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server $KAFKA_BROKER;;esac# Otherwise just run the specified command
exec "$@"
http://www.dtcms.com/a/469154.html

相关文章:

  • 2100AI智能生活
  • 网站开发交流群做网站线上线下价格混乱
  • AI:让驾驶体验个性化!
  • 由Nacos允许配置访问代理启发的Node前端部署路径转发探究
  • vue - JS 判断客户端是苹果 iOS 还是安卓 Android(封装好的方法直接调用)二种解决方案
  • 路由器如何判断数据转发目标
  • BEM命名规范
  • 12V-24V转3.2V-10V600mA恒流驱动芯片WT7018
  • 远程MCP的调用和阿里云生态的知识库和工作流的使用
  • 前端与后端开发之间的不同
  • 做企业免费网站鄂尔多斯北京网站建设
  • 网站建设优化服务好么锦州做网站
  • 在线Excel新突破:SpreadJS如何完美驾驭中国式复杂报表
  • Excel如何排序?【图文详解】Excel表格排序?Excel自动排序?
  • 【Python办公】csv转Excel(可指定行数)
  • 个人网站用备案吗深圳办公室装修公司哪家好
  • Scala面试题及详细答案100道(71-80)-- 与Java的交互
  • 基于 PyQt5 实现刀具类型选择界面的设计与交互逻辑
  • 常用库函数
  • QUIC协议相比其他传输层协议(TCP,STCP,UDP)的优势
  • 【PC+安卓】塞尔达传说:王国之泪|v1.4.2整合版|官方中文|解压可玩 内附switch模拟器
  • 【自然语言处理】实现跨层跨句的上下文语义理解的解决办法
  • 保利威点播插件功能概览:一体化视频学习与内容管理能力
  • 第六节_PySide6基本窗口控件_单行文本框(QLineEdit)
  • wordpress如何应用sslseo关键字优化软件
  • flutter项目打包macOS桌面程序dmg
  • 【MCAL】AUTOSAR架构下TC3xx芯片I2C模块详解
  • Windows10部署yolov8
  • Git|GitHub SSH 连接配置与验证全流程(通用方法)
  • K230基础-录放音频