当前位置: 首页 > news >正文

RabbitMQ的应用

RabbitMQ的应用

    • 7种工作模式介绍
      • Simple(简单模式)
      • Work Queue(⼯作队列)
      • Publish/Subscribe(发布/订阅)
      • Routing(路由模式)
      • Topics(通配符模式)
      • RPC(RPC通信)
      • Publisher Confirms(发布确认)
        • PublishingMessagesIndividually(单独确认)
        • Publishing Messages in Batches(批量确认)
        • Handling Publisher Confirms Asynchronously(异步确认)
        • 三种差异
    • 用Spring Boot整合RabbitMQ
      • Work Queue(⼯作队列)
      • Publish/Subscribe(发布/订阅)
      • Routing(路由模式)
      • Topics(通配符模式)

7种工作模式介绍

下述代码中要在pom文件中引入依赖

<dependency>
 <groupId>com.rabbitmq</groupId>
 <artifactId>amqp-client</artifactId>
 <version>5.7.3</version>
</dependency>

Simple(简单模式)

在这里插入图片描述
P: ⽣产者, 也就是要发送消息的程序
C: 消费者,消息的接收者
Queue: 消息队列, 图中⻩⾊背景部分. 类似⼀个邮箱, 可以缓存消息; ⽣产者向其中投递消息, 消费者从其中取出消息.
特点: ⼀个⽣产者P,⼀个消费者C, 消息只能被消费⼀次. 也称为点对点(Point-to-Point)模式.
生产者代码在上一期可以看到

Work Queue(⼯作队列)

⼀个⽣产者P,多个消费者C1,C2. 在多个消息的情况下, Work Queue 会将消息分派给不同的消费者, 每个消费者都会接收到不同的消息.
特点: 消息不会重复, 分配给不同的消费者.
适⽤场景: 集群环境中做异步处理
在这里插入图片描述
先声明一个全局变量初始化

public class Constants {
public static final String H0ST = "110.41.51.65";
public static final Integer PORT = 15673;
public static final String VIRTUAL_HOST = "bite";
public static final String USER_NAME = "study";
public static final String PASSwORD = "study";
public static final String WORK_QUEUE_NAME = "work_queues";

然后在创建一个生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import constant.Constants;
public class WorkRabbitProducer {
 public static void main(String[] args) throws Exception {
 //1. 创建channel通道
 ConnectionFactory factory = new ConnectionFactory();
 factory.setHost(Constants.HOST);//ip 默认值localhost
 factory.setPort(Constants.PORT); //默认值5672
 factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟机名称, 默认 /
 factory.setUsername(Constants.USER_NAME);//⽤⼾名,默认guest
 factory.setPassword(Constants.PASSWORD);//密码, 默认guest
 Connection connection = factory.newConnection();
 Channel channel = connection.createChannel();
 //2. 声明队列
 //如果没有⼀个这样的⼀个队列, 会⾃动创建, 如果有, 则不创建
 channel.queueDeclare(Constants.WORK_QUEUE_NAME, true, false, false, 
null);
 //3. 发送消息
 for (int i = 0; i < 10; i++) {
 String msg = "Hello World" + i;
 
channel.basicPublish("",Constants.WORK_QUEUE_NAME,null,msg.getBytes());
 }
 //4. 释放资源
 channel.close();
 connection.close();
 }
 }

再次创建一个消费者代码

import com.rabbitmq.client.*;
import constant.Constants;
import java.io.IOException;
public class WorkRabbitmqConsumer1 {
 public static void main(String[] args) throws Exception {
 //1. 创建channel通道
 ConnectionFactory factory = new ConnectionFactory();
 factory.setHost(Constants.HOST);//ip 默认值localhost
 factory.setPort(Constants.PORT); //默认值5672
 factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟机名称, 默认 /
 factory.setUsername(Constants.USER_NAME);//⽤⼾名,默认guest
 factory.setPassword(Constants.PASSWORD);//密码, 默认guest
 Connection connection = factory.newConnection();
 Channel channel = connection.createChannel();
 //2. 声明队列
 //如果没有⼀个这样的⼀个队列, 会⾃动创建, 如果有, 则不创建
 channel.queueDeclare(Constants.WORK_QUEUE_NAME, true, false, false, 
null);
 //3. 接收消息, 并消费
 DefaultConsumer consumer = new DefaultConsumer(channel) {
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {
 System.out.println("接收到消息: " + new String(body));
 }
 };
 channel.basicConsume(Constants.WORK_QUEUE_NAME, true, consumer);
 }
 }

在这里插入图片描述

Publish/Subscribe(发布/订阅)

在这里插入图片描述
Exchange: 交换机 (X).
作⽤: ⽣产者将消息发送到Exchange, 由交换机将消息按⼀定规则路由到⼀个或多个队列中(上图中⽣产者将消息投递到队列中, 实际上这个在RabbitMQ中不会发⽣. )
RabbitMQ交换机有四种类型: fanout,direct, topic, headers, 不同类型有着不同的路由策略. AMQP协议⾥还有另外两种类型, System和⾃定义, 此处不再描述.

