当前位置: 首页 > news >正文

在linux上安装kafka,并使用kafka-clients实现消费者

1.安装 java 环境

Kafka依赖 Java运行环境(JDK8或更高版本)

# 安装OpenJDK(推荐)
yum install openjdk-11-jdk# 验证安装
java -version

如果已经安装Java环境,可通过如下方式进行检查

# 检查当前 JAVA_HOME
echo $JAVA_HOME# 查找Java安装路径
update-alternatives --config java# 输出示例:/usr/lib/jvm/java-11-openjdk-amd64/bin/java# 设置JAVA_HOME(替换为你的路径)
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11.0.18.0.10-1.el7_9.x86_64
export PATH=$JAVA_HOME/bin:$PATH#永久修复(所有终端生效):
# /etc/profile
vim /etc/profile# 添加以下内容(替换为你的路径)
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11.0.18.0.10-1.el7_9.x86_64
export PATH=$JAVA_HOME/bin:$PATH# 使配置生效
source /etc/profile

2.下载和解压Kafka

从官网下载最新版Kafka(以3.7.0为例)

​wget https://archive.apache.org/dist/kafka/3.7.0/kafka_2.13-3.7.0.tgz
tar -xzf kafka_2.13-3.7.0.tgz
mv kafka_2.13-3.7.0 /opt/kafka  #移动到/opt目录
cd /opt/kafka

3.配置Kafka

修改配置文件 config/server.properties:

vim config/server.properties

关键配置项:

listeners=PLAINTEXT://0.0.0.0:9092
# 允许外部访问(替换为你的服务器IP或保持localhost。如果需要其他机器能访问你的kafka,就配置为服务器IP)
advertised.listeners=PLAINTEXT://<服务器IP>:9092# 日志存储目录(确保目录存在且可写)
log.dirs=/tmp/kafka-logs

4.启动Zookeeper和Kafka

Kafka依赖ZooKeeper协调服务,新版本内置了ZooKeeper(等zookeeper启动好了再启动kafka)
如果启动时内存不足,可以修改zookeeper和kafka的启动脚本,把内存缩小一些,我是缩小了一倍:

#启动ZooKeeper(后台运行)
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log 2>&1 &#启动Kafka(后台运行)
nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &

5.测试Kafka

5.1创建topic(localhost或你的服务器IP)

bin/kafka-topics.sh --create --topic test-topic \
--bootstrap-server localhost:9092 \
--partitions 1 --replication-factor 1

5.2启动生产者

bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092

5.3启动消费者(新终端)

bin/kafka-console-consumer.sh --topic test-topic --bootstrap-server localhost:9092 --from-beginning

6.停止服务

# 停止Kafka
bin/kafka-server-stop.sh# 停止ZooKeeper
bin/zookeeper-server-stop.sh

7.常见问题

端口冲突:确保9092(Kafka)和2181(ZooKeeper)端口未被占用。

防火墙:开放端口或关闭防火墙:
日志目录权限:确保Kafka进程有权限写入log.dirs配置的目录。

8.常用命令

#启动ZooKeeper(后台运行)
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log 2>&1 &#启动Kafka(后台运行)
nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &#创建测试用的 topic
bin/kafka-topics.sh --create --topic lzq-topic --bootstrap-server 123.249.124.105:9092 --partitions 1 --replication-factor 1bin/kafka-topics.sh --create --topic lzq-topic \
--bootstrap-server 123.249.124.105:9092 \
--partitions 1 --replication-factor 1#创建生产者
bin/kafka-console-producer.sh --topic lzq-topic --bootstrap-server 123.249.124.105:9092#创建消费者
bin/kafka-console-consumer.sh --topic lzq-topic --bootstrap-server 123.249.124.105:9092 --from-beginning# 停止Kafka
bin/kafka-server-stop.sh# 停止ZooKeeper
bin/zookeeper-server-stop.sh# 3. 列出topic
bin/kafka-topics.sh --list --bootstrap-server 123.249.124.105:9092bin/kafka-topics.sh --list --bootstrap-server localhost:9092ps -ef|grep kafkaps -ef|grep zook# 列出所有活跃的消费者组
bin/kafka-consumer-groups.sh --bootstrap-server 123.249.124.105:9092 --list# 查看特定组的详情
bin/kafka-consumer-groups.sh --bootstrap-server 123.249.124.105:9092 --group app-system-group --describe

