rabbitmq学习笔记快速使用
主要是快速了解使用,对于强要求比如说数据安全(也就是spring配置先不要求)
那么开始
引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId></dependency>
jackson依赖我们只要把他的序列化对象初始化给spring管理以后就可以对数据进行序列化和反序列化了,对比于spring的jdk序列化器,jackson更好用,性价比更强,内存占用更小。
//使用的是Jackson库中的Jackson2JsonMessageConverter类,代替使用jdk自带的序列化@Beanpublic MessageConverter jacksonMessageConvertor(){Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();jackson2JsonMessageConverter.setCreateMessageIds(true);//开启消息id的自动生成功能return jackson2JsonMessageConverter;}
然后是进行队列的声明,交换机的声明,以及队列和交换机的绑定的声明
@Configuration
public class RabbitMqConfig {private static String EXCHANGE_NAME="amq.topic";private static String QUEUE_NAME="alarm.data.topic.queue";private static String CONFIRM_ALARM_QUEUE_NAME="alarm.confirm.data.topic.queue";/*** 声明交换机*/@Beanpublic TopicExchange exchange(){// durable:是否持久化,默认是false// autoDelete:是否自动删除,当没有生产者或者消费者使用此交换机,该交换机会自动删除。return new TopicExchange(EXCHANGE_NAME,true,false);}/*** 声明告警队列* @return*/@Bean("alarmQueue")public Queue alarmQueue(){// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。return new Queue(QUEUE_NAME,true,false,false);}/*** 声明确认告警队列* @return*/@Bean("confirmAlarmQueue")public Queue confirmAlarmQueue(){return new Queue(CONFIRM_ALARM_QUEUE_NAME,true,false,false);}/*** 声明告警队列绑定关系* @param queue* @param topicExchange* @return*/@Beanpublic Binding alarmBinding(@Qualifier("alarmQueue") Queue queue, TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with("server.event.#");}/*** 声明确认告警队列绑定关系* @param queue* @param topicExchange* @return*/@Beanpublic Binding confirmAlarmBinding(@Qualifier("confirmAlarmQueue") Queue queue, TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with("server.event_confirm.#");}
首先是交换机的声明:
直接返回TopicExchange(name,true,false)即可交给spring管理。
然后第二个是是否持久化,第三个是没有使用直接删除。自然用true和false了
队列的声明:
/*** 声明告警队列* @return*/@Bean("alarmQueue")public Queue alarmQueue(){// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。return new Queue(QUEUE_NAME,true,false,false);}
直接new出来queue返回就可以了,我们使用的是rabbitmq里的队列而不是java里面的队列。选择new的时候选择rabbitmq的队列就可以了。
@param1是名字
2是否持久化 我们用true
3是连接关闭后直接删除 我们用false
4没有使用该队列自动删除 我们用false
最后交给spring进行管理我们在后面使用的时候直接用就可以了。
用在哪里呢 用在交换机和队列进行连接时候用,这个时候用注释就可以快速拿到bean进行匹配然后返回连接了。
所以第三个就是交换机和队列的连接了
/*** 声明告警队列绑定关系* @param queue* @param topicExchange* @return*/@Beanpublic Binding alarmBinding(@Qualifier("alarmQueue") Queue queue, TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with("server.event.#");}
使用@Qualifier找到bean然后赋值给Queue 然后使用BindingBuilder直接获取返回给spring管理。
最后那个参数是routingkey
然后是生产消息
@Autowiredprivate RabbitTemplate rabbitTemplate;private static String EXCHANGE_NAME="amq.topic";private static String CONFIRM_ALARM_QUEUE_NAME="alarm.confirm.data.topic.queue";@Testvoid producerAlarmMsg() {String msg = "发送一条告警消息";rabbitTemplate.convertAndSend(EXCHANGE_NAME, "server.event.#",msg);System.out.println("msg = " + msg);}@Testvoid producerConfirmAlarmMsg() {String msg = "发送一条确认告警消息";rabbitTemplate.convertAndSend(CONFIRM_ALARM_QUEUE_NAME, "server.event_confirm.#",msg);System.out.println("msg = " + msg);}
使用注解@rabbitTemplate里面的coverandsent方法,@param1是交换机名字 2是routingkey 3是消息 发送以后他会根据交换机自动去发送给对应的队列
然后是消费者
@Component
public class AlarmConsumer {@Autowiredprivate IAlarmService alarmService;@RabbitListener(queues ="alarm.data.topic.queue",concurrency = "5")public void getAlarmInfo(String data){alarmService.dealAlarmData(data);}@RabbitListener(queues ="alarm.confirm.data.topic.queue",concurrency = "5")public void getConfirmAlarmInfo(String data){alarmService.dealConfirmAlarmData(data);}
}
使用@RabbitListner 第一个是队列的名字 而不是bean的名字 通过队列的名字进行监听 然后处理数据。 后面那个concurrency是并发数。
调用service里面的逻辑进行处理数据
ok这就是rabbitmq的初步使用了。