当前位置: 首页 > news >正文

缓存与加速技术实践-Kafka消息队列

目录

#1.1消息队列

  1.1.1什么是消息队列

  1.1.2消息队列的特征

  1.1.3为什么需要消息队列

#2.1ksfka基础与入门

  2.1.1kafka基本概念

  2.1.2kafka相关术语

  2.1.3kafka拓扑架构

#3.1zookeeper概述介绍

  3.1.1zookeeper应用举例

  3.1.2zookeeper的工作原理是什么?

  3.1.3zookeeper集群架构

  3.1.4zookeeper的工作流程

#4.1单节点部署kafka

  4.1.1基础环境

  4.1.2安装zookeeeper

  4.1.3安装kafka

  4.1.4测试

#5.1集群部署kafka

  5.1.1资源列表

  5.1.2基础环境设置

  5.1.3安装zookeerper

  5.1.4安装kafka

  5.1.5测试


1.1消息队列

1.1.1什么是消息队列

   消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
   消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

1.1.2消息队列的特征

(1)存储
与依赖于使用套接字的基本 TCP 和 UDP 协议的传统请求和响应系统不同,消息队列通常将消息存储在某种类型的缓冲区中,直到目标进程读取这些消息或将其从消息队列中显式移除为止。
(2)异步
与请求和响应系统不同,消息队列通过缓冲消息可以在应用程序中实现一定程度的异步性,允许源进程发送消息并在队列中累积消息,而目标进程则可以挑选消息进行处理。 这样,应用程序就可以在某些故障情况下运行,例如连接断断续续或源进程或目标进程故障。
路由:消息队列还可以提供路由功能,其中多个进程可以在同一队列中读取或写入消息,从而实现广播或单播通信模式。

1.1.3为什么需要消息队列

(1)解耦
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
(2)冗余
消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的 “插入 - 获取 - 删除” 范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

(3)扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。
(4)灵活性 & 峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
(5)可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
(6)顺序保证:
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka 保证一个 Partition 内的消息的有序性)
(7)缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
(8)异步通信
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

2.1kafka基础与入门

2.1.1kafka基本概念

   Kafka 是一种高吞吐量的分布式发布 / 订阅消息系统,这是官方对 kafka 的定义,这样说起来,可能不太好理解,这里简单举个例子:现在是个大数据时代,各种商业、社交、搜索、浏览都会产生大量的数据。那么如何快速收集这些数据,如何实时的分析这些数据,是一个必须要解决的问题,同时,这也形成了一个业务需求模型,即生产者生产(produce)各种数据,消费者(consume)消费(分析、处理)这些数据。那么面对这些需求,如何高效、稳定的完成数据的生产和消费呢?这就需要在生产者与消费者之间,建立一个通信的桥梁,这个桥梁就是消息系统。从微观层面来说,这种业务需求也可理解为不同的系统之间如何传递消息。

    kafka 是 Apache 组织下的一个开源系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于 hadoop 平台的数据分析、低时延的实时系统、storm/spark 流式处理引擎等。kafka 现在已被多家大型公司作为多种类型的数据管道和消息系统使用。

2.1.2kafka相关术语

kafka 的一些核心概念和角色
    Broker:Kafka 集群包含一个或多个服务器,每个服务器被称为 broker(经纪人)。
    Topic:每条发布到 Kafka 集群的消息都有一个分类,这个类别被称为 Topic(主题)。
     Producer:指消息的生产者,负责发布消息到 kafka broker。
     Consumer:指消息的消费者,从 kafka broker 拉取数据,并消费这些已发布的消息。
     Partition:Partition 是物理上的概念,每个 Topic 包含一个或多个 Partition,每个 partition 都是一个有序的队列。partition 中的每条消息都会被分配一个有序的 id(offset)。
    Consumer Group:消费者组,可以给每个 Consumer 指定消费组,若不指定消费者组,则属于默认的 group。
     Message:消息,通信的基本单位,每个 producer 可以向一个 topic 发布一些消息。

