当前位置: 首页 > news >正文

使用Spring Boot构建消息通信层

14 消息驱动:如何使用 KafkaTemplate 集成 Kafka?

从今天开始,我们将进入 Spring Boot 中另一个重要话题的讨论,即消息通信。

消息通信是 Web 应用程序中间层组件中的代表性技术体系,主要用于构建复杂而又灵活的业务流程。在互联网应用中,消息通信被认为是实现系统解耦和高并发的关键技术体系。本节课我们将在 SpringCSS 案例中引入消息通信机制来实现多个服务之间的异步交互。

消息通信机制与 SpringCSS 案例

在引入消息通信机制及消息中间件之前,我们先来梳理下 SpringCSS 中的应用场景。

SpringCSS 案例中的消息通信场景

在 SpringCSS 案例中,一个用户的账户信息变动并不会太频繁。因为 account-service 和 customer-service 分别位于两个服务中,为了降低远程交互的成本,很多时候我们会想到先在 customer-service 本地存放一份用户账户的拷贝信息,并在客户工单生成过程时直接从本地数据库中获取用户账户。

在这样的设计和实现方式下,如果某个用户的账户信息发生变化,我们应该如何正确且高效地应对呢?此时消息驱动机制从系统扩展性角度为我们提供了一种很好的实现方案。

在用户账户信息变更时,account-service 首先会发送一个消息告知某个用户账户信息已经发生变化,然后通知所有对该消息感兴趣的服务。而在 SpringCSS 案例中,这个服务就是 customer-service,相当于是这个消息的订阅者和消费者。

通过这种方式,customer-service 就可以快速获取用户账户变更消息,从而正确且高效地处理本地的用户账户数据。

整个场景的示意图见下图:

Drawing 0.png

用户账户更新场景中的消息通信机制

上图中我们发现,消息通信机制使得我们不必花费太大代价即可实现整个交互过程,简单而方便。

消息通信机制简介

消息通信机制的整体工作流程如下图所示:

在这里插入图片描述
消息通信机制示意图

上图中位于流程中间的就是各种消息中间件,消息中间件一般提供了消息的发送客户端和接收客户端组件,这些客户端组件会嵌入业务服务中。

消息的生产者负责产生消息,在实际业务中一般由业务系统充当生产者;而消息的消费者负责消费消息,在实际业务中一般是后台系统负责异步消费。

消息通信有两种基本模型,即发布-订阅(Pub-Sub)模型和点对点(Point to Point)模型,发布-订阅支持生产者消费者之间的一对多关系,而点对点模型中有且仅有一个消费者。

上述概念构成了消息通信系统最基本的模型,围绕这个模型,业界已经有了一些实现规范和工具,代表性的规范有 JMS 、AMQP ,以及它们的实现框架 ActiveMQ 和 RabbitMQ 等,而 Kafka 等工具并不遵循特定的规范,但也提供了消息通信的设计和实现方案。

本节课我们重点关注 Kafka,后续的两个课时中我们再分别介绍 ActiveMQ 和 RabbitMQ。

与前面介绍的 JdbcTemplate 和 RestTemplate 类似,Spring Boot 作为一款支持快速开发的集成性框架,同样提供了一批以 -Template 命名的模板工具类用于实现消息通信。对于 Kafka 而言,这个工具类就是 KafkaTemplate。

使用 KafkaTemplate 集成 Kafka

在讨论如何使用 KafkaTemplate 实现与 Kafka 之间的集成方法之前,我们先来简单了解 Kafka 的基本架构,再引出 Kafka 中的几个核心概念。

Kafka 基本架构

Kafka 基本架构参考下图,从中我们可以看到 Broker、Producer、Consumer、Push、Pull 等消息通信系统常见概念在 Kafka 中都有所体现,生产者使用 Push 模式将消息发布到 Broker,而消费者使用 Pull 模式从 Broker 订阅消息。

在这里插入图片描述

Kafka 基本架构图

在上图中我们注意到,Kafka 架构图中还使用了 Zookeeper。

Zookeeper 中存储了 Kafka 的元数据及消费者消费偏移量(Offset),其作用在于实现 Broker 和消费者之间的负载均衡。因此,如果我们想要运行 Kafka,首先需要启动 Zookeeper,再启动 Kafka 服务器。

在 Kafka 中还存在 Topic 这么一个核心概念,它是 Kafka 数据写入操作的基本单元,每一个 Topic 可以存在多个副本(Replication)以确保其可用性。每条消息属于且仅属于一个 Topic,因此开发人员通过 Kafka 发送消息时,必须指定将该消息发布到哪个 Topic。同样,消费者订阅消息时,也必须指定订阅来自哪个 Topic 的信息。

另一方面,从组成结构上讲,一个 Topic 中又可以包含一个或多个分区(Partition),因此在创建 Topic 时我们可以指定 Partition 个数。

KafkaTemplate 是 Spring 中提供的基于 Kafka 完成消息通信的模板工具类,而要想使用这个模板工具类,我们必须在消息的发送者和消费者应用程序中都添加如下 Maven 依赖:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
使用 KafkaTemplate 发送消息

KafkaTemplate 提供了一系列 send 方法用来发送消息,典型的 send 方法定义如下代码所示:

@Override
public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
}

在上述方法实际传入了两个参数,一个是消息对应的 Topic,另一个是消息体的内容。通过该方法,我们就能完成最基本的消息发送过程。

请注意,在使用 Kafka 时,我们推荐事先创建好 Topic 供消息生产者和消费者使用, 通过命令行创建 Topic 的方法如下代码所示:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic springcss.account.topic

这里创建了一个名为“springcss.account.topic”的 Topic,并指定它的副本数量和分区数量都是 3。

事实上,我们在调用 KafkaTemplate 的 send 方法时,如果 Kafka 中不存在该方法中指定的 Topic,它就会自动创建一个新的 Topic。

另一方面,KafkaTemplate 也提供了一组 sendDefault 方法,它通过使用默认的 Topic 来发送消息,如下代码所示:

@Override
public ListenableFuture<SendResult<K, V>> sendDefault(V data) {return send(this.defaultTopic, data);
}

