当前位置: 首页 > news >正文

零基础学习RabbitMQ(4)--RabbitMQ快速入门

我们先创建一个Maven项目引入对应的依赖:


<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version>
</dependency>

1. 创建生产者

1.1 建立连接 
//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();//设置ipconnectionFactory.setHost("162.14.99.141");//设置端口号,默认是5672connectionFactory.setPort(5672);//设置账户密码connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");//设置要使用的虚拟机connectionFactory.setVirtualHost("ty");//获取连接Connection connection = connectionFactory.newConnection();
1.2 创建Channel
//2.创建ChannelChannel channel = connection.createChannel();
1.3 声明交换机

这里我们直接使用内置的交换击就无需声明

1.4 声明队列

        //4.声明队列/*** queueDeclare(String queue, boolean durable, boolean exclusive,* boolean autoDelete, Map<String, Object> arguments) throws IOException;* 参数说明* queue:队列名称* durable: 是否可持久化,持久化的队列会存储在磁盘,服务器重启后消息不丢失* exclusive:是否独占,只能有一个消费者监听队列,* autoDelete:是否自动删除,当没有Consumer时自动删除* arguments:其它的参数,如最大值,有效期等等* */channel.queueDeclare("queue1", true, false, false, null);

如果queue1不存在则会创建该队列

1.5 发送消息
        //5.发送消息/*** void basicPublish(String exchange, String routingKey, boolean mandatory,*      boolean immediate, BasicProperties props, byte[] body)* 参数说明* exchange:交换机名称* routingKey:路由名称,routingKey = 队列名称* props:配置信息* body:发送消息的数据*/String msg = "hello world";//这里的""代表内置交换机,使用内置交换机时,routingKey要和队列名称一样才可以路由到对应的队列channel.basicPublish("", "queue1", null, msg.getBytes());

 

这里的""代表使用的这个默认的交换机 

1.6 资源释放 
        //6.资源释放channel.close();connection.close();

整体代码:


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();//设置ipconnectionFactory.setHost("162.14.99.141");//设置端口号,默认是5672connectionFactory.setPort(5672);//设置账户密码connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");//设置要使用的虚拟机connectionFactory.setVirtualHost("ty");//获取连接Connection connection = connectionFactory.newConnection();//2.创建ChannelChannel channel = connection.createChannel();//3.声明交换机,这里使用内置的交换机//4.声明队列/*** queueDeclare(String queue, boolean durable, boolean exclusive,* boolean autoDelete, Map<String, Object> arguments) throws IOException;* 参数说明* queue:队列名称* durable: 是否可持久化,持久化的队列会存储在磁盘,服务器重启后消息不丢失* exclusive:是否独占,只能有一个消费者监听队列,* autoDelete:是否自动删除,当没有Consumer时自动删除* arguments:其它的参数,如最大值,有效期等等* */channel.queueDeclare("queue1", true, false, false, null);//5.发送消息/*** void basicPublish(String exchange, String routingKey, boolean mandatory,*      boolean immediate, BasicProperties props, byte[] body)* 参数说明* exchange:交换机名称* routingKey:路由名称,routingKey = 队列名称* props:配置信息* body:发送消息的数据*/String msg = "hello world";//这里的""代表内置交换机,使用内置交换机时,routingKey要和队列名称一样才可以路由到对应的队列channel.basicPublish("", "queue1", null, msg.getBytes());//6.资源释放channel.close();connection.close();}
}

我们运行代码后来到管理界面:

可以看到队列被创建成功了,并且有一条消息,由于我们在程序末尾释放了连接所有这里是没有Connections和Channels的。我们如果把释放资源的代码注释掉,再次运行:

 

 

2. 创建消费者

import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {//1. 创建连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("162.14.99.141");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("ty");Connection connection = connectionFactory.newConnection();//2. 创建ChannelChannel channel = connection.createChannel();//3. 声明队列channel.queueDeclare("queue1", true, false, false, null);//4. 消费信息DefaultConsumer consumer = new DefaultConsumer(channel) {/*** void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)* 定义如何处理接收到的消息,接收到消息时会自动 调用该方法* 参数说明* consumerTag: 消费者标签,* properties: 配置信息* body: 消息的具体内容*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到:" + new String(body));}};//Consumer 用于定义消费者的行为,从RabbitMQ接收消息时,需要提供一个实现了Consumer接口的对象//DefaultConsumer是RabbitMQ提供的一个默认消费者,实现了Consumer接口/*** String basicConsume(String queue, boolean autoAck, Consumer callback);* 参数说明* queue: 消费的队列的名称* autoAck: 是否要自动确认* callback: 接收到消息后的执行逻辑*/channel.basicConsume("queue1", true, consumer);//5. 释放资源channel.close();connection.close();}
}

 

注意:调用basicConsume()方法后,消费者会不断从队列中获取数据,这个操作是由另一个线程执行的,和主线程是异步的,直到连接被释放,这里从调用basicConsume到连接关闭中间消费的数据数量和打印出来的日志不一定是吻合的,有的消息的日志可能还没有打印出来,主线程就关闭了

这里我们在队列中添加一万条消息再调用消费者来演示:

消费者运行结果:

可以看到只打印了一条数据,但是我们通过可视化界面观察队列:

 

发现其中还有9050条数据。

我们可以把关闭连接的代码注释再次运行:

可以看到成功打印出了所有的日志 

相关文章:

  • Rust 和C++工业机器人实践
  • 当SAM遇到声纳图像时之论文阅读
  • TreeMap源码分析 红黑树
  • mac系统快捷键及命令安装
  • LSNet: 基于侧向抑制的神经网络
  • 预测性 SRE 与自动化修复
  • fvcom 网格文件grd制作
  • yolov11安装,训练模型,tensorrtx加速,Qt预测图像
  • mac触摸板设置右键
  • python pyecharts 数据分析及可视化(2)
  • 八股文——JAVA基础:hashCode()方法的作用与意义以及与equals方法的联动
  • 通过阿里云部署n8n工作流自动备份GitHub
  • Gartner《Everything Technical Professionals Need to KnowAbout DA Strategy》学习心得
  • RuoYi-Vue学习环境搭建
  • docker compose基本使用以及示例
  • 云端可视化耦合电磁场:麦克斯韦方程组的应用-AI云计算数值分析和代码验证
  • 学习使用Visual Studio分析.net内存转储文件的基本用法
  • MybatisPlus-03.快速入门-常用注解
  • 横向移动01
  • leetcode437-路径总和III