【RabbitMQ】工作队列和发布/订阅模式的具体实现
文章目录
- 建立连接
- 工作队列模式实现
- 创建队列和交换机
- 生产者代码
- 消费者代码
- 运行程序
- 启动消费者
- 启动生产者
- 发布/订阅模式实现
- 创建队列和交换机
- 生产者代码
- 创建交换机
- 声明两个队列
- 绑定队列和交换机
- 发送消息
- 完整代码
- 消费者代码
- 完整代码
- 运行程序
- 启动生产者
- 启动消费者
建立连接
我们把建立连接时,创建的连接工厂部分创建成常量,方便后面进行使用
- 在
rabbitmq
包下,再创建一个constant
包
package rabbitmq.constant; public class Constants { static public final String HOST = "localhost"; static public final int PORT = 5672; static public final String USER_NAME = "study"; static public final String PASSWORD = "study"; static public final String VIRTUAL_HOST = "coding ";
}
工作队列模式实现
和简单模式相比较,工作队列与之不同的就是有多个消费者,其他都一样。所以我们只需要多添加几个消费者即可
创建队列和交换机
在 Constants
中添加:
// 工作队列模式
public static final String WORK_QUEUE = "work.queue ";
生产者代码
package rabbitmq.work; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants; import java.io.IOException;
import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { // 1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3. 声明队列 channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null); //4. 发送消息 for (int i = 0; i < 10; i++) { String msg = "hello work queue..." + i; channel.basicPublish("", Constants.WORK_QUEUE, null, msg.getBytes()); } System.out.println("消息发送成功!"); // 5. 资源释放 channel.close(); connection.close(); }
}
消费者代码
package rabbitmq.work; import com.rabbitmq.client.*;
import rabbitmq.constant.Constants; import java.io.IOException;
import java.util.concurrent.TimeoutException; public class Consumer1 { public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { // 1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3. 声明队列 channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null); //4. 消费消息 DefaultConsumer consumer = new DefaultConsumer(channel){ // 从队列中收到消息,就会执行的方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //TODO System.out.println("接收到消息: " + new String(body)); } }; channel.basicConsume(Constants.WORK_QUEUE, true, consumer); // 等待程序执行完成 Thread.sleep(2000); // 5. 释放资源
// channel.close();
// connection.close(); }
}
- 多个消费者的代码都一样的
运行程序
我们先启动两个消费者,再启动生产者
- 如果先启动生产者,再启动消费者,由于消息较少,处理较快,那么第一个启动的消费者就会瞬间把 10 条消息消费掉,所以我们先启动两个消费者,再启动生产者
启动消费者
我们将两个消费者启动
- 我们可以看到
rabbitmq
客户端里面,work.queue
队列已经被创建了出来
启动生产者
在启动消费者之后,我们启动生产者,发送 10 条消息到队列中
- 我们可以看到,连个该消费者将 10 条消息消费完了
发布/订阅模式实现
在发布/订阅模式中,多了一个 Exchange
角色
Exchange
常见有三种类型,分别代表不同的路由规则
Fanout
: 广播,将消息交给所有绑定到交换机的队列(Publish/Subscribe
)Direct
: 定向,将消息交给符合指定routingKey
的队列 (Routing
模式)Topic
: 通配符,把消息交给符合routing pattern
(路由模式)的队列(Topics
模式)
也就分别对应不同的工作模式
创建队列和交换机
在 Constants
中添加:
// 发布订阅模式
public static final String FANOUT_EXCHANGE = "fanout.exchange";
public static final String FANOUT_QUEUE1 = "fanout.queue1";
public static final String FANOUT_QUEUE2 = "fanout.queue2";
生产者代码
发布/订阅模式的生产者代码和简单模式类似,只是有些变化
- 需要声明交换机
- 需要指出交换机和队列之间的关系
创建交换机
相比于生产者代码和简单模式,这一步是关键的一步。我们需要声明一个交换机,而不是使用默认交换机
channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
- 我们会使用到
exchangeDeclare()
方法
Exchange.DeclareOk exchangeDeclare(String exchange,
BuiltinExchangeType type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
参数解释:
exchange
:交换机名称type
:交换机类型Direct("direct")
:定向,直连,routing
Fanout("fanout")
:扇形(广播),每个队列都能收到消息TOPIC("topic")
:通配符HEADERS("headers")
:参数匹配(工作时用到的少)
durable
:是否持久化true
:持久化false
:非持久化- 持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息
autoDelete
:自动删除- 自动删除的前提是至少有一个对类或者交换器与这个交换器绑定,之后所有与这个交换器绑定的对类或交换器都与此解绑
- 而不是这种理解:当与此交换器连接的客户端都断开时,
RabbitMQ
会自动删除本交换器
internal
:内部使用,一般false
- 如果设置为
true
,表示内部使用 - 客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式
- 如果设置为
argument
:参数
声明两个队列
// 如果没有一个这样的队列,会自动创建;如果有,则不创建
channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);
绑定队列和交换机
channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");
channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");
- 这里会用到
queueBind()
方法
queueBind(String queue, String exchange, String routingKey)
参数解释:
queue
:对类名称exchange
:交换机名称routingKey
:路由key
,路由规则
- 如果交换机类型为
fanout
,routingKey
设置为“”
,表示每个消费者都能收到全部信息
发送消息
String msg = "hello fanout...";
// 第二个参数 routingKey 为空。因为这是广播模式,交换机收到消息后需要全部转发(绑定的时候设为空,发送的时候也为空
channel.basicPublish(Constants.FANOUT_EXCHANGE, "", null, msg.getBytes());
System.out.println("消息发送成功!");
- 这里会用到
basicPublish()
方法
basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
参数解释:
Exchange
:交换机名称routingKey
:如果交换机类型为fanout
,routingKey
设置为“”
,表示每个消费者都能收到全部信息
完整代码
package rabbitmq.fanout; import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants; import java.io.IOException;
import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { // 1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3. 声明交换机 /* Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; 参数解释: exchange:交换机名称 type:交换机类型 DIRECT("direct"),定向,直连,routing FANOUT("fanout"),扇形(广播),每个队列都能收到消息 TOPIC("topic"),通配符 HEADERS("headers"),参数匹配(工作中用的少) durable:是否持久化 autoDelete:自动删除 internal:内部使用(一般false) arguments:参数 */ channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true); // 4. 声明队列 channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null); channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null); // 5. 绑定交换机和队列 channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, ""); channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, ""); // 6. 发布消息 String msg = "hello fanout..."; // 第二个参数 routingKey 为空。因为这是广播模式,交换机收到消息后需要全部转发(绑定的时候设为空,发送的时候也为空 channel.basicPublish(Constants.FANOUT_EXCHANGE, "", null, msg.getBytes()); System.out.println("消息发送成功!"); // 7. 释放资源 channel.close(); connection.close(); }
}
消费者代码
主要的步骤为:
- 创建
Channel
- 接收消息,并处理
完整代码
package rabbitmq.fanout; import com.rabbitmq.client.*;
import rabbitmq.constant.Constants; import java.io.IOException;
import java.util.concurrent.TimeoutException; public class Consumer1 { public static void main(String[] args) throws IOException, TimeoutException { // 1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); // 2.建立信道 Channel channel = connection.createChannel(); // 3.声明队列(如果队列已经存在,就不会创建;不存在就会创建) channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null); // 4. 消费信息 DefaultConsumer consumer = new DefaultConsumer(channel) { // 从队列中收到消息,就会执行的方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到消息:" + new String(body)); } }; channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer); } }
运行程序
启动生产者
- 消息全转发
- 我们可以看到两个队列中分别有了一条消息
- 这就是发布订阅模式,他会把收到的消息都转发
- 交换机绑定了队列
- 这里,我们可以看到交换机和队列之间的绑定关系
启动消费者
消费者 1:
接收到消息:hello fanout...
消费者 2:
接收到消息:hello fanout...