Kafka入门-Broker以及文件存储机制
Kafka Broker
Broker实际上就是kafka实例,每一个节点都是独立的Kafka服务器。
Zookeeper中存储的Kafka信息
节点的服役以及退役
服役
首先要重新建立一台全新的服务器105,并且在服务器中安装JDK、Zookeeper、以及Kafka。配置好基础的信息之后,再将节点加入到kafka集群之中。如果是直接拷贝配置好的主机一定要先修改主机的ip地址以及主机名,那么一定要移除kafka的broker.id并且要删除kafka安装目录下的datas以及logs下的所有文件,不然复制的主机和被复制的主机会产生冲突。
rm -rf datas/ logs/
将节点加入到kafka集群之中,只需要在105机器中的kafka安装目录下执行启动命令
bin/kafka-server-start.sh -daemon config/server.properties
启动之后,kafka就会将自己的broker.id注册到zookeeper中,这样就加入了kafka集群。此时虽然加入了集群,但是并没有跟101、102、103之间同步数据,相当于没有起到作用。此时需要执行负载均衡操作,让105能够和其他三台主机一起共同工作。
首先在安装目录下创建一个新文件(直接操作101主机即可)
vim topics-to-move.json
{"topics":[{"topic":"first"}],"version":1
}
执行生成负载均衡计划命令,为0,1,2,3生成负载均衡计划,系统会生产一个负载均衡计划
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.27.101:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
查看计划如果满足要求,那么复制计划,并新建一个文件将复制的计划粘贴到文件中
vim increase-replication-factor.json
执行副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.27.101:9092 --reassignment-json-file increase-replication-factor.json --execute
验证副本执行计划
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.27.101:9092 --reassignment-json-file increase-replication-factor.json --verify
此时105主机就承担了一部分的副本存储压力,此时才正式服役。
退役旧节点
退役一台节点时,直接再执行一次负载均衡计划,比如退役105主机,105的broker.id=3
首先创建文件
vim topics-to-move.json
{"topics":[{"topic":"first"}],"version":1
}
执行生成负载均衡计划命令,只为0,1,2生成负载均衡计划,系统会生产一个负载均衡计划
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.27.101:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate
查看生成的计划,如果满足要求,那么复制计划,并将复制的计划粘贴到文件increase-replication-factor.json中
vim increase-replication-factor.json
执行副本存储计划
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.27.101:9092 --reassignment-json-file increase-replication-factor.json --execute
验证副本执行计划
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.27.101:9092 --reassignment-json-file increase-replication-factor.json --verify
此时105主机就可以直接停止进行退役。
副本
Kafka使用副本来提高数据可靠性,kafka默认使用一个副本,但是在生产环境中一般配置两个,保证数据可靠性。副本不是越多越好,会增加磁盘存储空间,增加网络中的数据传输,降低效率。
Kafka副本中分为Leader和Follower,Kafka生产者只会把数据发往Leader,然后Follower主动找Leader同步数据。
Kafka分区中的所有副本统称为AR(Assigned Repllicas)
AR=ISR+OSR
ISR:能够和Leader保持同步的Follower集合,ISR包含Leader本身,如果Follower长时间未向Leader发送通信请求或者同步数据,那么该Follower就会被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s,Leader发生故障之后就会从ISR中选举新的Leader。
OSR:表示在Follower与Leader同步时,延迟过多的副本。
Leader的选举流程
如图所示,Leader的选举由AR中的顺序以及是否在ISR存活决定。
Follower故障处理
LEO:每个副本的最后一个offset,LEO其实就是最新的offset + 1。
HW:所有副本中最小的LEO
实际上HW就是记录一个消息的偏移量,在这个消息之前的所有消息是Leader以及所有正常的Follower都有的消息。
当Followers故障时:
- Followers会被临时踢出ISR
- 这个期间Leeder和Follower会继续接收数据
- 当Follower恢复之后,Follower会读取本地磁盘记录的上次HW,并且将文件高于HW的部分截取掉,然后从HW开始向Leader进行同步
- 当重新恢复的Follower的LEO大于等于该Partiton的HW时,就代表Follower已经基本同步了Leader的数据,可以重新加入ISR
Leader故障处理
故障处理也跟LEO、HW相关
当Leader故障时:
- 首先将Leader踢出ISR队列,并从ISR队列选出一个新的Leader
- 为了保证数据在各个副本中一致(数据可能会丢失或者重复),其余的Follower各自将高于HW的部分截掉,然后从新的Leader处同步数据。
分区副本分配
Kafka会为尽量均匀的分配副本在节点上,增强数据的安全性、可靠性。但是我们可以跟之前服役和退役一样,来手动设置分区副本的分配。
正常情况下,Kafka会自动把LeaderPartition均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的,但是如果因为某些Broker宕机,会导致Leader Partition过于集中在其他少部分几台的Brokers上。导致其他机器请求读写压力过高。而宕机的Leader重启之后就成了Follower Partition,读写请求很低,造成集群负载不均衡
文件存储机制
Topic是逻辑上的概念,而Partiton是物理上的概念,每个Partition对应一个log文件,该log文件中存储的是Producer生产的数据。Producer生产的数据会不断的追加到log文件末端,为防止log文件过大导致数据定位效率低下,因此Kafka采取了分片和索引机制,将每个Partition分为多个Segment。每个Segment包括,“.index"偏移量索引文件、”.log"日志文件和".timeindex"时间戳索引文件等文件,这些文件位于一个以topic名称+分区序号为命名规则的文件夹下。
如果需要查看文件内容,那么可以通过kafka的命令进行查看。
[root@centos101 first-0]# kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index
Dumping ./00000000000000000000.index
offset: 0 position: 0
index为稀疏索引,大约每往log文件写入4kb数据,会往index文件写入一条索引。参数log.index.interval.bytes默认为4kb
index文件中保存的offset为相对offset,这样能确保offset的值所占空间不会过大,因此能将offset的值控制在固定大小
Kafka文件清除策略
Kafka默认的日志保存时间为7天,可以调整以下参数修改保存时间
- log.retention.hour (最低优先级)小时,默认七天
- log.retention.minutes 分钟
- log.retention.ms (最高优先级)毫秒
- log.retention.check.interval.ms 负责设置检查周期,隔一段时间检测是否过期,默认5分钟
日志保存时间和检查周期要进行搭配配置,不然检查周期过长就起不到效果。
Kafka提供的日志清理策略log.cleanup.policy有两种:delete以及compact两种
Delete
- 基于时间:默认开启,以segment中所有记录的最大时间戳作为该文件的过期时间戳,也就是segment中最晚过期的记录过期,才会清除这个segment
- 基于大小:默认关闭,超过设置的所有日志总大小,删除最早的segment。log.retention.bytes,默认为-1,表示无穷大
Compact
compact日志压缩:对于相同的key的不同value值,只保留最后一个版本。开启该策略只需修改log.cleanup.policy = compact.
Kafka高效读写数据
-
Kafka本身是分布式集群,采用分区技术,并行度高
-
读数据采用稀疏索引,可以快速定位要消费的数据
-
顺序写磁盘,写入log文件时是一直追加到文件的末端,使用顺序写,减少了大量磁头寻址的时间
-
页缓存+零拷贝技术