零基础学习RabbitMQ(4)--RabbitMQ快速入门
我们先创建一个Maven项目引入对应的依赖:
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version>
</dependency>
1. 创建生产者
1.1 建立连接
//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();//设置ipconnectionFactory.setHost("162.14.99.141");//设置端口号,默认是5672connectionFactory.setPort(5672);//设置账户密码connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");//设置要使用的虚拟机connectionFactory.setVirtualHost("ty");//获取连接Connection connection = connectionFactory.newConnection();
1.2 创建Channel
//2.创建ChannelChannel channel = connection.createChannel();
1.3 声明交换机
这里我们直接使用内置的交换击就无需声明
1.4 声明队列
//4.声明队列/*** queueDeclare(String queue, boolean durable, boolean exclusive,* boolean autoDelete, Map<String, Object> arguments) throws IOException;* 参数说明* queue:队列名称* durable: 是否可持久化,持久化的队列会存储在磁盘,服务器重启后消息不丢失* exclusive:是否独占,只能有一个消费者监听队列,* autoDelete:是否自动删除,当没有Consumer时自动删除* arguments:其它的参数,如最大值,有效期等等* */channel.queueDeclare("queue1", true, false, false, null);
如果queue1不存在则会创建该队列
1.5 发送消息
//5.发送消息/*** void basicPublish(String exchange, String routingKey, boolean mandatory,* boolean immediate, BasicProperties props, byte[] body)* 参数说明* exchange:交换机名称* routingKey:路由名称,routingKey = 队列名称* props:配置信息* body:发送消息的数据*/String msg = "hello world";//这里的""代表内置交换机,使用内置交换机时,routingKey要和队列名称一样才可以路由到对应的队列channel.basicPublish("", "queue1", null, msg.getBytes());
这里的""代表使用的这个默认的交换机
1.6 资源释放
//6.资源释放channel.close();connection.close();
整体代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();//设置ipconnectionFactory.setHost("162.14.99.141");//设置端口号,默认是5672connectionFactory.setPort(5672);//设置账户密码connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");//设置要使用的虚拟机connectionFactory.setVirtualHost("ty");//获取连接Connection connection = connectionFactory.newConnection();//2.创建ChannelChannel channel = connection.createChannel();//3.声明交换机,这里使用内置的交换机//4.声明队列/*** queueDeclare(String queue, boolean durable, boolean exclusive,* boolean autoDelete, Map<String, Object> arguments) throws IOException;* 参数说明* queue:队列名称* durable: 是否可持久化,持久化的队列会存储在磁盘,服务器重启后消息不丢失* exclusive:是否独占,只能有一个消费者监听队列,* autoDelete:是否自动删除,当没有Consumer时自动删除* arguments:其它的参数,如最大值,有效期等等* */channel.queueDeclare("queue1", true, false, false, null);//5.发送消息/*** void basicPublish(String exchange, String routingKey, boolean mandatory,* boolean immediate, BasicProperties props, byte[] body)* 参数说明* exchange:交换机名称* routingKey:路由名称,routingKey = 队列名称* props:配置信息* body:发送消息的数据*/String msg = "hello world";//这里的""代表内置交换机,使用内置交换机时,routingKey要和队列名称一样才可以路由到对应的队列channel.basicPublish("", "queue1", null, msg.getBytes());//6.资源释放channel.close();connection.close();}
}
我们运行代码后来到管理界面:
可以看到队列被创建成功了,并且有一条消息,由于我们在程序末尾释放了连接所有这里是没有Connections和Channels的。我们如果把释放资源的代码注释掉,再次运行:
2. 创建消费者
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {//1. 创建连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("162.14.99.141");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("ty");Connection connection = connectionFactory.newConnection();//2. 创建ChannelChannel channel = connection.createChannel();//3. 声明队列channel.queueDeclare("queue1", true, false, false, null);//4. 消费信息DefaultConsumer consumer = new DefaultConsumer(channel) {/*** void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)* 定义如何处理接收到的消息,接收到消息时会自动 调用该方法* 参数说明* consumerTag: 消费者标签,* properties: 配置信息* body: 消息的具体内容*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到:" + new String(body));}};//Consumer 用于定义消费者的行为,从RabbitMQ接收消息时,需要提供一个实现了Consumer接口的对象//DefaultConsumer是RabbitMQ提供的一个默认消费者,实现了Consumer接口/*** String basicConsume(String queue, boolean autoAck, Consumer callback);* 参数说明* queue: 消费的队列的名称* autoAck: 是否要自动确认* callback: 接收到消息后的执行逻辑*/channel.basicConsume("queue1", true, consumer);//5. 释放资源channel.close();connection.close();}
}
注意:调用basicConsume()方法后,消费者会不断从队列中获取数据,这个操作是由另一个线程执行的,和主线程是异步的,直到连接被释放,这里从调用basicConsume到连接关闭中间消费的数据数量和打印出来的日志不一定是吻合的,有的消息的日志可能还没有打印出来,主线程就关闭了
这里我们在队列中添加一万条消息再调用消费者来演示:
消费者运行结果:
可以看到只打印了一条数据,但是我们通过可视化界面观察队列:
发现其中还有9050条数据。
我们可以把关闭连接的代码注释再次运行:
可以看到成功打印出了所有的日志