从代码中我们可以看到,在上述 sendDefault 方法内部中也是使用了 send 方法完成消息的发送过程。

那么,如何指定这里的 defaultTopic 呢?在 Spring Boot 中,我们可以使用如下配置项完成这个工作。

spring:kafka:bootstrap-servers:- localhost:9092template:default-topic: demo.topic

现在,我们已经了解了通过 KafkaTemplate 发送消息的实现方式,KafkaTemplate 高度抽象了消息的发送过程,整个过程非常简单。

接下来我们切换下视角,看看如何消费所发送的消息。

使用 @KafkaListener 注解消费消息

首先需要强调一点,通过翻阅 KafkaTemplate 提供的类定义,我们并未找到有关接收消息的任何方法,这实际上与 Kafka 的设计思想有很大关系。

这点也与本课程后续要介绍的 JmsTemplate 和 RabbitTemplate 存在很大区别,因为它们都提供了明确的 receive 方法来接收消息。

从前面提供的 Kafka 架构图中我们可以看出,在 Kafka 中消息通过服务器推送给各个消费者,而 Kafka 的消费者在消费消息时,需要提供一个监听器(Listener)对某个 Topic 实现监听,从而获取消息,这也是 Kafka 消费消息的唯一方式。

在 Spring 中提供了一个 @KafkaListener 注解实现监听器,该注解定义如下代码所示:

@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(KafkaListeners.class)
public @interface KafkaListener {String id() default "";String containerFactory() default "";//消息 TopicString[] topics() default {};//Topic 的模式匹配表达式String topicPattern() default "";//Topic 分区TopicPartition[] topicPartitions() default {};String containerGroup() default "";String errorHandler() default "";//消息分组 IdString groupId() default "";boolean idIsGroup() default true;String clientIdPrefix() default "";String beanRef() default "__listener";
}

上述代码中我们可以看到 @KafkaListener 的定义比较复杂,我把日常开发中常见的几个配置项做了注释。

在使用 @KafkaListener 时,最核心的操作是设置 Topic,而 Kafka 还提供了一个模式匹配表达式可以对目标 Topic 实现灵活设置。

在这里,我们有必要强调下 groupId 这个属性,这就涉及 Kafka 中另一个核心概念:消费者分组(Consumer Group)。

设计消费者组的目的是应对集群环境下的多服务实例问题。显然,如果采用发布-订阅模式会导致一个服务的不同实例可能会消费到同一条消息。

为了解决这个问题,Kafka 中提供了消费者组的概念。一旦我们使用了消费组,一条消息只能被同一个组中的某一个服务实例所消费。

消费者组的基本结构如下图所示:

在这里插入图片描述
Kafka 消费者组示意图

使用 @KafkaListener 注解时,我们把它直接添加在处理消息的方法上即可,如下代码所示:

@KafkaListener(topics = “demo.topic”)
public void handlerEvent(DemoEvent event) {//TODO:添加消息处理逻辑
}

当然,我们还需要在消费者的配置文件中指定用于消息消费的配置项,如下代码所示:

spring:      kafka:bootstrap-servers:- localhost:9092template:default-topic: demo.topicconsumer:group-id: demo.group

可以看到,这里除了指定 template.default-topic 配置项之外,还指定了 consumer. group-id 配置项来指定消费者分组信息。

在 SpringCSS 案例中集成 Kafka

介绍完 KakfaTemplate 的基本原理后,我们将在 SpringCSS 案例中引入 Kafka 实现 account-service 与 customer-service 之间的消息通信。

实现 account-service 消息生产者

首先,我们新建一个 Spring Boot 工程,用来保存用于多个服务之间交互的消息对象,以供各个服务使用。

我们将这个代码工程简单命名为 message,并添加一个代表消息主体的事件 AccountChangedEvent,如下代码所示:

package com.springcss.message;public class AccountChangedEvent implements Serializable {//事件类型private String type;//事件所对应的操作(新增、更新和删除)private String operation;//事件对应的领域模型private AccountMessage accountMessage;//省略 getter/setter
}

上述 AccountChangedEvent 类包含了 AccountMessage 对象本身以及它的操作类型,而 AccountMessage 与 Account 对象的定义完全一致,只不过 AccountMessage 额外实现了用于序列化的 Serializable 接口而已,如下代码所示:

public class AccountMessage implements Serializable {private Long id;private String accountCode;    private String accountName;
}

定义完消息实体之后,我们在 account-service 中引用了一个 message 工程,并添加了一个 KafkaAccountChangedPublisher 类用来实现消息的发布,如下代码所示:

@Component("kafkaAccountChangedPublisher")
public class KafkaAccountChangedPublisher {@Autowiredprivate KafkaTemplate<String, AccountChangedEvent> kafkaTemplate;@Overrideprotected void publishEvent(AccountChangedEvent event) {kafkaTemplate.send(AccountChannels.SPRINGCSS_ACCOUNT_TOPIC, event);}
}

在这里可以看到,我们注入了一个 KafkaTemplate 对象,然后通过它的 send 方法向目标 Topic 发送了消息。

这里的 AccountChannels.SPRINGCSS_ACCOUNT_TOPIC 就是 “springcss.account.topic”,我们需要在 account-service 中的配置文件中指定同一个 Topic,如下代码所示:

spring:kafka:bootstrap-servers:- localhost:9092template:default-topic: springcss.account.topicproducer:keySerializer: org.springframework.kafka.support.serializer.JsonSerializervalueSerializer: org.springframework.kafka.support.serializer.JsonSerializer

注意到这里,我们使用了 JsonSerializer 对发送的消息进行序列化。

实现 customer-service 消息消费者

针对服务消费者 customer-service,我们先来看它的配置信息,如下代码所示:

spring:      kafka:bootstrap-servers:- localhost:9092template:default-topic: springcss.account.topicconsumer:value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializergroup-id: springcss_customerproperties:spring.json.trusted.packages: com.springcss.message

