当前位置: 首页 > news >正文

分布式之RabbitMQ的使用(1)

文章目录

  • 什么是RabbitMQ
    • 为什么会产生消息队列?有几个原因
  • RabbitMQ安装
  • 简单队列实现
    • 创建项目
    • 创建项目
    • 引入JAR
    • 初始化项目
    • 创建连接工具类
    • 创建生产者
      • 代码编写
      • 打开命令行窗口查看
    • 创建消费者
      • 写法一:新建类
      • 写法二:使用匿名内部类
      • 两种写法的测试
      • 两种方法对比
  • Work模式
    • 创建生产者
    • 创建消费者
      • 创建消费者01
      • 创建消费者02
    • 测试
  • 公平分发和手动/自动反馈
    • 公平分发
      • 修改消费者01
      • 修改消费者02
      • 运行结果有问题
    • 手动/自动反馈
      • 自动反馈
      • 手动反馈
    • 手动/自动反馈代码实现
      • 修改消费者
      • 修改生产者
      • 运行结果
    • 手动反馈处理失败
      • 讲解
      • 代码演示
      • 运行结果
  • 消息应答和持久化
    • 消息应答
    • RabbitMQ消息持久化
      • 持久化步骤
      • 代码演示
      • 运行结果
  • 订阅模式
    • 生产者
    • 消费者
      • 消费者01
      • 消费者02
    • 运行结果
  • 路由模式
    • 生产者
    • 消费者
      • 消费者01
      • 消费者02
    • 运行结果

什么是RabbitMQ

消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。MQ框架非常之多,比较流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的RocketMQ。开发语言:Erlang – 面向并发的编程语言

为什么会产生消息队列?有几个原因

不同进程(process)之间传递0消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;

不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列。

有以下几个模式:

  1. 简单队列
  2. 工作队列
  3. 订阅模式
  4. 路由分发模式
  5. 主题模式

在这里插入图片描述

RabbitMQ安装

请看我之前分布式专栏的博客:RabbitMQ的安装

简单队列实现

在这里插入图片描述

在这里插入图片描述
生产者,队列,消费者

生产者生产消息,消费者拿到消息,RabbitMQ存储消息。

创建项目

在这里插入图片描述

创建项目

在这里插入图片描述
我们这里先使用指定的版本的包。
在这里插入图片描述

引入JAR

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.1</version>
</dependency>

初始化项目

删除java.com.hsh下的所有文件然后新增文件夹test01,test02,test03,utils
在这里插入图片描述

创建连接工具类

在utils文加下新建ConnectionUtils文件

