当前位置: 首页 > news >正文

RabbitMQ基础

RabbitMQ基础知识

一、RabbitMQ简介

1.1 什么是RabbitMQ?

RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据。

1.2 核心概念

  • Producer:消息生产者
  • Consumer:消息消费者
  • Exchange:交换机,负责消息路由
  • Queue:队列,存储消息
  • Binding:绑定,交换机和队列之间的关系
  • Channel:信道,建立在Connection上的虚拟连接
  • Connection:TCP连接

1.3 应用场景

  • 异步处理
  • 应用解耦
  • 流量削峰
  • 日志处理
  • 消息通讯

二、安装与配置

2.1 安装RabbitMQ

# Windows安装
# 1. 安装Erlang
# 2. 下载并安装RabbitMQ
# 3. 启用管理插件
rabbitmq-plugins enable rabbitmq_management

# Linux安装
sudo apt-get update
sudo apt-get install rabbitmq-server

# Docker安装
docker run -d --hostname my-rabbit --name rabbit -p 5672:5672 -p 15672:15672 rabbitmq:management

2.2 基础配置

# application.properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/

三、交换机类型

3.1 Direct Exchange

// 声明交换机
@Bean
public DirectExchange directExchange() {
    return new DirectExchange("direct.exchange");
}

// 声明队列
@Bean
public Queue directQueue() {
    return new Queue("direct.queue");
}

// 绑定关系
@Bean
public Binding directBinding() {
    return BindingBuilder.bind(directQueue())
            .to(directExchange())
            .with("direct.routingKey");
}

3.2 Topic Exchange

// 声明交换机
@Bean
public TopicExchange topicExchange() {
    return new TopicExchange("topic.exchange");
}

// 声明队列
@Bean
public Queue topicQueue1() {
    return new Queue("topic.queue1");
}

// 绑定关系
@Bean
public Binding topicBinding1() {
    return BindingBuilder.bind(topicQueue1())
            .to(topicExchange())
            .with("topic.#");  // #匹配0个或多个单词
}

3.3 Fanout Exchange

// 声明交换机
@Bean
public FanoutExchange fanoutExchange() {
    return new FanoutExchange("fanout.exchange");
}

// 声明队列
@Bean
public Queue fanoutQueue1() {
    return new Queue("fanout.queue1");
}

// 绑定关系
@Bean
public Binding fanoutBinding1() {
    return BindingBuilder.bind(fanoutQueue1())
            .to(fanoutExchange());
}

3.4 Headers Exchange

// 声明交换机
@Bean
public HeadersExchange headersExchange() {
    return new HeadersExchange("headers.exchange");
}

// 声明队列
@Bean
public Queue headersQueue() {
    return new Queue("headers.queue");
}

// 绑定关系
@Bean
public Binding headersBinding() {
    return BindingBuilder.bind(headersQueue())
            .to(headersExchange())
            .whereAll("header1", "value1").match();
}

四、消息发送与接收

4.1 消息生产者

@Service
public class MessageProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 发送消息
    public void sendMessage(String exchange, String routingKey, Object message) {
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }

    // 发送带确认的消息
    public void sendMessageWithConfirm(String exchange, String routingKey, Object message) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
    }
}

4.2 消息消费者

@Component
public class MessageConsumer {
    // 简单消费
    @RabbitListener(queues = "direct.queue")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }

    // 手动确认消费
    @RabbitListener(queues = "direct.queue")
    public void receiveMessageManual(String message, Channel channel, 
            @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        try {
            // 处理消息
            System.out.println("Received message: " + message);
            // 手动确认
            channel.basicAck(tag, false);
        } catch (Exception e) {
            // 消息重回队列
            channel.basicNack(tag, false, true);
        }
    }
}

五、高级特性

5.1 消息确认机制

// 配置
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true

// 实现确认回调
@Bean
public RabbitTemplate.ConfirmCallback confirmCallback() {
    return (correlationData, ack, cause) -> {
        if (ack) {
            System.out.println("消息发送成功");
        } else {
            System.out.println("消息发送失败:" + cause);
        }
    };
}

// 实现返回回调
@Bean
public RabbitTemplate.ReturnCallback returnCallback() {
    return (message, replyCode, replyText, exchange, routingKey) -> {
        System.out.println("消息路由失败:" + replyText);
    };
}

5.2 死信队列

// 声明死信交换机
@Bean
public DirectExchange deadLetterExchange() {
    return new DirectExchange("dlx.exchange");
}