相较消息生产者中的配置信息,消息消费者的配置信息多了两个配置项,其中一个是 group-id,通过前面内容的介绍,我们已经知道这是 Kafka 消费者特有的一个配置项,用于指定消费者组。

而另一个配置项是 spring.json.trusted.packages,用于设置 JSON 序列化的可行包名称,这个名称需要与 AccountChangedEvent 类所在的包结构一致,即这里指定的 com.springcss.message。

小结与预告

消息通信机制是应用程序开发过程中常用的一种技术体系。在今天的课程中,我们首先基于 SpringCSS 案例梳理了消息通信机制的应用场景,并给出了这一机制的一些基本概念。然后,基于 Kafka 这款主流的详细中间件,我们使用 Spring Boot 提供的 KafkaTemplate 完成了消息的发送和消费,并将其集成到 SpringCSS 案例中。

15 消息驱动:如何使用 JmsTemplate 集成 ActiveMQ?

14 讲我们介绍了基于 Kafka 和 KafkaTemplate 实现消息发送和消费,并重构了 SpringCSS 案例系统中的 account-service 和 customer-service 服务。今天,我们继续介绍 ActiveMQ,并基于 JmsTemplate 模板工具类为 SpringCSS 案例添加对应的消息通信机制。

JMS 规范与 ActiveMQ

JMS(Java Messaging Service)是一种 Java 消息服务,它基于消息传递语义,提供了一整套经过抽象的公共 API。目前,业界也存在一批 JMS 规范的实现框架,其中具备代表性的是 ActiveMQ。

JMS 规范

JMS 规范提供了一批核心接口供开发人员使用,而这些接口构成了客户端的 API 体系,如下图所示:

图片2.png

JMS 规范中的核心 API

上图中可以看到,我们可以通过 ConnectionFactory 创建 Connection,作为客户端的 MessageProducer 和 MessageConsumer 通过 Connection 提供的会话(Session)与服务器进行交互,而交互的媒介就是各种经过封装、包含目标地址(Destination)的消息。

JMS 的消息由两大部分组成,即消息头(Header)和消息体(Payload)。

消息体只包含具体的业务数据,而消息头包含了 JMS 规范定义的通用属性,比如消息的唯一标识 MessageId、目标地址 Destination、接收消息的时间 Timestamp、有效期 Expiration、优先级 Priority、持久化模式 DeliveryMode 等都是常见的通用属性,这些通用属性构成了消息通信的基础元数据(Meta Data),由消息通信系统默认设置。

JMS 规范中的点对点模型表现为队列(Queue),队列为消息通信提供了一对一顺序发送和消费的机制。点对点模型 API 在通用 API 基础上,专门区分生产者 QueueSender 和消费者 QueueReceiver。

而 Topic 是 JMS 规范中对发布-订阅模型的抽象,JMS 同样提供了专门的 TopicPublisher 和 TopicSubscriber。

对于 Topic 而言,因多个消费者存在同时消费一条消息的情况,所以消息有副本的概念。相较点对点模型,发布-订阅模型通常用于更新、事件、通知等非响应式请求场景。在这些场景中,消费者和生产者之间是透明的,消费者可以通过配置文件进行静态管理,也可以在运行过程中动态被创建,同时还支持取消订阅操作。

ActiveMQ

JMS 规范存在 ActiveMQ、WMQ、TIBCO 等多种第三方实现方式,其中较主流的是 ActiveMQ。

针对 ActiveMQ,目前有两个实现项目可供选择,一个是经典的 5.x 版本,另一个是下一代的 Artemis,关于这两者之间的关系,我们可以简单地认为 Artemis 是 ActiveMQ 的未来版本,代表 ActiveMQ 的发展趋势。因此,本课程我们将使用 Artemis 演示消息通信机制。

如果我们想启动 Artemis 服务,首先需要通过如下所示的命名创建一个服务实例。

artemis.cmd create D:\artemis --user springcss --password springcss_password

然后,执行如下命令,我们就可以正常启动这个 Artemis 服务实例了。

D:\artemis \bin\artemis run

Spring 提供了对 JMS 规范及各种实现的友好集成,通过直接配置 Queue 或 Topic,我们就可以使用 JmsTemplate 提供的各种方法简化对 Artemis 的操作了。

使用 JmsTemplate 集成 ActiveMQ

如果我们想基于 Artemis 使用 JmsTemplate,首先需要在 Spring Boot 应用程序中添加对 spring-boot-starter-artemis 的依赖,如下代码所示:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-artemis</artifactId>
</dependency>

在讨论如何使用 JmsTemplate 实现消息发送和消费之前,我们先来分析消息生产者和消费者的工作模式。

通常,生产者行为模式单一,而消费者根据消费方式的不同有一些特定的分类,比如常见的有推送型消费者(Push Consumer)和拉取型消费者(Pull Consumer)。

推送型方式指的是应用系统向消费者对象注册一个 Listener 接口并通过回调 Listener 接口方法实现消息消费,而在拉取方式下应用系统通常主动调用消费者的拉取消息方法消费消息,主动权由应用系统控制。

在消息通信的两种基本模型中,发布-订阅模型支持生产者/消费者之间的一对多关系,属于一种典型的推送消费者实现机制;而点对点模型中有且仅有一个消费者,他们主要通过基于间隔性拉取的轮询(Polling)方式进行消息消费。

14 讲我们提到 Kafka 中消费消息的方式是一种典型的推送型消费者,所以 KafkaTemplate 只提供了发送消息的方法而没有提供实现消费消息的方法。而 JmsTemplate 则不同,它同时支持推送型消费和拉取型消费,接下来我们一起看下如何使用JmsTemplate 发送消息。

使用 JmsTemplate 发送消息

JmsTemplate 中存在一批 send 方法用来实现消息发送,如下代码所示:

@Override
public void send(MessageCreator messageCreator) throws JmsException {
}@Override
public void send(final Destination destination, final MessageCreator messageCreator) throws JmsException {
}@Override
public void send(final String destinationName, final MessageCreator messageCreator) throws JmsException {
}

这些方法一方面指定了目标 Destination,另一方面提供了一个用于创建消息对象的 MessageCreator 接口,如下代码所示:

public interface MessageCreator {Message createMessage(Session session) throws JMSException;
}

通过 send 方法发送消息的典型实现方式如下代码所示:

public void sendDemoObject(DemoObject demoObject) { jmsTemplate.send("demo.queue", new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createObjectMessage(demoObject); } 
}

与 KakfaTemplate 不同,JmsTemplate 还提供了一组更为简便的方法实现消息发送,即 convertAndSend 方法,如下代码所示:

public void convertAndSend(Destination destination, final Object message) throws JmsException {}

通过 convertAndSend 方法,我们可以直接传入任意业务对象,且该方法能自动将业务对象转换为消息对象并进行消息发送,具体的示例代码如下所示:

public void sendDemoObject(DemoObject demoObject) { jmsTemplate.convertAndSend("demo.queue", demoObject); 
}

在以上代码中,我们注意到 convertAndSend 方法还存在一批重载方法,它包含了消息后处理功能,如下代码所示:

@Override
public void convertAndSend( Destination destination, final Object message, final MessagePostProcessor postProcessor)throws JmsException {
}

上述方法中的 MessagePostProcessor 就是一种消息后处理器,它用来在构建消息过程中添加自定义的消息属性,它的一种典型的使用方法如下代码所示:

public void sendDemoObject(DemoObject demoObject) { jmsTemplate.convertAndSend("demo.queue", demoObject, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws JMSException { //针对 Message 的处理return message;} 
});

使用 JmsTemplate 的最后一步就是在配置文件中添加配置项,如下代码所示:

spring:artemis:host: localhostport: 61616user: springcsspassword: springcss_password

这里我们指定了 artemis 服务器的地址、端口、用户名和密码等信息。同时,我们也可以在配置文件中指定 Destination 信息,具体配置方式如下代码所示:

spring:jms:template:default-destination: springcss.account.queue
使用 JmsTemplate 消费消息

基于前面的讨论,我们知道 JmsTemplate 同时支持推送型消费和拉取型消费两种消费类型。我们先来看一下如何实现拉取型消费模式。

在 JmsTemplate 中提供了一批 receive 方法供我们从 artemis 中拉取消息,如下代码所示:

public Message receive() throws JmsException {
}public Message receive(Destination destination) throws JmsException {
}public Message receive(String destinationName) throws JmsException {
}

到这一步我们需要注意一点:调用上述方法时,当前线程会发生阻塞,直到一条新消息的到来。针对阻塞场景,这时 receive 方法的使用方式如下代码所示:

public DemoEvent receiveEvent() {Message message = jmsTemplate.receive(“demo.queue”); return (DemoEvent) messageConverter.fromMessage(message);
}

这里我们使用了一个 messageConverter 对象将消息转化为领域对象。

在使用 JmsTemplate 时,我们可以使用 Spring 提供的 MappingJackson2MessageConverter、MarshallingMessageConverter、MessagingMessageConverter,以及 SimpleMessageConverter 实现消息转换,一般系统默认使用 SimpleMessageConverter。而在日常开发过程中,我们通常会使用 MappingJackson2MessageConverter 来完成 JSON 字符串与对象之间的转换。

同时,JmsTemplate 还提供了一组更为高阶的 receiveAndConvert 方法完成消息的接收和转换,如下代码所示:

public Object receiveAndConvert(Destination destination) throws JmsException {}

顾名思义,receiveAndConvert 方法能在接收消息后完成对消息对象的自动转换,使得接收消息的代码更为简单,如下代码所示:

public DemoEvent receiveEvent() { return (DemoEvent)jmsTemplate.receiveAndConvert("demo.queue"); 
}

当然,在消费者端,我们同样需要指定与发送者端完全一致的 MessageConverter 和 Destination 来分别实现消息转换和设置消息目的地。

介绍完拉模式,接下来我们介绍推模式下的消息消费方法,实现方法也很简单,如下代码所示:

@JmsListener(queues = “demo.queue”)
public void handlerEvent(DemoEvent event) {//TODO:添加消息处理逻辑
}

在推模式下,开发人员只需要在 @JmsListener 注解中指定目标队列,就能自动接收来自该队列的消息。

在 SpringCSS 案例中集成 ActiveMQ

ActiveMQ 是本专栏中使用到的第二款消息中间件,因为每款消息中间件都需要设置一些配置信息,所以我们有必要回到 SpringCSS 案例系统,先对配置信息的管理做一些优化。

实现 account-service 消息生产者

首先,我们来回顾下《多维配置:如何使用 Spring Boot 中的配置体系?》的内容介绍,在 Spring Boot 中,我们可以通过 Profile 有效管理针对不同场景和环境的配置信息。

而在 SpringCSS 案例中,Kafka、ActiveMQ 及 16 讲将要介绍的 RabbitMQ 都是消息中间件,在案例系统运行过程中,我们需要选择其中一种中间件演示消息发送和接收到过程,这样我们就需要针对不同的中间件设置不同的 Profile 了。

在 account-service 中,我们可以根据 Profile 构建如下所示的配置文件体系。

Drawing 1.png

account-service 中的配置文件

从以上图中可以看到:根据三种不同的中间件,我们分别提供了三个配置文件。以其中的 application-activemq.yml 为例,其包含的配置项如下代码所示:

spring:jms:template:default-destination: springcss.account.queueartemis:host: localhostport: 61616user: springcsspassword: springcss_passwordembedded:enabled: false

在主配置文件 application.yml 中,我们可以将当前可用的 Profile 设置为 activemq,如下代码所示:

spring:profiles:active: activemq

介绍完配置信息的优化管理方案,我们再来看看实现消息发送的 ActiveMQAccountChangedPublisher 类,如下代码所示:

@Component("activeMQAccountChangedPublisher")
public class ActiveMQAccountChangedPublisher{@Autowiredprivate JmsTemplate jmsTemplate;@Overrideprotected void publishEvent(AccountChangedEvent event) {jmsTemplate.convertAndSend(AccountChannels.SPRINGCSS_ACCOUNT_QUEUE, event, this::addEventSource);}private Message addEventSource(Message message) throws JMSException {message.setStringProperty("EVENT_SYSTEM", "SpringCSS");return message;}
}

以上代码中,我们基于 JmsTemplate 的 convertAndSend 方法完成了消息的发送。同时,我们注意到:这里也使用了另一种实现 MessagePostProcessor 的方法,即 lambda 语法,你可以参考这种语法简化代码的组织方式。

另一方面,在案例中,我们希望使用 MappingJackson2MessageConverter 完成对消息的转换。因此,我们可以在 account-service 中添加一个 ActiveMQMessagingConfig 初始化具体的 MappingJackson2MessageConverter 对象,如下代码所示:

@Configuration
public class ActiveMQMessagingConfig {@Beanpublic MappingJackson2MessageConverter activeMQMessageConverter() {MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();messageConverter.setTypeIdPropertyName("_typeId");Map<String, Class<?>> typeIdMappings = new HashMap<String, Class<?>>();typeIdMappings.put("accountChangedEvent", AccountChangedEvent.class);messageConverter.setTypeIdMappings(typeIdMappings);return messageConverter;}
}

上述代码的核心作用是定义了 typeId 到 Class 的 Map,这样做的目的在于为消息的转换提供了灵活性。比如我们可以在 account-service 中发送了一个 Id 为“accountChangedEvent”且类型为 AccountChangedEvent 的业务对象,而在消费该消息的场景中,我们只需要指定同一个 Id,对应的消息就可以自动转化为另一种业务对象(不一定是这里使用到的 AccountChangedEvent)。

实现 customer-service 消息消费者

我们先回到 customer-service 服务,看看如何消费来自 account-service 的消息,如下代码所示:

@Component("activeMQAccountChangedReceiver")
public class ActiveMQAccountChangedReceiver {@Autowiredprivate JmsTemplate jmsTemplate;@Overrideprotected AccountChangedEvent receiveEvent() {return (AccountChangedEvent) jmsTemplate.receiveAndConvert(AccountChannels.SPRINGCSS_ACCOUNT_QUEUE);}
}

这里,我们只是简单通过 JmsTemplate 的 receiveAndConvert 方法拉取来自 ActiveMQ 的消息。

请注意,因为 receiveAndConvert 方法的执行过程是阻塞性的拉取行为,所以我们可以实现一个新的 Controller 专门测试该方法的有效性,如下代码所示:

@RestController
@RequestMapping(value="messagereceive")
public class MessageReceiveController {@Autowiredprivate ActiveMQAccountChangedReceiver accountChangedReceiver; @RequestMapping(value = "", method = RequestMethod.GET)public void receiveAccountChangedEvent() {accountChangedReceiver.receiveAccountChangedEvent();}
}

一旦我们访问了这个端点,系统就会拉取 ActiveMQ 中目前尚未消费的消息。如果 ActiveMQ 没有待消费的消息,这个方法就会阻塞,且一直处于等待状态,直到新消息的到来。

如果你想使用消息推送的方式来消费消息,实现过程更加简单,如下代码所示:

@Override
@JmsListener(destination = AccountChannels.SPRINGCSS_ACCOUNT_QUEUE)
public void handlerAccountChangedEvent(AccountChangedEvent event) { AccountMessage account = event.getAccountMessage();String operation = event.getOperation(); System.out.print(accountMessage.getId() + ":" + accountMessage.getAccountCode() + ":" + accountMessage.getAccountName());
}

从以上代码中可以看到,我们可以直接通过 @JmsListener 注解消费从 ActiveMQ 推送过来的消息。这里我们只是把消息打印了出来,你可以根据实际需要对消息进行任何形式的处理。

小结与预告

本节课我们继续介绍基于 JMS 规范的 ActiveMQ 消息中间件,并使用 Spring Boot 提供的 JmsTemplate 完成了消息的发送和消费。同样,我们也将 ActiveMQ 的使用过程集成到 SpringCSS 案例中,并基于 Spring Boot 的配置体系对配置信息的管理过程做了优化。16 讲我们将继续介绍本课程中最后一款要引入的消息中间件 RabbitMQ 及 Spring Boot 中提供的模板工具类 RabbitTemplate。

16 消息驱动:如何使用 RabbitTemplate 集成 RabbitMQ?

15 讲我们介绍了基于 ActiveMQ 和 JmsTemplate 实现消息发送和消费,并重构了 SpringCSS 案例系统中的 account-service 和 customer-service 服务。

今天,我们将介绍另一款主流的消息中间件 RabbitMQ,并基于 RabbitTemplate 模板工具类为 SpringCSS 案例添加对应的消息通信机制。

AMQP 规范与 RabbitMQ

AMQP(Advanced Message Queuing Protocol)是一个提供统一消息服务的应用层标准高级消息队列规范。和 JMS 规范一样,AMQP 描述了一套模块化的组件及组件之间进行连接的标准规则,用于明确客户端与服务器交互的语义。而业界也存在一批实现 AMQP 规范的框架,其中极具代表性的是 RabbitMQ。

AMQP 规范

在 AMQP 规范中存在三个核心组件,分别是交换器(Exchange)、消息队列(Queue)和绑定(Binding)。其中交换器用于接收应用程序发送的消息,并根据一定的规则将这些消息路由发送到消息队列中;消息队列用于存储消息,直到这些消息被消费者安全处理完毕;而绑定定义了交换器和消息队列之间的关联,为它们提供了路由规则。

在 AMQP 规范中并没有明确指明类似 JMS 中一对一的点对点模型和一对多的发布-订阅模型,不过通过控制 Exchange 与 Queue 之间的路由规则,我们可以很容易地模拟 Topic 这种典型消息中间件的概念。

如果存在多个 Queue,Exchange 如何知道把消息发送到哪个 Queue 中呢?

通过 Binding 规则设置路由信息即可。在与多个 Queue 关联之后,Exchange 中会存在一个路由表,这个表中维护着每个 Queue 存储消息的限制条件。

消息中包含一个路由键(Routing Key),它由消息发送者产生,并提供给 Exchange 路由这条消息的标准。而 Exchange 会检查 Routing Key,并结合路由算法决定将消息路由发送到哪个 Queue 中。

通过下面 Exchange 与 Queue 之间的路由关系图,我们可以看到一条来自生产者的消息通过 Exchange 中的路由算法可以发送给一个或多个 Queue,从而实现点对点和发布订阅功能。

在这里插入图片描述

AMQP 路由关系图

上图中,不同的路由算法存在不同的 Exchange 类型,而 AMQP 规范中指定了直接式交换器(Direct Exchange)、广播式交换器(Fanout Exchange)、主题式交换器(Topic Exchange)和消息头式交换器(Header Exchange)这几种 Exchange 类型,不过这一讲我们将重点介绍直接式交换器。

通过精确匹配消息的 Routing Key,直接式交换器可以将消息路由发送到零个或多个队列中,如下图所示:

在这里插入图片描述

Direct Exchange 示意图

RabbitMQ 基本架构

RabbitMQ 使用 Erlang 语言开发的 AMQP 规范标准实现框架,而 ConnectionFactory、Connection、Channel 是 RabbitMQ 对外提供的 API 中最基本的对象,都需要遵循 AMQP 规范的建议。其中,Channel 是应用程序与 RabbitMQ 交互过程中最重要的一个接口,因为我们大部分的业务操作需要通过 Channel 接口完成,如定义 Queue、定义 Exchange、绑定 Queue 与 Exchange、发布消息等。

如果想启动 RabbitMQ,我们只需要运行 rabbitmq-server.sh 文件即可。不过,因为 RabbitMQ 依赖于 Erlang,所以首先我们需要确保安装上 Erlang 环境。

接下来,我们一起看下如何使用 Spring 框架所提供的 RabbitTemplate 模板工具类集成 RabbitMQ。

使用 RabbitTemplate 集成 RabbitMQ

如果想使用 RabbitTemplate 集成 RabbitMQ,首先我们需要在 Spring Boot 应用程序中添加对 spring-boot-starter-amqp 的依赖,如下代码所示:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
使用 RabbitTemplate 发送消息

和其他模板类一样,RabbitTemplate 也提供了一批 send 方法用来发送消息,如下代码所示:

@Override
public void send(Message message) throws AmqpException {send(this.exchange, this.routingKey, message);
}@Override
public void send(String routingKey, Message message) throws AmqpException {send(this.exchange, routingKey, message);
}@Override
public void send(final String exchange, final String routingKey, final Message message) throws AmqpException {send(exchange, routingKey, message, null);
}

在这里可以看到,我们指定了消息发送的 Exchange 及用于消息路由的路由键 RoutingKey。因为这些 send 方法发送的是原生消息对象,所以在与业务代码进行集成时,我们需要将业务对象转换为 Message 对象,示例代码如下所示:

public void sendDemoObject(DemoObject demoObject) { MessageConverter converter = rabbitTemplate.getMessageConverter(); MessageProperties props = new MessageProperties(); Message message = converter.toMessage(demoObject, props); rabbitTemplate.send("demo.queue", message); 
}

如果我们不想在业务代码中嵌入 Message 等原生消息对象,还可以使用 RabbitTemplate 的 convertAndSend 方法组进行实现,如下代码所示:

@Override
public void convertAndSend(Object object) throws AmqpException {convertAndSend(this.exchange, this.routingKey, object, (CorrelationData) null);
}@Override
public void correlationConvertAndSend(Object object, CorrelationData correlationData) throws AmqpException {convertAndSend(this.exchange, this.routingKey, object, correlationData);
}@Override
public void convertAndSend(String routingKey, final Object object) throws AmqpException {convertAndSend(this.exchange, routingKey, object, (CorrelationData) null);
}@Override
public void convertAndSend(String routingKey, final Object object, CorrelationData correlationData)throws AmqpException {convertAndSend(this.exchange, routingKey, object, correlationData);
}@Override
public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException {convertAndSend(exchange, routingKey, object, (CorrelationData) null);
}

上述 convertAndSend 方法组在内部就完成了业务对象向原生消息对象的自动转换过程,因此,我们可以使用如下所示的代码来简化消息发送过程。

public void sendDemoObject(DemoObject demoObject) { rabbitTemplate.convertAndSend("demo.queue", demoObject); 
}

当然,有时候我们需要在消息发送的过程中为消息添加一些属性,这就不可避免需要操作原生 Message 对象,而 RabbitTemplate 也提供了一组 convertAndSend 重载方法应对这种场景,如下代码所示:

@Override
public void convertAndSend(String exchange, String routingKey, final Object message,final MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException {Message messageToSend = convertMessageIfNecessary(message);messageToSend = messagePostProcessor.postProcessMessage(messageToSend, correlationData);send(exchange, routingKey, messageToSend, correlationData);
}

注意这里,我们使用了一个 MessagePostProcessor 类对所生成的消息进行后处理,MessagePostProcessor 的使用方式如下代码所示:

rabbitTemplate.convertAndSend(“demo.queue”, event, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//针对 Message 的处理return message;}
});

使用 RabbitTemplate 的最后一步是在配置文件中添加配置项,在配置时我们需要指定 RabbitMQ 服务器的地址、端口、用户名和密码等信息,如下代码所示:

spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: DemoHost

注意,出于对多租户和安全因素的考虑,AMQP 还提出了虚拟主机(Virtual Host)概念,因此这里出现了一个 virtual-host 配置项。

Virtual Host 类似于权限控制组,内部可以包含若干个 Exchange 和 Queue。多个不同用户使用同一个 RabbitMQ 服务器提供的服务时,我们可以将其划分为多个 Virtual Host,并在自己的 Virtual Host 中创建相应组件,如下图所示:

图片9.png

添加了 Virtual Host 的 AMQP 模型

使用 RabbitTemplate 消费消息

和 JmsTemplate 一样,使用 RabbitTemplate 消费消息时,我们也可以使用推模式和拉模式。

在拉模式下,使用 RabbitTemplate 的典型示例如下代码所示:

public DemoEvent receiveEvent() {return (DemoEvent) rabbitTemplate.receiveAndConvert(“demo.queue”);
}

这里,我们使用了 RabbitTemplate 中的 receiveAndConvert 方法,该方法可以从一个指定的 Queue 中拉取消息,如下代码所示:

@Override
public Object receiveAndConvert(String queueName) throws AmqpException {return receiveAndConvert(queueName, this.receiveTimeout);
}

这里请注意,内部的 receiveAndConvert 方法中出现了第二个参数 receiveTimeout,这个参数的默认值是 0,意味着即使调用 receiveAndConvert 时队列中没有消息,该方法也会立即返回一个空对象,而不会等待下一个消息的到来,这点与 15 讲介绍的 JmsTemplate 存在本质性的区别。

如果我们想实现与 JmsTemplate 一样的阻塞等待,设置好 receiveTimeout 参数即可,如下代码所示:

public DemoEvent receiveEvent() { return (DemoEvent)rabbitTemplate.receiveAndConvert("demo.queue", 2000ms); 
}

如果不想每次方法调用都指定 receiveTimeout,我们可以在配置文件中通过添加配置项的方式设置 RabbitTemplate 级别的时间,如下代码所示:

spring:rabbitmq:template:receive-timeout: 2000

当然,RabbitTemplate 也提供了一组支持接收原生消息的 receive 方法,但我们还是建议使用 receiveAndConvert 方法实现拉模式下的消息消费。

介绍完拉模式,接下来我们介绍推模式,它的实现方法也很简单,如下代码所示:

@RabbitListener(queues = “demo.queue”)
public void handlerEvent(DemoEvent event) {//TODO:添加消息处理逻辑
}

开发人员在 @RabbitListener 中指定目标队列即可自动接收来自该队列的消息,这种实现方式与 15 讲中介绍的 @JmsListener 完全一致。

在 SpringCSS 案例中集成 RabbitMQ

因为这三种模板工具类的使用方式非常类似,都可以用来提取公共代码形成统一的接口和抽象类,所以作为介绍消息中间件的最后一讲,我们想对 SpringCSS 案例中的三种模板工具类的集成方式进行抽象。

实现 account-service 消息生产者

在消息生产者的 account-service 中,我们提取了如下所示的 AccountChangedPublisher 作为消息发布的统一接口。

public interface AccountChangedPublisher {void publishAccountChangedEvent(Account account, String operation);
}

请注意,这是一个面向业务的接口,没有使用用于消息通信的 AccountChangedEvent 对象。

而我们将在 AccountChangedPublisher 接口的实现类 AbstractAccountChangedPublisher 中完成对 AccountChangedEvent 对象的构建,如下代码所示:

public abstract class AbstractAccountChangedPublisher implements AccountChangedPublisher {@Overridepublic void publishAccountChangedEvent(Account account, String operation) {AccountMessage accountMessage = new AccountMessage(account.getId(), account.getAccountCode(), account.getAccountName());AccountChangedEvent event = new AccountChangedEvent(AccountChangedEvent.class.getTypeName(),operation.toString(), accountMessage);publishEvent(event);}protected abstract void publishEvent(AccountChangedEvent event);
}

AbstractAccountChangedPublisher 是一个抽象类,我们基于传入的业务对象构建了一个消息对象 AccountChangedEvent,并通过 publishEvent 抽象方法发送消息。

针对不同的消息中间件,我们需要分别实现对应的 publishEvent 方法。以 Kafka 为例,我们重构了原有代码并提供了如下所示的 KafkaAccountChangedPublisher 实现类。

@Component("kafkaAccountChangedPublisher")
public class KafkaAccountChangedPublisher extends AbstractAccountChangedPublisher {@Autowiredprivate KafkaTemplate<String, AccountChangedEvent> kafkaTemplate;@Overrideprotected void publishEvent(AccountChangedEvent event) {kafkaTemplate.send(AccountChannels.SPRINGCSS_ACCOUNT_TOPIC, event);}
}

对 RabbitMQ 而言,RabbitMQAccountChangedPublisher 的实现方式也是类似,如下代码所示:

@Component("rabbitMQAccountChangedPublisher")
public class RabbitMQAccountChangedPublisher extends AbstractAccountChangedPublisher {@Autowiredprivate RabbitTemplate rabbitTemplate;@Overrideprotected void publishEvent(AccountChangedEvent event) {rabbitTemplate.convertAndSend(AccountChannels.SPRINGCSS_ACCOUNT_QUEUE, event, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {MessageProperties props = message.getMessageProperties();props.setHeader("EVENT_SYSTEM", "SpringCSS");return message;}});}
}