  1. Fanout:⼴播,将消息交给所有绑定到交换机的队列(Publish/Subscribe模式)
  2. Direct:定向,把消息交给符合指定routing key的队列(Routing模式)
  3. Topic:通配符,把消息交给符合routing pattern(路由模式)的队列(Topics模式)
  4. headers类型的交换器不依赖于路由键的匹配规则来路由消息, ⽽是根据发送的消息内容中的headers属性进⾏匹配. headers类型的交换器性能会很差,⽽且也不实⽤,基本上不会看到它的存在.
    Exchange(交换机)只负责转发消息, 不具备存储消息的能⼒, 因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息就会丢失
    RoutingKey: 路由键.⽣产者将消息发给交换器时, 指定的⼀个字符串, ⽤来告诉交换机应该如何处理这个消息.
    Binding Key:绑定. RabbitMQ中通过Binding(绑定)将交换器与队列关联起来, 在绑定的时候⼀般会指定⼀个Binding Key, 这样RabbitMQ就知道如何正确地将消息路由到队列了.
    在这里插入图片描述
    在这里插入图片描述1.在使用绑定的时候,需要的路由键是BindingKey.
    2.在发送消息的时候,需要的路由键是RoutingKey.
    本课程后续也可能把两者合称为RoutingKey,大家根据使用场景来区分.

代码中用全局声明

public static String FANOUT_EXCHANGE_NAME = "test_fanout";
public static String FANOUT_QUEUE_NAME1 = "fanout_queue1";
public static String FANOUT_QUEUE_NAME2 = "fanout_queue2

生产者代码

public class FanoutRabbitProducer {
 public static void main(String[] args) throws Exception {
 //1. 创建channel通道
 ConnectionFactory factory = new ConnectionFactory();
 factory.setHost(Constants.HOST);//ip 默认值localhost
 factory.setPort(Constants.PORT); //默认值5672
 factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟机名称, 默认 /
 factory.setUsername(Constants.USER_NAME);//⽤⼾名,默认guest
 factory.setPassword(Constants.PASSWORD);//密码, 默认guest
 Connection connection = factory.newConnection();
 Channel channel = connection.createChannel();
 //2. 创建交换机
 /*
 exchangeDeclare(String exchange, BuiltinExchangeType type, boolean 
durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
 参数:
 1. exchange:交换机名称
 2. type:交换机类型
 * DIRECT("direct"), 定向,直连,routing
 * * FANOUT("fanout"),扇形(⼴播), 每个队列都能收到消息
 * TOPIC("topic"),通配符
 * HEADERS("headers") 参数匹配(⼯作⽤的较少)
 3. durable: 是否持久化
 4. autoDelete: ⾃动删除
 5. internal: 内部使⽤, ⼀般falase
 6. arguments: 参数
 */
 channel.exchangeDeclare(Constants.FANOUT_EXCHANGE_NAME, 
BuiltinExchangeType.FANOUT, true, false, false, null);
 //3. 声明队列
 //如果没有⼀个这样的⼀个队列, 会⾃动创建, 如果有, 则不创建
 channel.queueDeclare(Constants.FANOUT_QUEUE_NAME1, true, false, false, 
null);
 channel.queueDeclare(Constants.FANOUT_QUEUE_NAME2, true, false, false, 
null);
 //4. 绑定队列和交换机
 /*
 queueBind(String queue, String exchange, String routingKey, 
Map<String, Object> arguments)
 参数:
 1. queue: 队列名称
 2. exchange: 交换机名称
 3. routingKey: 路由key, 路由规则
 如果交换机类型为fanout,routingkey设置为"",表⽰每个消费者都可以收到全部信息
 */
 
channel.queueBind(Constants.FANOUT_QUEUE_NAME1,Constants.FANOUT_EXCHANGE_NAME, 
"");
 
channel.queueBind(Constants.FANOUT_QUEUE_NAME2,Constants.FANOUT_EXCHANGE_NAME, 
"");
 //5. 发送消息
 /**
 * basicPublish(String exchange, String routingKey, 
AMQP.BasicProperties props, byte[] body)
 * 参数说明:
 * Exchange: 交换机名称
 * routingKey: 如果交换机类型为fanout,routingkey设置为"",表⽰每个消费者都可以
收到全部信息
 */
 String msg = "hello fanout";
 
channel.basicPublish(Constants.FANOUT_EXCHANGE_NAME,"",null,msg.getBytes());
channel.close();
 connection.close();
 }
 }

消费者代码

import com.rabbitmq.client.*;
import constant.Constants;
import java.io.IOException;
public class FanoutRabbitmqConsumer1 {
 public static void main(String[] args) throws Exception {
 //1. 创建channel通道
 ConnectionFactory factory = new ConnectionFactory();
 factory.setHost(Constants.HOST);//ip 默认值localhost
 factory.setPort(Constants.PORT); //默认值5672
 factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟机名称, 默认 /
 factory.setUsername(Constants.USER_NAME);//⽤⼾名,默认guest
 factory.setPassword(Constants.PASSWORD);//密码, 默认guest
 Connection connection = factory.newConnection();
 Channel channel = connection.createChannel();
 //2. 接收消息, 并消费
 DefaultConsumer consumer = new DefaultConsumer(channel) {
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {
 System.out.println("接收到消息: " + new String(body));
 }
 };
 channel.basicConsume(Constants.FANOUT_QUEUE_NAME1, true, consumer);
 }

在这里插入图片描述
在这里插入图片描述

Routing(路由模式)

在这里插入图片描述
路由模式是发布订阅模式的变种, 在发布订阅基础上, 增加路由key
发布订阅模式是⽆条件的将所有消息分发给所有消费者, 路由模式是Exchange根据RoutingKey的规则,将数据筛选后发给对应的消费者队列
适合场景: 需要根据特定规则分发消息的场景
生产者代码
创建交换机

channel.exchang
BuiltinExchangeType.DIRECT, true,false,,false, null);

声明队列

channel.queueDeclare(Constants.DIRECT_QUEUE_NAME1, true, false, false, null);
channel.queueDeclare(Constants.DIRECT_QUEUE_NAME2, true, false, false, null);

绑定交换机和队列

//队列1绑定orange
channel.queueBind(Constants.DIRECT_QUEUE_NAME1,Constants.DIRECT_EXCHANGE_NAME,
"orange");
//队列2绑定black,green
channel.queueBind(Constants.DIRECT_QUEUE_NAME2,Constants.DIRECT_EXCHANGE_NAME,
"black");
channel.queueBind(Constants.DIRECT_QUEUE_NAME2,Constants.DIRECT_EXCHANGE_NAME,
green

发送消息

//发送消息时,指定RoutingKey
String msg = "hello direct, I am orange";
channel.basicPublish(Constants.DIRECT_EXCHANGE_NAME,"orange",null, msg·getBytes(
));
String msg_black = "hello direct,I am black";
channel.basicPublish(Constants.DIRECT_EXCHANGE_NAME,"black", null,msg_black.getB
ytes());
String msg-green= "hello direct, I am green";
channel.basicPublish(Constants.DIRECT_EXCHANGE_NAME,"green", nuLl,msg-green.getB
ytes());

声明交换机和队列

public static String DIRECT_EXCHANGE_NAME = "test_direct";
public static String DIRECT_QUEUE_NAME1 = "direct_queue1";
public static String DIRECT_QUEUE_NAME2 = "direct_queue2";

生产者代码

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import constant.Constants;
public class DirectRabbitProducer {
 public static void main(String[] args) throws Exception {
 //1. 创建channel通道
 ConnectionFactory factory = new ConnectionFactory();
 factory.setHost(Constants.HOST);//ip 默认值localhost
 factory.setPort(Constants.PORT); //默认值5672
 factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟机名称, 默认 /
 factory.setUsername(Constants.USER_NAME);//⽤⼾名,默认guest
 factory.setPassword(Constants.PASSWORD);//密码, 默认guest
 Connection connection = factory.newConnection();
 Channel channel = connection.createChannel();
 //2. 创建交换机
 channel.exchangeDeclare(Constants.DIRECT_EXCHANGE_NAME, 
BuiltinExchangeType.FANOUT, true, false, false, null);
 //3. 声明队列
 //如果没有⼀个这样的⼀个队列, 会⾃动创建, 如果有, 则不创建
 channel.queueDeclare(Constants.DIRECT_QUEUE_NAME1, true, false, false, 
null);
 channel.queueDeclare(Constants.DIRECT_QUEUE_NAME2, true, false, false, 
null);
//4. 绑定队列和交换机
 //队列1绑定orange
 
channel.queueBind(Constants.DIRECT_QUEUE_NAME1,Constants.DIRECT_EXCHANGE_NAME, 
"orange");
 //队列2绑定black, green
 
channel.queueBind(Constants.DIRECT_QUEUE_NAME2,Constants.DIRECT_EXCHANGE_NAME, 
"black");
 
channel.queueBind(Constants.DIRECT_QUEUE_NAME2,Constants.DIRECT_EXCHANGE_NAME, 
"green");
 //5. 发送消息
 String msg = "hello direct, I am orange";
 
channel.basicPublish(Constants.DIRECT_EXCHANGE_NAME,"orange",null,msg.getBytes(
));
 String msg_black = "hello direct,I am black";
 
channel.basicPublish(Constants.DIRECT_EXCHANGE_NAME,"black",null,msg_black.getB
ytes());
 String msg_green= "hello direct, I am green";
 
channel.basicPublish(Constants.DIRECT_EXCHANGE_NAME,"green",null,msg_green.getB
ytes());
 //6.释放资源
 channel.close();
 connection.close();
 }
 }

消费者代码

mport com.rabbitmq.client.*;
import constant.Constants;
import java.io.IOException;
public class DirectRabbitmqConsumer1 {
 public static void main(String[] args) throws Exception {
 //1. 创建channel通道
 ConnectionFactory factory = new ConnectionFactory();
 factory.setHost(Constants.HOST);//ip 默认值localhost
 factory.setPort(Constants.PORT); //默认值5672
 factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟机名称, 默认 /
 factory.setUsername(Constants.USER_NAME);//⽤⼾名,默认guest
 factory.setPassword(Constants.PASSWORD);//密码, 默认guest
 Connection connection = factory.newConnection();
 Channel channel = connection.createChannel();
 //2. 接收消息, 并消费
 DefaultConsumer consumer = new DefaultConsumer(channel) {
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {
 System.out.println("接收到消息: " + new String(body));
 }
 };
 channel.basicConsume(Constants.DIRECT_QUEUE_NAME1, true, consumer);
 }
 }

在这里插入图片描述
在这里插入图片描述

Topics(通配符模式)

在这里插入图片描述
路由模式的升级版, 在routingKey的基础上,增加了通配符的功能, 使之更加灵活.
Topics和Routing的基本原理相同,即:⽣产者将消息发给交换机,交换机根据RoutingKey将消息转发给与RoutingKey匹配的队列. 类似于正则表达式的⽅式来定义Routingkey的模式.
RoutingKey 是⼀系列由点( . )分隔的单词, ⽐如 " stock.usd.nyse “, " nyse.vmw “,
" quick.orange.rabbit "
2. BindingKey 和RoutingKey⼀样, 也是点( . )分割的字符串.
3. Binding Key中可以存在两种特殊字符串, ⽤于模糊匹配
* 表⽰⼀个单词
# 表⽰多个单词(0-N个)
⽐如:
• Binding Key 为"d.a.b” 会同时路由到Q1 和Q2
• Binding Key 为"d.a.f” 会路由到Q1
• Binding Key 为"c.e.f" 会路由到Q2
• Binding Key 为"d.b.f" 会被丢弃, 或者返回给⽣产者(需要设置mandatory参数)
不同之处是:routingKey的匹配⽅式不同,Routing模式是相等匹配,topics模式是通配符匹配.
适合场景: 需要灵活匹配和过滤消息的场景
生产者代码

public static String TOPIC_EXCHANGE_NAME = "test_topic";
public static String TOPIC_QUEUE_NAME1 = "topic_queue1";
public static String TOPIC_QUEUE_NAME2 = "topic_queue2";
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import constant.Constants;
public class TopicRabbitProducer {
 public static void main(String[] args) throws Exception {
 //1. 创建channel通道
 ConnectionFactory factory = new ConnectionFactory();
 factory.setHost(Constants.HOST);//ip 默认值localhost
 factory.setPort(Constants.PORT); //默认值5672
 factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟机名称, 默认 /
 factory.setUsername(Constants.USER_NAME);//⽤⼾名,默认guest
 factory.setPassword(Constants.PASSWORD);//密码, 默认guest
 Connection connection = factory.newConnection();
 Channel channel = connection.createChannel();
 //2. 创建交换机
 channel.exchangeDeclare(Constants.TOPIC_EXCHANGE_NAME, 
BuiltinExchangeType.TOPIC, true, false, false, null);
 //3. 声明队列
 //如果没有⼀个这样的⼀个队列, 会⾃动创建, 如果有, 则不创建
 channel.queueDeclare(Constants.TOPIC_QUEUE_NAME1, true, false, false, 
null);
 channel.queueDeclare(Constants.TOPIC_QUEUE_NAME2, true, false, false, 
null);
 //4. 绑定队列和交换机
 //队列1绑定error, 仅接收error信息
 channel.queueBind(Constants.TOPIC_QUEUE_NAME1,Constants.TOPIC_EXCHANGE_NAME, 
"*.error");
 //队列2绑定info, error: error,info信息都接收
 
channel.queueBind(Constants.TOPIC_QUEUE_NAME2,Constants.TOPIC_EXCHANGE_NAME, 
"#.info");
 
channel.queueBind(Constants.TOPIC_QUEUE_NAME2,Constants.TOPIC_EXCHANGE_NAME, 
"*.error");
 //5. 发送消息
 String msg = "hello topic, I'm order.error";
 
channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"order.error",null,msg.getBy
tes());
 String msg_black = "hello topic, I'm order.pay.info";
 
channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"order.pay.info",null,msg_bl
ack.getBytes());
 String msg_green= "hello topic, I'm pay.error";
 
channel.basicPublish(Constants.TOPIC_EXCHANGE_NAME,"pay.error",null,msg_green.g
etBytes());
 //6.释放资源
 channel.close();
 connection.close();
 }
}

