当前位置: 首页 > news >正文

【Kafka】Kafka从入门到实战:构建高吞吐量分布式消息系统

Kafka从入门到实战:构建高吞吐量分布式消息系统

一、Kafka概述

Apache Kafka是一个分布式流处理平台,最初由LinkedIn开发,后成为Apache顶级项目。它被设计用于高吞吐量、低延迟的消息处理,能够处理来自多个生产者的海量数据,并将这些数据实时传递给消费者。

Kafka核心特性

  1. 高吞吐量:即使是非常普通的硬件,Kafka也能支持每秒数百万条消息
  2. 可扩展性:集群可以无缝扩展,无需停机
  3. 持久性:消息持久化到磁盘,并支持数据备份防止数据丢失
  4. 分布式:天然支持分布式部署,具有容错能力
  5. 实时性:消息产生后立即对消费者可见

二、Kafka核心概念

1. 基本组件

  • Producer:消息生产者,向Kafka集群发送消息
  • Consumer:消息消费者,从Kafka集群读取消息
  • Broker:Kafka服务器节点,组成Kafka集群
  • Topic:消息类别,生产者发送消息到特定Topic,消费者订阅特定Topic
  • Partition:Topic物理上的分组,一个Topic可以分为多个Partition
  • Replica:Partition的副本,保证数据可靠性
  • Consumer Group:一组Consumer实例共同消费一个Topic

2. 消息存储机制

Kafka采用顺序写入磁盘的方式存储消息,这种设计使得Kafka即使使用普通磁盘也能实现很高的吞吐量。每个Partition是一个有序的、不可变的消息序列,新消息被追加到Partition末尾。

3. 消息传递语义

  • 至少一次(At least once):消息不会丢失,但可能被重复消费
  • 至多一次(At most once):消息可能丢失,但不会被重复消费
  • 精确一次(Exactly once):消息恰好被消费一次

三、Kafka环境搭建

1. 单机版安装

# 下载Kafka
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
tar -xzf kafka_2.13-3.6.0.tgz
cd kafka_2.13-3.6.0# 启动Zookeeper(新版本Kafka已内置,无需单独安装)
bin/zookeeper-server-start.sh config/zookeeper.properties# 启动Kafka
bin/kafka-server-start.sh config/server.properties

2. 集群部署

修改config/server.properties文件:

# 每个broker必须有唯一的id
broker.id=1# 监听地址
listeners=PLAINTEXT://:9092# 日志目录
log.dirs=/tmp/kafka-logs# Zookeeper连接地址
zookeeper.connect=localhost:2181# 副本数量
default.replication.factor=3# 分区数量
num.partitions=3

在不同节点上启动多个Broker实例即组成集群。

四、Kafka基础操作

1. Topic管理

# 创建Topic
bin/kafka-topics.sh --create --topic test-topic \
--bootstrap-server localhost:9092 \
--partitions 3 --replication-factor 1# 查看Topic列表
bin/kafka-topics.sh --list --bootstrap-server localhost:9092# 查看Topic详情
bin/kafka-topics.sh --describe --topic test-topic \
--bootstrap-server localhost:9092# 删除Topic
bin/kafka-topics.sh --delete --topic test-topic \
--bootstrap-server localhost:9092

2. 生产消费消息

# 启动生产者
bin/kafka-console-producer.sh --topic test-topic \
--bootstrap-server localhost:9092# 启动消费者
bin/kafka-console-consumer.sh --topic test-topic \
--bootstrap-server localhost:9092 --from-beginning

五、Java客户端开发

1. 添加依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version>
</dependency>

2. 生产者示例

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class SimpleProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 可选配置props.put("acks", "all"); // 确保所有副本都收到消息props.put("retries", 3); // 发送失败重试次数props.put("linger.ms", 1); // 发送延迟Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 100; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key-" + i, "value-" + i);producer.

相关文章:

  • WinUI3开发_使用mica效果
  • Ubuntu 可执行程序自启动方法
  • 【动态规划 数论】P9759 [COCI 2022/2023 #3] Bomboni|普及+
  • Maven 多仓库治理与发布策略深度实践
  • AD学习(3)
  • 教程:PyCharm 中搭建多级隔离的 Poetry 环境(从 Anaconda 到项目专属.venv)
  • pycharm 设置环境出错
  • P3 QT项目----记事本(3.8)
  • 【字节拥抱开源】字节团队开源视频模型 ContentV: 有限算力下的视频生成模型高效训练
  • PostgreSQL 对 IPv6 的支持情况
  • FastAPI核心解密:深入“路径操作”与HTTP方法,构建API的坚实骨架
  • 前端antd,后端fastapi,实现运行系统指令,并打印运行日志
  • Mac如何配置ZSH并使用Oh-my-zsh?让你的终端更加实用、美观
  • 初学 pytest 记录
  • 解决Excel词典(xllex.dll)文件丢失或损坏问题的终极指南:从基础到高级修复技巧
  • 在 JavaScript中编写 Appium 测试(入门)
  • Java求职者面试指南:Spring、Spring Boot、Spring MVC与MyBatis技术解析
  • Spring Boot 与 Kafka 的深度集成实践(一)
  • PHP:Web 开发的经典利器
  • 「混合开发」H5与原生App交互流程方案全面解析
  • 安徽省 政府网站建设的要求/宁波网络推广方式
  • 签名设计在线生成器/西安seo顾问
  • 泉州专业建站品牌/衡水seo营销
  • 租电信网站服务器吗/新闻热点素材
  • 建设银行武威分行网站/如何制作app软件
  • 邢台做网站流程/seo怎么优化简述