在linux上安装kafka,并使用kafka-clients实现消费者
1.安装 java 环境
Kafka依赖 Java运行环境(JDK8或更高版本)
# 安装OpenJDK(推荐)
yum install openjdk-11-jdk# 验证安装
java -version
如果已经安装Java环境,可通过如下方式进行检查
# 检查当前 JAVA_HOME
echo $JAVA_HOME# 查找Java安装路径
update-alternatives --config java# 输出示例:/usr/lib/jvm/java-11-openjdk-amd64/bin/java# 设置JAVA_HOME(替换为你的路径)
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11.0.18.0.10-1.el7_9.x86_64
export PATH=$JAVA_HOME/bin:$PATH#永久修复(所有终端生效):
# /etc/profile
vim /etc/profile# 添加以下内容(替换为你的路径)
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11.0.18.0.10-1.el7_9.x86_64
export PATH=$JAVA_HOME/bin:$PATH# 使配置生效
source /etc/profile
2.下载和解压Kafka
从官网下载最新版Kafka(以3.7.0为例)
wget https://archive.apache.org/dist/kafka/3.7.0/kafka_2.13-3.7.0.tgz
tar -xzf kafka_2.13-3.7.0.tgz
mv kafka_2.13-3.7.0 /opt/kafka #移动到/opt目录
cd /opt/kafka
3.配置Kafka
修改配置文件 config/server.properties:
vim config/server.properties
关键配置项:
listeners=PLAINTEXT://0.0.0.0:9092
# 允许外部访问(替换为你的服务器IP或保持localhost。如果需要其他机器能访问你的kafka,就配置为服务器IP)
advertised.listeners=PLAINTEXT://<服务器IP>:9092# 日志存储目录(确保目录存在且可写)
log.dirs=/tmp/kafka-logs
4.启动Zookeeper和Kafka
Kafka依赖ZooKeeper协调服务,新版本内置了ZooKeeper(等zookeeper启动好了再启动kafka)
如果启动时内存不足,可以修改zookeeper和kafka的启动脚本,把内存缩小一些,我是缩小了一倍:
#启动ZooKeeper(后台运行)
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log 2>&1 &#启动Kafka(后台运行)
nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
5.测试Kafka
5.1创建topic(localhost或你的服务器IP)
bin/kafka-topics.sh --create --topic test-topic \
--bootstrap-server localhost:9092 \
--partitions 1 --replication-factor 1
5.2启动生产者
bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
5.3启动消费者(新终端)
bin/kafka-console-consumer.sh --topic test-topic --bootstrap-server localhost:9092 --from-beginning
6.停止服务
# 停止Kafka
bin/kafka-server-stop.sh# 停止ZooKeeper
bin/zookeeper-server-stop.sh
7.常见问题
端口冲突:确保9092(Kafka)和2181(ZooKeeper)端口未被占用。
防火墙:开放端口或关闭防火墙:
日志目录权限:确保Kafka进程有权限写入log.dirs配置的目录。
8.常用命令
#启动ZooKeeper(后台运行)
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log 2>&1 &#启动Kafka(后台运行)
nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &#创建测试用的 topic
bin/kafka-topics.sh --create --topic lzq-topic --bootstrap-server 123.249.124.105:9092 --partitions 1 --replication-factor 1bin/kafka-topics.sh --create --topic lzq-topic \
--bootstrap-server 123.249.124.105:9092 \
--partitions 1 --replication-factor 1#创建生产者
bin/kafka-console-producer.sh --topic lzq-topic --bootstrap-server 123.249.124.105:9092#创建消费者
bin/kafka-console-consumer.sh --topic lzq-topic --bootstrap-server 123.249.124.105:9092 --from-beginning# 停止Kafka
bin/kafka-server-stop.sh# 停止ZooKeeper
bin/zookeeper-server-stop.sh# 3. 列出topic
bin/kafka-topics.sh --list --bootstrap-server 123.249.124.105:9092bin/kafka-topics.sh --list --bootstrap-server localhost:9092ps -ef|grep kafkaps -ef|grep zook# 列出所有活跃的消费者组
bin/kafka-consumer-groups.sh --bootstrap-server 123.249.124.105:9092 --list# 查看特定组的详情
bin/kafka-consumer-groups.sh --bootstrap-server 123.249.124.105:9092 --group app-system-group --describe
9.pom引入依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.4.1</version><scope>compile</scope></dependency>
10.创建配置类
package cn.newdt.monitor.custom.config;import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;
/*** Kafka配置类*/
@Configuration
public class KafkaConfig {@Value("${kafka.bootstrap.servers:123.249.124.105:9092}")private String bootstrapServers;@Value("${kafka.group.id:app-system-group}")private String groupId;@Value("${kafka.enable.auto.commit:true}")private Boolean enableAutoCommit;@Value("${kafka.auto.commit.interval:5000}")private Integer autoCommitInterval;@Value("${kafka.auto.offset.reset:latest}")private String autoOffsetReset;@Value("${kafka.fetch.max.wait:500}")private Integer fetchMaxWait;@Value("${kafka.fetch.min.size:1}")private Integer fetchMinSize;@Value("${kafka.heartbeat.interval:3000}")private Integer heartbeatInterval;@Value("${kafka.max.poll.records:500}")private Integer maxPollRecords;@Beanpublic KafkaConsumer kafkaConsumer() {Map<String, Object> configs = new HashMap<>();// kafka服务端的IP和端口,格式:(ip:port)configs.put("bootstrap.servers", bootstrapServers);// 指定消费组(自己起的组名,启动时会自动把这个组名注册到kafka)configs.put("group.id", groupId);// 开启consumer的偏移量(offset)自动提交到Kafkaconfigs.put("enable.auto.commit", enableAutoCommit);//consumer的偏移量(offset) 自动提交的时间间隔,单位毫秒configs.put("auto.commit.interval.ms", autoCommitInterval);/*偏移量重置策略(在Kafka中没有初始化偏移量或者当前偏移量不存在情况)earliest, 在偏移量无效的情况下, 自动重置为最早的偏移量latest, 在偏移量无效的情况下, 自动重置为最新的偏移量none, 在偏移量无效的情况下, 抛出异常.*/configs.put("auto.offset.reset", autoOffsetReset);// 请求阻塞最大时间configs.put("fetch.max.wait.ms", fetchMaxWait);// 请求应答最小字节数configs.put("fetch.min.bytes", fetchMinSize);// 心跳间隔时间configs.put("heartbeat.interval.ms", heartbeatInterval);// 单次poll最大记录数configs.put("max.poll.records", maxPollRecords);// Key和Value反序列化器Deserializer<String> keyDeserializer = new StringDeserializer();Deserializer<String> valueDeserializer = new StringDeserializer();// 创建Kafka消费者return new KafkaConsumer<>(configs, keyDeserializer, valueDeserializer);}
}
10.创建消费者
package cn.newdt.monitor.custom;import cn.hutool.core.thread.ThreadUtil;
import cn.newdt.base.cmdb.domain.AppSystem;
import cn.newdt.monitor.custom.dao.config.CustomAppSystemMapper;
import cn.newdt.monitor.utils.CommonUtil;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;/*** 监听来自 kafka 的应用系统消息*/
@Component
@Slf4j
public class AppSystemKafkaConsumer implements InitializingBean, DisposableBean {@Autowiredprivate KafkaConsumer kafkaConsumer;@Autowiredprivate CustomAppSystemMapper customAppSystemMapper;@Value("${kafka.topic.name:lzq-topic}")private String topicName;@Value("${kafka.poll.timeout:1000}")private Long pollTimeout;private volatile Thread consumerThread;private final AtomicBoolean running = new AtomicBoolean(false);private static final String LOG_PREFIX = "[kafka同步应用系统]";@Overridepublic void afterPropertiesSet() {consumerThread = new Thread(this::consumeMessages, "kafka-consumer-thread");consumerThread.setDaemon(false);consumerThread.start();}private void consumeMessages() {try {log.info("{}启动线程监听Topic: {}", LOG_PREFIX, topicName);ThreadUtil.sleep(1000);kafkaConsumer.subscribe(Collections.singletonList(topicName));running.set(true);while (running.get()) {try {ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(pollTimeout));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {processMessage(consumerRecord);}} catch (Exception e) {log.error("{}Kafka消费过程中发生异常", LOG_PREFIX, e);ThreadUtil.sleep(5000); // 发生异常时短暂休眠避免频繁重试}}} catch (Exception e) {log.error("{}Kafka消费者线程执行异常", LOG_PREFIX, e);} finally {log.info("{}Kafka消费者线程已停止", LOG_PREFIX);}}private void processMessage(ConsumerRecord<String, String> consumerRecord) {try {String originalMsg = consumerRecord.value();log.info("{}消费消息 - Topic: {}, Partition: {}, Offset: {}, Value: {}",LOG_PREFIX,consumerRecord.topic(),consumerRecord.partition(),consumerRecord.offset(),originalMsg);// todo: 在这里添加具体的业务逻辑处理} catch (Exception e) {log.error("{}处理Kafka消息时发生异常: {}", LOG_PREFIX, consumerRecord.value(), e);}}@Overridepublic void destroy() {log.info("开始关闭Kafka消费者");running.set(false);if (consumerThread != null) {consumerThread.interrupt();try {consumerThread.join(5000); // 等待最多5秒} catch (InterruptedException e) {Thread.currentThread().interrupt();}}if (kafkaConsumer != null) {try {kafkaConsumer.close();log.info("Kafka消费者已关闭");} catch (Exception e) {log.error("关闭Kafka消费者时发生异常", e);}}}}