2.1.3kafka拓扑架构

     一个典型的 Kafka 集群包含若干 Producer,若干 broker、若干 Consumer Group,以及一个 Zookeeper 集群。Kafka 通过 Zookeeper 管理集群配置,选举 leader,以及在 Consumer Group 发生变化时进行 rebalance。Producer 使用 push 模式将消息发布到 broker,Consumer 使用 pull 模式从 broker 订阅并消费消息。典型架构如下图所示:

3.1zookeeper概念介绍

     ZooKeeper 是一种分布式协调技术,所谓分布式协调技术主要是用来解决分布式环境当中多个进程之间的同步控制,让他们有序的去访问某种共享资源,防止造成资源竞争(脑裂)的后果。脑裂是指在主备切换时,由于切换不彻底或其他原因,导致客户端和 Slave 误以为出现两个 activemaster,最终使得整个集群处于混乱状态

     这里首先介绍下什么是分布式系统,所谓分布式系统就是在不同地域分布的多个服务器,共同组成的一个应用系统来为用户提供服务,在分布式系统中最重要的是进程的调度,这里假设有一个分布在三个地域的服务器组成的一个应用系统,在第一台机器上挂载了一个资源,然后这三个地域分布的应用进程都要竞争这个资源,但我们又不希望多个进程同时进行访问,这个时候就需要一个协调器(锁),来让它们有序的来访问这个资源。这个协调器就是分布式系统中经常提到的那个锁,例如进程 1 在使用该资源的时候,会先去获得这把锁,进程 1 获得锁以后会对该资源保持独占,此时其它进程就无法访问该资源,进程 1 在用完该资源以后会将该锁释放掉,以便让其它进程来获得锁。由此可见,通过这个锁机制,就可以保证分布式系统中多个进程能够有序的访问该共享资源。这里把这个分布式环境下的这个锁叫作分布式锁。这个分布式锁就是分布式协调技术实现的核心内容。

   目前,在分布式协调技术方面做得比较好的有 Google 的 Chubby,还有 Apache 的 ZooKeeper,它们都是分布式锁的实现者。ZooKeeper 所提供锁服务在分布式领域久经考验,它的可靠性、可用性都是经过理论和实践验证的。

   ZooKeeper 是一种为分布式应用所设计的高可用、高性能的开源协调服务,它提供了一项基本服务:分布式锁服务,同时,也提供了数据的维护和管理机制,如:统一命名服务、状态同步服务、集群管理、分布式消息队列、分布式应用配置项的管理等等。

3.1.1zookeeper应用举例

(1)什么是单点故障问题呢?
     所谓单点故障,就是在一个主从的分布式系统中,主节点负责任务调度分发,从节点负责任务的处理,而当主节点发生故障时,整个应用系统也就瘫痪了,那么这种故障就称为单点故障。那我们的解决方法就是通过对集群 master 角色的选取,来解决分布式系统单点故障的问题。
(2)传统的方式是怎么解决单点故障的?以及有哪些缺点呢?
      传统的方式是采用一个备用节点,这个备用节点定期向主节点发送 ping 包,主节点收到 ping 包以后向备用节点发送回复 Ack 信息,当备用节点收到回复的时候就会认为当前主节点运行正常,让它继续提供服务。而当主节点故障时,备用节点就无法收到回复信息了,此时,备用节点就认为主节点宕机,然后接替它成为新的主节点继续提供服务。
这种传统解决单点故障的方法,虽然在一定程度上解决了问题,但是有一个隐患,就是网络问题,可能会存在这样一种情况:主节点并没有出现故障,只是在回复 ack 响应的时候网络发生了故障,这样备用节点就无法收到回复,那么它就会认为主节点出现了故障,接着,备用节点将接管主节点的服务,并成为新的主节点,此时,分布式系统中就出现了两个主节点(双 Master 节点)的情况,双 Master 节点的出现,会导致分布式系统的服务发生混乱。这样的话,整个分布式系统将变得不可用。为了防止出现这种情况,就需要引入 ZooKeeper 来解决这种问题。

