当前位置: 首页 > news >正文

RabbitMQ 工作模式(上)

前言

在 RabbitMQ 中,一共有七种工作模式,我们也可以打开官网了解:
在这里插入图片描述

本章我们先介绍前三种工作模式

(Simple)简单模式

在这里插入图片描述

P:producer 生产者,负责发送消息
C:consumer 消费者,接收消息
Queue: 消息队列

简单模式的特点:一个生产者P,一个消费者C,消息只能被消费一次,也被称为点对点【Point-to-Point】模式

案例:上一篇文章中的快速入门RabbitMQ 就是简单模式的演示,大家可以去看一下代码的实现

Work Queue(工作队列)

在这里插入图片描述
特点:多个消费者共同消费同一条队列的消息,消息不重复
假设队列有 10 条消息,C1 消费了 4 条,C2 消费了 6 条,这就是共同消费。

生产者:

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//进行绑定ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);//建立连接Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//这里使用默认的交换机//声明队列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//发送消息for (int i = 0; i < 10; i++) {String msg = "hello work queue...."+i;channel.basicPublish("",Constants.WORK_QUEUE, null, msg.getBytes());}System.out.println("发送消息成功");//资源释放channel.close();connection.close();}
}

消费者:

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//进行参数绑定ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);//建立连接Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//使用默认的交换机//声明队列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//消息消费DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到的消息:" + new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);}
}
public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//进行参数绑定ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);//建立连接Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//使用默认的交换机//声明队列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//消息消费DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到的消息:" + new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);}
}

交换机概念

这里解释一下为什么上面两种工作模式流程图没有交换机的存在,因为交换机在上面两种工作模式中不起重要作用,为了简化,所以省略了交换机,实际在 RabbitMQ 中 生产者发送的消息是需要通过交换机将消息发送到对应的队列中。

在RabbitMQ 中,一共有四种类型的交换机,分别是 Fanout、Direct、Topic、Headers,不同类型的交换机有着不同的路由策略。
在 AMQP 协议中还有额外的两种类型:System 和 自定义,在RabbitMQ 中我们就不介绍这额外的两种

Fanout:广播模式,将小心发送给所有与该交换机绑定的队列中,对应下面即将讲解的 Publish / Subscribe (发布 / 订阅) 工作模式

Direct:定向,将消息发送给指定的 routing key 的队列中,对应 Routing 模式

Topic:通配符,将消息交给符合指定的 routing pattern (路由模式)的队列

Headers : 该交换机不依赖路由键的批匹配规则来路由消息,而是根据发送的消息内容中的headers 属性进行匹配,headers 类型的交换机性能很差,很少很在工作中遇到。


这里解释一下 routing key 和 binding key 的概念:

在这里插入图片描述

生产者和交换机的联系使用的是 Routing Key,交换机和队列的联系使用的是 Binding Key

在后续的代码中 我们也会将 Binding Key 当作 Routing Key

Publish / Subscribe (发布 / 订阅)

在这里插入图片描述

交换机将消息发送到不同的队列中,类似广播的作用,所有和该交换机绑定的队列都会收到一样的消息

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//设置 MQ 参数ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);//建立连接Connection connection = factory.newConnection();// 开启信道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare("publish", BuiltinExchangeType.FANOUT, true, false, null);//声明队列channel.queueDeclare("fanout1", true, false, false, null);channel.queueDeclare("fanout2", true, false, false, null);//绑定队列和交换机channel.queueBind("fanout1","publish","");channel.queueBind("fanout2","publish","");//发送消息for (int i = 0; i < 10; i++) {channel.basicPublish("publish", "", null, ("hello" + i).getBytes());}//关闭资源channel.close();connection.close();}
}

方法参数介绍:

交换机声明:

Exchange.DeclareOk exchangeDeclare(String exchange,
BuiltinExchangeType type,
boolean durable, 
boolean autoDelete,
Map<String, Object> arguments) throws IOException;

exchange:交换机的名称

BuiltinExchangeType type: 交换机的类型,点击 BuiltinExchangeType 可以查看到 这是一个枚举类,一共有四种类型的交换机:DIRECT(“direct”), FANOUT(“fanout”), TOPIC(“topic”), HEADERS(“headers”);

public enum BuiltinExchangeType {DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers");private final String type;BuiltinExchangeType(String type) {this.type = type;}public String getType() {return type;}
}

durable:是否进行持久化

autoDelete: 是否自动删除

arguments:高级特性的设置


交换机与队列的绑定:

Queue.BindOk queueBind(String queue, 
String exchange, 
String routingKey) throws IOException;

queue:队列的名称

exchange: 交换机的名称

routingKey : 交换机和队列绑定的联系词BindingKey,这里写的RoutingKey,本质上是一样的。


消费者代码:

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//设置 MQ 参数ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);//建立连接Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare("fanout1", true, false, false, null);//消费消息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}};channel.basicConsume("fanout1", true, consumer);}
}
public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//设置 MQ 参数ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);//建立连接Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare("fanout2", true, false, false, null);//进行消费Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}};channel.basicConsume("fanout2", true, consumer);}
}

相关文章:

  • LabVIEW汽车CAN总线检测系统开发
  • SpringBoot(一)--- Maven基础
  • [人月神话_6] 另外一面 | 一页流程图 | 没有银弹
  • 游戏引擎学习第292天:实现蛇
  • Java文件读写程序
  • 提示工程 - 系统提示(System Prompts)
  • 健康生活:养生实用指南
  • AM32电调学习解读六:main.c文件的函数介绍
  • 在 Vue 中插入 B 站视频
  • 关于 Web 漏洞原理与利用:1. SQL 注入(SQLi)
  • 并发编程(4)
  • Python面试总结
  • STK手动建链+matlab联调
  • 【回眸】发财日记(二)
  • 中科院:LLM工具调用框架TUMS
  • C++笔记-红黑树
  • 【图书管理系统】用户注册系统实现详解
  • linux-----------------库制作与原理(下)
  • 《虚拟即真实:数字人驱动技术在React Native社交中的涅槃》
  • Flask快速入门和问答项目源码
  • 减负举措如何助力基层干部轻装上阵?记者一线调查
  • 视频丨歼-10CE首次实战大放异彩
  • 高瓴、景林旗下公司美股持仓揭晓:双双增持中概股
  • 高途一季度净利润同比增长1108%: “与吴彦祖一起学英语”短时间内就实现了盈利
  • 沃尔玛上财季净利下滑12%:关税带来成本压力,新财季价格涨幅将高于去年
  • 广西壮族自治区党委副书记、自治区政府主席蓝天立接受审查调查