当前位置: 首页 > news >正文

Linux 下使用 Docker-Compose 安装 Kafka 和 Kafka-UI(KRaft 模式)

一、背景

  测试 Apache ShenYu 网关日志采集和处理时,选用了 kafka 插件。所以记录一下docker-compose 搭建本地 Kafka 环境,通过 Kafka-UI 可视化管理工具进行消息查看和调试的过程。


二、安装环境

  • 操作系统:CentOS 7.9
  • Docker 版本:20.10.2
  • Docker-Compose 版本:v2.38.2
  • Kafka 版本:3.9.1(采用 KRaft 模式)
  • Kafka-UI 镜像:provectuslabs/kafka-ui:latest

三、Kafka 与 Kafka-UI 安装与启动配置

1. 准备 .env 环境变量文件

  .env 文件中定义了 Kafka 的 CLUSTER_IDMETA_FILE 路径。CLUSTER_ID 可以通过 uuidgen 命令生成,也可以根据自己的需求定义,META_FILE 的路径和 server.properties 中的 log.dirs 保持一致。

CLUSTER_ID=yourid
META_FILE=/tmp/kraft-combined-logs/meta.properties

2. 编写 docker-compose.yml

Kafka 和 Kafka-UI 的完整部署配置。注意将 yourip 替换为自己服务器的真实 IP 地址。

services:kafka:image: apache/kafka:3.9.1container_name: kafkaports:- "9099:9099"volumes:- ./kafka/server.properties:/opt/kafka/config/kraft/server.properties- ./kafka/data:/kafka/data- ./kafka/kraft-logs:/tmp/kraft-combined-logscommand: >bash -c "META_FILE=${META_FILE};if [ ! -f $META_FILE ]; thenecho '[Kafka Init] Formatting with fixed cluster.id: ${CLUSTER_ID}';/opt/kafka/bin/kafka-storage.sh format -t ${CLUSTER_ID} -c /opt/kafka/config/kraft/server.properties;elseecho '[Kafka Start] meta.properties exists, skipping format...';fi &&/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties"kafka-ui:image: provectuslabs/kafka-ui:latestcontainer_name: kafka-uiports:- 9094:8080environment:KAFKA_CLUSTERS_0_NAME: local-kafkaKAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: yourip:9099

3. Kafka 配置文件(server.properties)

  server.properties 是我从容器中搞出来的完整配置,按需修改端口和 ip, 我将端口改成了9099。 以下是关键配置项:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.#
# This configuration file is intended for use in KRaft mode, where
# Apache ZooKeeper is not present.
############################## Server Basics ############################## The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller# The node id associated with this instance's roles
node.id=1# The connect string for the controller quorum
controller.quorum.voters=1@kafka:9093############################# Socket Server Settings ############################## The address the socket server listens on.
# Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum.
# with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://0.0.0.0:9099,CONTROLLER://0.0.0.0:9093# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT# Listener name, hostname and port the broker or the controller will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://yourip:9099,CONTROLLER://yourip:9093# A comma-separated list of the names of the listeners used by the controller.
# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600############################# Log Basics ############################## A comma separated list of directories under which to store log files
log.dirs=/tmp/kraft-combined-logs# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

启动服务

docker-compose up -d
# 启动失败可以根据日志查询,在docker-compose.yml目录执行命令
docker-compose logs -f
# 或者执行
docker logs 容器名

访问 Kafka-UI

浏览器访问:http://yourip:9094

可以在页面中查看看板、topic、消费者了。


四、Java 生产者和消费者示例

使用 java 编写生产者和消费者逻辑。通过 Kafka-UI 可以可视化查看消息流转情况,maven 依赖版本最好保持和 kafka 一致或高于 kafka 的版本。

Maven 依赖:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.9.1</version>
</dependency>

为了测试方便,我写在同一个类中了。

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;/**** @title* @author shijiangyong* @date 2025/10/16 17:50**/
public class KafkaTest {public static void main(String[] args) {String bootstrapServers = "yourip:9099";String topic = "kafa-test--log";// producer(bootstrapServers, topic);consumer(bootstrapServers, topic);}private static void consumer(String bootstrapServers, String topic) {Properties props = new Properties();props.put("bootstrap.servers", bootstrapServers);props.put("group.id", "shenyu-group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "latest");try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {consumer.subscribe(Collections.singletonList(topic));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("消费消息: %s%n", record.value());}}}}/*** 生产者* @param bootstrapServers* @param topic*/private static void producer(String bootstrapServers, String topic) {createTopicIfNotExists(bootstrapServers, topic);Properties props = new Properties();props.put("bootstrap.servers", bootstrapServers);props.put("key.serializer", StringSerializer.class.getName());props.put("value.serializer", StringSerializer.class.getName());try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {String message = "别扯些没用的";ProducerRecord<String, String> record = new ProducerRecord<>(topic, "test-key",message);producer.send(record, (metadata, exception) -> {if (exception == null) {System.out.println("sent to topic: " + metadata.topic());} else {exception.printStackTrace();}});}}/*** topic不存在则创建,可以自己在kafka-ui中创建* @param bootstrapServers* @param topic*/private static void createTopicIfNotExists(String bootstrapServers, String topic) {Properties adminProps = new Properties();adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);try (AdminClient adminClient = AdminClient.create(adminProps)) {// 获取已存在的 topic 列表Set<String> topics = adminClient.listTopics().names().get();if (!topics.contains(topic)) {// 创建 topic(默认1个分区,1个副本)NewTopic newTopic = new NewTopic(topic, 1, (short) 1);adminClient.createTopics(Collections.singleton(newTopic)).all().get();System.out.println("create topic : " + topic);} else {System.out.println("topic already exists: " + topic);}} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}}
}

注: kafka 默认可以自动创建 topic 的。如果不想自动创建,在server.properties文件中增加auto.create.topics.enable=false ,重启容器即可。

调用生产者生产日志,结果如图:

调用消费者消费日志,结果如图:


五、ShenYu 网关 Kafka 消费者代码示例

kafka 中消息:

