Spring Boot集合RabbitMQ
RabbitMQ在SpringBoot中的使用
Spring官网中如何在Spring中配置RabbitMQ:AMQP :: Spring Boot

在创建项目的时候导入其依赖



添加配置项

工作队列中消息的发送
创建一个队列
@Configuration
public class RabbitMQConfig {// 当spring启动的时候,自动创建,并且注册一个RabbitMQ队列@Bean("workQueue")public Queue workQueue(){return QueueBuilder.durable(Constants.WORK_QUEUE123).build();}
}
将队列名称定义为一个常量
public static final String WORK_QUEUE123="work.queue";
进行消息的发送
@RestController
@RequestMapping("/rabbit")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/work")public String work() {for (int i = 0; i < 10; i++) {System.out.println("消息发送成功");rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE123, "hello rabbitMQ Spring Boot");}return "消息发送成功";}
}
消息的接收

@Component
public class WorkListener {@RabbitListener(queues = Constants.WORK_QUEUE123)public void queueListener1(Message message ,Channel channel){System.out.println("监听的队列"+Constants.WORK_QUEUE123+"收到的消息"+message+"channel"+channel);}@RabbitListener(queues = Constants.WORK_QUEUE123)public void queueListener2(String message){System.out.println("监听的队列"+Constants.WORK_QUEUE123+"收到的消息"+message);}}

发布订阅模式(Publish/Subscribe)
在发布订阅模式中多了一个新的角色 Exchange(交换机)
创建两个队列,一个交换机,在创建队列的时候,将队列定义为持久化
@Bean("fanoutQueue1")public Queue fanoutQueue1(){return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();}@Bean("fanoutQueue2")public Queue fanoutQueue2(){return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();}@Bean("fanoutExchange")public FanoutExchange fanoutExchange(){return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).durable(true).build();}
将队列和交换机进行绑定
@Bean("fanoutBinding1")public Binding fanoutBinding1(@Qualifier("fanoutExchange") FanoutExchange exchange,@Qualifier("fanoutQueue1") Queue queue){return BindingBuilder.bind(queue).to(exchange);}@Bean("fanoutBinding2")public Binding fanoutBinding2(@Qualifier("fanoutExchange") FanoutExchange exchange,@Qualifier("fanoutQueue2") Queue queue){return BindingBuilder.bind(queue).to(exchange);}
将其名字定义为常量
public static final String FANOUT_QUEUE1="fanout.queue1";public static final String FANOUT_QUEUE2="fanout.queue2";public static final String FANOUT_EXCHANGE="fanout.exchange";
发送消息
@RequestMapping("/fanout")public String fanout() {for (int i = 0; i < 10; i++) {System.out.println("消息发送成功");rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE, "", "hello spring amqp:fanout...");}return "发送成功";}
接收消息
@Component
public class FanoutListener {@RabbitListener(queues = Constants.FANOUT_QUEUE1)public void fanoutQueueListener1(String message){System.out.println("监听的队列"+Constants.FANOUT_QUEUE1+"收到的消息"+message);}@RabbitListener(queues = Constants.FANOUT_QUEUE2)public void fanoutQueueListener2(String message){System.out.println("监听的队列"+Constants.FANOUT_QUEUE2+"收到的消息"+message);}
}

路由模式(Routing)
定义队列和交换机
@Bean("directQueue1")public Queue directQueue1(){return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();}@Bean("directQueue2")public Queue directQueue2(){return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();}@Bean("directExchange")public DirectExchange directExchange(){return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();}
定义队列和交换机的名字为常量
public static final String DIRECT_QUEUE1="direct.queue1";public static final String DIRECT_QUEUE2="direct.queue2";public static final String DIRECT_EXCHANGE="direct.exchange";
将队列和交换机进行绑定,并且定义rountingKey
@Bean("directBlinding1")public Binding directBlinding1(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue1") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("orange");}@Bean("directBlinding2")public Binding directBlinding2(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("black");}@Bean("directBlinding3")public Binding directBlinding3(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("orange");}
发送消息
@RequestMapping("/directByOrange")public String directByOrange() {for (int i = 0; i < 4; i++) {System.out.println("消息发送成功");rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, "orange", "<orange>hello spring amqp:direct...");}return "发送成功";}@RequestMapping("/directByBlack")public String directByBlack() {for (int i = 0; i < 4; i++) {System.out.println("消息发送成功");rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, "black", "<black>hello spring amqp:direct...");}return "发送成功";}
接收消息
@Component
public class DirectListener {@RabbitListener(queues = Constants.DIRECT_QUEUE1)public void directQueueListener1(String message){System.out.println("监听的队列"+Constants.DIRECT_QUEUE1+"收到的消息"+message);}@RabbitListener(queues = Constants.DIRECT_QUEUE2)public void directQueueListener2(String message){System.out.println("监听的队列"+Constants.DIRECT_QUEUE2+"收到的消息"+message);}
}
通过rountingKey:orange发送消息

