RabbitMQ 声明队列和交换机详解
RabbitMQ 声明队列和交换机详解
一、为什么需要声明队列和交换机?
RabbitMQ
是先声明再使用的机制:
- 队列负责存储消息
- 交换机负责路由消息
- 绑定关系决定消息从交换机到队列的路径
如果没有事先声明:
- 队列/交换机不存在时,发送消息会失败
- 队列没有绑定到交换机时,消息会丢失(除非设置了备用交换机)
二、声明交换机(Exchange)
2.1 交换机参数说明
RabbitMQ 提供四种类型(direct、fanout、topic、headers),声明时需要指定以下参数:
参数 | 类型 | 说明 |
---|---|---|
name | String | 交换机名称(不为空字符串) |
type | String | 类型:direct 、fanout 、topic 、headers |
durable | boolean | 是否持久化(重启 RabbitMQ 后仍存在) |
autoDelete | boolean | 是否自动删除(最后一个队列解绑后删除) |
arguments | Map | 额外参数(如 TTL、死信交换机配置) |
2.2 Java 原生声明交换机
channel.exchangeDeclare("my.direct.exchange", // 交换机名称BuiltinExchangeType.DIRECT, // 类型true, // durablefalse, // autoDeletenull // arguments
);
2.3 Spring AMQP 声明交换机
@Bean
public DirectExchange directExchange() {return new DirectExchange("my.direct.exchange", true, false);
}
Spring 会自动在应用启动时向 RabbitMQ 发送声明请求。
三、声明队列(Queue)
3.1 队列参数说明
参数 | 类型 | 说明 |
---|---|---|
name | String | 队列名称(匿名队列可由 RabbitMQ 自动生成) |
durable | boolean | 是否持久化(消息是否持久化取决于发送时的 deliveryMode ) |
exclusive | boolean | 是否排他队列(仅连接可见,断开即删除) |
autoDelete | boolean | 是否自动删除(最后一个消费者断开时删除) |
arguments | Map | 额外参数(TTL、死信队列、最大长度等) |
3.2 Java 原生声明队列
channel.queueDeclare("my.queue", // 队列名称true, // durablefalse, // exclusivefalse, // autoDeletenull // arguments
);
3.3 Spring AMQP 声明队列
@Bean
public Queue myQueue() {return new Queue("my.queue", true, false, false);
}
四、绑定交换机和队列
4.1 Java 原生绑定
channel.queueBind("my.queue", // 队列名称"my.direct.exchange", // 交换机名称"order.create" // routingKey
);
4.2 Spring AMQP 绑定
@Bean
public Binding binding() {return BindingBuilder.bind(myQueue()).to(directExchange()).with("order.create");
}
五、实战示例
5.2 fanout示例
package com.itheima.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {/*** 声明交换机* @return Fanout类型交换机*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("hmall.fanout");}/*** 第1个队列*/@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2个队列*/@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
5.3 direct示例
package com.itheima.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DirectConfig {/*** 声明交换机* @return Direct类型交换机*/@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange("hmall.direct").build();}/*** 第1个队列*/@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}/*** 第2个队列*/@Beanpublic Queue directQueue2(){return new Queue("direct.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}
}
六、基于注解声明
- Direct模式
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
- Topic模式
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),key = "china.#"
))
public void listenTopicQueue1(String msg){System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),key = "#.news"
))
public void listenTopicQueue2(String msg){System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}