  集成在 ShenYu 插件中的 Kafka 消费者代码如下,用于日志订阅消费。调用 start() 方法后即可开始接收日志记录。

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonDeserializer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.shenyu.admin.mapper.BizAuditLogMapper;
import org.apache.shenyu.admin.model.entity.BizAuditLogDO;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.Properties;/**** @title* @author shijiangyong* @date 2025/10/15 15:00**/
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaLogConsumerService {private final BizAuditLogMapper bizAuditLogMapper;private final Gson gson = new GsonBuilder().registerTypeAdapter(LocalDateTime.class, (JsonDeserializer<LocalDateTime>) (json, type, ctx) ->LocalDateTime.parse(json.getAsString(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))).create();@PostConstructpublic void startConsumer() {Thread thread = new Thread(this::consumeLoop, "shenyu-kafka-consumer");thread.setDaemon(true);thread.start();}private void consumeLoop() {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your:9099");props.put(ConsumerConfig.GROUP_ID_CONFIG, "shenyu-log-gson-consumer");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {consumer.subscribe(Collections.singletonList("shenyu-access-logging"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : records) {// 只处理没有 key 的消息if (record.key() != null) {log.info("忽略含 key 的消息: {}", record.key());continue;}try {BizAuditLogDO logEntry = gson.fromJson(record.value(), BizAuditLogDO.class);bizAuditLogMapper.addDynamicBizAuditLog(logEntry);log.info("插入成功:{}", logEntry.getRequestUri());} catch (Exception e) {log.error("消息解析或入库失败,内容: {}", record.value(), e);}}}} catch (Exception e) {log.error("Kafka 消费异常中止:", e);}}
}

每调用一次就存入接口,记录便存到 mysql 中。


六、遇到的问题与解决方案

问题一:Kafka 启动失败,提示找不到 meta.properties

报错信息:

原因分析:

  没有将 log.dirs(即 /tmp/kraft-combined-logs)挂载到宿主机,容器重启后该目录被清空,导致 Kafka 无法读取 KRaft 所需的 meta.properties, Kafka 在 KRaft 模式(无 ZooKeeper) 下,依赖 meta.properties 文件用于标识 broker 的唯一身份、cluster id 等信息 ,所以启动失败。

解决方案:

持久化 meta.properties到宿主机。

- ./kafka/kraft-logs:/tmp/kraft-combined-logs

问题二:Kafka 初始化时报错无法写入 meta.properties

报错信息:

原因分析:

  宿主机上挂载目录是用 root 创建的,Kafka 容器内的进程(通常为 UID 1000)无权限写入。

解决方案:

  执行命令修复挂载目录权限:

sudo chown -R 1000:1000 ./kafka/kraft-logs

问题三:JVM 无权限写入 GC 日志

原因分析:

  Kafka 默认将 GC 日志写入 /kafka/data,但是这个目录对容器内进程没有写权限。

解决方案:

执行命令修复挂载目录权限:

sudo chown -R 1000:1000 ./kafka/data

或者禁用 GC 日志输出,这个我没试。

http://www.dtcms.com/a/506636.html

相关文章:

  • 【C++入门篇 - 10】:模板
  • [Linux]学习笔记系列 -- [kernel][lock]mutex
  • 开源 Linux 服务器与中间件(七)数据库--MySQL
  • 在 JavaScript 中处理 `0.1 + 0.2` 这类精度问题
  • 今天我们学习python编程常用模块与面向对象
  • 网站的三大标签宁波专业seo服务
  • Day6C语言前期阶段练习之汉诺塔问题
  • Apache Spark 集群部署与使用指南
  • 基于3D LiDAR的作物排检,用于在不同种植密度下成熟时的大豆
  • Python使用Medical Information Dataset实战2025.07版(上)
  • 基因组学中的深度学习!
  • 基于容器适配器模式的 Stack 与 Queue 实现:复用底层容器的优雅设计
  • Kafka面试精讲 Day 26:Kafka集群部署与配置
  • 73_基于深度学习的水面漂浮垃圾检测系统(yolo11、yolov8、yolov5+UI界面+Python项目源码+模型+标注好的数据集)
  • 在JavaScript中,清除 Canvas 画布上的内容
  • 方便做简笔画的网站或软件公司人员管理系统
  • SQL之参数类型讲解
  • CTFSHOW—WEB5
  • UU远程深度测评:重构远程控制体验的“无套路”标杆
  • 提升 iOS 26 系统流畅度的实战指南,多工具组合监控
  • vue3:vue3 + elementplus + pinia实现js的XMLHttpRequest 下载功能。
  • 如何在macOS上免密登录阿里云ECS服务器
  • 把“天猫”装进 JVM:Java 关键词商品爬虫从 0 到 1(含完整可运行代码)
  • tar打包过滤指定目录指南
  • 塘厦镇住房规划建设局网站wordpress主题生成
  • 5-SpringCloud-服务链路追踪 Micrometer Tracing
  • 网站怎样做谷歌推广没有网站怎么做淘宝客
  • 【C/C++基本功】union联合体彻底详解
  • 万字 Apache ShardingSphere 完全指南:从分库分表到分布式数据库生态
  • WebPages PHP:深入理解与高效实践