对于 RabbitMQ 而言,在使用 RabbitMQAccountChangedPublisher 发送消息之前,我们需要先初始化 Exchange、Queue,以及两者之间的 Binding 关系,因此我们实现了如下所示的 RabbitMQMessagingConfig 配置类。

@Configuration
public class RabbitMQMessagingConfig {public static final String SPRINGCSS_ACCOUNT_DIRECT_EXCHANGE = "springcss.account.exchange";public static final String SPRINGCSS_ACCOUNT_ROUTING = "springcss.account.routing";@Beanpublic Queue SpringCssDirectQueue() {return new Queue(AccountChannels.SPRINGCSS_ACCOUNT_QUEUE, true);}@Beanpublic DirectExchange SpringCssDirectExchange() {return new DirectExchange(SPRINGCSS_ACCOUNT_DIRECT_EXCHANGE, true, false);}@Beanpublic Binding bindingDirect() {return BindingBuilder.bind(SpringCssDirectQueue()).to(SpringCssDirectExchange()).with(SPRINGCSS_ACCOUNT_ROUTING);}@Beanpublic Jackson2JsonMessageConverter rabbitMQMessageConverter() {return new Jackson2JsonMessageConverter();}
}

上述代码中初始化了一个 DirectExchange、一个 Queue ,并设置了两者之间的绑定关系,同时我们还初始化了一个 Jackson2JsonMessageConverter 用于在消息发送过程中将消息转化为序列化对象,以便在网络上进行传输。