9.pom引入依赖

        <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.4.1</version><scope>compile</scope></dependency>

10.创建配置类

package cn.newdt.monitor.custom.config;import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;
/*** Kafka配置类*/
@Configuration
public class KafkaConfig {@Value("${kafka.bootstrap.servers:123.249.124.105:9092}")private String bootstrapServers;@Value("${kafka.group.id:app-system-group}")private String groupId;@Value("${kafka.enable.auto.commit:true}")private Boolean enableAutoCommit;@Value("${kafka.auto.commit.interval:5000}")private Integer autoCommitInterval;@Value("${kafka.auto.offset.reset:latest}")private String autoOffsetReset;@Value("${kafka.fetch.max.wait:500}")private Integer fetchMaxWait;@Value("${kafka.fetch.min.size:1}")private Integer fetchMinSize;@Value("${kafka.heartbeat.interval:3000}")private Integer heartbeatInterval;@Value("${kafka.max.poll.records:500}")private Integer maxPollRecords;@Beanpublic KafkaConsumer kafkaConsumer() {Map<String, Object> configs = new HashMap<>();// kafka服务端的IP和端口,格式:(ip:port)configs.put("bootstrap.servers", bootstrapServers);// 指定消费组(自己起的组名,启动时会自动把这个组名注册到kafka)configs.put("group.id", groupId);// 开启consumer的偏移量(offset)自动提交到Kafkaconfigs.put("enable.auto.commit", enableAutoCommit);//consumer的偏移量(offset) 自动提交的时间间隔,单位毫秒configs.put("auto.commit.interval.ms", autoCommitInterval);/*偏移量重置策略(在Kafka中没有初始化偏移量或者当前偏移量不存在情况)earliest, 在偏移量无效的情况下, 自动重置为最早的偏移量latest, 在偏移量无效的情况下, 自动重置为最新的偏移量none, 在偏移量无效的情况下, 抛出异常.*/configs.put("auto.offset.reset", autoOffsetReset);// 请求阻塞最大时间configs.put("fetch.max.wait.ms", fetchMaxWait);// 请求应答最小字节数configs.put("fetch.min.bytes", fetchMinSize);// 心跳间隔时间configs.put("heartbeat.interval.ms", heartbeatInterval);// 单次poll最大记录数configs.put("max.poll.records", maxPollRecords);// Key和Value反序列化器Deserializer<String> keyDeserializer = new StringDeserializer();Deserializer<String> valueDeserializer = new StringDeserializer();// 创建Kafka消费者return new KafkaConsumer<>(configs, keyDeserializer, valueDeserializer);}
}

10.创建消费者

package cn.newdt.monitor.custom;import cn.hutool.core.thread.ThreadUtil;
import cn.newdt.base.cmdb.domain.AppSystem;
import cn.newdt.monitor.custom.dao.config.CustomAppSystemMapper;
import cn.newdt.monitor.utils.CommonUtil;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;/*** 监听来自 kafka 的应用系统消息*/
@Component
@Slf4j
public class AppSystemKafkaConsumer implements InitializingBean, DisposableBean {@Autowiredprivate KafkaConsumer kafkaConsumer;@Autowiredprivate CustomAppSystemMapper customAppSystemMapper;@Value("${kafka.topic.name:lzq-topic}")private String topicName;@Value("${kafka.poll.timeout:1000}")private Long pollTimeout;private volatile Thread consumerThread;private final AtomicBoolean running = new AtomicBoolean(false);private static final String LOG_PREFIX = "[kafka同步应用系统]";@Overridepublic void afterPropertiesSet() {consumerThread = new Thread(this::consumeMessages, "kafka-consumer-thread");consumerThread.setDaemon(false);consumerThread.start();}private void consumeMessages() {try {log.info("{}启动线程监听Topic: {}", LOG_PREFIX, topicName);ThreadUtil.sleep(1000);kafkaConsumer.subscribe(Collections.singletonList(topicName));running.set(true);while (running.get()) {try {ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(pollTimeout));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {processMessage(consumerRecord);}} catch (Exception e) {log.error("{}Kafka消费过程中发生异常", LOG_PREFIX, e);ThreadUtil.sleep(5000); // 发生异常时短暂休眠避免频繁重试}}} catch (Exception e) {log.error("{}Kafka消费者线程执行异常", LOG_PREFIX, e);} finally {log.info("{}Kafka消费者线程已停止", LOG_PREFIX);}}private void processMessage(ConsumerRecord<String, String> consumerRecord) {try {String originalMsg = consumerRecord.value();log.info("{}消费消息 - Topic: {}, Partition: {}, Offset: {}, Value: {}",LOG_PREFIX,consumerRecord.topic(),consumerRecord.partition(),consumerRecord.offset(),originalMsg);// todo: 在这里添加具体的业务逻辑处理} catch (Exception e) {log.error("{}处理Kafka消息时发生异常: {}", LOG_PREFIX, consumerRecord.value(), e);}}@Overridepublic void destroy() {log.info("开始关闭Kafka消费者");running.set(false);if (consumerThread != null) {consumerThread.interrupt();try {consumerThread.join(5000); // 等待最多5秒} catch (InterruptedException e) {Thread.currentThread().interrupt();}}if (kafkaConsumer != null) {try {kafkaConsumer.close();log.info("Kafka消费者已关闭");} catch (Exception e) {log.error("关闭Kafka消费者时发生异常", e);}}}}
http://www.dtcms.com/a/406365.html

相关文章:

