Kafka 相关内容总结
Kafka 相关内容总结
一、Kafka 概述
- 基本信息:最初由 Linkedin 公司开发,采用 scala 语言编写,2010 年贡献给 Apache 基金会并成为顶级开源项目,是分布式、支持分区(partition)、多副本(replica)且基于 zookeeper 协调的分布式消息系统
- 核心特性:可实时处理大量数据,能满足多种需求场景,如基于 hadoop 的批处理系统、低延迟的实时系统、storm/Spark 流式处理引擎,以及处理 web/nginx 日志、访问日志,还可作为消息服务等
- 产生背景:在大数据时代,商业、社交、搜索、浏览等各类应用系统产生海量信息,面临信息收集、分析及及时完成这两项任务的挑战,由此形成生产者生产信息、消费者消费(处理分析)信息的业务需求模型,Kafka 作为两者间的消息系统应运而生,解决不同系统间消息传递问题
二、消息队列通信模式
(一)点对点模式
- 传送模型:基于拉取或者轮询的消息传送模型
- 核心特点:发送到队列的消息仅被一个消费者处理;生产者将消息放入消息队列后,由消费者主动拉取消息消费
- 优缺点:
- 优点:消费者可自主控制拉取消息的频率
- 缺点:消费者无法感知消息队列是否有消息待消费,需额外线程监控
(二)发布订阅模式
- 传送模型:基于推送的消息传送模型
- 核心特点:支持多个不同订阅者,生产者将消息放入消息队列后,队列会主动将消息推送给订阅该类消息的消费者(类似微信公众号)
- 优缺点:
- 优点:消费者被动接收推送,无需感知消息队列是否有待消费消息
- 缺点:消息队列无法感知不同消费者的处理速度,推送速度难以适配所有消费者。例如,若三个消费者处理速度分别为 8M/s、5M/s、2M/s,推送速度为 5M/s 时,处理速度 2M/s 的消费者无法承受;推送速度为 2M/s 时,处理速度 8M/s 和 5M/s 的消费者资源会极大浪费
三、Kafka 的架构原理
- 整体定位:高吞吐量的分布式发布订阅消息系统,可处理消费者规模网站的所有动作流数据,具备高性能、持久化、多副本备份、横向扩展能力
- 核心组件及功能:
- Producer(生产者):消息的产生者,是消息的入口
- Broker:Kafka 实例,每个服务器上可有一个或多个,通常一个 Broker 对应一台服务器,集群内每个 Broker 有唯一编号(如 broker-0、broker-1)
- Topic(主题):消息的分类,Kafka 数据存储于 Topic,每个 Broker 上可创建多个 Topic
- Partition(分区):Topic 的分区,每个 Topic 可有多分区,主要作用是负载均衡,提高 Kafka 吞吐量,同一 Topic 不同分区数据不重复,表现形式为文件夹
- Replication(副本):每个分区有多个副本,起备份作用。主分区(Leader)故障时,会从副本(Follower)中选一个成为新 Leader。默认副本最大数量为 10,且副本数量不大于 Broker 数量,Follower 和 Leader 不在同一机器,同一机器对同一分区仅存放一个副本(含自身)
- Message(消息):每条发送的消息主体
- Consumer(消费者):消息的消费方,是消息的出口。
- Consumer Group(消费者组):由多个消费者组成,同一分区的数据仅能被消费者组中的一个消费者消费,同一消费者组的消费者可消费同一 Topic 不同分区的数据,以提高吞吐量
- Zookeeper:Kafka 集群依赖其保存集群元信息,保障系统可用性
- 消息处理流:
- Producer 先从集群获取分区的 Leader
- Producer 将消息发送给 Leader
- Leader 将消息写入本地文件
- Followers 主动从 Leader 拉取消息
- Followers 将消息写入本地后向 Leader 发 ACK
- Leader 收到所有副本的 ACK 后向 Producer 发送 ACK
- 此外,Producer 采用 push 模式将数据发布到 Broker,每条消息追加到分区中并顺序写入磁盘,确保同一分区内数据有序
四、Kafka 部署(基于 Zookeeper 集群)
(一)部署环境
使用三台主机搭建 Zookeeper 集群,主机 IP 及对应 Zookeeper 节点分别为 192.168.100.10(zookeeper1)、192.168.100.20(zookeeper2)、192.168.100.30(zookeeper3)
(二)部署步骤
1. 基础环境配置(三台主机均执行)
- 关闭防火墙、selinux,并设置时钟同步
- 安装 Java:
- 创建 /opt/software 目录,将 jdk-8u181-linux-x64.tar.gz 上传至此目录并解压
- 编辑 /etc/profile 文件,添加 Java 环境变量(export JAVA_HOME=/opt/software/jdk1.8.0_181;export PATH=JAVAH**OME/bin:PATH;export CLASSPATH=.:JAVAH**OME/l**ib/d**t.ja**r:JAVA_HOME/lib/tools.jar)
- 从 zookeeper1 将 java 目录和 /etc/profile 文件发送到另外两台主机,之后三台主机均执行 source /etc/profile 使环境变量生效
2. 安装 Zookeeper
- 在 zookeeper1 解压 zookeeper-3.4.8.tar.gz,并重命名为 zookeeper,进入该目录创建 data 和 logs 文件夹,进入 conf 目录将 zoo_sample.cfg 复制为 zoo.cfg 并编辑
- 修改 zoo.cfg 中 dataDir 参数为 /opt/software/zookeeper/data,在文件末尾添加 server.1=192.168.100.10:2888:3888、server.2=192.168.100.20:2888:3888、server.3=192.168.100.30:2888:3888
- 在 zookeeper1 的 /opt/software/zookeeper/data 目录下创建 myid 文件并写入 1,然后将 zookeeper 目录发送到另外两台主机
- 在 zookeeper2 和 zookeeper3 的对应 myid 文件中分别写入 2 和 3
- 配置 Zookeeper 环境变量,在 /etc/profile 文件中添加 export ZOOKEEPER_HOME=/opt/software/zookeeper、export PATH=PAT**H:ZOOKEEPER_HOME/bin,将该文件发送到另外两台主机,三台主机均执行 source /etc/profile
- 三台主机均执行 zkServer.sh start 启动 Zookeeper,执行 zkServer.sh status 查询状态,会分别显示为 follower 或 leader(如 zookeeper3 可能为 leader,zookeeper1 和 zookeeper2 为 follower)
3. 安装 Kafka
- 在 zookeeper1 解压 kafka_2.11-2.4.0.tgz,编辑 config/server.properties 文件,注释掉默认的 broker.id=0 和 zookeeper.connect=localhost:2181,添加 broker.id=1、zookeeper.connect=192.168.100.10:2181,192.168.100.20:2181,192.168.100.30:2181、listeners = PLAINTEXT://192.168.100.10:9092
- 将 kafka_2.11-2.4.0 目录发送到另外两台主机
- 分别修改 zookeeper2 和 zookeeper3 的 kafka 配置文件,zookeeper2 的 broker.id=2、listeners = PLAINTEXT://192.168.200.20:9092;zookeeper3 的 broker.id=3、listeners = PLAINTEXT://192.168.200.30:9092,zookeeper.connect 配置与 zookeeper1 一致
- 三台主机均执行./kafka_2.11-2.4.0/bin/kafka-server-start.sh -daemon ./kafka_2.11-2.4.0/config/server.properties 启动 Kafka
- 执行 jps 命令查看进程,可看到 Kafka 和 QuorumPeerMain(Zookeeper 进程)进程
4. Kafka 测试
- 在 zookeeper1 执行./kafka_2.11-2.4.0/bin/kafka-topics.sh --create --zookeeper 192.168.100.10:2181 --replication-factor 1 --partitions 1 --topic test 创建 test 主题
- 在 zookeeper2 和 zookeeper3 分别执行./kafka_2.11-2.4.0/bin/kafka-topics.sh --list --zookeeper 对应的 IP:2181,均可列出 test 主题,表明 Kafka 部署成功
具体示例:
部署zookeeper集群
192.168.100.10 zookeeper1
192.168.100.20 zookeeper2
192.168.100.30 zookeeper3
已关闭防火墙与selinux
设置/etc/hosts文件:
[root@zk1 ~]# vim /etc/hosts
....
192.168.100.10 zk1.example.com zk1
192.168.100.20 zk2.example.com zk2
192.168.100.30 zk3.example.com zk3
~
另外两台与上述一样
安装Java:
[root@zk1 ~]# java -version
openjdk version "1.8.0_181"
OpenJDK Runtime Environment (build 1.8.0_181-b13)
OpenJDK 64-Bit Server VM (build 25.181-b13, mixed mode)[root@zk1 ~]# rpm -qa | grep java
python-javapackages-3.4.1-11.el7.noarch
java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64
java-1.8.0-openjdk-headless-1.8.0.181-7.b13.el7.x86_64
tzdata-java-2018e-3.el7.noarch
javapackages-tools-3.4.1-11.el7.noarch[root@zk1 ~]# rpm -e java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64 --nodeps
[root@zk1 ~]# rpm -e java-1.8.0-openjdk-headless-1.8.0.181-7.b13.el7.x86_64 --nodeps
另外两台与上述一样
[root@zk1 ~]# rpm -qa | grep java
python-javapackages-3.4.1-11.el7.noarch
tzdata-java-2018e-3.el7.noarch
javapackages-tools-3.4.1-11.el7.noarch[root@zk2 ~]# rpm -qa | grep java
python-javapackages-3.4.1-11.el7.noarch
tzdata-java-2018e-3.el7.noarch
javapackages-tools-3.4.1-11.el7.noarch[root@zk3 ~]# rpm -qa | grep java
python-javapackages-3.4.1-11.el7.noarch
tzdata-java-2018e-3.el7.noarch
javapackages-tools-3.4.1-11.el7.noarch
三台主机都mkdir /opt/software:
[root@zk1 ~]# mkdir /opt/software[root@zk2 ~]# mkdir /opt/software[root@zk3 ~]# mkdir /opt/software
切换目录上传压缩包并解压:
[root@zk1 ~]# cd /opt/software/
[root@zk1 software]# rz -E
rz waiting to receive.
[root@zk1 software]# ls
jdk-8u181-linux-x64.tar.gz
[root@zk1 software]# tar -xzvf jdk-8u181-linux-x64.tar.gz
....
[root@zk1 software]# ls
jdk1.8.0_181 jdk-8u181-linux-x64.tar.gz
配置环境变量:
[root@zk1 software]# cd jdk1.8.0_181/
[root@zk1 jdk1.8.0_181]# ls
bin jre README.html THIRDPARTYLICENSEREADME.txt
COPYRIGHT lib release
include LICENSE src.zip
javafx-src.zip man THIRDPARTYLICENSEREADME-JAVAFX.txt
[root@zk1 jdk1.8.0_181]# vim /etc/profile
....
export JAVA_HOME=/opt/software/jdk1.8.0_181
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
~ [root@zk1 jdk1.8.0_181]# source /etc/profile
[root@zk1 jdk1.8.0_181]# java -version
java version "1.8.0_181"
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
将java目录和/etc/profile都发送到zookeeper2和zookeeper3:
[root@zk1 ~]# scp -r /opt/software/jdk1.8.0_181/ root@zk2:/opt/software/[root@zk1 ~]# scp -r /opt/software/jdk1.8.0_181/ root@zk3:/opt/software/[root@zk1 ~]# scp /etc/profile root@zk2:/etc/profile
root@zk2's password:
profile 100% 1962 974.9KB/s 00:00 [root@zk1 ~]# scp /etc/profile root@zk3:/etc/profile
root@zk3's password:
profile 100% 1962 793.3KB/s 00:00
两台主机全部都source一下/etc/profile:
[root@zk2 ~]# source /etc/profile
[root@zk2 ~]# java -version
java version "1.8.0_181"
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)[root@zk3 ~]# source /etc/profile
[root@zk3 ~]# java -version
java version "1.8.0_181"
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
设置免密钥:
[root@zk1 ~]# ssh-keygen
Generating public/private rsa key pair.
Enter file in which to save the key (/root/.ssh/id_rsa):
Enter passphrase (empty for no passphrase):
Enter same passphrase again:
Your identification has been saved in /root/.ssh/id_rsa.
Your public key has been saved in /root/.ssh/id_rsa.pub.
The key fingerprint is:
SHA256:frPfpo3TfK6RDzuvweQbEnin7urrX5VykNOLzIAVEQE root@zk1.example.com
The key's randomart image is:
+---[RSA 2048]----+
| E.*= |
| o o |
| . . + . |
| .+ + o|
| S . o=o+.|
| . . Boo |
| . o ooX |
| . +.*=O.|
| o**B+BB+|
+----[SHA256]-----+
[root@zk1 ~]# ssh-copy-id -i ~/.ssh/id_rsa.pub root@zk2
/usr/bin/ssh-copy-id: INFO: Source of key(s) to be installed: "/root/.ssh/id_rsa.pub"
/usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed
/usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys
root@zk2's password: Number of key(s) added: 1Now try logging into the machine, with: "ssh 'root@zk2'"
and check to make sure that only the key(s) you wanted were added.[root@zk1 ~]# ssh-copy-id -i ~/.ssh/id_rsa.pub root@zk3
/usr/bin/ssh-copy-id: INFO: Source of key(s) to be installed: "/root/.ssh/id_rsa.pub"
/usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed
/usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys
root@zk3's password: Number of key(s) added: 1Now try logging into the machine, with: "ssh 'root@zk3'"
and check to make sure that only the key(s) you wanted were added.
另外两台与上述一样
安装zookeeper:
解压zookeeper软件包:
[root@zk1 ~]# cd /opt/software/
[root@zk1 software]# rz -E
rz waiting to receive.
[root@zk1 software]# ls
jdk1.8.0_181 jdk-8u181-linux-x64.tar.gz zookeeper-3.4.8.tar.gz
[root@zk1 software]# tar -xzvf zookeeper-3.4.8.tar.gz
....
[root@zk1 software]# ls
jdk1.8.0_181 jdk-8u181-linux-x64.tar.gz zookeeper-3.4.8 zookeeper-3.4.8.tar.gz
[root@zk1 software]# mv zookeeper-3.4.8 zookeeper[root@zk1 software]# cd zookeeper/
[root@zk1 zookeeper]# ls
bin contrib ivy.xml README_packaging.txt zookeeper-3.4.8.jar
build.xml dist-maven lib README.txt zookeeper-3.4.8.jar.asc
CHANGES.txt docs LICENSE.txt recipes zookeeper-3.4.8.jar.md5
conf ivysettings.xml NOTICE.txt src zookeeper-3.4.8.jar.sha1
[root@zk1 zookeeper]# mkdir data
[root@zk1 zookeeper]# mkdir logs
[root@zk1 zookeeper]# cd conf/
[root@zk1 conf]# ls
configuration.xsl log4j.properties zoo_sample.cfg
[root@zk1 conf]# cp zoo_sample.cfg zoo.cfg
[root@zk1 conf]# vim zoo.cfg
.....
//修改 dataDir 参数内容如下:
dataDir=/opt/software/zookeeper/data
//在文档最末尾填写如下几行:
server.1=192.168.100.10:2888:3888
server.2=192.168.100.20:2888:3888
server.3=192.168.100.30:2888:3888
~
在每个节点写入该节点的标识编号,每个节点编号不同,zookeeper1写入 1, zookeeper2写入 2,zookeeper3写入 3:
[root@zk1 conf]# cd ..
[root@zk1 zookeeper]# cd data/
[root@zk1 data]# echo 1 > myid
[root@zk1 data]# cat myid
1
将zookeeper发送给另外两台主机:
[root@zk1 data]# cd /opt/software/
[root@zk1 software]# scp -r zookeeper root@zk2:/opt/software/
.....
[root@zk1 software]# scp -r zookeeper root@zk3:/opt/software/
.....
在zookeeper2中修改:
[root@zk2 ~]# cd /opt/software/
[root@zk2 software]# ls
jdk1.8.0_181 zookeeper
[root@zk2 software]# cd zookeeper/
[root@zk2 zookeeper]# ls
bin dist-maven logs zookeeper-3.4.8.jar
build.xml docs NOTICE.txt zookeeper-3.4.8.jar.asc
CHANGES.txt ivysettings.xml README_packaging.txt zookeeper-3.4.8.jar.md5
conf ivy.xml README.txt zookeeper-3.4.8.jar.sha1
contrib lib recipes
data LICENSE.txt src
[root@zk2 zookeeper]# cd data/
[root@zk2 data]# ls
myid
[root@zk2 data]# vim myid
2
~
在zookeeper3中修改:
[root@zk3 ~]# cd /opt/software/
[root@zk3 software]# ls
jdk1.8.0_181 zookeeper
[root@zk3 software]# cd zookeeper/
[root@zk3 zookeeper]# cd data/
[root@zk3 data]# vim myid
3
~
配置zookeeper的环境变量:
[root@zk1 ~]# vim /etc/profile
....
export ZOOKEEPER_HOME=/opt/software/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin
~
[root@zk1 ~]# source /etc/profile
将/etc/profile文件发送给另外两台主机:
[root@zk1 ~]# scp /etc/profile root@zk2:/etc/profile
profile 100% 2046 740.9KB/s 00:00
[root@zk1 ~]# scp /etc/profile root@zk3:/etc/profile
profile 100% 2046 902.6KB/s 00:00
另外两台主机全部都source一下/etc/profile:
[root@zk2 ~]# source /etc/profile[root@zk3 ~]# source /etc/profile
三台主机都启动zookeeper:
[root@zk1 ~]# zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/software/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED[root@zk2 ~]# zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/software/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED[root@zk3 ~]# zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/software/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
三台主机都查询一下zookeeper的状态:
[root@zk1 ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/software/zookeeper/bin/../conf/zoo.cfg
Mode: follower
[root@zk1 ~]# jps
61186 QuorumPeerMain
61275 Jps[root@zk2 ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/software/zookeeper/bin/../conf/zoo.cfg
Mode: leader
[root@zk2 ~]# jps
61252 Jps
61151 QuorumPeerMain[root@zk3 ~]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/software/zookeeper/bin/../conf/zoo.cfg
Mode: follower
[root@zk3 ~]# jps
61120 QuorumPeerMain
61208 Jps
安装kafka
解压kafka:
[root@zk1 ~]# rz -E
rz waiting to receive.
[root@zk1 ~]# tar -zxvf kafka_2.11-2.4.0.tgz
......
编辑kafka的配置文件:
[root@zk1 ~]# cd kafka_2.11-2.4.0/
[root@zk1 kafka_2.11-2.4.0]# ls
bin config libs LICENSE NOTICE site-docs
[root@zk1 kafka_2.11-2.4.0]# cd config/
[root@zk1 config]# ls
connect-console-sink.properties connect-log4j.properties producer.properties
connect-console-source.properties connect-mirror-maker.properties server.properties
connect-distributed.properties connect-standalone.properties tools-log4j.properties
connect-file-sink.properties consumer.properties trogdor.conf
connect-file-source.properties log4j.properties zookeeper.properties
[root@zk1 config]# vim server.properties
在配置文件中找到以下两行并注释掉(在文本前加#)如下所示:
#broker.id=0
.....
#zookeeper.connect=localhost:2181
添加
.....
broker.id=1
zookeeper.connect=192.168.100.10:2181,192.168.100.20:2181,192.168.100.30:2181
listeners=PLAINTEXT://192.168.100.10:9092
将kafka发送给另外两台主机:
[root@zk1 ~]# scp -r kafka_2.11-2.4.0 root@zk2:/root/
.....
[root@zk1 ~]# scp -r kafka_2.11-2.4.0 root@zk3:/root/
.....
修改zookeeper2主机的kafka配置文件:
[root@zk2 ~]# vim kafka_2.11-2.4.0/config/server.properties
....
broker.id=2
zookeeper.connect=192.168.100.10:2181,192.168.100.20:2181,192.168.100.30:2181
listeners=PLAINTEXT://192.168.100.20:9092
....
修改zookeeper3主机的kafka配置文件:
[root@zk3 ~]# vim kafka_2.11-2.4.0/config/server.properties
.....
broker.id=3
zookeeper.connect=192.168.100.10:2181,192.168.100.20:2181,192.168.100.30:2181
listeners=PLAINTEXT://192.168.100.30:9092
.....
三台主机全部启动kafka:
[root@zk1 ~]# ./kafka_2.11-2.4.0/bin/kafka-server-start.sh -daemon ./kafka_2.11-2.4.0/config/server.properties [root@zk2 ~]# ./kafka_2.11-2.4.0/bin/kafka-server-start.sh -daemon ./kafka_2.11-2.4.0/config/server.properties [root@zk3 ~]# ./kafka_2.11-2.4.0/bin/kafka-server-start.sh -daemon ./kafka_2.11-2.4.0/config/server.properties
jps查看:
[root@zk1 ~]# jps
61186 QuorumPeerMain
61954 Kafka
62052 Jps[root@zk2 ~]# jps
62017 Jps
61932 Kafka
61151 QuorumPeerMain[root@zk3 ~]# jps
61120 QuorumPeerMain
61872 Kafka
61949 Jps
测试kafka:
[root@zk1 ~]# ./kafka_2.11-2.4.0/bin/kafka-topics.sh --create --zookeeper 192.168.100.10:2181 --replication-factor 1 --partitions 1 --topic luoqi
Created topic luoqi.
[root@zk2 ~]# ./kafka_2.11-2.4.0/bin/kafka-topics.sh --list --zookeeper 192.168.100.20:2181
luoqi
[root@zk3 ~]# ./kafka_2.11-2.4.0/bin/kafka-topics.sh --list --zookeeper 192.168.100.20:2181
luoqi