3.1.2zookeeper的工作原理是什么?

下面通过三种情形来讲解:
(1)master 启动
在分布式系统中引入 Zookeeper 以后,就可以配置多个主节点,这里以配置两个主节点为例,假定它们是主节点 A 和主节点 B,当两个主节点都启动后,它们都会向 ZooKeeper 中注册节点信息。我们假设主节点 A 注册的节点信息是 master00001,主节点 B 注册的节点信息是 master00002,注册完以后会进行选举,选举有多种算法,这里以编号最小作为选举算法为例,编号最小的节点将在选举中获胜并获得锁成为主节点,也就是主节点 A 将会获得锁成为主节点,然后主节点 B 将被阻塞成为一个备用节点。这样,通过这种方式 Zookeeper 就完成了对两个 Master 进程的调度。完成了主、备节点的分配和协作。
(2)master 故障
如果主节点 A 发生了故障,这时候它在 ZooKeeper 所注册的节点信息会被自动删除,而 ZooKeeper 会自动感知节点的变化,发现主节点 A 故障后,会再次发出选举,这时候主节点 B 将在选举中获胜,替代主节点 A 成为新的主节点,这样就完成了主、被节点的重新选举。
(3)master 恢复
如果主节点 A 恢复了,它会再次向 ZooKeeper 注册自身的节点信息,只不过这时候它注册的节点信息将会变成 master00003 ,而不是原来的信息。ZooKeeper 会感知节点的变化再次发动选举,这时候,主节点 B 在选举中会再次获胜继续担任主节点,主节点 A 会担任备用节点。
    zookeeper 就是通过这样的协调、调度机制如此反复的对集群进行管理和状态同步的。

3.1.3zookeeper集群架构

   zookeeper一般是通过群集架构来提供服务的,下图是zookeeper的基本架构图。

   zookeeper 集群主要角色有 server 和 client,其中 server 又分为 leader、follower 和 observer 三个角色,每个角色的含义如下:
   Leader:领导者角色,主要负责投票的发起和决议,以及更新系统状态。
   follower:跟随着角色,用于接收客户端的请求并返回结果给客户端,在选举过程中参与投票。
   observer:观察者角色,用户接收客户端的请求,并将写请求转发给 leader,同时同步 leader 状态,但是不参与投票。Observer 目的是扩展系统,提高伸缩性。
   client:客户端角色,用于向 zookeeper 发起请求。

3.1.4zookeeper的工作流程

     Zookeeper 修改数据的流程:Zookeeper 集群中每个 Server 在内存中存储了一份数据,在 Zookeeper 启动时,将从实例中选举一个 Server 作为 leader,Leader 负责处理数据更新等操作,当且仅当大多数 Server 在内存中成功修改数据,才认为数据修改成功。
    Zookeeper 写的流程为:客户端 Client 首先和一个 Server 或者 Observe 通信,发起写请求,然后 Server 将写请求转发给 Leader,Leader 再将写请求转发给其它 Server,其它 Server 在接收到写请求后写入数据并响应 Leader,Leader 在接收到大多数写成功回应后,认为数据写成功,最后响应 Client,完成一次写操作过程。

4.1单节点部署kafka

4.1.1基础环境

更改主机名并添加地址映射

[root@localhost ~]#hostnamectl set-hostname kafka1
[root@kafka1 ~]#echo ‘192.168.10.101 kafka1’ >> /etc/hosts

4.1.2安装zookeeper

zookeeper运行依赖java环境,需要先安装java,后安装zookeeper

[root@kafka1 ~]#yum -y install java
[root@kafka1 ~]#tar zxvf apache-zookeeper-3.6.0-bin.tar.gz
[root@kafka1 ~]#mv apache-zookeeper-3.6.0-bin /etc/zookeeper
[root@kafka1 ~]#cd /etc/zookeeper/conf
[root@kafka1 conf]#mv zoo_sample.cfg zoo.cfg

编辑zookeeper配置文件