消费者代码

import com.rabbitmq.client.*;
import constant.Constants;
import java.io.IOException;
public class TopicRabbitmqConsumer1 {
 public static void main(String[] args) throws Exception {
 //1. 创建channel通道
 ConnectionFactory factory = new ConnectionFactory();
 factory.setHost(Constants.HOST);//ip 默认值localhost
 factory.setPort(Constants.PORT); //默认值5672
 factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟机名称, 默认 /
 factory.setUsername(Constants.USER_NAME);//⽤⼾名,默认guest
 factory.setPassword(Constants.PASSWORD);//密码, 默认guest
 Connection connection = factory.newConnection();
 Channel channel = connection.createChannel();
 //2. 接收消息, 并消费
 DefaultConsumer consumer = new DefaultConsumer(channel) {
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {
 System.out.println("接收到消息: " + new String(body));
 }
 };
 channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);
 }
}

在这里插入图片描述

RPC(RPC通信)

在这里插入图片描述
在这里插入图片描述
1.客户端发送消息到一个指定的队列,并在消息属性中设置replyTo字段,这个字段指定了一个回调队列,用于接收服务端的响应。
2.服务端接收到请求后,处理请求并发送响应消息到replyTo指定的回调队列
3.客户端在回调队列上等待响应消息.一旦收到响应,客户端会检查消息的correlationld属性,以确保它是所期望的响应
生产者代码

