当前位置: 首页 > news >正文

springcloud, nacos使用rabbitMq

这两天有需求需要使用rabbitMQ,正好有时间整理一下rabbitmq的使用方法
第一步, 在pom.xml中引入,这里就不写了.
第二步, 在nacos中添加rabbitmq的配置文件

ryhj:
  mq-queue-name: myqueueName
  mq-exchange-name: myExchangeName
  mq-routing-key: myKey
spring:
  rabbitmq:
    host: 192.168.1.110
    password: mypassword
    port: 5672
    username: yourUsername

上方的名称可以根据需要自己更改
第三步:
生产者的方法。 我们这次使用伊特TestControlller来进行测试

mport cn.hutool.json.JSON;
import cn.hutool.json.JSONUtil;
import com.apex.iot.data.model.DataModel;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Date;

@RestController
@RequestMapping("/test")
public class TestController {
    @Resource
    private RabbitTemplate rabbitTemplate;
    // 此处是在nacos中获取的交换机名字
    @Value("${ryhj.mq-exchange-name}")
    private String mqExchangeName;
    // 此处是在nacos中获取的路由键
    @Value("${ryhj.mq-routing-key}")
    private String mqRoutingKey;

    @RequestMapping("/test")
    public void test(){
        System.err.println(1);
        DataModel data=new DataModel();
        data.setDeviceParentName("吸附机器");
        data.setDeviceId("XL_1");
        data.setDeviceName("吸附");
        data.setIp("127.0.0.1");
        data.setValue(true);
        data.setValueType("a");
        data.setCreateTime(new Date());
        data.setStatus(0);
        JSON parse = JSONUtil.parse(data);
        // 这一步是将数据转变为json的关键,要不然接收后会乱码
        String result = com.alibaba.fastjson.JSON.toJSONString(data);
        rabbitTemplate.convertAndSend(mqExchangeName, mqRoutingKey, result);
    }

}

第四步,消费者写的方法:

package com.apex.iot.rabbitmq;

import com.apex.iot.device.service.DeviceService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;

/**
 * rabbitMq的mq监听
 * @createDate 2025-03-13
 */

@Component
@Configuration
@Slf4j
public class RabbitMqListener {

	/**引入部分分别是队列名,交换机以及路由键
	**/
    @RabbitHandler
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue("#{'${ryhj.mq-queue-name}'}"),
                    exchange = @Exchange(value = "#{'${ryhj.mq-exchange-name}'}",type = "direct"),
                    key = "#{'${ryhj.mq-routing-key}'}")
    })
    public void process(String msg, Message message, Channel channel) throws IOException {
        try {
            //数据处理
            System.err.println("msg:"+msg);
            // 手动确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e) {
            try {
                // 拒绝消息并重新入队
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                log.error("处理mq设备数据失败!数据:{}", msg, e);
            }catch (Exception ee) {
                log.error("处理mq设备信息失败!数据:{}", msg, ee);
            }

        }
    }
}

这样写完以后,mq处理基本已经完成了,但是我接收的时候会报错。报错内容如下:

2025-03-18 10:09:22.038  WARN 31944 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Failed to convert message

这个错误信息表明,Spring AMQP 在处理 RabbitMQ 消息时失败了,具体原因是消息转换失败。
所以我们可以在消息监听者这边添加一个mq的配置类,具体如下:

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory, MessageConverter messageConverter) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory); // 设置连接工厂
        factory.setMessageConverter(messageConverter);   // 设置消息转换器
        return factory;
    }

}

这样,再次启动,就完成啦。
如果接收有乱码信息出现,如下图:

   sr cn.hutool.json.JSONObject j Na
 v L configt Lcn/hutool/json/JSONConfig;xr cn.hutool.core.map.MapWrapper  T#A_ r L rawt Ljava/util/Map;xpsr java.util.LinkedHashMap4 N\l   Z accessOrderxr java.util.HashMap    `  F 
loadFactorI 	thresholdxp?@     w      t deviceIdt 	QY_XFXL_1t 
deviceNamet 
吸附系列1t deviceParentNamet 吸附机器t ipt 	127.0.0.1t valuesr java.lang.Boolean  r ՜   Z valuexpt 	valueTypet at 
createTimesr java.util.Datehj KYt  xpw    	T
xt statussr java.lang.Integer8 I valuexr java.lang.Number        xp    x q ~  xsr cn.hutool.json.JSONConfig ^  L  Z checkDuplicateZ 
ignoreCaseZ ignoreErrorZ ignoreNullValueZ stripTrailingZerosZ transientSupportL 
dateFormatt Ljava/lang/String;L 
keyComparatort Ljava/util/Comparator;xp   pp

这样,我们在生产者上编辑一下json就可以了。
上方已经备注了。就这一行

String result = com.alibaba.fastjson.JSON.toJSONString(data)

相关文章:

  • 前端面试项目拷打
  • Django:内置和自定义中间件
  • Java集合的底层原理
  • 深入解析ES6+新语法:复杂的迭代器与生成器
  • 【css酷炫效果】纯CSS实现手风琴折叠效果
  • navicat导出文件密码解密
  • vue3二次封装tooltip实现el-table中的show-overflow-tooltip效果
  • 003 SpringCloud整合-LogStash安装及ELK日志收集
  • Spring Boot集成JWT:打造安全的RESTful API
  • Linux上离线安装PyTorch教程:No module named ‘_bz2:No module named ‘_lzma‘
  • 单元测试mock
  • 蓝桥杯备考:特殊01背包问题——》集合subset
  • 两款软件助力图片视频去水印及图像编辑
  • PHP转GO Go语言环境搭建(Day1) 常见问题及解决方案指南
  • Node.js系列(3)--集群部署指南
  • K8S-etcd服务无法启动问题排查
  • Android audio(8)-native音频服务的启动与协作(audiopolicyservice和audioflinger)
  • 网络华为HCIA+HCIP VLAN间通信
  • ubuntu下TFTP服务器搭建
  • [GHCTF 2025]Goph3rrr [127.0.0.1绕过][env命令查找flag]
  • 公募基金行业迎系统性变革:基金公司业绩差必须少收费
  • 创新创业50人论坛开幕在即,双创青年为何选择来上海筑梦?
  • 鸿蒙概念股强势上涨,鸿蒙电脑本月正式发布,生态链即将补全
  • 退休11年后,71岁四川厅官杨家卷被查
  • 上千游客深夜滞留张家界大喊退票?当地通报情况并致歉
  • 挑大梁!一季度北上广等7省份进出口占外贸总值四分之三