当前位置: 首页 > news >正文

RabbitMQ操作实战

1.RabbitMQ安装

RabbitMQ Windows 安装、配置、使用 - 小白教程-腾讯云开发者社区-腾讯云下载erlang:http://www.erlang.org/downloads/https://cloud.tencent.com/developer/article/2192340

Windows 10安装RabbitMQ及延时消息插件rabbitmq_delayed_message_exchange - 民工黑猫 - 博客园安装RabbitMQ服务器 第一步:下载erlang 原因:RabbitMQ服务端代码是使用并发式语言Erlang编写的,安装Rabbit MQ的前提是安装Erlang。下载地址:http://www.erlang.org/downloads 第二步:下载RabbitMQ 下载地址:https://https://www.cnblogs.com/yyee/p/14281111.html

2.生产端确保消息发送到交换机和路由键

application.yml

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    publisher-confirm-type: CORRELATED # 交换机的确认
    publisher-returns: true # 队列的确认
logging:
  level:
    com.atguigu.mq.config: info

RabbitConfig

@Configuration
@Slf4j
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void initRabbitTemplate() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    // 发送到交换机-成功或失败
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("confirm() 回调函数打印 correlationData:{}, ack:{}, cause:{}", correlationData, ack, cause);
    }

    // 发送到队列-失败
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.info("returnedMessage() 回调函数 msg:{}, replyCode:{}, replyText:{}, exchange:{}, routingKey:{}",
                new String(returned.getMessage().getBody()), returned.getReplyCode(), returned.getReplyText(), returned.getExchange(), returned.getRoutingKey());
    }
}
RabbitMQTest


    public static final String EXCHANGE_DIRECT = "exchange.direct.order";
    public static final String ROUTING_KEY = "order";



    @Autowired
    private RabbitTemplate rabbitTemplate;

    //发消息-生产端确保消息发到交换机和路由键
    rabbitTemplate.convertAndSend(EXCHANGE_DIRECT + "~", ROUTING_KEY + "~", "Message Test Confirm~~~ ~~~");

3.消费端手动ack

application.yml

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual # 把消息确认模式改为手动确认
        prefetch: 1 # 每次从队列中取回消息的数量
MyMessageListener

    public static final String QUEUE_NAME  = "queue.order";

    //接消息-消费端手动ack
    //@RabbitListener(queues = {QUEUE_NAME})
    public void processMessage(String dataString, Message message, Channel channel) throws IOException {
        //获取消息的deliveryTag
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            //核心操作
            log.info("消费端 消息内容:" + dataString);
            System.out.println(10 / 0);
            //核心操作成功返ACK
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            //获取消息是否重复投递
            Boolean redelivered = message.getMessageProperties().getRedelivered();
            //核心操作失败返NACK
            if (!redelivered) {
                //第一次投递,重新放回队列 (requeue:是否重新放回队列)
                channel.basicNack(deliveryTag, false, true);
                //channel.basicReject(deliveryTag, true);
            } else {
                //重复投递,不重新放回队列 (requeue:是否重新放回队列)
                channel.basicNack(deliveryTag, false, false);
                //channel.basicReject(deliveryTag, false);
            }
        }
    }

4.消息设置超时时间(通过消息后置处理器)


        rabbitTemplate.convertAndSend(EXCHANGE_TIMEOUT, ROUTING_KEY_TIMEOUT, "Test timeout", message -> {
            message.getMessageProperties().setExpiration("4000");
            return message;
        });

5.延迟消息(延迟插件)

Windows 10安装RabbitMQ及延时消息插件rabbitmq_delayed_message_exchange - 民工黑猫 - 博客园安装RabbitMQ服务器 第一步:下载erlang 原因:RabbitMQ服务端代码是使用并发式语言Erlang编写的,安装Rabbit MQ的前提是安装Erlang。下载地址:http://www.erlang.org/downloads 第二步:下载RabbitMQ 下载地址:https://https://www.cnblogs.com/yyee/p/14281111.htmlRabbitMQ延迟插件下载地址

Community Plugins | RabbitMQ RabbitMQ设置延迟消息的交换机

生产端发送延时消息(通过消息后置处理器)

        //延迟消息-延时插件(最多俩天内)
        String msg = "Test delay message by plugin " + new SimpleDateFormat("HH:mm:ss").format(new Date());
        rabbitTemplate.convertAndSend(EXCHANGE_DELAY, ROUTING_KEY_DELAY, msg, message -> {
            //x-delay参数必须基于x-delayed-message-exchange插件才能生效
            message.getMessageProperties().setHeader("x-delay", "10000");
            return message;
        });

消费端消费延时消息


    @RabbitListener(queues = {QUEUE_DELAY})
    public void processMessageDelay(String dataString, Message message, Channel channel) throws IOException {
        log.info("[delay message][消息本身]" + dataString);
        log.info("[delay message][当前时间]" + new SimpleDateFormat("HH:mm:ss").format(new Date()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

相关文章:

  • 迁移学习策略全景解析:从理论到产业落地的技术跃迁
  • 打造更 AI 的操作系统 《龙蜥+超级探访》第三期走进浪潮信息
  • 对rust中的from和into的理解
  • RA-Eco-RA2L1-48PIN-V1.0开发板RTC时钟
  • redis restore 命令的用法
  • 【深度学习】强化学习(RL)-A3C(Asynchronous Advantage Actor-Critic)
  • 快速使用通义千问大模型API + VUE
  • 【时序预测】深度时序预测算法的对比与核心创新点分析
  • ArcGIS Pro中打造精美高程渲染图的全面指南
  • 数据开发面试:DQL,DDL,DTL
  • 【Kubernetes】对资源进行PATCH
  • 【Mysql】:数据库表的三部曲(数据操作 + 类型解析 + 约束规则)
  • sqlmap:自动SQL注入和数据库接管工具
  • JSX基础 —— 识别JS表达式
  • 【鸿蒙Next】系统通知权限申请
  • apk反编译工具
  • html2canvas 实现屏幕截图、生成海报功能
  • C++ 常见面试知识点
  • 为什么java从json中获取值有数据类型,而从xml中获取值没有数据类型?
  • 一个典型的要求: Python | C#实现年月日创建文件夹 时分秒对应文件名的保存路径
  • 用哪个登录网址最好/百度搜索关键词优化
  • wordpress中文论坛/seo搜索排名优化公司
  • 自己做网站卖二手车/抚顺优化seo
  • wordpress文章长/seo网站运营
  • 长春网站建设公司怎么样/浏览器大全
  • 架设个人网站/百度seo推广工具