public static String RPC_REQUEST_QUEUE_NAME = "rpc_request_queue";
import com.rabbitmq.client.*;
import constant.Constants;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class RPCClient {
 public static void main(String[] args) throws Exception {
 //1. 创建Channel通道
 ConnectionFactory factory = new ConnectionFactory();
 factory.setHost(Constants.HOST);//ip 默认值localhost
 factory.setPort(Constants.PORT); //默认值5672
 factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟机名称, 默认 /
 factory.setUsername(Constants.USER_NAME);//⽤⼾名,默认guest
 factory.setPassword(Constants.PASSWORD);//密码, 默认guest
 Connection connection = factory.newConnection();
 Channel channel = connection.createChannel();
 //2. 声明队列
 channel.queueDeclare(Constants.RPC_REQUEST_QUEUE_NAME, true, false, 
false, null);
 // 唯⼀标志本次请求
 String corrId = UUID.randomUUID().toString();
 // 定义临时队列,并返回⽣成的队列名称
 String replyQueueName = channel.queueDeclare().getQueue();
 // ⽣成发送消息的属性
 AMQP.BasicProperties props = new AMQP.BasicProperties
 .Builder()
 .correlationId(corrId) // 唯⼀标志本次请求
 .replyTo(replyQueueName) // 设置回调队列
 .build();
 // 通过内置交换机, 发送消息
 String message = "hello rpc...";
 channel.basicPublish("", Constants.RPC_REQUEST_QUEUE_NAME, props, 
message.getBytes());
 // 阻塞队列,⽤于存储回调结果
 final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
 //接收服务端的响应
 DefaultConsumer consumer = new DefaultConsumer(channel) {
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {
 System.out.println("接收到回调消息:"+ new String(body));
 if (properties.getCorrelationId().equals(corrId)) {
 response.offer(new String(body, "UTF-8"));
 }
 };
 channel.basicConsume(replyQueueName, true, consumer);
 // 获取回调的结果
 String result = response.take();
 System.out.println(" [RPCClient] Result:" + result);
 //释放资源
 channel.close();
 connection.close();
 }
 }
 }

