如何使用Spring Context实现消息队列
说明:MQ 组件有很多,有 Kafka、Rabbit MQ、Rabbit MQ,如果不想引入这些组件,还能依靠 Redis 作为 MQ 组件,另外,如果不像引入 MQ 组件,仅想实现某些方法的异步操作,还能使用 Spring Boot 中自带的 @Async 注解。
相关博客
-
Kafka简单使用
-
RabbitMQ安装及简单使用
-
RocketMQ安装和简单使用
-
如何用Redis作为消息队列
本文介绍如何使用 Spring Context 实现消息队列
实现
定义一个消息对象,如下:
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** 消息对象*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MyMessage {private String messageId;private String message;
}
消息生产者,使用 ApplicationContext 推送消息
import com.hezy.service.mq.message.MyMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** 消息生产者*/
@Slf4j
@Component
public class MyProducer {@Resourceprivate ApplicationContext applicationContext;public void sendMyMessage(String messageId, String message) {MyMessage myMessage = new MyMessage(messageId, message);applicationContext.publishEvent(myMessage);}
}
消息消费者,接收消息,并转发到指定实现类执行
import com.hezy.service.DemoService;
import com.hezy.service.mq.message.MyMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** 消息消费着*/
@Slf4j
@Component
public class MyConsumer {@Resourceprivate DemoService demoService;@EventListener@Async // Spring Event 默认在 Producer 发送的线程,通过 @Async 实现异步public void onMessage(MyMessage message) {log.info("[onMessage][消息内容({})]", message);demoService.doMyMessage(message);}
}
消费方法
import com.hezy.service.mq.message.MyMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;@Service
@Slf4j
public class DemoServiceImpl implements DemoService {/*** 消息消费方法** @param message 消息*/@Overridepublic void doMyMessage(MyMessage message) {log.info("收到消息:{}, 去做一些事情", message);}
}
写一个接口,推送消息
import com.hezy.service.mq.message.MyMessage;
import com.hezy.service.mq.producer.MyProducer;
import org.springframework.web.bind.annotation.*;import javax.annotation.Resource;@RestController
@RequestMapping("/demo")
public class DemoController {@Resourceprivate MyProducer myProducer;@PostMapping("/put")private void test(@RequestBody MyMessage myMessage) {myProducer.sendMyMessage(myMessage.getMessageId(), myMessage.getMessage());}
}
以上代码,pom.xml 如下,功能实现仅使用了 Spring Boot 框架
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.12</version><relativePath/></parent><groupId>com.hezy</groupId><artifactId>spring_context_demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>17</maven.compiler.source><maven.compiler.target>17</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>
</project>
启动,调用该接口

控制台可见消息被消费

另外
推送消息是异步操作,如下,在推送消息后线程休眠10秒,消息也会在接口返回结果之前被消费。
@PostMapping("/put")private void test(@RequestBody MyMessage myMessage) throws InterruptedException {myProducer.sendMyMessage(myMessage.getMessageId(), myMessage.getMessage());Thread.sleep(10000L);System.out.println("返回结果");}

消息推送给消费者,是通过消息对象来识别的,只要消息消费者的参数是消息对象,就能接收到该消息。
如果定义了多个消费者,参数是同一个消息对象,这些消费者都能接收到该消息,是广播的方式,如下:

总结
本文介绍了如何使用Spring Context实现异步消息