// 声明死信队列
@Bean
public Queue deadLetterQueue() {
    return new Queue("dlx.queue");
}

// 声明普通队列,并指定死信交换机
@Bean
public Queue normalQueue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", "dlx.exchange");
    args.put("x-dead-letter-routing-key", "dlx.routingKey");
    return new Queue("normal.queue", true, false, false, args);
}

5.3 延迟队列

// 声明延迟交换机
@Bean
public CustomExchange delayExchange() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "direct");
    return new CustomExchange("delay.exchange", "x-delayed-message", 
            true, false, args);
}

// 发送延迟消息
public void sendDelayMessage(String message, int delayTime) {
    rabbitTemplate.convertAndSend("delay.exchange", "delay.routingKey", 
            message, messagePostProcessor -> {
        messagePostProcessor.getMessageProperties()
                .setDelay(delayTime);
        return messagePostProcessor;
    });
}

六、集群部署

6.1 普通集群模式

# 1. 配置hosts
192.168.1.101 rabbit1
192.168.1.102 rabbit2
192.168.1.103 rabbit3

# 2. 同步Erlang Cookie
scp /var/lib/rabbitmq/.erlang.cookie root@rabbit2:/var/lib/rabbitmq/
scp /var/lib/rabbitmq/.erlang.cookie root@rabbit3:/var/lib/rabbitmq/

# 3. 加入集群
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@rabbit1
rabbitmqctl start_app

6.2 镜像集群模式

# 设置镜像策略
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

七、监控与运维

7.1 监控指标

  • 队列数量
  • 消息数量
  • 连接数量
  • 消费者数量
  • 交换机数量

7.2 常用命令

# 查看队列
rabbitmqctl list_queues

# 查看交换机
rabbitmqctl list_exchanges

# 查看绑定关系
rabbitmqctl list_bindings

# 查看连接
rabbitmqctl list_connections

# 查看消费者
rabbitmqctl list_consumers

八、最佳实践

8.1 开发规范

  1. 命名规范
    • 使用有意义的交换机和队列名称
    • 使用统一的命名规则
    • 避免特殊字符
  2. 消息处理
    • 保证消息可靠性
    • 处理消息幂等性
    • 合理设置重试机制
  3. 性能优化
    • 合理使用预取数量
    • 控制消息大小
    • 适当的并发数量

8.2 运维规范

  1. 监控告警
    • 监控队列积压
    • 监控消费者状态
    • 监控集群状态
  2. 容量规划
    • 评估消息量
    • 规划硬件资源
    • 合理设置集群规模
  3. 安全管理
    • 访问控制
    • 加密传输
    • 定期备份
http://www.dtcms.com/a/107177.html

相关文章:

  • 【5090d】配置运行和微调大模型所需基础环境【一】
  • 简述竞赛经历在考研复试中的作用
  • rom定制系列------红米note8pro原生安卓12批量线刷 安卓14批量线刷定制功能项 解锁bl后fast刷写
  • Bash 花括号扩展 {start..end} 进阶使用指南——字典生成
  • Linux进程间通信(1)
  • 天梯赛 L2-025 分而治之
  • GoldenEye: 1靶场渗透
  • 第四章,动态路由介绍//////RIP
  • 【Kubernetes】如何使用 kubeadm 搭建 Kubernetes 集群?还有哪些部署工具?
  • 基于昇腾NPU的YOLOv8部署
  • redis一些常用的命令(1)
  • 【零基础入门unity游戏开发——2D篇】SortingGroup(排序分组)组件
  • acwing 每日一题4889. 空调II
  • WinForm真入门(4)——窗体和控件、属性和事件 的基本概念
  • NFC碰一碰到底是什么?具体有什么功能
  • Transformer
  • Vue.js状态管理利器:Vuex核心原理与实战指南
  • VRRP(虚拟路由器冗余协议)、虚拟路由器、master路由器、backup路由器
  • 【算法数学篇】试除法求约数
  • 最长公共子串
  • (六)ASCLIN_UART模块串口DMA模式
  • 完美解决Tensorboard: No dashboards are active for the current data set.问题
  • 云曦3月断网考
  • 48. 旋转图像
  • 图神经网络实战(PyTorch Geometric处理学术网络)
  • Rock Pi 5B Linux虚拟串口设置方法
  • 无人机无线图像回传技术解析!
  • 如果数据包的最后一段特别短,如何处理?
  • 【GPT入门】第31课 ollama运行私有化部署的模型与调试
  • Linux:线程的同步与互斥