服务端代码

import com.rabbitmq.client.*;
import constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RPCServer {
 public static void main(String[] args) throws IOException, 
TimeoutException {
 //1. 创建Channel通道
 ConnectionFactory factory = new ConnectionFactory();
 factory.setHost(Constants.HOST);//ip 默认值localhost
 factory.setPort(Constants.PORT); //默认值5672
 factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟机名称, 默认 /
 factory.setUsername(Constants.USER_NAME);//⽤⼾名,默认guest
 factory.setPassword(Constants.PASSWORD);//密码, 默认guest
 Connection connection = factory.newConnection();
 Channel channel = connection.createChannel();
 //2. 声明队列
 channel.queueDeclare(Constants.RPC_REQUEST_QUEUE_NAME, true, false, 
false, null);
 //3. 接收消息, 并消费
 // 设置同时最多只能获取⼀个消息
 channel.basicQos(1);
 System.out.println("Awaiting RPC request");
 Consumer consumer = new DefaultConsumer(channel){
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {
 AMQP.BasicProperties replyProps = new AMQP.BasicProperties
 .Builder()
 .correlationId(properties.getCorrelationId())
 .build();
 // ⽣成返回
 String message = new String(body);
 String response = "request:"+ message + ", response: 处理成功";
 // 回复消息,通知已经收到请求
 channel.basicPublish( "", properties.getReplyTo(), replyProps, 
response.getBytes());
 // 对消息进⾏应答
 channel.basicAck(envelope.getDeliveryTag(), false);
 }
 };
 channel.basicConsume(Constants.RPC_REQUEST_QUEUE_NAME, false,consumer);
 }

在这里插入图片描述

Publisher Confirms(发布确认)

Publisher Confirms模式是RabbitMQ提供的⼀种确保消息可靠发送到RabbitMQ服务器的机制。在这种模式下,⽣产者可以等待RabbitMQ服务器的确认,以确保消息已经被服务器接收并处理.

  1. ⽣产者将Channel设置为confirm模式(通过调⽤channel.confirmSelect()完成)后, 发布的每⼀条消息都会获得⼀个唯⼀的ID, ⽣产者可以将这些序列号与消息关联起来,以便跟踪消息的状态.
  2. 当消息被RabbitMQ服务器接收并处理后,服务器会异步地向⽣产者发送⼀个确认(ACK)给⽣产者(包含消息的唯⼀ID),表明消息已经送达.在这里插入图片描述
    在这里插入图片描述
    发送方确认机制最大的好处在于它是异步的,生产者可以同时发布消息和等待信道返回确认消息,
    1.当消息最终得到确认之后,生产者可以通过回调方法来处理该确认消息.
    2.如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack(Basic.Nack)命令,生产者同样
    可以在回调方法中处理该nack命令.
    发布确认有三种策略
PublishingMessagesIndividually(单独确认)
static void publishMessagesIndividually() throws Exception {
 try (Connection connection = createConnection()) {
 //创建channel
 Channel ch = connection.createChannel();
 //开启信道确认模式
 ch.confirmSelect();
 //声明队列
 ch.queueDeclare(PUBLISHER_CONFIRMS_QUEUE_NAME1, true, false, true, 
null);
 long start = System.currentTimeMillis();
 //循环发送消息
 for (int i = 0; i < MESSAGE_COUNT; i++) {
 String body = "消息"+ i;
 //发布消息
 ch.basicPublish("", PUBLISHER_CONFIRMS_QUEUE_NAME1, null, 
body.getBytes());
 //等待确认消息.只要消息被确认,这个⽅法就会被返回
 //如果超时过期, 则抛出TimeoutException。如果任何消息被nack(丢失), 
waitForConfirmsOrDie将抛出IOException。
 ch.waitForConfirmsOrDie(5_000);
 }
 long end = System.currentTimeMillis();
 System.out.format("Published %d messages individually in %d ms", 
MESSAGE_COUNT, end - start);
}
}

观察上⾯代码, 会发现这种策略是每发送⼀条消息后就调⽤channel.waitForConfirmsOrDie⽅法,之后等待服务端的确认, 这实际上是⼀种串⾏同步等待的⽅式. 尤其对于持久化的消息来说, 需要等待消息确
认存储在磁盘之后才会返回(调⽤Linux内核的fsync⽅法).

Publishing Messages in Batches(批量确认)
static void publishMessagesInBatch() throws Exception {
 try (Connection connection = createConnection()) {
 //创建信道
 Channel ch = connection.createChannel();
 //信道设置为confirm模式
 ch.confirmSelect();
 //声明队列
 ch.queueDeclare(PUBLISHER_CONFIRMS_QUEUE_NAME2, true, false, true, 
null);
 
 int batchSize = 100;
 int outstandingMessageCount = 0;
 
 long start = System.currentTimeMillis();
 for (int i = 0; i < MESSAGE_COUNT; i++) {
 String body = "消息"+ i;
 //发送消息
 ch.basicPublish("", PUBLISHER_CONFIRMS_QUEUE_NAME2, null, 
body.getBytes());
 outstandingMessageCount++;
 //批量确认消息
 if (outstandingMessageCount == batchSize) {
 ch.waitForConfirmsOrDie(5_000);
 outstandingMessageCount = 0;
 }
 }
 //消息发送完, 还有未确认的消息, 进⾏确认
 if (outstandingMessageCount > 0) {
 ch.waitForConfirmsOrDie(5_000);
 }
 long end = System.currentTimeMillis();
 System.out.format("Published %d messages in batch in %d ms",MESSAGE_COUNT,end - start);
}
}

相⽐于单独确认策略, 批量确认极⼤地提升了confirm的效率, 缺点是出现Basic.Nack或者超时时, 我们不清楚具体哪条消息出了问题. 客⼾端需要将这⼀批次的消息全部重发, 这会带来明显的重复消息数量.
当消息经常丢失时,批量确认的性能应该是不升反降的.

Handling Publisher Confirms Asynchronously(异步确认)
static void handlePublishConfirmsAsynchronously() throws Exception {
 try (Connection connection = createConnection()) {
 Channel ch = connection.createChannel();
 ch.queueDeclare(PUBLISHER_CONFIRMS_QUEUE_NAME3, false, false, true, 
null);
 ch.confirmSelect();
 //有序集合,元素按照⾃然顺序进⾏排序,存储未confirm消息序号
 SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new
TreeSet<>());
 ch.addConfirmListener(new ConfirmListener() {
 @Override
 public void handleAck(long deliveryTag, boolean multiple) throws
IOException {
 //System.out.println("ack, SeqNo: " + deliveryTag +",multiple:" +multiple);
 //multiple 批量
 //confirmSet.headSet(n)⽅法返回当前集合中⼩于n的集合
 if (multiple) {
 //批量确认:将集合中⼩于等于当前序号deliveryTag元素的集合清除,表⽰
这批序号的消息都已经被ack了
 confirmSet.headSet(deliveryTag+1).clear();
 } else {
 //单条确认:将当前的deliveryTag从集合中移除
 confirmSet.remove(deliveryTag);
 }
 }
 @Override
 public void handleNack(long deliveryTag, boolean multiple) throws
IOException {
 System.err.format("deliveryTag: %d, multiple: %b%n", 
deliveryTag, multiple);
 if (multiple) {
 //批量确认:将集合中⼩于等于当前序号deliveryTag元素的集合清除,表⽰
这批序号的消息都已经被ack了
 confirmSet.headSet(deliveryTag+1).clear();
 } else {
 //单条确认:将当前的deliveryTag从集合中移除
 confirmSet.remove(deliveryTag);
 }
 //如果处理失败, 这⾥需要添加处理消息重发的场景. 此处代码省略
 }
 });
 //循环发送消息
 long start = System.currentTimeMillis();
 for (int i = 0; i < MESSAGE_COUNT; i++) {
 String message = "消息" + i;
 //得到下次发送消息的序号, 从1开始
 long nextPublishSeqNo = ch.getNextPublishSeqNo();
 //System.out.println("消息序号:"+ nextPublishSeqNo);
 ch.basicPublish("", PUBLISHER_CONFIRMS_QUEUE_NAME3, null, 
message.getBytes());
 //将序号存⼊集合中
 confirmSet.add(nextPublishSeqNo);
 }
 //消息确认完毕
 while (!confirmSet.isEmpty()){
 	Thread.sleep(10);
 	}
 	long end = System.currentTimeMillis();
 System.out.format("Published %d messages and handled confirms 
asynchronously in %d ms%n", MESSAGE_COUNT, end -start);
}
}
三种差异

异步确认>批量确认>单独确认

用Spring Boot整合RabbitMQ

在下面引入依赖

<!--Spring MVC相关依赖-->
<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-test</artifactId>
 <scope>test</scope>
</dependency>
<!--RabbitMQ相关依赖-->
<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

并且配置yml配置,基本信息配置

#配置RabbitMQ的基本信息
spring:
 rabbitmq:
 host: 110.41.51.65
 port: 15673 #默认为5672
 username: study
 password: study
 virtual-host: bite #默认值为

Work Queue(⼯作队列)

生产者代码

public static final String WORK_QUEUE = "work_queue”;

声明队列

import com.bite.rabbitmq.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
 //1. ⼯作模式队列
 @Bean("workQueue")
 public Queue workQueue() {
 return QueueBuilder.durable(Constants.WORK_QUEUE).build();
 }
}
@RequestMapping("/producer")
@RestController
public class ProducerController {
 @Autowired
 private RabbitTemplate rabbitTemplate;
 @RequestMapping("/work")
 public String work(){
 for (int i = 0; i < 10; i++) {
 //使⽤内置交换机发送消息, routingKey和队列名称保持⼀致
 rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE, "hello spring amqp:work");
 		}
 return "发送成功";
 	}
 }

