RabbitMQ 应用
RabbitMQ 应用
1. 工作模式介绍
1.1 Simple (简单模式)
(1) 概述
特点: 一个生产者, 一个消费者. 消息只能被消费一次. 也称为点对点 (Point-to-Point) 模式.
(2) 代码演示
Producer:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ProducerDemo {public static void main(String[] args) throws IOException, TimeoutException {//1.建立连接//(1) IP//(2) 端口号//(3) 账号//(4) 密码//(5) 虚拟主机ConnectionFactory factory = new ConnectionFactory();factory.setHost("120.53.237.101");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("admin");factory.setVirtualHost("vhost1");Connection connection = factory.newConnection();//2.开启信道Channel channel = connection.createChannel();//3.声明交换机 (使用内置交换机)//4.声明队列/*** queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,* Map<String,Object> arguments)* 参数说明:* queue:队列名称* durable:可持久化* exclusive:是否独占* autoDelete:是否自动删除* arguments:参数*/channel.queueDeclare("hello", true, false, false, null);//5. 发送消息/*** basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)* 参数说明:* exchange:交换机名称* routingKey:路由标签. (对于内置交换机,routingkey和队列名称保持一致)* props:属性配置* body:消息*/for (int i = 0; i < 10; i++) {String msg = "hello rabbitmq" + i;channel.basicPublish("", "hello", null, msg.getBytes());System.out.println("消息发送成功" + i);}//6. 资源释放channel.close();connection.close();}
}
Consumer:
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerDemo {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//1. 创建连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("120.53.237.101");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("vhost1");Connection connection = connectionFactory.newConnection();//2. 创建ChannelChannel channel = connection.createChannel();//3. 声明队列 (如果消费者要订阅的队列已经存在, 则可以省略)channel.queueDeclare("hello",true, false, false, null);//4. 消费消息/*** basicConsume(String queue, boolean autoAck, Consumer callback)* 参数说明:* queue: 队列名称* autoAck: 是否自动确认* callback: 接收到消息后, 执行的逻辑*/DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume("hello", true, consumer);//等待程序执行完成Thread.sleep(3000);//5. 释放资源channel.close();connection.close();}
}
1.2 Work Queue (工作队列模式)
(1) 概述
一个生产者 § , 多个消费者 (C1, C2 …).
特点: 消息不会重复, 多个消费者共同消费 Queue 中的消息.
适用场景: 集群环境中做异步处理.
(2) 代码演示
Producer:
package com.wang.rabbitmq.work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.wang.rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);//密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列 (使用内置的交换机)//如果队列不存在, 则创建 ; 如果队列已经存在, 则不创建/*** 第一个参数: 队列名称,这里是 Constants.WORK_QUEUE,指定队列名。* 第二个参数: b,durable,是否持久化。true 表示服务器重启后队列还在。* 第三个参数: b1,exclusive,是否独占。false 表示非独占,多个消费者可访问。* 第四个参数: b2,autoDelete,自动删除。false 表示当最后一个消费者断开后不自动删。* 第五个参数: map,arguments,(额外参数),null 就是没有*/channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4. 发送消息for (int i = 0; i < 10; i++) {String msg = "hello work queue...." + i;/*** 第一个参数: exchange,这里是空字符串,说明用默认交换器. 。* 第二个参数: routingKey,这里是队列名,(因为默认交换器下,routingKey 就是队列名,确保消息路由到对应队列)* 第三个参数: basicProperties,消息的属性,比如持久化、优先级等,null 表示使用默认属性。* 第四个参数: 消息本体,转成字节数组发送.*/channel.basicPublish("",Constants.WORK_QUEUE, null, msg.getBytes());}System.out.println("消息发送成功~");//6. 资源释放channel.close();connection.close();}
}
Consumer1:
package com.wang.rabbitmq.work;import com.rabbitmq.client.*;
import com.wang.rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列 (使用内置的交换机)//如果队列不存在, 则创建, 如果队列存在, 则不创建channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Override/*** consumerTag:消费者标签,标识消费者。* envelope:包含消息元数据,比如消息 ID、投递标签等。* properties:消息的属性,如持久化等设置。* body:消息体,字节数组,需要转字符串。*/public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};/***1.Constants.WORK_QUEUE: 指定从哪个队列消费消息.2.b: true(autoAck):表示是否自动确认消息. true(自动确认)意味着当消息发送给消费者后,RabbitMQ 会立即从队列中删除该消息(无需消费者手动确认);若为 `false`,则需消费者手动发送确认后,消息才会从队列移除。3.consumer:消费者对象,包含处理消息的逻辑.*/channel.basicConsume(Constants.WORK_QUEUE, true, consumer);//6. 资源释放
// channel.close();
// connection.close();}
}
Consumer2:
package com.wang.rabbitmq.work;import com.rabbitmq.client.*;
import com.wang.rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列 (使用内置的交换机)//如果队列不存在, 则创建, 如果队列存在, 则不创建channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Override/*** consumerTag:消费者标签,标识消费者。* envelope:包含消息元数据,比如消息 ID、投递标签等。* properties:消息的属性,如持久化等设置。* body:消息体,字节数组,需要转字符串。*/public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};/***1.Constants.WORK_QUEUE: 指定从哪个队列消费消息.2.b: true(autoAck):表示是否自动确认消息. true(自动确认)意味着当消息发送给消费者后,RabbitMQ 会立即从队列中删除该消息(无需消费者手动确认);若为 `false`,则需消费者手动发送确认后,消息才会从队列移除。3.consumer:消费者对象,包含处理消息的逻辑.*/channel.basicConsume(Constants.WORK_QUEUE, true, consumer);//6. 资源释放
// channel.close();
// connection.close();}
}
1.3 Publish/Subscribe (发布/订阅模式)
(1) 概述
X: 交换机. 交换机将消息按一定规则路由到一个或多个队列上.
交换机有四种类型:
- Fanout: 广播类型. 将消息发给所有绑定到交换机的队列 --> (Publish/Subscribe 模式)
- Direct: 定向类型. 把消息交给符合指定 routing key 的队列 --> (Routing 模式)
- Topic: 通配符类型. 把消息交给符合 routing pattern 的队列 --> (Topics模式)
- headers: headers 类型的交换机不依赖于路由键来路由消息, 而是根据消息内容中的 headers 属性进行匹配. headers 类型的交换器性能会很差, 而且不实用, 基本很少使用.
[!TIP]
注:
路由键 (Routing key). 生产者和交换机之间的路由键称为 Routing key, 交换机和队列之间的路由键称为 Binding key (Binding key也是 Routing key 的一种, 有时候也直接写做 Routing key).
发布/订阅模式中的交换机是 Fanout (广播类型), X 收到消息后会将消息发给所有队列. C1 订阅 Q1 ; C2 订阅 Q2.
(2) 代码演示
Producer:
package com.wang.rabbitmq.fanout;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.wang.rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明交换机/*** 第一个参数: 交换器名称,这里 Constants.FANOUT_EXCHANGE,指定交换器的标识。* 第二个参数: 交换器类型,BuiltinExchangeType.FANOUT,说明是扇出类型,也就是广播模式。* 第三个参数: durable,true 表示持久化,服务器重启后交换器还在。*/channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);//4. 声明队列/*** 第一个参数: 队列名称,这里是 Constants.WORK_QUEUE,指定队列名。* 第二个参数: b,durable,是否持久化。true 表示服务器重启后队列还在。* 第三个参数: b1,exclusive,是否独占。false 表示非独占,多个消费者可访问。* 第四个参数: b2,autoDelete,自动删除。false 表示当最后一个消费者断开后不自动删。* 第五个参数: map,arguments,(额外参数),null 就是没有*/channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);//5. 交换机和队列绑定channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");//6. 发布消息String msg = "hello fanout....";channel.basicPublish(Constants.FANOUT_EXCHANGE,"", null, msg.getBytes());System.out.println("消息发送成功");//7. 释放资源channel.close();connection.close();}
}
Consumer1:
package com.wang.rabbitmq.fanout;import com.rabbitmq.client.*;
import com.wang.rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);//密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);}
}
Consumer2:
package com.wang.rabbitmq.fanout;import com.rabbitmq.client.*;
import com.wang.rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE2, true, consumer);}
}
1.4 Routing (路由模式)
(1) 概述
发布订阅模式是无条件的将所有消息分发给所有消费者, 而路由模式是 Exchange 根据 RoutingKey 的是否匹配, 将数据筛选后发给对应的消费者队列.
适合场景: 需要根据特定规则分发消息的场景.
如上图, Binding key 有 a, b ,c 三种. 当 Routing key == a 时, 对应消息发给 Q1, Q2 ; 当 Routing key == b / c 时, 对应消息发给 Q2.
(2) 代码演示
Producer:
package com.wang.rabbitmq.direct;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.wang.rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 路由模式生产者*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明交换机channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);//4. 声明队列channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);//5. 绑定交换机和队列channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "a");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");//6. 发送消息String msg = "hello direct, my routingkey is a....";channel.basicPublish(Constants.DIRECT_EXCHANGE,"a", null, msg.getBytes());String msg_b = "hello direct, my routingkey is b....";channel.basicPublish(Constants.DIRECT_EXCHANGE,"b", null, msg_b.getBytes());String msg_c = "hello direct, my routingkey is c....";channel.basicPublish(Constants.DIRECT_EXCHANGE,"c", null, msg_c.getBytes());System.out.println("消息发送成功");//7. 释放资源channel.close();connection.close();}
}
Consumer1:
package com.wang.rabbitmq.direct;import com.rabbitmq.client.*;
import com.wang.rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);}
}
Consumer2:
package com.wang.rabbitmq.direct;import com.rabbitmq.client.*;
import com.wang.rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.DIRECT_QUEUE2, true, consumer);}
}
1.5 Topics (通配符模式)
(1) 概述
通配符模式是路由模式的升级版. 在路由模式的基础上添加了通配符, 使得匹配规则更加灵活.
(2) 代码演示
Producer:
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import constant.Constants;public class TopicRabbitProducer {public static String TOPIC_EXCHANGE_NAME = "test_topic";public static String TOPIC_QUEUE_NAME1 = "topic_queue1";public static String TOPIC_QUEUE_NAME2 = "topic_queue2";public static void main(String[] args) throws Exception {//1. 创建channel通道ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST); //ip 默认值localhostfactory.setPort(Constants.PORT); //默认值5672factory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟机名称,默认 /factory.setUsername(Constants.USER_NAME); //用户名,默认guestfactory.setPassword(Constants.PASSWORD); //密码,默认guestConnection connection = factory.newConnection();Channel channel = connection.createChannel();//2. 创建交换机channel.exchangeDeclare(Constants.TOPIC_EXCHANGE_NAME,BuiltinExchangeType.TOPIC, true, false, false, null);//3. 声明队列//如果没有这样的一个队列,会自动创建,如果有,则不创建channel.queueDeclare(Constants.TOPIC_QUEUE_NAME1, true, false, false, null);channel.queueDeclare(Constants.TOPIC_QUEUE_NAME2, true, false, false, null);//4. 绑定队列和交换机//队列1绑定error,仅接收error信息channel.queueBind(Constants.TOPIC_QUEUE_NAME1,Constants.TOPIC_EXCHANGE_NAME,"*.error");//队列2绑定info,error: error,info信息都接收channel.queueBind(Constants.TOPIC_QUEUE_NAME2,Constants.TOPIC_EXCHANGE_NAME,"#.info");channel.queueBind(Constants.TOPIC_QUEUE_NAME2,Constants.TOPIC_EXCHANGE_NAME,"*.error");//5. 发送消息String msg = "hello topic, I'm order.error";channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"order.error",null,msg.getBytes());String msg_black = "hello topic, I'm order.pay.info";channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"order.pay.info",null,msg_black.getBytes());String msg_green= "hello topic, I'm pay.error";channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"pay.error",null,msg_green.getBytes());//6.释放资源channel.close();connection.close();}
}
Consumer:
import com.rabbitmq.client.*;
import constant.Constants;
import java.io.IOException;public class TopicRabbitmqConsumer1 {public static void main(String[] args) throws Exception {//1. 创建channel通道ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST); //ip 默认值localhostfactory.setPort(Constants.PORT); //默认值5672factory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟机名称,默认 /factory.setUsername(Constants.USER_NAME); //用户名,默认guestfactory.setPassword(Constants.PASSWORD); //密码,默认guestConnection connection = factory.newConnection();Channel channel = connection.createChannel();//2. 接收消息,并消费DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);}
}
1.6 RPC (RPC 通信)
(1) 概述
RPC 通信中, 没有生产者和消费者,只有客户端和服务端.
客户端发送请求, 接收响应 ; 服务端接收请求, 发送响应.
工作流程:
- 客户端发送消息到一个指定的队列, 并在消息属性中设置 replyTo 字段, 这个字段指定了一个回调队列, 表示在这个队列里接收服务端响应.
- 服务端接收到请求后, 处理请求并发送响应到指定的回调队列.
- 客户端在回调队列上等待响应消息. 一旦收到响应, 客户端会检查消息的 correlationId 属性, 以确保同一请求对应的响应.
(2) 代码演示
客户端:
package com.wang.rabbitmq.rpc;import com.rabbitmq.client.*;
import com.wang.rabbitmq.constant.Constants;import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;/*** rpc 客户端* 1. 发送请求* 2. 接收响应*/
public class RpcClient {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);//3. 发送请求String msg = "hello rpc...";//设置请求的唯一标识 (correlationId)String correlationID = UUID.randomUUID().toString();//设置请求的相关属性AMQP.BasicProperties props = new AMQP.BasicProperties().builder().correlationId(correlationID).replyTo(Constants.RPC_RESPONSE_QUEUE).build();channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());//4. 接收响应//使用阻塞队列, 来存储响应信息final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String respMsg = new String(body);System.out.println("接收到回调消息: "+ respMsg);if (correlationID.equals(properties.getCorrelationId())){//如果correlationID校验一致response.offer(respMsg);}}};channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);String result = response.take();System.out.println("[RPC Client 响应结果]:"+ result);}
}
服务端:
package com.wang.rabbitmq.rpc;import com.rabbitmq.client.*;
import com.wang.rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** RPC server* 1. 接收请求* 2. 发送响应*/
public class RpcServer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 接收请求channel.basicQos(1);DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String request = new String(body,"UTF-8");System.out.println("接收到请求:"+ request);String response = "针对request:"+ request +", 响应成功";AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);}
}
2. SpringBoot 完成工作模式
详见项目 rabbitmq-spring-demo
(1) 配置文件: application.yml
spring:application:name: rabbitmq-spring-demorabbitmq:host: 120.53.237.101port: 5672username: adminpassword: adminvirtual-host: vhost2
(2) config 包
RabbitMQConfig 类:
(用来配置要使用的 交换机, 队列, 交换机和队列之间的绑定关系)
package com.wang.rabbitmqspringdemo.config;import com.wang.rabbitmqspringdemo.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {//工作模式 (使用默认交换机)@Bean("workQueue")public Queue workQueue(){return QueueBuilder.durable(Constants.WORK_QUEUE).build();}//广播模式@Bean("fanoutQueue1")public Queue fanoutQueue1(){return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();}@Bean("fanoutQueue2")public Queue fanoutQueue2(){return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();}@Bean("fanoutExchange")public FanoutExchange fanoutExchange(){return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).durable(true).build();}@Bean("fanoutQueueBinding1")public Binding fanoutQueueBinding1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue1") Queue queue){return BindingBuilder.bind(queue).to(fanoutExchange);}@Bean("fanoutQueueBinding2")public Binding fanoutQueueBinding2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue2") Queue queue){return BindingBuilder.bind(queue).to(fanoutExchange);}//路由模式@Bean("directQueue1")public Queue directQueue1(){return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();}@Bean("directQueue2")public Queue directQueue2(){return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();}@Bean("directExchange")public DirectExchange directExchange(){return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();}@Bean("directQueueBinding1")public Binding directQueueBinding1(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue1") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("orange");}@Bean("directQueueBinding2")public Binding directQueueBinding2(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue2") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("black");}@Bean("directQueueBinding3")public Binding directQueueBinding3(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue2") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("orange");}//通配符模式@Bean("topicQueue1")public Queue topicQueue1(){return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();}@Bean("topicQueue2")public Queue topicQueue2(){return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();}@Bean("topicExchange")public TopicExchange topicExchange(){return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();}@Bean("topicQueueBinding1")public Binding topicQueueBinding1(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue1") Queue queue){return BindingBuilder.bind(queue).to(topicExchange).with("*.orange.*");}@Bean("topicQueueBinding2")public Binding topicQueueBinding2(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){return BindingBuilder.bind(queue).to(topicExchange).with("*.*.rabbit");}@Bean("topicQueueBinding3")public Binding topicQueueBinding3(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){return BindingBuilder.bind(queue).to(topicExchange).with("lazy.#");}
}
(3) Constant 包
Constants 类:
(用来配置常量 (交换机名称, 队列名称) )
package com.wang.rabbitmqspringdemo.constant;public class Constants {public static final String WORK_QUEUE = "work.queue";//发布订阅模式public static final String FANOUT_QUEUE1 = "fanout.queue1";public static final String FANOUT_QUEUE2 = "fanout.queue2";public static final String FANOUT_EXCHANGE = "fanout.exchange";//路由模式public static final String DIRECT_QUEUE1 = "direct.queue1";public static final String DIRECT_QUEUE2 = "direct.queue2";public static final String DIRECT_EXCHANGE = "direct.exchange";//通配符模式public static final String TOPIC_QUEUE1 = "topic.queue1";public static final String TOPIC_QUEUE2 = "topic.queue2";public static final String TOPIC_EXCHANGE = "topic.exchange";
}
(4) controller 包
ProducerController 类:
(写生产者代码)
使用 rabbbitTemplate对象的 convertAndSend 方法来发送消息.
package com.wang.rabbitmqspringdemo.controller;import com.wang.rabbitmqspringdemo.constant.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RequestMapping("/producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/work")public String work(){for (int i = 0; i < 10; i++) {//使用默认交换机, RoutingKey 和队列名称一致// 发送消息rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE, "hello spring amqp: work..."+i);}return "发送成功";}@RequestMapping("/fanout")public String fanout(){rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE,"", "hello spring amqp:fanout...");return "发送成功";}@RequestMapping("/direct/{routingKey}")public String direct(@PathVariable("routingKey") String routingKey){rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, routingKey, "hello spring amqp:direct, my routing key is "+routingKey);return "发送成功";}@RequestMapping("/topic/{routingKey}")public String topic(@PathVariable("routingKey") String routingKey){rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE, routingKey, "hello spring amqp:topic, my routing key is "+routingKey);return "发送成功";}
}
(5) listener 包
工作队列模式 (WorkListener 类)
package com.wang.rabbitmqspringdemo.listener;import com.rabbitmq.client.Channel;
import com.wang.rabbitmqspringdemo.constant.Constants;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class WorkListener {@RabbitListener(queues = Constants.WORK_QUEUE)public void queueListener1(Message message, Channel channel){System.out.println("listener 1 ["+Constants.WORK_QUEUE+"] 接收到消息:" +message + ",channel:"+channel);}@RabbitListener(queues = Constants.WORK_QUEUE)public void queueListener2(String message){System.out.println("listener 2 ["+Constants.WORK_QUEUE+"] 接收到消息:" +message);}
}
广播模式 (FanoutListener 类)
package com.wang.rabbitmqspringdemo.listener;import com.rabbitmq.client.Channel;
import com.wang.rabbitmqspringdemo.constant.Constants;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class FanoutListener {@RabbitListener(queues = Constants.FANOUT_QUEUE1)public void queueListener1(String message){System.out.println("队列["+ Constants.FANOUT_QUEUE1+"] 接收到消息:" +message);}@RabbitListener(queues = Constants.FANOUT_QUEUE2)public void queueListener2(String message){System.out.println("队列["+Constants.FANOUT_QUEUE2+"] 接收到消息:" +message);}
}
路由模式 (DirectListener 类)
package com.wang.rabbitmqspringdemo.listener;import com.wang.rabbitmqspringdemo.constant.Constants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class DirectListener {@RabbitListener(queues = Constants.DIRECT_QUEUE1)public void queueListener1(String message){System.out.println("队列["+Constants.DIRECT_QUEUE1+"] 接收到消息:" +message);}@RabbitListener(queues = Constants.DIRECT_QUEUE2)public void queueListener2(String message){System.out.println("队列["+Constants.DIRECT_QUEUE2+"] 接收到消息:" +message);}
}
通配符模式 (TopicListener 类)
package com.wang.rabbitmqspringdemo.listener;import com.wang.rabbitmqspringdemo.constant.Constants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class TopicListener {@RabbitListener(queues = Constants.TOPIC_QUEUE1)public void queueListener1(String message){System.out.println("队列["+Constants.TOPIC_QUEUE1+"] 接收到消息:" +message);}@RabbitListener(queues = Constants.TOPIC_QUEUE2)public void queueListener2(String message){System.out.println("队列["+Constants.TOPIC_QUEUE2+"] 接收到消息:" +message);}
}