#!/bin/bash# Exit immediately if a *pipeline* returns a non-zero status. (Add -x for command tracing)set -eget_broker_endpoint(){if[[ -z "$KAFKA_BROKER"]];then# Look for any environment variables set by Docker container linking. For example, if the container# running Kafka were named 'broker' in this container, then Docker should have created several envs,# such as 'BROKER_PORT_9092_TCP'. If so, then use that to automatically connect to the linked broker.exportKAFKA_BROKER=$(env|grep .*PORT_9092_TCP=|sed -e 's|.*tcp://||'|uniq|paste -sd ,)fiif[["x$KAFKA_BROKER"="x"]];thenexportKAFKA_BROKER=0.0.0.0:9092fiecho"Using KAFKA_BROKER=$KAFKA_BROKER"}if[[ -z "$NODE_ID"]];thenif[[ -z "$BROKER_ID"]];thenNODE_ID=1echo"WARNING: Using default NODE_ID=1, which is valid only for non-clustered installations."elseNODE_ID="$BROKER_ID"echo"WARNING: Using NODE_ID=$BROKER_ID, as specified via BROKER_ID variable. Please update your configuration to use the NODE_ID variable instead."fifi# KRaft modeif[[ -z "$CLUSTER_ID"]];thenCLUSTER_ID='ldlg5yhkQeWtnUrZrC6edg';echo'CLUSTER_ID not set, using default'fiif[[ -z "$NODE_ROLE"]];thenNODE_ROLE='combined';ficase"$NODE_ROLE"in'combined')CONFIG_FILE=config/server.properties;;'broker')CONFIG_FILE=config/broker.properties;;'controller')CONFIG_FILE=config/controller.properties;;*)CONFIG_FILE=config/server.properties;;esacecho"Starting in KRaft mode, using CLUSTER_ID=$CLUSTER_ID, NODE_ID=$NODE_ID and NODE_ROLE=$NODE_ROLE."echo"Using configuration $CONFIG_FILE."if[[ -n "$HEAP_OPTS"]];thensed -r -i "s/^(export KAFKA_HEAP_OPTS)=\"(.*)\"/\1=\"${HEAP_OPTS}\"/g"$KAFKA_HOME/bin/kafka-server-start.shunset HEAP_OPTS
fiexportKAFKA_NODE_ID=$NODE_IDexportKAFKA_BROKER_ID=$NODE_IDexportKAFKA_LOG_DIRS="${KAFKA_DATA}/$KAFKA_NODE_ID"mkdir -p $KAFKA_LOG_DIRSunset NODE_IDif[[ -z "$ADVERTISED_PORT"]];thenADVERTISED_PORT=9092fiif[[ -z "$HOST_NAME"]];thenHOST_NAME=$(ip addr |grep'BROADCAST' -A2 |tail -n1 |awk'{print $2}'|cut -f1 -d'/')fi:${PORT:=9092}:${ADVERTISED_PORT:=9092}:${CONTROLLER_PORT:=9093}:${ADVERTISED_PORT:=${PORT}}:${ADVERTISED_HOST_NAME:=${HOST_NAME}}:${KAFKA_ADVERTISED_PORT:=${ADVERTISED_PORT}}:${KAFKA_ADVERTISED_HOST_NAME:=${ADVERTISED_HOST_NAME}}:${KAFKA_PORT:=${PORT}}:${KAFKA_HOST_NAME:=${HOST_NAME}}case"$NODE_ROLE"in'combined'):${KAFKA_LISTENERS:=PLAINTEXT://$KAFKA_HOST_NAME:$KAFKA_PORT,CONTROLLER://$KAFKA_HOST_NAME:$CONTROLLER_PORT};;'broker'):${KAFKA_LISTENERS:=PLAINTEXT://$KAFKA_HOST_NAME:$KAFKA_PORT};;'controller'):${KAFKA_LISTENERS:=PLAINTEXT://$KAFKA_HOST_NAME:$CONTROLLER_PORT};;*):${KAFKA_LISTENERS:=PLAINTEXT://$KAFKA_HOST_NAME:$KAFKA_PORT,CONTROLLER://$KAFKA_HOST_NAME:$CONTROLLER_PORT};;esac:${KAFKA_ADVERTISED_LISTENERS:=PLAINTEXT://$KAFKA_ADVERTISED_HOST_NAME:$KAFKA_ADVERTISED_PORT}export KAFKA_LISTENERS KAFKA_ADVERTISED_LISTENERS
unset HOST_NAME ADVERTISED_HOST_NAME KAFKA_HOST_NAME KAFKA_ADVERTISED_HOST_NAME PORT ADVERTISED_PORT KAFKA_PORT KAFKA_ADVERTISED_PORT CONTROLLER_PORT NODE_ROLEecho"Using KAFKA_LISTENERS=$KAFKA_LISTENERS and KAFKA_ADVERTISED_LISTENERS=$KAFKA_ADVERTISED_LISTENERS"## Set up the JMX options#:${JMXAUTH:="false"}:${JMXSSL:="false"}if[[ -n "$JMXPORT"&& -n "$JMXHOST"]];thenecho"Enabling JMX on ${JMXHOST}:${JMXPORT}"exportKAFKA_JMX_OPTS="-Djava.rmi.server.hostname=${JMXHOST} -Dcom.sun.management.jmxremote.rmi.port=${JMXPORT} -Dcom.sun.management.jmxremote.port=${JMXPORT} -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=${JMXAUTH} -Dcom.sun.management.jmxremote.ssl=${JMXSSL} "fi# Choose the right `cp` argument, `--update=none` is not available on RHELrelease=`cat /etc/redhat-release |cut -d ' ' -f1`if[$release="Fedora"];thencp_arg="-r --update=none"elsecp_arg="-rn"fi# Copy config files if not provided in volumecp$cp_arg$KAFKA_HOME/config.orig/* $KAFKA_HOME/config# Process the argument to this container ...case$1instart)## Configure the log files ...#if[[ -z "$LOG_LEVEL"]];thenLOG_LEVEL="INFO"fised -i -r -e "s|=INFO, stdout|=$LOG_LEVEL, stdout|g"$KAFKA_HOME/config/log4j.propertiessed -i -r -e "s|^(log4j.appender.stdout.threshold)=.*|\1=${LOG_LEVEL}|g"$KAFKA_HOME/config/log4j.propertiesexportKAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$KAFKA_HOME/config/log4j.properties"unset LOG_LEVEL# Add missing EOF at the end of the config fileecho"">>$KAFKA_HOME/$CONFIG_FILE## Process all environment variables that start with 'KAFKA_' (but not 'KAFKA_HOME' or 'KAFKA_VERSION'):#forVARin`env`doenv_var=`echo"$VAR"|sed"s/=.*//"`if[[$env_var=~ ^KAFKA_ &&$env_var!="KAFKA_VERSION"&&$env_var!="KAFKA_HOME"&&$env_var!="KAFKA_LOG4J_OPTS"&&$env_var!="KAFKA_JMX_OPTS"]];thenprop_name=`echo"$VAR"|sed -r "s/^KAFKA_(.*)=.*/\1/g"|tr'[:upper:]''[:lower:]'|tr _ .`ifgrep -Eq "(^|^#)$prop_name="$KAFKA_HOME/$CONFIG_FILE;then#note that no config names or values may contain an '@' charsed -r -i "s%(^|^#)($prop_name)=(.*)%\2=${!env_var}%g"$KAFKA_HOME/$CONFIG_FILEelse#echo "Adding property $prop_name=${!env_var}"echo"$prop_name=${!env_var}">>$KAFKA_HOME/$CONFIG_FILEfifidoneif[[ -n $CREATE_TOPICS]];thenecho"Creating topics: $CREATE_TOPICS"# Start a subshell in the background that waits for the Kafka broker to open socket on port 9092 and# then creates the topics when the broker is running and able to receive connections ...(echo"STARTUP: Waiting for Kafka broker to open socket on port 9092 ..."while ss -n |awk'$5 ~ /:9092$/ {exit 1}';dosleep1;doneecho"START: Found running Kafka broker on port 9092, so creating topics ..."IFS=',';fortopicToCreatein$CREATE_TOPICS;do# remove leading and trailing whitespace ...topicToCreate="$(echo ${topicToCreate}|xargs)"IFS=':'read -a topicConfig <<<"$topicToCreate"config=if[ -n "${topicConfig[3]}"];thenconfig="--config=cleanup.policy=${topicConfig[3]}"figet_broker_endpointecho"STARTUP: Creating topic ${topicConfig[0]} with ${topicConfig[1]} partitions and ${topicConfig[2]} replicas with cleanup policy ${topicConfig[3]}..."$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server $KAFKA_BROKER --replication-factor ${topicConfig[2]} --partitions ${topicConfig[1]} --topic "${topicConfig[0]}"${config}done)&fiif[[! -z "$CLUSTER_ID"&&! -f "$KAFKA_LOG_DIRS/meta.properties"]];thenecho"No meta.properties found in $KAFKA_LOG_DIRS; going to format the directory"$KAFKA_HOME/bin/kafka-storage.sh format --standalone -t $CLUSTER_ID -c $KAFKA_HOME/$CONFIG_FILEfiexec$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/$CONFIG_FILE;;watch-topic)shiftFROM_BEGINNING=""FETCH_MIN_BYTES=1PRINT_KEY="false"whilegetopts :akm: option;docase${option}ina)FROM_BEGINNING="--from-beginning";;k)PRINT_KEY="true";;m)FETCH_MIN_BYTES=$OPTARG;;h|\?)echo"Usage: watch-topic [-a] [-k] [-m minBytes] topicname"echo""echo"where"echo""echo" -a Consume all messages from the beginning of the topic."echo" By default, this starts consuming messages starting at the"echo" time this utility connects."echo" -k Display the key with the value. By default, the key will"echo" not be displayed."echo" -m minBytes Fetch messages only when doing so will consume at least"echo" the specified number of bytes. Defaults to '1'."echo" topicname The required name of the topic to watch."exit1;;;esacdoneshift$((OPTIND -1))if[[ -z $1]];thenecho"ERROR: A topic name must be specified"exit1;fiTOPICNAME=$1shiftget_broker_endpointecho"Contents of topic $TOPICNAME:"exec$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server $KAFKA_BROKER --property print.key=$PRINT_KEY --property fetch.min.bytes=$FETCH_MIN_BYTES --topic "$TOPICNAME"$FROM_BEGINNING$@;;create-topic)shiftPARTITION=1REPLICAS=1CLEANUP_POLICY=deletewhilegetopts :p:r:c: option;docase${option}inp)PARTITION=$OPTARG;;r)REPLICAS=$OPTARG;;c)CLEANUP_POLICY=$OPTARG;;h|\?)echo"Usage: create-topic [-p numPartitions] [-r numReplicas] [-c cleanupPolicy] topicname"echo""echo"where"echo""echo" -p numPartitions Create the topic with the specified number of partitions."echo" By default, the topic is created with only one partition."echo" -r numReplicas Create the topic with the specified number of replicas."echo" By default, the topic is created with only one replica."echo" The number of replicas may not be larger than the number"echo" of brokers."echo" -c cleanupPolicy Create the topic with the specified cleanup policy."echo" By default, the topic is created with delete cleanup policy."echo" topicname The required name of the new topic."exit1;;;esacdoneget_broker_endpointshift$((OPTIND -1))if[[ -z $1]];thenecho"ERROR: A topic name must be specified"exit1;fiTOPICNAME=$1echo"Creating new topic $TOPICNAME with $PARTITION partition(s), $REPLICAS replica(s) and cleanup policy set to $CLEANUP_POLICY..."exec$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server $KAFKA_BROKER --replication-factor $REPLICAS --partitions $PARTITION --topic "$TOPICNAME" --config=cleanup.policy=$CLEANUP_POLICY;;list-topics)echo"Listing topics..."get_broker_endpointexec$KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server $KAFKA_BROKER;;esac# Otherwise just run the specified commandexec"$@"