消费者代码

import com.bite.rabbitmq.constant.Constants;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class WorkListener {
 @RabbitListener(queues = Constants.WORK_QUEUE)
 public void listenerQueue(Message message){
 System.out.println("listener 1["+Constants.WORK_QUEUE+"]收到消息:" + 
message);
 }
 @RabbitListener(queues = Constants.WORK_QUEUE)
 public void listenerQueue2(Message message){
 System.out.println("listener 2["+Constants.WORK_QUEUE+"]收到消息:" + 
message);
 }
 }

在这里插入图片描述
在这里插入图片描述

Publish/Subscribe(发布/订阅)

生产者代码

发布/订阅模式
public static final String FANOUT_QUEUE1 = "fanout_queue1";
public static final String FANOUT_QUEUE2 = "fanout_queue2";
public static final String FANOUT_EXCHANGE_NAME = "fanout_exchange";
//声明2个队列, 观察是否两个队列都收到了消息
@Bean("fanoutQueue1")
public Queue fanoutQueue1() {
 return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();
}
@Bean("fanoutQueue2")
public Queue fanoutQueue2() {
 return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();
}
//声明交换机
@Bean("fanoutExchange")
public FanoutExchange fanoutExchange() {
 return
ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE_NAME).durable(true).bu
ild();
}
//队列和交换机绑定
@Bean
public Binding fanoutBinding(@Qualifier("fanoutExchange") FanoutExchange 
exchange, @Qualifier("fanoutQueue1") Queue queue) {
 return BindingBuilder.bind(queue).to(exchange);
}
@Bean
public Binding fanoutBinding2(@Qualifier("fanoutExchange") FanoutExchange 
exchange, @Qualifier("fanoutQueue2") Queue queue) {
 return BindingBuilder.bind(queue).to(exchange);
}