  • 数据结构——受限表之队列
  • 宁波网站建设公司立找亿企邦郑州高端装修设计公司
  • python整合网站开发技术凌云网络科技有限公司
  • Ansible 生产级自动化指南:Playbook、Handlers、Jinja2 全解析
  • Ansible Playbook:自动化配置管理的利器
  • 光影绘新疆:解锁城市旅游宣传片拍摄全攻略
  • 龙华网站建设专业定制企业静态网页设计制作心得
  • MotionSight论文阅读
  • 大模型为什么RoPE能提升长序列表现?
  • TypeScript类型兼容性
  • 软件介绍下载网站建设广安门外网站建设
  • SpringBoot 统一功能处理:拦截器、统一返回与异常处理
  • MySQL 8.0 核心转储优化指南
  • MySQL 学习笔记 (Part.2)
  • 什么是数据治理?有哪些好用的数据治理平台?
  • 【Dubbo】Rpc与HTTP的区别、Dubbo调用过程
  • 网站需要怎么做的吗wordpress nova
  • php 做的应用网站wordpress 模板之家
  • PDFParser 的pickle.loads 寻找链(源码)wmctf2025-pdf2text
  • 如何在业务端进行正确的 reCAPTCHA 验证时序设计
  • 小九源码-springboot048-基于spring boot心理健康服务系统
  • 网站建设收费标准网络营销技术
  • 信息安全基础知识:04网络安全体系
  • 高古楼网站 做窗子pc 手机站网站制作
  • 郑州企业网站快速优化多少钱搜索词和关键词
  • 视频内容审核API选型指南:10大主流接口功能对比
  • shell 编程(1)——vim安装与使用
  • 【后端】.NET Core API框架搭建(11) --对象映射
  • Django+FastAPI+Vue微服务架构指南
  • 今日策略:年化436%,回撤7%,夏普比5.28, deap因子挖掘重构,附python代码