通过rountingKey:black发送消息

通配符模式(Topics)
创建队列和交换机
@Bean("topicQueue1")public Queue topicQueue1(){return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();}@Bean("topicQueue2")public Queue topicQueue2(){return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();}@Bean("topicExchange")public TopicExchange topicExchange(){return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();}
定义队列和交换机的名字为常量
public static final String TOPIC_QUEUE1="topic.queue1";public static final String TOPIC_QUEUE2="topic.queue2";public static final String TOPIC_EXCHANGE="topic.exchange";
将队列和交换机进行绑定
定义rountingKey的通配符规则
@Bean("topicBlinding1")public Binding topicBlinding1(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue1") Queue queue){return BindingBuilder.bind(queue).to(topicExchange).with("*.*.mm");}@Bean("topicBlinding2")public Binding topicBlinding2(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue2") Queue queue){return BindingBuilder.bind(queue).to(topicExchange).with("*.nn.*");}@Bean("topicBlinding3")public Binding topicBlinding3(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue2") Queue queue){return BindingBuilder.bind(queue).to(topicExchange).with("pp.#");}
发送消息
进行路径的匹配
![]()
@RequestMapping("/topic/{routingKey}")public String topic(@PathVariable("routingKey") String routingKey){System.out.println("消息发送成功");rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE, routingKey, "hello spring amqp:topic, my routing key is "+routingKey);return "发送成功";}
接受消息
@Component
public class TopicListener {@RabbitListener(queues = Constants.TOPIC_QUEUE1)public void topicQueueListener1(String message){System.out.println("监听的队列"+Constants.TOPIC_QUEUE1+"收到的消息"+message);}@RabbitListener(queues = Constants.TOPIC_QUEUE2)public void topicQueueListener2(String message){System.out.println("监听的队列"+Constants.TOPIC_QUEUE2+"收到的消息"+message);}
}





由此可见,发送的消息已经全部被消费
基于SpringBoot+RabbitMQ完成通信

实订单项目和物流项目之间的通信
在创建好项目之后,在项目中配置好RabbitMQ的信息


首先进行队列的申明
@Configuration
public class RabbitConfig {@Bean("workQueue")public Queue workQueue(){return QueueBuilder.durable("queue.build").build();}
在订单系统下单成功后,进行订单消息的发送
@RestController
@RequestMapping("/order")
public class OrderController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/createOrder")public String createOrder(){String orderId= UUID.randomUUID().toString();rabbitTemplate.convertAndSend("","queue.build","下单成功"+orderId);return "下单成功";}

![]()

在物流系统中进行消息的接收
@Component
public class OrderListener {@RabbitListener(queues = "queue.build")public void listenerQueue(String message){System.out.println("接收到的消息"+message);}@RabbitHandler@RabbitListener(queues = "queue.build")public void ListenerQueue(UserInfo userinfo){System.out.println("接收到的消息"+userinfo);}
}
![]()
消息已经被消费
![]()
发送消息格式为对象
创建一个对象
@Data
public class UserInfo {private String messageId;private String messageName;
}
使用Json转换器,并将Json转换器绑定到自定义的RabbitTemplate,进行JSON 序列化,跨语言、可读、安全
// 定义一个Json消息转换器@Beanpublic Jackson2JsonMessageConverter jackson2JsonMessageConverter(){return new Jackson2JsonMessageConverter();}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jackson2JsonMessageConverter){RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());return rabbitTemplate;}
进行对象消息的发送
@RequestMapping("/createOrder1")public String createOrder1(){String orderId= UUID.randomUUID().toString();UserInfo userInfo=new UserInfo();userInfo.setMessageId(orderId);userInfo.setMessageId("发送消息成功");rabbitTemplate.convertAndSend("","queue.build",userInfo);return "下单成功";}
![]()
先在物流系统中配置JSON 消息转换器
@Configuration
public class RabbitConfig {@Beanpublic Jackson2JsonMessageConverter converter() {return new Jackson2JsonMessageConverter();}
进行消息的接收
@RabbitHandler@RabbitListener(queues = "queue.build")public void ListenerQueue(UserInfo userinfo){System.out.println("接收到的消息"+userinfo);}
![]()
![]()