使用接口发送消息

@RequestMapping("/fanout")
public String fanoutProduct(){
 //routingKey为空, 表⽰所有队列都可以收到消息
 rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE_NAME, "","hello 
spring boot: fanout");
 return "发送成功";
 }

消费者代码

import com.bite.rabbitmq.constant.Constants;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class FanoutListener {
 //指定监听队列的名称
 @RabbitListener(queues = Constants.FANOUT_QUEUE1)
 public void ListenerQueue(String message){
 System.out.println("["+Constants.FANOUT_QUEUE1+ "]接收到消息:"+ message);
 }
 @RabbitListener(queues = Constants.FANOUT_QUEUE2)
 public void ListenerQueue2(String message){
 System.out.println("["+Constants.FANOUT_QUEUE2+ "]接收到消息:"+ message);
 }
 }

在这里插入图片描述

Routing(路由模式)

生产者代码

public static final String DIRECT_QUEUE1="direct_queuel"
public static final String DIRECT"direct_queue2
public static final String DIRECT_EXCHANGE_NAME = "direct_exchange"
//Routing模式
@Bean("directQueue1")
public Queue routingQueue1() {
 return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();
}
@Bean("directQueue2")
public Queue routingQueue2() {
 return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();
}
//声明交换机
@Bean("directExchange")
public DirectExchange directExchange() {
 return
ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE_NAME).durable(true).build();
}
//队列和交换机绑定
//队列1绑定orange
@Bean
public Binding directBinding(@Qualifier("directExchange") DirectExchange 
exchange, @Qualifier("directQueue1") Queue queue) {
 return BindingBuilder.bind(queue).to(exchange).with("orange");
}
//队列2绑定black, green
@Bean
public Binding directBinding2(@Qualifier("directExchange") DirectExchange 
exchange, @Qualifier("directQueue2") Queue queue) {
 return BindingBuilder.bind(queue).to(exchange).with("black");
}
@Bean
public Binding directBinding3(@Qualifier("directExchange") DirectExchange 
exchange, @Qualifier("directQueue2") Queue queue) {
 return BindingBuilder.bind(queue).to(exchange).with("green");
 }