实现 customer-service 消息消费者

现在,回到 customer-service 服务,我们先看看提取用于接收消息的统一化接口 AccountChangedReceiver,如下代码所示:

public interface AccountChangedReceiver {//Pull 模式下的消息接收方法void receiveAccountChangedEvent();//Push 模式下的消息接收方法void handlerAccountChangedEvent(AccountChangedEvent event);
}

AccountChangedReceiver 分别定义了拉取模式和推送模式下的消息接收方法,同样我们也提取了一个抽象实现类 AbstractAccountChangedReceiver,如下代码所示:

public abstract class AbstractAccountChangedReceiver implements AccountChangedReceiver {@AutowiredLocalAccountRepository localAccountRepository;@Overridepublic void receiveAccountChangedEvent() {AccountChangedEvent event = receiveEvent();handleEvent(event);}protected void handleEvent(AccountChangedEvent event) {AccountMessage account = event.getAccountMessage();String operation = event.getOperation();operateAccount(account, operation);}private void operateAccount(AccountMessage accountMessage, String operation) {System.out.print(accountMessage.getId() + ":" + accountMessage.getAccountCode() + ":" + accountMessage.getAccountName());LocalAccount localAccount = new LocalAccount(accountMessage.getId(), accountMessage.getAccountCode(),accountMessage.getAccountName());if (operation.equals("ADD") || operation.equals("UPDATE")) {localAccountRepository.save(localAccount);} else {localAccountRepository.delete(localAccount);}}protected abstract AccountChangedEvent receiveEvent();
}