[root@kafka1 conf]#vi zoo.cfg
dataDir=/etc/zookeeper/zookeeper-data

启动并验证zookeeper

[root@kafkal conf]#cd /etc/zookeeper/
[root@kafkal zookeeper]#mkdir zookeeper-data
[root@kafkal zookeeper]#./bin/zkServer.sh start
[root@kafkal zookeeper]#./bin/zkServer.sh status

4.1.3安装kafka

[root@kafkal ~]#tar zxvf kafka_2.13-2.4.1.tgz
[root@kafkal ~]#mv kafka_2.13-2.4.1 /etc/kafka
[root@kafkal ~]#cd /etc/kafka/

修改kafka配置文件

[root@kafkal kafka]#vi config/server.properties
log.dirs=/etc/kafka/kafka-logs        #60 行

启动kafka

[root@kafkal kafka]#mkdir /etc/kafka/kafka-logs
[root@kafkal kafka]#nohup bin/kafka-server-start.sh config/server.properties &

检查两个端口的开启状态

[root@kafkal kafka]#netstat -anpt | grep 2181
[root@kafkal kafka]#netstat -anpt | grep 9092

4.1.4测试

创建 topic
[root@kafkal kafka]#bin/kafka-topics.sh --create --zookeeper kafkal:2181 --replication-factor 1 --partitions 1 --topic test
列出 topic
[root@kafkal kafka]#bin/kafka-topics.sh --list --zookeeper kafkal:2181
查看 topic
[root@kafkal kafka]#bin/kafka-topics.sh --describe --zookeeper kafkal:2181 --topic test
生产消息
[root@kafkal kafka]#bin/kafka-console-producer.sh --broker-list kafkal:9092 --topic test
消费消息(打开另一个终端,一边生产消息,一边查看消费消息)
[root@kafkal kafka]#bin/kafka-console-consumer.sh --bootstrap-server kafkal:9092 --topic test
删除 topic
[root@kafkal kafka]#bin/kafka-topics.sh --delete --zookeeper kafkal:2181 --topic test

5.1集群部署kafka

5.1.1资源列表

操作系统IP主机名应用
OpenEuler24192.168.10.101kafkalZookeeper,kafka
OpenEuler24192.168.10.102kafka2Zookeeper,kafka
OpenEuler24192.168.10.103kafka2Zookeeper,kafka

5.1.2基础环境设置

关闭防火墙和selinux

[root@localhost ~]#systemctl stop firewalld
[root@localhost ~]# setenforce 0

修改主机名

kafkal
[root@localhost ~]#hostnamectl set-hostname kafkalKafka2
[root@localhost ~]#hostnamectl set-hostname kafka2Kafka3
[root@localhost ~]#hostnamectl set-hostname kafka3添加地址映射(三个节点都要做)
cat > /etc/hosts << EOF
192.168.10.101 kafkal
192.168.10.102 kafka2
192.168.10.103 kafka3

5.1.3安装zookeeper

3个节点都要操作

安装zookeeper

[root@kafkal ~]yum -y install java
[root@kafkal ~]tar zxvf apache-zookeeper-3.6.0-bin.tar.gz
[root@kafkal ~]mv apache-zookeeper-3.6.0-bin /etc/zookeeper
[root@kafkal ~]cd /etc/zookeeper/conf
[root@kafkal conf]mv zoo_sample.cfg zoo.cfg

编辑配置文件

[root@kafkal conf]vi zoo.cfg
dataDir=/etc/zookeeper/zookeeper-data
clientPort=2181
server.1=192.168.10.101:2888:3888
server.2=192.168.10.102:2888:3888
server.3=192.168.10.103:2888:3888

新增数据目录

[root@kafkal conf]cd /etc/zookeeper/
[root@kafkal conf]mkdir zookeeper-data

创建ID节点

节点 1:
[root@kafkal conf]# echo '1' > /etc/zookeeper/zookeeper-data/myid
节点 2:
[root@kafka2 conf]# echo '2' > /etc/zookeeper/zookeeper-data/myid
节点 3:
[root@kafka3 conf]# echo '3' > /etc/zookeeper/zookeeper-data/myid