消费者代码

import com.bite.rabbitmq.constant.Constants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DirectListener {
 //指定监听队列的名称
 @RabbitListener(queues = Constants.DIRECT_QUEUE1)
 public void ListenerQueue(String message){
System.out.println("["+Constants.DIRECT_QUEUE1+ "]接收到消息:"+ message);
 }
 @RabbitListener(queues = Constants.DIRECT_QUEUE2)
 public void ListenerQueue2(String message){
 System.out.println("["+Constants.DIRECT_QUEUE2+ "]接收到消息:"+ message);
 }
}

接口发送消息

@RequestMapping("/direct")
public String directProduct(String routingKey){
 //routingKey作为参数传递
 rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE_NAME, 
routingKey,"hello spring boot: direct "+routingKey);
 return "发送成功";
}

通过调用接口发送routingkey为orange的消息。

Topics(通配符模式)

Topics和Routing模式的区别是:
1.topics模式使用的交换机类型为topic(Routing模式使用的交换机类型为direct)
2.topic类型的交换机在匹配规则上进行了扩展,BindingKey支持通配符匹配
生产者代码

public static final String TOPICS_QUEUE1 = "topics_queue1";
public static final String TOPICS_QUEUE2 = "topics_queue2";
public static final String TOPICS_EXCHANGE_NAME = "topics_exchange";
1
2
3
4
//topic模式
@Bean("topicsQueue1")
public Queue topicsQueue1() {
 return QueueBuilder.durable(Constants.TOPICS_QUEUE1).build();
}
@Bean("topicsQueue2")
public Queue topicsQueue2() {
 return QueueBuilder.durable(Constants.TOPICS_QUEUE2).build();
}
//声明交换机
@Bean("topicExchange")
public TopicExchange topicExchange() {
 return
ExchangeBuilder.topicExchange(Constants.TOPICS_EXCHANGE_NAME).durable(true).bui
ld();
}
//队列和交换机绑定
//队列1绑定error, 仅接收error信息
@Bean
public Binding topicBinding(@Qualifier("topicExchange") TopicExchange 
exchange, @Qualifier("topicsQueue1") Queue queue) {
 return BindingBuilder.bind(queue).to(exchange).with("*.error");
 }
 @Bean
public Binding topicBinding2(@Qualifier("topicExchange") TopicExchange 
exchange, @Qualifier("topicsQueue2") Queue queue) {
 return BindingBuilder.bind(queue).to(exchange).with("#.info");
}
@Bean
public Binding topicBinding3(@Qualifier("topicExchange") TopicExchange 
exchange, @Qualifier("topicsQueue2") Queue queue) {
 return BindingBuilder.bind(queue).to(exchange).with("*.error");
}

接口发送消息

@RequestMapping("/topics")
public String topicProduct(String routingKey){
 //routingKey为空, 表⽰所有队列都可以收到消息
 rabbitTemplate.convertAndSend(Constants.TOPICS_EXCHANGE_NAME, 
routingKey,"hello spring boot: topics "+routingKey);
 return "发送成功";
}

消费者代码

import com.bite.rabbitmq.constant.Constants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TopicListener {
 //指定监听队列的名称
 @RabbitListener(queues = Constants.TOPICS_QUEUE1)
 public void ListenerQueue(String message){
 System.out.println("["+Constants.TOPICS_QUEUE1+ "]接收到消息:"+ message);
 }
 @RabbitListener(queues = Constants.TOPICS_QUEUE2)
 public void ListenerQueue2(String message){
 System.out.println("["+Constants.TOPICS_QUEUE2+ "]接收到消息:"+ message);
 }
 }

相关文章:

  • mysql和mongodb
  • React 之 Redux 第三十二节 Redux 常用API及HOOKS,以及Redux Toolkit核心API使用详解
  • 62. 评论日记
  • java 实现文件编码检测的多种方式
  • Podman技术深度解剖:架构、原理与核心特性解析
  • cocos Spine资源及加载
  • JavaScript Map 对象深度解剖
  • HarmonyOS 第2章 Ability的开发,鸿蒙HarmonyOS 应用开发入门
  • 开源FMC 4路千兆网模块
  • Git 基本使用
  • 塑料瓶识别分割数据集labelme格式976张1类别
  • CASAIM与中国中车达成深度合作,助力异形大部件尺寸精准分析
  • TCPIP详解 卷1协议 四 地址解析协议
  • gcc/g++使用
  • agasa文件传输:内网文件互传的高效解决方案
  • 【Kubernetes基础】--查阅笔记1
  • Missashe考研日记-day20
  • svn 分支(branch)和标签(tag)管理
  • Cherry Studio + MCP,从0到1保姆教程,3个场景体验
  • GIT的一些操作
  • 消息人士称泽连斯基已启程前往土耳其
  • 因存在安全隐患,福特公司召回约27.4万辆SUV
  • 125%→10%、24%税率暂停90天,对美关税开始调整
  • 央媒评网红质疑胖东来玉石定价暴利:对碰瓷式维权不能姑息
  • 哈马斯表示已释放一名美以双重国籍被扣押人员
  • 朝着解决问题的正确方向迈进——中美经贸高层会谈牵动世界目光