这里实现了 AccountChangedReceiver 接口的 receiveAccountChangedEvent 方法,并定义了一个 receiveEvent 抽象方法接收来自不同消息中间件的 AccountChangedEvent 消息。一旦 receiveAccountChangedEvent 方法获取了消息,我们将根据其中的 Account 对象及对应的操作更新本地数据库。

接下来我们看看 AbstractAccountChangedReceiver 中的一个实现类 RabbitMQAccountChangedReceiver,如下代码所示:

@Component("rabbitMQAccountChangedReceiver")
public class RabbitMQAccountChangedReceiver extends AbstractAccountChangedReceiver {@Autowiredprivate RabbitTemplate rabbitTemplate;@Overridepublic AccountChangedEvent receiveEvent() {return (AccountChangedEvent) rabbitTemplate.receiveAndConvert(AccountChannels.SPRINGCSS_ACCOUNT_QUEUE);}@Override@RabbitListener(queues = AccountChannels.SPRINGCSS_ACCOUNT_QUEUE)public void handlerAccountChangedEvent(AccountChangedEvent event) {super.handleEvent(event);}
}

上述 RabbitMQAccountChangedReceiver 同时实现了 AbstractAccountChangedReceiver 的 receiveEvent 抽象方法及 AccountChangedReceiver 接口中的 handlerAccountChangedEvent 方法。其中 receiveEvent 方法用于主动拉取消息,而 handlerAccountChangedEvent 方法用于接受推动过来的消息,在该方法上我们添加了 @RabbitListener 注解。