5.1.4安装kafka

3个节点都要操作 

 安装kafka

[root@kafkal ~]tar zxvf kafka_2.13-2.4.1.tgz
[root@kafkal ~]mv kafka_2.13-2.4.1 /etc/kafka

配置kafka配置文件

[root@kafkal ~]cd /etc/kafka/
[root@kafkal kafka]vi config/server.properties
broker.id=1
##21行  修改,注意其他两个的id分别是2和3listeners=PLAINTEXT://192.168.207.131:9092
#31行  修改,其他节点改成各自的IP地址log.dirs=/etc/kafka/kafka-logs
## 60行  修改num.partitions=1
##65行 分片数量,不能超过节点数zookeeper.connect=192.168.10.101:2181,192.168.10.102:2181,192.168.10.103:2181
##123行,填写集群中各节点的地址和端口

启动kafka服务

[root@kafkal kafka]#mkdir /etc/kafka/kafka-logs
[root@kafkal kafka]#nohup ./bin/kafka-server-start.sh config/server.properties &

5.1.5测试

创建 topic(任意一个节点)
[root@kafkal kafka]#bin/kafka-topics.sh --create --zookeeper kafkal:2181 --replication-factor 1 --partitions 1 --topic test
列出 topic(任意一个节点)
[root@kafkal kafka]#bin/kafka-topics.sh --list --zookeeper kafkal:2181
[root@kafkal kafka]#bin/kafka-topics.sh --list --zookeeper kafka2:2181
[root@kafkal kafka]#bin/kafka-topics.sh --list --zookeeper kafka3:2181
生产消息
[root@kafkal kafka]#bin/kafka-console-producer.sh --broker-list kafkal:9092 --topic test
消费消息
[root@kafkal kafka]#bin/kafka-console-consumer.sh --bootstrap-server kafkal:9092 --topic test
删除 topic
[root@kafkal kafka]#bin/kafka-topics.sh --delete --zookeeper kafkal:2181 --topic test

OK了,家人们今天的笔记分享到此结束,麻烦大家多给我点点赞,还有评论哦~~~在此谢谢家人们啦

相关文章:

  • 网络安全基础:从CIA三元组到密钥交换与消息认证
  • 【软考高级系统架构论文】论 SOA 在企业集成架构设计中的应用
  • 从C++编程入手设计模式——观察者模式
  • TensorFlow 安装与 GPU 驱动兼容(h800)
  • 人工智能学习45-Incep网络
  • 经济法-4- 合同法律制度
  • 从0开始学linux韦东山教程Linux驱动入门实验班(1)
  • Web攻防-XSS跨站Cookie盗取数据包提交网络钓鱼BEEF项目XSS平台危害利用
  • 【软考高级系统架构论文】论软件系统架构风格
  • 【simulink】IEEE5节点系统潮流仿真模型(2机5节点全功能基础模型)
  • 【Java】对象
  • 操作系统内核态和用户态--1-基础认识
  • 操作系统内核态和用户态--2-系统调用是什么?
  • 分布式锁 不同的拒绝策略 应用场景 业务上的思考
  • QT vscode cmake 编译 undefined reference to `vtable for 问题解决
  • 自定义 Hook:在 Vue3 中复用逻辑
  • 【C++】pybind11:生成 Python 可用的动态库
  • 5.3 VSCode使用FFmpeg库
  • 国家级与省级(不含港澳台)标准地图服务网站汇总
  • python的校园兼职系统
  • 北京的做网站的公司/重庆百度推广排名
  • 无锡做网站选优易信/培训机构是干什么的
  • 易语言怎么用网站做背景音乐/sem是什么意思呢
  • 石家庄seo网站优化电话/sem和seo的关系
  • 做公司网站员工保险/武汉seo优化代理
  • 纳森网络做网站多少钱/网络营销的好处和优势