package com.hsh.utils;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class ConnectionUtils {public static Connection getConnection() {try {Connection connection = null;// 定义一个连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 设置连接地址connectionFactory.setHost("127.0.0.1");// 设置端口号connectionFactory.setPort(5672);// 设置虚拟主机(相当于数据库中的库)connectionFactory.setVirtualHost("/");// 设置用户名connectionFactory.setUsername("guest");// 设置密码connectionFactory.setPassword("guest");connection = connectionFactory.newConnection();return connection;}catch (Exception e){e.printStackTrace();return null;}}
}

创建生产者

代码编写

package com.hsh.test01;import com.hsh.utils.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Producer01 {public static void main(String[] args) {System.out.println("生产者启动...");// 获得连接Connection connection = ConnectionUtils.getConnection();try {// 创建通道Channel channel = connection.createChannel();// 创建队列声明channel.queueDeclare("贝尔摩德", false, false, false, null);// 定义发送消息的数据String message = "秘密让女人更有魅力";// 发送消息channel.basicPublish("", "贝尔摩德", null, message.getBytes());System.out.println("生产者发送消息:" + message);channel.close();connection.close();}catch (Exception e){e.printStackTrace();}}
}

打开命令行窗口查看

打开RabbitMQ的可视化界面,点击
在这里插入图片描述
在这里插入图片描述

设置是否排他:为 true 则设置队列为排他的。如果一个队列被声明为排 他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意 三点:排他队列是基于连接( Connection) 可见的,同 个连接的不同信道 (Channel) 是可以同时访问同一连接创建的排他队列; "首次"是指如果 个连接己经声明了 排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同:即使该队 列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列 适用于一个客户端同时发送和读取消息的应用场景

创建消费者

我们需要对上面存到RabbitMQ的数据进行监听,如果RabbitMQ里面有数据就拿。而监听可以单独建一个类,也可写一个匿名内部类。

写法一:新建类

test01编写监听类

package com.hsh.test01;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;import java.io.IOException;public class ListenerConsumer extends DefaultConsumer {public ListenerConsumer(Channel channel) {super(channel);}@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("消费者1:" + message);}
}

test01编写消费者

package com.hsh.test01;import com.hsh.utils.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Consumer01 {public static void main(String[] args) {System.out.println("消费者启动...");try {// 获得连接Connection connection = ConnectionUtils.getConnection();// 创建通道Channel channel = connection.createChannel();// 连接队列channel.queueDeclare("贝尔摩德", false, false, false, null);// 监听 true自动反馈channel.basicConsume("贝尔摩德", true, new ListenerConsumer(channel));}catch (Exception e){e.printStackTrace();}}}

写法二:使用匿名内部类

修改test01中的Consumer01

package com.hsh.test01;import com.hsh.utils.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer01 {public static void main(String[] args) {System.out.println("消费者启动...");try {// 获得连接Connection connection = ConnectionUtils.getConnection();// 创建通道Channel channel = connection.createChannel();// 连接队列channel.queueDeclare("贝尔摩德", false, false, false, null);// 监听 true自动反馈// 使用匿名内部类DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("消费者1:" + message);}};// 监听 true自动反馈channel.basicConsume("贝尔摩德", true, defaultConsumer);}catch (Exception e){e.printStackTrace();}}}

两种写法的测试

我们先去删除队列中存入的值
在这里插入图片描述
先开消费者,在开启生产者
在这里插入图片描述

两种方法对比

第一种写法简单,不需要新建一个类,我比较喜欢第一种。

Work模式

一个生产者对应多个消费者,但是只能有一个消费者获得消息!!!
在这里插入图片描述
下面我们的生产者for循环打印20条消息,然后消费者进行使用。

创建生产者

test02中创建Producer02

package com.hsh.test02;import com.hsh.utils.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Producer02 {public static void main(String[] args) {System.out.println("生产者启动...");// 获得连接Connection connection = ConnectionUtils.getConnection();try {// 创建通道Channel channel = connection.createChannel();// 创建队列声明channel.queueDeclare("毛利兰", false, false, false, null);for (int i = 0; i < 20; i++){// 定义发送消息的数据String message = "我并不讨厌等待,因为等待得越久,见到他的时候,我就越开心" + i;// 发送消息channel.basicPublish("", "毛利兰", null, message.getBytes());System.out.println("生产者发送消息:" + message);// 休眠Thread.sleep(200);}channel.close();connection.close();}catch (Exception e){e.printStackTrace();}}
}

创建消费者

创建消费者01

package com.hsh.test02;import com.hsh.utils.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer01 {public static void main(String[] args) {System.out.println("消费者启动...");try {// 获得连接Connection connection = ConnectionUtils.getConnection();// 创建通道Channel channel = connection.createChannel();// 连接队列channel.queueDeclare("毛利兰", false, false, false, null);// 监听 true自动反馈DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("消费者1:" + message);}};// channel.basicConsume("毛利兰", true, defaultConsumer);}catch (Exception e){e.printStackTrace();}}
}

创建消费者02

package com.hsh.test02;import com.hsh.utils.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer02 {public static void main(String[] args) {System.out.println("消费者启动...");try {// 获得连接Connection connection = ConnectionUtils.getConnection();// 创建通道Channel channel = connection.createChannel();// 连接队列channel.queueDeclare("毛利兰", false, false, false, null);// 监听 true自动反馈DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("消费者1:" + message);}};channel.basicConsume("毛利兰", true, defaultConsumer);}catch (Exception e){e.printStackTrace();}}
}

测试

先运行消费者,再运行生产者。

在这里插入图片描述

1.消费者1与消费者2处理的数据条数一样。
2.消费者1偶数
3.消费者2奇数
这种方式叫轮询分发(Round-robin)。

公平分发和手动/自动反馈

公平分发

在这里插入图片描述
现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而RabbitMQ则是不了解这些的。这是因为当消息进入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将消息发给轮询指定的消费者。

我们可以给两个消费者分别加个延迟,模拟性能好的执行多一点,性能不好的执行少一点

修改消费者01

我们需要在消费者01加上如下

try {Thread.sleep(100);
} catch (Exception e) {e.printStackTrace();
}

代码演示

package com.hsh.test02;import com.hsh.utils.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer01 {public static void main(String[] args) {System.out.println("消费者启动...");try {// 获得连接Connection connection = ConnectionUtils.getConnection();// 创建通道Channel channel = connection.createChannel();// 连接队列channel.queueDeclare("毛利兰", false, false, false, null);// 监听 true自动反馈DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(100);} catch (Exception e) {e.printStackTrace();}String message = new String(body, "UTF-8");System.out.println("消费者1:" + message);}};// 监听 true自动反馈channel.basicConsume("毛利兰", true, defaultConsumer);}catch (Exception e){e.printStackTrace();}}
}

修改消费者02

我们需要在消费者02加上如下

try {Thread.sleep(500);
} catch (Exception e) {e.printStackTrace();
}

代码演示

package com.hsh.test02;import com.hsh.utils.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer02 {public static void main(String[] args) {System.out.println("消费者启动...");try {// 获得连接Connection connection = ConnectionUtils.getConnection();// 创建通道Channel channel = connection.createChannel();// 连接队列channel.queueDeclare("毛利兰", false, false, false, null);// 监听 true自动反馈DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(500);} catch (Exception e) {e.printStackTrace();}String message = new String(body, "UTF-8");System.out.println("消费者1:" + message);}};channel.basicConsume("毛利兰", true, defaultConsumer);}catch (Exception e){e.printStackTrace();}}
}

运行结果有问题

在这里插入图片描述
并没预期效果,这是因为没有设置手动反馈和限制接收消息个数。

手动/自动反馈

但是我们演示上面的按性能好坏出来不同量的代码编写。我们需要让程序手动反馈否则看不出效果。那么什么是手动反馈,什么是自动反馈。就是我们之前写过的代码中的channel.basicConsume("毛利兰", true, defaultConsumer);

package com.hsh.test02;public class Consumer01 {public static void main(String[] args) {// ....try {//...DefaultConsumer defaultConsumer = new DefaultConsumer(channel){//....};// 这里true就是自动反馈channel.basicConsume("毛利兰", true, defaultConsumer);}catch (Exception e){}}
}

true是自动反馈(NACK),false是手动反馈(ACK)
我举个例子说明自动反馈
比如A是消费者01,B是消费者02,。

自动反馈

自动反馈是接收很多消息后再去进行处理。此时A,B就会争先恐后的拿数据,就会出现对半接收的情况。就不会出现性能好的执行多一点,性能不好的执行少一点的情况。也就是B很菜,但是一直拿,不管我能不能跑的完,很贪心,B拿多了,它还处理的慢。典型的贪多嚼不烂。

手动反馈

只有当我处理完当前的消息后才去拿新的消息。也就是能者多劳。此时就能出现性能好的执行多一点,性能不好的执行少一点的情况。除此之外我们还需要设置每次只允许通道中只有一条消息。

手动/自动反馈代码实现

修改消费者

我们需要在消费者加上channel.basicQos(1);

/*同一时刻服务器只会发一条消息给消费者限制发送给消费者不得超过一条消息*/
channel.basicQos(1);// 接收消息后返回处理完毕
try {String message = new String(body, "UTF-8");System.out.println(envelope.getDeliveryTag()+"消费者1:" + message);
} catch (Exception e) {e.printStackTrace();
} finally {// 手动反馈// 第一个参数:envelope.getDeliveryTag() 当前消息的编号 我在上面的输出打印了可以看看// 第二个参数:false单挑消息应答,true批量应答channel.basicAck(envelope.getDeliveryTag(), false);
}// 改为手动反馈
channel.basicConsume("毛利兰", false, defaultConsumer);

消费者01代码演示

package com.hsh.test02;import com.hsh.utils.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;
import java.io.UnsupportedEncodingException;public class Consumer01 {public static void main(String[] args) {System.out.println("消费者启动...");try {// 获得连接Connection connection = ConnectionUtils.getConnection();// 创建通道Channel channel = connection.createChannel();channel.basicQos(1);// 连接队列channel.queueDeclare("毛利兰", false, false, false, null);// 监听 true自动反馈DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(100);} catch (Exception e) {e.printStackTrace();}// 这里需要包起来,无论报不保存,都需要执行 channel.basicAck(envelope.getDeliveryTag(), false);try {String message = new String(body, "UTF-8");System.out.println(envelope.getDeliveryTag()+"消费者1:" + message);} catch (Exception e) {e.printStackTrace();} finally {// 手动反馈// 第一个参数:envelope.getDeliveryTag() 当前消息的编号 我在上面的输出打印了可以看看// 第二个参数:false单挑消息应答,true批量应答channel.basicAck(envelope.getDeliveryTag(), false);}}};channel.basicConsume("毛利兰", false, defaultConsumer);}catch (Exception e){e.printStackTrace();}}
}

消费者02代码演示

package com.hsh.test02;import com.hsh.utils.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer02 {public static void main(String[] args) {System.out.println("消费者启动...");try {// 获得连接Connection connection = ConnectionUtils.getConnection();// 创建通道Channel channel = connection.createChannel();channel.basicQos(1);// 连接队列channel.queueDeclare("毛利兰", false, false, false, null);// 监听 true自动反馈DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(500);} catch (Exception e) {e.printStackTrace();}// 这里需要包起来,无论报不保存,都需要执行 channel.basicAck(envelope.getDeliveryTag(), false);try {String message = new String(body, "UTF-8");System.out.println(envelope.getDeliveryTag()+"消费者2:" + message);} catch (Exception e) {e.printStackTrace();} finally {// 手动反馈// 第一个参数:envelope.getDeliveryTag() 当前消息的编号 我在上面的输出打印了可以看看// 第二个参数:false单挑消息应答,true批量应答channel.basicAck(envelope.getDeliveryTag(), false);}}};channel.basicConsume("毛利兰", false, defaultConsumer);}catch (Exception e){e.printStackTrace();}}
}

修改生产者

同样的我们也需要在生产者加上channel.basicQos(1);

/*同一时刻服务器只会发一条消息给消费者限制发送给消费者不得超过一条消息*/
channel.basicQos(1);

代码演示

package com.hsh.test02;import com.hsh.utils.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Producer02 {public static void main(String[] args) {System.out.println("生产者启动...");// 获得连接Connection connection = ConnectionUtils.getConnection();try {// 创建通道Channel channel = connection.createChannel();// 保证一次只分发一个channel.basicQos(1);// 创建队列声明channel.queueDeclare("毛利兰", false, false, false, null);for (int i = 0; i < 20; i++){// 定义发送消息的数据String message = "我并不讨厌等待,因为等待得越久,见到他的时候,我就越开心" + i;// 发送消息channel.basicPublish("", "毛利兰", null, message.getBytes());System.out.println("生产者发送消息:" + message);// 休眠Thread.sleep(200);}channel.close();connection.close();}catch (Exception e){e.printStackTrace();}}
}

运行结果

在这里插入图片描述

手动反馈处理失败

讲解

上面如果我们的手动反馈失败了可使用下面的方法。

// 拒绝消息
// 参数1: 消息的编号
// 参数2:表示是否进行批量操作 默认false
// 参数3:被拒绝的消息是否重新入队 
//        当设置为 true时,RabbitMQ 会将被拒绝的消息重新放回原始队列的尾部,以便可以再次被消费
//        当设置为 false时,RabbitMQ 会将消息从队列中删除,不会重新入队
channel.basicNack(envelope.getDeliveryTag(),false,true);

代码演示

我们给消费者02一个模拟异常,看看执行情况。

package com.hsh.test02;import com.hsh.utils.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer02 {public static void main(String[] args) {System.out.println("消费者启动...");try {// 获得连接Connection connection = ConnectionUtils.getConnection();// 创建通道Channel channel = connection.createChannel();channel.basicQos(1);// 连接队列channel.queueDeclare("毛利兰", false, false, false, null);// 监听 true自动反馈DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(500);} catch (Exception e) {e.printStackTrace();}String message ="";// 这里需要包起来,无论报不保存,都需要执行 channel.basicAck(envelope.getDeliveryTag(), false);try {message = new String(body, "UTF-8");// 模拟异常if (envelope.getDeliveryTag()==2){int i = 1/0;}System.out.println(envelope.getDeliveryTag()+"消费者2:" + message);} catch (Exception e) {//  e.printStackTrace();System.out.println("第"+envelope.getDeliveryTag()+"条处理失败,放回队列,内容是:"+message);// 拒绝消息// 参数1: 消息的编号// 参数2:表示是否进行批量操作 默认false// 参数3:被拒绝的消息是否重新入队//        当设置为 true时,RabbitMQ 会将被拒绝的消息重新放回原始队列的尾部,以便可以再次被消费//        当设置为 false时,RabbitMQ 会将消息从队列中删除,不会重新入队channel.basicNack(envelope.getDeliveryTag(),false,true);} finally {// 手动反馈// 第一个参数:envelope.getDeliveryTag() 当前消息的编号 我在上面的输出打印了可以看看// 第二个参数:false单挑消息应答,true批量应答channel.basicAck(envelope.getDeliveryTag(), false);}}};channel.basicConsume("毛利兰", false, defaultConsumer);}catch (Exception e){e.printStackTrace();}}
}

运行结果

在这里插入图片描述

消息应答和持久化

在这里插入图片描述

消息应答

这个相当于在消费者中保证消息不丢失,这个就是上面的手动反馈和自动反馈。这里对上面的手动和自动反馈进行进一步讲解。

ch.basicConsume(QUEUE_NAME,true,consumer);

True:自动确认
只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都认为是消息已经成功消费。一旦rabbitmq将消息分发给消费者,就会从内存中删除。(会丢失数据消息)

False:手动确认
消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。如果有一个消费者挂掉,就会交付给其他消费者。
手动告诉rabbitmq消息处理完成后,rabbitmq删除内存中的消息。
反馈:

//手动回馈
ch.basicAck(envelope.getDeliveryTag(),false);

使用Nack让消息回到队列中

//  处理条数    是否批量处理   是否放回队列   false丢弃
ch.basicNack(envelope.getDeliveryTag(),false,true);

如果rabbitmq挂了,我们的消息任然会丢失!那么就需要对RabbitMQ做消息持久化。

RabbitMQ消息持久化

持久化步骤

这个也是上面的简单队列和Work模式都用过了。只不过当时是设置非持久化。
RabbitMQ消息持久化分为两步

  1. 声明队列持久化
    //声明队列
    boolean b = false;
    channel.queueDeclare(QUEUE_NAME,b,false,false,null);
    
    我们直接将程序中的b=false;改为true是不可以的,因为QUEUE_NAME 已经存,在rabbitmq是不允许重新定义一个已存在的队列。
  2. 设置持久化指令:
    //设置生成者发送消息为持久化信息(要求保存到硬盘上)保存在内存中
    //MessageProperties.PERSISTENT_TEXT_PLAIN,指令完成持久化
    ch.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
    

代码演示

修改生产者

package com.hsh.test02;import com.hsh.utils.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;public class Producer02 {public static void main(String[] args) {System.out.println("生产者启动...");Connection connection = ConnectionUtils.getConnection();try {Channel channel = connection.createChannel();channel.basicQos(1);// 创建队列声明  参数二是否持久化 我们改为truechannel.queueDeclare("毛利兰", true, false, false, null);for (int i = 0; i < 20; i++){String message = "我并不讨厌等待,因为等待得越久,见到他的时候,我就越开心" + i;// 发送消息 第三个参数表示是否持久化消息// MessageProperties.PERSISTENT_TEXT_PLAIN是持久化消息channel.basicPublish("","毛利兰",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());System.out.println("生产者发送消息:" + message);Thread.sleep(200);}channel.close();connection.close();}catch (Exception e){e.printStackTrace();}}
}

运行结果

需要到queues中删除队列后再创建。
然后运行生产者
重启RabbitMQ看看消息是否消失。如果不消失说明成功了。
在这里插入图片描述

订阅模式

一个生产者将消息首先发送到交换器,交换器绑定多个队列,然后被监听该队列的消费者所接收并消费。

在这里插入图片描述

一个生产者多个消费者,每个消费者都有自己的队列,生产者没有吧消息直接发送到队列,而是发送到了交换机,每个队列都绑定到交换机,生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的。
也就是王者发的公告每个玩家都能收到信息。

生产者

语法

// 创建交换机
//   参数一: 交换机名称
//   参数二: 交换机类型(这个后面讲不着急)
channel.exchangeDeclare("工藤新一", "fanout");// 发送消息
//   参数一: 交换机名称
//   参数二: 队列名称(在简单队列和Work模式已经演示过了)
//   参数三: 消息的持久化
//   参数四: 要发送的消息
channel.basicPublish("工藤新一", "", null, msg.getBytes());

在test03中新建文件Producer01

package com.hsh.test03;import com.hsh.utils.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Producer01 {public static void main(String[] args) {System.out.println("生产者启动...");// 获得连接Connection connection = ConnectionUtils.getConnection();try {// 创建通道Channel channel = connection.createChannel();// 创建交换机//   参数一: 交换机名称//   参数二: 交换机类型(这个后面讲不着急)channel.exchangeDeclare("工藤新一", "fanout");// 定义要发送的消息String msg = "也许杀人需要理由,然而救人是不需要什么理由的";// 发送消息//   参数一: 交换机名称//   参数二: 队列名称(在简单队列和Work模式已经演示过了)//   参数三: 消息的持久化//   参数四: 要发送的消息channel.basicPublish("工藤新一", "", null, msg.getBytes());System.out.println("生产者发送消息:" + msg);channel.close();connection.close();}catch (Exception e){e.printStackTrace();}}
}

在这里插入图片描述
这里我们需要在消费者中绑定交换机才能使用。

消费者

绑定交换机

// 绑定交换机
//   参数一:队列名称
//   参数二:交换机名称
channel.queueBind("贝尔摩德", "酒厂", "");

下面进行演示

消费者01

package com.hsh.test03;import com.hsh.utils.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;/*** @author xrkhy* @date 2025/9/23 9:44* @description*/
public class Consumer01 {public static void main(String[] args) {System.out.println("消费者启动...");try {// 获得连接Connection connection = ConnectionUtils.getConnection();// 创建通道Channel channel = connection.createChannel();// 连接队列channel.queueDeclare("贝尔摩德", false, false, false, null);// 绑定交换机//   参数一:队列名称//   参数二:交换机名称channel.queueBind("贝尔摩德", "酒厂", "");// 监听 true自动反馈DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("消费者1:" + message);}};// 监听channel.basicConsume("贝尔摩德", true, defaultConsumer);}catch (Exception e){e.printStackTrace();}}
}

消费者02

package com.hsh.test03;import com.hsh.utils.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;/*** @author xrkhy* @date 2025/9/23 9:44* @description*/
public class Comsumer02 {public static void main(String[] args) {System.out.println("消费者启动...");try {// 获得连接Connection connection = ConnectionUtils.getConnection();// 创建通道Channel channel = connection.createChannel();// 连接队列channel.queueDeclare("琴酒", false, false, false, null);// 绑定交换机//   参数一:队列名称//   参数二:交换机名称channel.queueBind("琴酒", "酒厂", "");// 监听 true自动反馈DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("消费者2:" + message);}};// 监听channel.basicConsume("琴酒", true, defaultConsumer);}catch (Exception e){e.printStackTrace();}}
}

运行结果

这里先去启动生产者,再去启动消费者,最后再去启动生产者。因为消费者绑定了交换机,但是此时没有运行生产者,也就没有交换机,消费者会报错。最后再次启动生产者是为了发消息。
在这里插入图片描述
打开可视化界面的交换机
在这里插入图片描述
点击交换机看看
在这里插入图片描述

路由模式

在学习之前我们先来知道什么是路由。
如下图
我们的生产者生产了消息,类型属于情感生活。
消费者c1只接收情感生活,游戏娱乐这两种类型。
消费者c2只接收情感生活这一种类型。
消费者c3只接收情感生活,游戏娱乐,体育竞技这三种类型。

我们中间的圆就是路由器,他负责把生产者发送的消息按类型发给不同的消费者。
这里以感情生活类型为例那么就是发给消费者查c1,消费者c2,消费者c3。
如果生产的是体育竞技类型那么就只发给消费者c3。
在这里插入图片描述
下面是运行流程。 Direct:处理路由键
在这里插入图片描述
在这里插入图片描述

生产者

语法

// 创建交换机
//   参数一: 交换机名称
//   参数二: 处理路由键
channel.exchangeDeclare("exchange", "direct");// 发送消息
//   参数一: 交换机名称
//   参数二: 队列名称(在简单队列和Work模式已经演示过了)/路由键
//   参数三: 消息的持久化
//   参数四: 要发送的消息
channel.basicPublish("exchange", "info", null, msg.getBytes());

在test04中写

package com.hsh.test04;import com.hsh.utils.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Producer01 {public static void main(String[] args) {System.out.println("生产者启动...");// 获得连接Connection connection = ConnectionUtils.getConnection();try {// 创建通道Channel channel = connection.createChannel();// 创建交换机//   参数一: 交换机名称//   参数二: 处理路由键channel.exchangeDeclare("exchange", "direct");// 定义要发送的消息String msg = "我是info消息";// 发送消息//   参数一: 交换机名称//   参数二: 队列名称(在简单队列和Work模式已经演示过了)/路由键//   参数三: 消息的持久化//   参数四: 要发送的消息channel.basicPublish("exchange", "info", null, msg.getBytes());System.out.println("生产者发送消息:" + msg);channel.close();connection.close();}catch (Exception e){e.printStackTrace();}}
}

消费者

语法

// 绑定交换机
//   参数一:队列名称
//   参数二:交换机名称
//   参数三:路由key
channel.queueBind("allMessage", "exchange", "error");
channel.queueBind("allMessage", "exchange", "info");
channel.queueBind("allMessage", "exchange", "warning");

消费者01

package com.hsh.test04;import com.hsh.utils.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;public class Consumer01 {public static void main(String[] args) {System.out.println("消费者启动...");try {// 获得连接Connection connection = ConnectionUtils.getConnection();// 创建通道Channel channel = connection.createChannel();// 连接队列channel.queueDeclare("allMessage", false, false, false, null);// 绑定交换机//   参数一:队列名称//   参数二:交换机名称//   参数三:路由keychannel.queueBind("allMessage", "exchange", "error");channel.queueBind("allMessage", "exchange", "info");channel.queueBind("allMessage", "exchange", "warning");// 监听 true自动反馈DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("消费者1:" + message);}};// 监听channel.basicConsume("allMessage", true, defaultConsumer);}catch (Exception e){e.printStackTrace();}}
}

消费者02

package com.hsh.test04;import com.hsh.utils.ConnectionUtils;
import com.rabbitmq.client.*;import java.io.IOException;public class Comsumer02 {public static void main(String[] args) {System.out.println("消费者启动...");try {// 获得连接Connection connection = ConnectionUtils.getConnection();// 创建通道Channel channel = connection.createChannel();// 连接队列channel.queueDeclare("errorMessage", false, false, false, null);// 绑定交换机//   参数一:队列名称//   参数二:交换机名称//   参数三:路由keychannel.queueBind("errorMessage", "exchange", "error");// 监听 true自动反馈DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("消费者2:" + message);}};// 监听channel.basicConsume("errorMessage", true, defaultConsumer);}catch (Exception e){e.printStackTrace();}}
}

运行结果

这里还是先去启动生产者,再去启动消费者,最后再去启动生产者
在这里插入图片描述
修改为error,因为两个都有error所以都能访问。
在这里插入图片描述

未完待续…

http://www.dtcms.com/a/398187.html

相关文章:

  • 基于Java后端与Vue前端的MES生产管理系统,涵盖生产调度、资源管控及数据分析,提供全流程可视化支持,包含完整可运行源码,助力企业提升生产效率与管理水平
  • 阿里云ACP云计算和大模型考哪个?
  • RabbitMQ C API 实现 RPC 通信实例
  • Ingress原理:七层流量的路由管家
  • 代理网站推荐做网站公司是干什么的
  • 个人建设门户网站 如何备案网址域名注册信息查询
  • React 19 vs React 18全面对比,掌握最新前端技术趋势
  • 链改2.0倡导者朱幼平:内地RWA代币化是违规的,但RWA数资化是可信可行的!
  • iOS 混淆后崩溃分析与符号化实战,映射表管理、自动化符号化与应急排查流程
  • 【JavaSE】【网络原理】网络层、数据链路层简单介绍
  • PyTorch 神经网络工具箱核心内容
  • Git高效开发:企业级实战指南
  • 外贸营销型网站策划中seo层面包括影楼网站推广
  • ZooKeeper详解
  • RabbitMQ如何构建集群?
  • 【星海随笔】RabbitMQ开发篇
  • 深入理解 RabbitMQ:消息处理全流程与核心能力解析
  • docker安装canal-server(v.1.1.8)【mysql->rabbitMQ】
  • 学习嵌入式的第四十天——ARM
  • 佛山营销网站建设公司益阳市城乡和住房建设部网站
  • Linux磁盘数据挂载以及迁移
  • 【图像算法 - 28】基于YOLO与PyQt5的多路智能目标检测系统设计与实现
  • Android音视频编解码全流程之Muxer
  • 一家做土产网站呼和浩特网站建设信息
  • Android Studio - Android Studio 检查特定资源被引用的情况
  • 借助Aspose.HTML控件,使用 Python 编程创建 HTML 页面
  • 营销型网站建设运营网站建设yuanmus
  • Day67 基本情报技术者 单词表02 编程基础
  • 《Java操作Redis教程:以及序列化概念和实现》
  • 欧拉公式与拉普拉斯变换的关系探讨与深入理解