接着我们来看下同样继承了 AbstractAccountChangedReceiver 抽象类的 KafkaAccountChangedListener 类,如下代码所示:

@Component
public class KafkaAccountChangedListener extends AbstractAccountChangedReceiver {@Override@KafkaListener(topics = AccountChannels.SPRINGCSS_ACCOUNT_TOPIC)public void handlerAccountChangedEvent(AccountChangedEvent event) {super.handleEvent(event);}@Overrideprotected AccountChangedEvent receiveEvent() {return null;}
}

我们知道 Kafka 只能通过推送方式获取消息,所以它只实现了 handlerAccountChangedEvent 方法,而 receiveEvent 方法为空。

小结与预告

这一讲,我们学习了最后一款消息中间件 RabbitMQ,并使用 Spring Boot 提供的 RabbitTemplate 完成了消息的发送和消费。同时,基于三种消息中间件的对接方式,我们提取了它们之间的共同点,并抽取了对应的接口和抽象类,重构了 SpringCSS 系统的实现过程。

Spring 为我们提供了一整套完整的安全解决方案,17 讲我们将对这套解决方案展开讨论。

http://www.dtcms.com/a/482066.html

相关文章:

  • 山东济南seo整站优化公司对其网站建设进行了考察调研
  • MIPI_CSI22_Xilinx IP
  • 【C++STL :stack queue (一) 】STL:stack与queue全解析|深入使用(附高频算法题详解)
  • DevOps工具链对比,云效 vs TikLab哪一款更好用?
  • Kanass,一款超级轻量且简洁的项目管理工具
  • 如何做企业的网站微信如何开通小程序
  • 【从0开始学习Java | 第20篇】网络编程
  • PetaLinux 工程迁移指南
  • Java面试实战:互联网医疗场景中的JVM调优与Spring Boot应用
  • http环境实现通知
  • 分布式雷达 vs 多基地雷达:同频共振的“合唱团”和“乐队”
  • 手机端-adb脚本自动化-真机版
  • Python爬虫常见陷阱:Ajax动态生成内容的URL去重与数据拼接
  • 简繁英3合1企业网站生成管理系统V1.6wordpress如何降级
  • 【学以致用|python自动化办公】OCR批量识别自动存为Excel(批量识别发票)
  • AJAX 实时搜索
  • 详细介绍C++中通过OLE操作excel时,一般会出现哪些异常,这些异常的原因是什么,如何来解决这些异常
  • ES6知识点详解和应用场景
  • 网站平台建设可行性c 网站开发项目教程
  • Webpack 核心知识点详解:proxy、热更新、Loader与Plugin全解析
  • 本地搭建 Jekyll 环境
  • 前端基础之《React(1)—webpack简介》
  • 攻击者利用Discord Webhook通过npm、PyPI和Ruby软件包构建隐蔽C2通道
  • [Spark] Metrics收集流程
  • pyspark并行性能提升经验
  • HTML盒子模型详解
  • 个人电脑做网站违法吗东莞市住建局官网
  • 下载selenium-ide及使用
  • [Spark] 事件总线机制
  • 长春建站公众号wordpress4.7中文主题