当前位置: 首页 > news >正文

RabbitMQ的详细使用

1. RabbitMQ概述

rabbitMQ是一个广泛使用的消息服务器,采用Erlang语言编写,是一种开源的实现AMQP【高级消息队列协议】的消息中间件。它最初起源于金融系统,它的性能及稳定性都非常出色。简单来说,消息中间件就是指保存数据的一个容器,可以用于两个系统间的数据传递。

消息中间件一般有三种角色:生产者Producer、消费者Consumer、消息队列Broker。生产者发消息到消息队列,消费者再从消息队列中获取数据并处理。

RabbitMQ的【核心用途】如下:

  • 异步处理:比如注册成功后发邮件,不必等待邮件发完才响应用户。
  • 系统解耦:生产者与消费者不直接通信。
  • 流量削峰:高并发场景下缓冲请求,避免系统崩溃。
  • 消息广播/任务分发:如分布式日志、任务调度等。

安装erlangrabbitmq

sudo apt update
sudo apt install erlang-nox rabbitmq-server -y

2. 常用命令

通过rabbitmqctl help [命令]可以查询相关帮助文档,以便于我们使用rabbitmq的命令。rabbitmq的常用命令如下:

  • 用户与权限管理:修改用户权限或角色后,要重启服务器以生效

    # 添加用户
    sudo rabbitmqctl add_user 用户名 密码
    sudo rabbitmqctl add_user admin 123456# 设置用户角色[administrator/super user]
    sudo rabbitmqctl set_user_tags admin administrator# 设置权限[给指定vhost的读/写/配置权限]
    sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"# 查看所有用户
    sudo rabbitmqctl list_users# 查看所有用户权限,默认查看/虚拟主机上的权限
    sudo rabbitmqctl list_permissions
    sudo rabbitmqctl list_permissions -p [hostname]# 删除用户
    sudo rabbitmqctl delete_user admin
    
  • web后台启用:配置完成后访问本机15672端口即可,且登录用户的身份必须是administrator

    # 查看rabbitmq所有插件的状态信息
    sudo rabbitmq-plugins list# 启动Web管理插件
    sudo rabbitmq-plugins enable rabbitmq_management
    
    # 查看rabbitmq节点的运行状态和资源使用情况, 如端口占用15672
    sudo rabbitmqctl status
    
    # 开启ufw防火墙
    # 先放行ssh, 放置开启防火墙后远程断联
    sudo ufw allow ssh
    sudo ufw enable# 放行rabbitmq Web管理页面端口
    sudo ufw allow 15672/tcp# 放行AMQP消息传输端口
    sudo ufw allow 5672/tcp# 重载ufw防火墙规则
    sudo ufw reload
    

    在这里插入图片描述

  • 虚拟主机

    # 创建虚拟主机
    sudo rabbitmqctl add_vhost test# 分配权限给用户
    sudo rabbitmqctl set_permissions -p test admin ".*" ".*" ".*"# 查看所有虚拟主机
    sudo rabbitmqctl list_vhosts# 删除虚拟主机
    rabbitmqctl delete_vhost test
    

3. RabbitMQ的工作模型

rabbitmq工作模型

为了更好的理解rabbitmq的工作模型,可以类别mysqlbroker相当于mysql服务器,虚拟主机相当于数据库,队列相当于表,消息相当于表中的记录。

消息队列中有三个核心要素:消息生产者、消息队列、消息消费者。我们需要知道以下概念:

  • 连接:连接RabbitMQ服务器的TCP长连接。
  • 信道channel:连接中的一个虚拟通道,消息队列发送或接收消息时,都是通过信道进行的。
  • 虚拟主机:一个虚拟分组,相当于一个数据库,每个用户在自己的虚拟主机上创建exchangequeue等。
  • 交换机Exchange:交换机负责从生产者接收消息,并根据交换机类型分发到对应的消息队列中,起到一个路由的作用。
  • 路由键:交换机根据路由键来决定消息分发到哪个队列,路由键是消息的目的地址。
  • 绑定Binding:绑定是队列和交换机的一个关联连接。
  • 队列:存储消息的缓存。
  • 消息:由生产者通过RabbitMQ发送给消费者的信息,消息可以是任何数据,如字符串、user对象、json串等。

4. RabbitMQ交换机类型

4.1 扇形交换机

扇形交换机Fanout ExchangeRabbitMQ中一种基于广播模式的交换机类型,其核心特点是在于忽略消息的路由键,直接将收到的消息复制并广播到所有与之绑定的队列中。其核心机制如下:

  • 消息广播机制:生产者发送到扇形交换机的消息会无差别分发到所有绑定的队列,每个队列都会收到一份相同的消息副本。这种机制类似于无线电广播,所有订阅者【队列】均能接收到相同内容。
  • 路由键完全忽略:与其他交换机不同,扇形交换机不解析路由键,即使消息中定义了路由键,也会被交换机忽略。队列绑定到扇形交换机时也无需指定绑定键。

由于扇形交换机的这两个特性,扇形交换机很适用于以下场景:

  • 系统级广播通知:例如系统公告、配置更新等需要所有模块同时感知的事件。
  • 日志收集与分发:在分布式系统中,同一日志消息可能需要同时写入数据库、磁盘文件、监控系统等多个存储介质。通过扇形交换机,一次发送即可触发多端处理。
  • 事件驱动架构:当多个微服务需要基于同一事件如订单创建执行不同逻辑时,扇形交换机可将事件广播到所有订阅服务,实现解耦的并行处理。

C#使用RabbitMQ-3_发布订阅模式(扇形交换机)_c# rabbitmq 交换机-CSDN博客

python中,pika是和rabbitmq交互的官方客户端库,它提供了一套接口,用于连接 RabbitMQ 服务器、声明队列、发布消息、接收消息等。下面使用pika库实现扇形交换机:

  • 生产者:声明交换机、发送消息,不负责创建队列或绑定队列。

    • 一个连接中可以声明多个信道,每个信道可以声明多个交换机。

    • 在同一虚拟主机内,不同信道声明的交换机可以相互使用,例如信道A声明交换机logs_fanout,信道B可直接通过同名交换机发送消息。

    • 交换机在虚拟主机内全局存在,与创建它的信道无关。

    • 扇形交换机会将消息广播到所有绑定的队列,因此若此时无消费者队列绑定,消息会被丢弃。

      import pika# 创建一个TCP阻塞连接
      connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.44.128',port=5672,virtual_host='python_host',credentials=pika.PlainCredentials('bruce', '20041104'),
      ))
      # 创建一个信道
      channel = connection.channel()# 声明交换机
      # 规则: 若交换机不存在,则自动创建; 若已存在, 则校验参数是否匹配
      channel.exchange_declare(exchange='fanout_exchange',  # 交换机名称exchange_type='fanout',  # 交换机类型durable=True,  # 持久化设置
      )# 发布消息到交换机
      message = 'hello rabbitMQ'
      channel.basic_publish(exchange='fanout_exchange',  # 目标交换机名称routing_key='',  # 路由键body=message,  # 消息体properties=pika.BasicProperties(delivery_mode=2,  # 持久化消息, 1表示非持久化消息),
      )print('发送成功')
      # 关闭连接
      # channel.close()
      # connection.close()
      
  • 消费者:声明交换机、声明并绑定队列,且开始消费消息。

    import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.44.128',port=5672,virtual_host='python_host',credentials=pika.PlainCredentials('bruce', '20041104'),
    ))# 确保交换机存在, 否则绑定失败
    channel = connection.channel()
    channel.exchange_declare(exchange='fanout_exchange',exchange_type='fanout',durable=True,
    )# 声明队列1并绑定到交换机
    channel.queue_declare(queue='queue1',durable=True,
    )
    channel.queue_bind(exchange='fanout_exchange',queue='queue1',
    )# 声明队列2并绑定到交换机
    channel.queue_declare(queue='queue2',durable=True,
    )
    channel.queue_bind(exchange='fanout_exchange',queue='queue2',
    )# 定义回调函数
    def callback1(ch, method, properties, body):print(f'queue1: Received message: {body}')ch.basic_ack(delivery_tag=method.delivery_tag)def callback2(ch, method, properties, body):print(f'queue2: Received message: {body}')ch.basic_ack(delivery_tag=method.delivery_tag)# 注册消费者
    channel.basic_consume(queue='queue1', on_message_callback=callback1)
    channel.basic_consume(queue='queue2', on_message_callback=callback2)print('Waiting for messages...')
    channel.start_consuming()"""
    Waiting for messages...
    queue1: Received message: b'hello rabbitMQ'
    queue2: Received message: b'hello rabbitMQ'
    """
    

4.2 直连交换机

直连交换机Direct ExchangeRabbitMQ 中基于精确匹配路由键Routing Key 的交换机类型,其核心机制是:

  • 路由键完全匹配:消息的Routing Key必须与队列绑定时指定的Binding Key
  • 点对点或单播路由:适用于将消息精准投递到特定队列,常用于单消费者处理特定任务的场景。

由于直连交换机的以上特性,适用于以下场景:

  • 订单状态更新:将订单 ID 作为路由键,确保同一订单的消息始终由同一队列处理。
  • 任务分发与负载均衡:多个队列绑定同一路由键时,消息会以轮询方式在队列间分发,实现消费者间的负载均衡。
  • 优先级处理:不同队列绑定不同路由键,区分高优先级任务和普通任务。

RabbitMQ的四种交换机模式_mq的四种模式-CSDN博客

路由键匹配规则如下:

  • 键不匹配时不发送消息:当生产者发送消息时指定的路由键与队列绑定的绑定键完全一致**,消息才会被路由到对应的队列。
  • 键匹配时发送消息:若多个队列绑定相同的路由键,消息会被复制并分发到所有匹配的队列,但直连交换机的默认行为是轮询分发。

代码实现:

  • 生产者

    import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.44.128',port=5672,virtual_host='python_host',credentials=pika.PlainCredentials('bruce', '20041104'),
    ))channel = connection.channel()
    # 声明直连交换机
    channel.exchange_declare(exchange='direct_exchange',exchange_type='direct',durable=True,
    )log_types = ['info', 'warning', 'error']
    for log_type in log_types:message = f'{log_type.upper()}: 这是一个{log_type}日志'channel.basic_publish(exchange='direct_exchange',routing_key=log_type,body=message.encode('utf-8'))print(f'[x] Sent {log_type}: {message}')channel.close()
    connection.close()
    
  • 消费者

    import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.44.128',port=5672,virtual_host='python_host',credentials=pika.PlainCredentials('bruce', '20041104'),
    ))channel = connection.channel()
    # 声明直连交换机
    channel.exchange_declare(exchange='direct_exchange',exchange_type='direct',durable=True,
    )# 监听info日志
    channel.queue_declare(queue='info_queue',durable=True,
    )
    channel.queue_bind(exchange='direct_exchange',queue='info_queue',routing_key='info',
    )def callback1(ch, method, properties, body):print(f"[INFO] Received: {body.decode()}")channel.basic_consume(queue='info_queue',on_message_callback=callback1,auto_ack=True,
    )# 监听err日志
    channel.queue_declare(queue='error_queue',durable=True,
    )
    channel.queue_bind(exchange='direct_exchange',queue='error_queue',routing_key='error',
    )def callback2(ch, method, properties, body):print(f"[ERROR] Received: {body.decode()}")channel.basic_consume(queue='error_queue',on_message_callback=callback2,auto_ack=True,
    )channel.start_consuming()"""
    [INFO] Received: INFO: 这是一个info日志
    [ERROR] Received: ERROR: 这是一个error日志
    """
    

4.3 主题交换机

主题交换机Topic ExchangeRabbitMQ 中基于 通配符匹配路由键 的交换机类型,其核心机制如下:

  • 路由键动态匹配:消息的路由键由 . 分隔的单词组成,交换机通过通配符匹配队列绑定的绑定键,支持两种通配符:
    • *:匹配一个单词,如 order.* 可匹配 order.payment,但不能匹配 order.payment.failed
    • #:匹配零个或多个单词,如 order.# 可匹配 orderorder.paymentorder.payment.failed
  • 灵活的路由策略:通过通配符组合实现多维度消息分类,例如log.error匹配所有错误日志,news.sports.* 匹配所有体育类新闻的子分类。

主题交换机适用于需要动态消息分类或分层路由的场景:

  • 日志分级处理:

    • 绑定键 log.error 路由到告警服务队列。
    • 绑定键 log.info 路由到存储服务队列。
  • 多维度事件订阅:

    • 用户订阅 news.sports 接收所有体育新闻。
    • 订阅 news.weather 接收天气信息。

代码实现:

  • 生产者

    import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.44.128',port=5672,virtual_host='python_host',credentials=pika.PlainCredentials('bruce', '20041104'),
    ))channel = connection.channel()
    # 声明直连交换机
    channel.exchange_declare(exchange='topic_exchange',exchange_type='topic',durable=True,
    )messages = {'auth.info': '用户登录成功','auth.warning': '尝试使用过期密码','kern.critical': '内核崩溃','cron.error': '定时任务失败'
    }for routing_key, message in messages.items():channel.basic_publish(exchange='topic_exchange',routing_key=routing_key,body=message.encode('utf-8'))print(f"[x] Sent {routing_key}: {message}")channel.close()
    connection.close()
    
  • 消费者

    import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.44.128',port=5672,virtual_host='python_host',credentials=pika.PlainCredentials('bruce', '20041104'),
    ))channel = connection.channel()
    # 声明直连交换机
    channel.exchange_declare(exchange='topic_exchange',exchange_type='topic',durable=True,
    )# 接收所有auth开头的路由键
    channel.queue_declare(queue='auth_queue')
    channel.queue_bind(exchange='topic_exchange', queue='auth_queue', routing_key='auth.#')def callback(ch, method, properties, body):print(f"[AUTH] Received ({method.routing_key}): {body.decode()}")channel.basic_consume(queue='auth_queue', on_message_callback=callback, auto_ack=True)
    print('[*] Waiting for auth.# messages...')
    channel.start_consuming()"""
    [AUTH] Received (auth.info): 用户登录成功
    [AUTH] Received (auth.warning): 尝试使用过期密码
    """
    

4.4 头部交换机

头部交换机Headers ExchangeRabbitMQ 中一种基于消息头部属性 进行路由的交换机类型,其核心机制如下:

  • 忽略路由键:与直连、主题交换机不同,头部交换机完全依赖消息的 headers 属性进行匹配,路由键会被忽略。
  • 自定义匹配规则:通过绑定队列时定义的键值对与消息的 headers 属性进行匹配,支持两种匹配模式:
    • x-match=any:消息的 headers任意一个键值对匹配即可路由到队列。
    • x-match=all:消息的 headers所有键值对必须完全匹配 才能路由到队列。

头部交换机灵活高,但是性能较低,所有很少使用,它有如下使用场景:

  • 多条件组合过滤:例如订单消息需同时满足 type=urgentregion=Asia 才触发告警。
  • 动态标签路由:根据消息标签如 priority=highcategory=finance分发到不同处理服务。
  • 替代路由键的场景:当路由键无法表达复杂业务逻辑时,通过 headers 扩展路由维度。

代码实现:

  • 生产者

    import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.44.128',port=5672,virtual_host='python_host',credentials=pika.PlainCredentials('bruce', '20041104'),
    ))channel = connection.channel()
    channel.exchange_declare(exchange='headers_exchange',exchange_type='headers',durable=True,
    )messages = [({'type': 'log','level': 'error'}, "系统错误日志"),({'type': 'audit','priority': 'high'}, "高优先级审计日志"),({'type': 'security','priority': 'high'}, "高优先级安全告警"),
    ]for headers, body in messages:properties = pika.BasicProperties(headers=headers)channel.basic_publish(exchange='headers_exchange',routing_key='',  # headers交换机不使用routing_keybody=body.encode('utf-8'),properties=properties)print(f"[x] Sent: {body} with headers: {headers}")channel.close()
    connection.close()
    
  • 消费者

    import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.44.128',port=5672,virtual_host='python_host',credentials=pika.PlainCredentials('bruce', '20041104'),
    ))channel = connection.channel()
    channel.exchange_declare(exchange='headers_exchange',exchange_type='headers',durable=True,
    )# 声明并绑定队列
    channel.queue_declare(queue='headers_queue',durable=True
    )
    channel.queue_bind(exchange='headers_exchange',queue='headers_queue',routing_key='',arguments={'x-match': 'all','type': 'log','level': 'error',}
    )def callback(ch, method, properties, body):print(f"[LOG ERROR] Received: {body.decode()}")channel.basic_consume(queue='headers_queue', on_message_callback=callback, auto_ack=True)
    print('[*] Waiting for log error messages...')
    channel.start_consuming()
    """
    [LOG ERROR] Received: 系统错误日志
    """
    
    import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.44.128',port=5672,virtual_host='python_host',credentials=pika.PlainCredentials('bruce', '20041104'),
    ))channel = connection.channel()
    channel.exchange_declare(exchange='headers_exchange',exchange_type='headers',durable=True,
    )messages = [({'type': 'log','level': 'error'}, "系统错误日志"),({'type': 'audit','priority': 'high'}, "高优先级审计日志"),({'type': 'security','priority': 'high'}, "高优先级安全告警"),
    ]for headers, body in messages:properties = pika.BasicProperties(headers=headers)channel.basic_publish(exchange='headers_exchange',routing_key='',  # headers交换机不使用routing_keybody=body.encode('utf-8'),properties=properties)print(f"[x] Sent: {body} with headers: {headers}")channel.close()
    connection.close()"""
    [LOG ERROR] Received: 系统错误日志
    [LOG ERROR] Received: 高优先级审计日志
    [LOG ERROR] Received: 高优先级安全告警
    """
    

5. TTL

TTL即过期消息,消息的过期时间有以下两种方式:

  • 设置单条消息的过期时间:如果到了时间还没有被消费者接收的消息,称为死信

    import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.44.128',port=5672,virtual_host='python_host',credentials=pika.PlainCredentials('bruce', '20041104'),
    ))channel = connection.channel()
    channel.exchange_declare(exchange='direct_exchange',exchange_type='direct',durable=True,
    )channel.basic_publish(exchange='direct_exchange',routing_key='direct_key',body='hello world',properties=pika.BasicProperties(# 10s后未被消费就会删除expiration='10000',)
    )# 声明队列
    channel.queue_declare(queue='direct_queue')
    channel.queue_bind(exchange='direct_exchange',queue='direct_queue',routing_key='direct_key',
    )print('消息发送成功')
    
  • 通过队列属性设置消息过期时间:统一设置整个进入该队列的消息的过期时间。如果同时也设置了单条消息的过期时间,则以时间小的为准。

    import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.44.128',port=5672,virtual_host='python_host',credentials=pika.PlainCredentials('bruce', '20041104'),
    ))channel = connection.channel()
    channel.exchange_declare(exchange='direct_exchange',exchange_type='direct',durable=True,
    )channel.basic_publish(exchange='direct_exchange',routing_key='direct_key',body='hello world',
    )# 声明队列
    channel.queue_declare(queue='direct_queue',arguments={'x-message-ttl': 10000,},
    )
    channel.queue_bind(exchange='direct_exchange',queue='direct_queue',routing_key='direct_key',
    )print('消息发送成功')
    
  • 设置队列的过期时间

    # 声明队列
    channel.queue_declare(queue='direct_queue',arguments={'x-message-ttl': 10000,# 10s后未被访问就删除'x-expires': 10000},
    )
    

6. RabbitMQ死信机制

死信机制DLQRabbitMQ中处理异常消息的容错机制,用于存储因特定原因无法被正常消费的消息。其核心作用体现在:

  • 异常消息隔离:防止无效消息阻塞正常业务队列,避免消息丢失。
  • 重试与补偿:支持自动重试、延迟处理或人工干预,如未支付订单超时关闭。
  • 系统稳定性保障:通过TTL和队列容量限制防止系统过载。

6.1 死信消息

死信消息是指因特定原因无法被消费者正常处理的消息。其核心特征为消息失去正常消费路径,需通过特殊机制处理。产生死信的常见原因包括:

  • 消息过期:消息存活时间超过设定阈值,包括两种配置方式:
    • 队列级TTL:通过x-message-ttl参数统一设置队列中所有消息的过期时间。
    • 消息级TTL:发送消息时通过expiration字段单独设置。两者共存时以较小值为准。
  • 队列容量溢出:队列达到最大长度x-max-length参数,新消息进入时,最早的消息会被标记为死信。
  • 消息被拒绝:消费者明确拒绝【basic.rejectbasic.nack】且设置requeue=false,不再重新入队。例如订单支付失败后被标记为不可重试。

6.2 死信队列

死信队列Dead Letter Queue, DLQ是存储死信消息的特殊队列,需配合死信交换机DLX使用。其核心作用为

  • 容错保障:防止因异常消息丢失导致业务中断。
  • 延迟处理:结合TTL实现延迟任务,如订单15分钟未支付自动关闭。
  • 问题追溯:集中存储异常消息用于后续审计或修复。

6.2.1 消息过期

# 生产者
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.44.128',port=5672,virtual_host='python_host',credentials=pika.PlainCredentials('bruce', '20041104'),
))channel = connection.channel()
# 声明死信交换机和死信队列
channel.exchange_declare(exchange='dlx_exchange', exchange_type='fanout', durable=True)
channel.queue_declare(queue='dlx_queue', durable=True)
channel.queue_bind(exchange='dlx_exchange', queue='dlx_queue')channel.exchange_declare(exchange='main_exchange', exchange_type='direct', durable=True)# 声明主队列
channel.queue_declare(queue='main_queue',durable=True,arguments={'x-dead-letter-exchange': 'dlx_exchange',    # 死信交换机'x-dead-letter-routing-key': '',             # fanout 不需要'x-message-ttl': 5000                         # 消息存活 5 秒}
)
channel.queue_bind(exchange='main_exchange', queue='main_queue', routing_key='main_key')# 发送消息
channel.basic_publish(exchange='main_exchange',routing_key='main_key',body='这是一条会过期的消息',properties=pika.BasicProperties(delivery_mode=2  # 持久化消息)
)print("[x] 消息已发送,等待过期转入死信队列")
connection.close()
# 消费者
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.44.128',port=5672,virtual_host='python_host',credentials=pika.PlainCredentials('bruce', '20041104'),
))channel = connection.channel()
channel.queue_declare(queue='dlx_queue', durable=True)def callback(ch, method, properties, body):print(f"[DLX] 收到死信消息: {body.decode()}")channel.basic_consume(queue='dlx_queue', on_message_callback=callback, auto_ack=True)print("[*] 正在监听死信队列 dlx_queue...")
channel.start_consuming()

6.2.2 队列达到最大长度

当队列达到最大长度后,之后再进来的消息会入队,而最先入队的消息会变成死信而进入死信队列。

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.44.128',port=5672,virtual_host='python_host',credentials=pika.PlainCredentials('bruce', '20041104'),
))channel = connection.channel()
# 声明死信交换机和死信队列
channel.exchange_declare(exchange='dlx_exchange', exchange_type='fanout', durable=True)
channel.queue_declare(queue='dlx_queue', durable=True)
channel.queue_bind(exchange='dlx_exchange', queue='dlx_queue')# 主交换机和主队列,设置最大长度为 5 条,并指定死信交换机
channel.exchange_declare(exchange='main_exchange', exchange_type='direct', durable=True)
channel.queue_declare(queue='main_queue',durable=True,arguments={'x-max-length': 5,  # 队列最多容纳 5 条消息'x-dead-letter-exchange': 'dlx_exchange',  # 多余消息进入死信交换机'x-dead-letter-routing-key': 'dlx_queue'}
)
channel.queue_bind(exchange='main_exchange', queue='main_queue', routing_key='main_key')# 向队列发送 10 条消息,超过5条的将转入DLQ
for i in range(10):body = f'消息 {i + 1}'channel.basic_publish(exchange='main_exchange',routing_key='main_key',body=body,properties=pika.BasicProperties(delivery_mode=2)  # 持久化消息)print(f'[x] 发送: {body}')connection.close()

6.2.3 消息被拒绝

在消费者订阅队列时,如果设置了以下配置则表示消费者自动确认消息,如下:但是自动确认带来了很多问题,如消费者接收消息后,需要向数据库中插入一条订单信息,如果是自动确认此时数据库又刚好宕机,则会导致该订单的丢失。

channel.basic_consume(queue='your_queue',on_message_callback=callback,auto_ack=True  # 自动确认
)

当设置auto_ack=False时表示需消费者手动确认消息,如下:如果消费者接收了消息,但是一直没有处理,有以下几种请情况:

  • ack但设置了ttl:会进入死信队列
  • ack且没设置ttl:不会进入死信队列,默认行为是重新入队,触发条件就是连接断开或出现其它异常
import timeimport pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.44.128',port=5672,virtual_host='python_host',credentials=pika.PlainCredentials('bruce', '20041104'),
))channel = connection.channel()is_ok = True
def callback(ch, method, properties, body):if is_ok:# 手动确认消息time.sleep(5)ch.basic_ack(delivery_tag=method.delivery_tag)else:# 拒绝消息ch.basic_nack(delivery_tag=method.delivery_tag)print(f"[X] 收到消息: {body.decode()}")channel.basic_consume(queue='main_queue', on_message_callback=callback, auto_ack=False)print("[*] 正在队列 main_queue...")
channel.start_consuming()

当拒绝消息时,默认情况下也是重新入队,这是因为ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)中的reueue默认为True,当设置为False表示不重新入队,直接进入死信队列。

6.3 延迟队列

延迟队列即利用TTL + 死信队列,实现原理即创建一个延迟队列,设置过期时间和死信交换机,当TTL到期后,消息会被转发到DLX,该死信交换机才是真正的消费队列。常见使用场景如下:

场景说明
订单支付超时取消订单创建后 30 分钟内未支付自动取消
🔁 延迟重试机制第一次失败后 10s 再试,失败后进入延迟队列重试
📮 定时推送通知 / 邮件 / 短信控制何时发送提醒
🚧 限流分发任务控制任务的节奏发送
秒杀/预约倒计时控制提前5分钟提示
import timeimport pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.44.128',port=5672,virtual_host='python_host',credentials=pika.PlainCredentials('bruce', '20041104'),
))channel = connection.channel()# 死信交换机和消费队列
channel.exchange_declare(exchange='real_exchange', exchange_type='direct', durable=True)
channel.queue_declare(queue='real_queue', durable=True)
channel.queue_bind(exchange='real_exchange', queue='real_queue', routing_key='process_key')# 延迟队列: 设置死信交换机 + TTL
channel.exchange_declare(exchange='delay_exchange', exchange_type='direct', durable=True)
channel.queue_declare(queue='delay_queue',durable=True,arguments={'x-dead-letter-exchange': 'real_exchange',           # TTL 到期转发到 real_exchange'x-dead-letter-routing-key': 'process_key',          # 转发 routing_key'x-message-ttl': 10000                                # 延迟时间}
)
channel.queue_bind(exchange='delay_exchange', queue='delay_queue', routing_key='delay_key')# 发送消息到延迟队列
channel.basic_publish(exchange='delay_exchange',routing_key='delay_key',body='这是一个延迟10秒的任务',properties=pika.BasicProperties(delivery_mode=2)
)print('已发送延迟消息')
connection.close()

上述实现延迟队列使用了两个交换机,实际上使用一个交换机也可实现,其原理就是一个交换机创建两个队列,一个用于延迟队列,另一个用于正常消费队列,延迟消息发到delay_queueTTL到期后,转发回同一个交换机,再路由到real_queue

# 1️⃣ 声明一个交换机:负责发消息与转发死信
channel.exchange_declare(exchange='task_exchange', exchange_type='direct', durable=True)# 2️⃣ 真正消费的队列:real_queue
channel.queue_declare(queue='real_queue', durable=True)
channel.queue_bind(exchange='task_exchange', queue='real_queue', routing_key='real')# 3️⃣ 延迟队列 delay_queue:设置TTL和死信交换机,转发回同一个交换机
channel.queue_declare(queue='delay_queue',durable=True,arguments={'x-dead-letter-exchange': 'task_exchange','x-dead-letter-routing-key': 'real','x-message-ttl': 10000  # 延迟10秒}
)
channel.queue_bind(exchange='task_exchange', queue='delay_queue', routing_key='delay')# 4️⃣ 发送消息到延迟队列
channel.basic_publish(exchange='task_exchange',routing_key='delay',  # 进 delay_queue,过期后死信转 real_queuebody='这是延迟10秒的任务',properties=pika.BasicProperties(delivery_mode=2)
)print("[x] 已发送延迟任务")
connection.close()

延迟队列的机制缺陷:队列中第一条消息的 TTL10s,第二条消息的 TTL5s但由于第一条还未过期,第二条消息即使 TTL 到了也无法出队,直到第一条消息被处理或过期,第二条消息才会被 RabbitMQ 扫描并进入死信队列。这就是 RabbitMQ 队列是 FIFO先进先出+ TTL 延迟处理只扫描队头消息 所导致的问题。有以下常见解决方案:

  • TTL + 死信队列:将不同过期时间的消息放在不同的队列中,然后过期后放入到死信队列中【不推荐】。

  • 使用Redis + 定时器

  • 使用RabbitMQ中的插件

    # 下载插件
    wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ezsudo cp rabbitmq_delayed_message_exchange-3.9.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.9.27/plugins/# 启动插件
    sudo rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    
    import timeimport pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.44.128',port=5672,virtual_host='python_host',credentials=pika.PlainCredentials('bruce', '20041104'),
    ))channel = connection.channel()# 使用插件的 x-delayed-message 类型交换机
    channel.exchange_declare(exchange='delayed_exchange',exchange_type='x-delayed-message',durable=True,# 想要它模仿哪种正常交换机行为arguments={'x-delayed-type': 'direct'}
    )# 绑定队列
    channel.queue_declare(queue='delayed_queue', durable=True)
    channel.queue_bind(exchange='delayed_exchange', queue='delayed_queue', routing_key='delay')# 发布延迟 5000ms 的消息
    channel.basic_publish(exchange='delayed_exchange',routing_key='delay',body='延迟5秒的消息',# 延迟5sproperties=pika.BasicProperties(headers={'x-delay': 5000})
    )print('[x] 发送成功')
    connection.close()
    

7. 生产者确认模式

消息的confirm确认机制,是指生产者发布消息后,到达了消息服务器broker里面的exchange交换机,则会给生产者一个应答,生产者接收该应答以确定该消息正常发送到Broker中的交换机,这是消息可靠性投递的重要保障。简而言之,确认模式用于让生产者知道消息是否成功到达 RabbitMQ 服务器

消息的可靠性投递就是要保证消息投递过程中的每一个环节都要成功,确保消息在传输过程中不丢失、不重复、不乱序,那么就一定会牺牲掉部分性能,性能与可靠性是无法兼得的。如果业务实时一致性要求不是特别高的场景,可以牺牲一些可靠性来换取性能。

生产者确实模式包括三种,如下:

模式说明适用场景
单条确认【同步】每发送一条消息就等待 Broker 的确认简单、可靠,但吞吐量低
批量确认【同步】每发送一批消息后一次性等待确认提高性能,但一旦失败难以定位
异步确认注册回调函数,Broker 确认消息时调用回调性能最好,适用于高并发、异步系统

单条确认:最安全,但是效率最低,每条消息都阻塞等待ack确认。当使用channel.confirm_delivery()启用生产者确认模式后,basic_publish()的返回值会发生变化:未启用时返回None,无法判断是否成功;启用后成功路由到交换机则会返回True,否则抛出UnroutableErrorNackError 异常。

import timeimport pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.44.128',port=5672,virtual_host='python_host',credentials=pika.PlainCredentials('bruce', '20041104'),
))channel = connection.channel()
# 启用确认模式
channel.confirm_delivery()channel.exchange_declare(exchange='test_exchange',exchange_type='direct',durable=True,
)
channel.queue_declare(queue='test_queue',durable=True,
)
channel.queue_bind(exchange='test_exchange',queue='test_queue',routing_key='direct_key',
)try:channel.basic_publish(exchange='test_exchange',routing_key='direct_key',body='hello world',properties=pika.BasicProperties(delivery_mode=2),mandatory=True  # 强制要求路由到队列)print('消息发送成功')
except pika.exceptions.UnroutableError:print('消息无法路由到队列')
except pika.exceptions.NackError:print('Broker拒绝接收消息')

多条确认:多条消息一起确认,效率有所提高,但是出现异常时难以定位是哪条消息失败。

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.44.128',port=5672,virtual_host='python_host',credentials=pika.PlainCredentials('bruce', '20041104'),
))channel = connection.channel()
# 启用确认模式
channel.confirm_delivery()channel.exchange_declare(exchange='test_exchange',exchange_type='direct',durable=True,
)
channel.queue_declare(queue='test_queue',durable=True,
)
channel.queue_bind(exchange='test_exchange',queue='test_queue',routing_key='direct_key',
)try:# 批量发送消息for i in range(100):channel.basic_publish(exchange='test_exchange',routing_key='direct_key',body=f'Msg {i}',properties=pika.BasicProperties(delivery_mode=2)  # 消息持久化)# 等待所有消息的确认结果print("所有消息确认成功")
except pika.exceptions.UnroutableError:print("部分消息无法路由到队列")
except pika.exceptions.NackError:print("Broker拒绝接收部分消息")
except pika.exceptions.ConnectionClosedByBroker:print("Broker连接异常中断!")

异步确认【推荐】:基于aio-pika实现

import asyncio
from aio_pika import connect_robust, Message, DeliveryMode, ExchangeTypeasync def main():# 连接 RabbitMQconnection = await connect_robust("amqp://bruce:20041104@192.168.44.128/python_host",timeout=5)async with connection:# 创建通道, 开启发布确认channel = await connection.channel(publisher_confirms=True)# 声明直连交换机exchange = await channel.declare_exchange("direct_exchange", ExchangeType.DIRECT, durable=True)# 声明队列并绑定queue = await channel.declare_queue("confirm_queue", durable=True)await queue.bind(exchange, routing_key="confirm_key")# 构建消息message = Message(body=b"Hello aio-pika",delivery_mode=DeliveryMode.PERSISTENT  # 持久化消息)try:# 发布消息并等待确认【mandatory=True:无路由则报错】await exchange.publish(message, routing_key="confirm_key", mandatory=True)print("✅ 消息发布并确认成功")except Exception as e:print(f"❌ 消息发布失败或未被确认: {e}")# 启动异步主函数
if __name__ == "__main__":asyncio.run(main())

8. Rabbitmq消息的退回模式

rabbitmq整个消息投递的路径为producer -> exchange -> queue -> consumer

  • 消息从producer -> exchange会返回一个confirmCallback
  • 消息从exchange -> queue投递失败则会返回一个returnCallback

我们可以利用这两个callback来控制消息的可靠性投递。RabbitMQReturn模式用于处理消息成功到达交换机但无法路由到队列的场景。Return模式仅在以下两种情况下触发:

  • 路由键不匹配:消息的路由键与队列绑定规则不符。
  • 队列不存在:目标队列未声明或已被删除。

消息生产者将消息发送到ExchangeExchange尝试将消息路由到队列,若失败则触发Return回调,RabbitMQ将消息退回生产者而不是直接丢弃,并附带错误码或错误描述。

import timeimport pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.44.128',port=5672,virtual_host='python_host',credentials=pika.PlainCredentials('bruce', '20041104'),
))channel = connection.channel()channel.exchange_declare(exchange='new_exchange',exchange_type='direct',durable=True
)# 定义返回回调
def on_return_callback(ch, method, properties, body):print("⚠️ 消息未被路由")print(f"Exchange: {method.exchange}")print(f"Routing key: {method.routing_key}")print(f"Body: {body.decode()}")channel.add_on_return_callback(on_return_callback)# 发送不可路由消息
channel.basic_publish(exchange='new_exchange',routing_key='non_existing_key',body='这是一条测试消息',properties=pika.BasicProperties(delivery_mode=2),mandatory=True
)# 处理返回的消息事件
channel.connection.process_data_events(time_limit=2)connection.close()"""
⚠️ 消息未被路由
Exchange: new_exchange
Routing key: non_existing_key
Body: 这是一条测试消息
"""

9. 交换机的属性

交换机的核心属性如下:

属性含义示例
exchange交换机名称'my_exchange'
exchange_typedirect / fanout / topic / headers'direct'
durable是否持久化True
auto_delete当没有队列绑定时是否自动删除交换机False
internal是否内部交换机,不能由生产者发送消息False
arguments拓展参数,如备用交换机设置等{'alternate-exchange': 'backup_exchange'}
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()channel.exchange_declare(exchange='main_exchange',exchange_type='direct',durable=True,auto_delete=False,internal=False,arguments={}  # 可以设置备用交换机等参数
)connection.close()

备用交换机:备用交换机用于处理未被路由的消息,即 mandatoryFalse 时,消息没有匹配任何队列也不会丢弃,而是转发到备用交换机。备用交换机的私用场景如下:

  • 避免消息因路由失败而丢失。

  • 实现死信逻辑【死信队列】或兜底队列【即备用队列】。

    # 声明备用交换机和队列
    channel.exchange_declare(exchange='backup_exchange', exchange_type='fanout', durable=True)
    channel.queue_declare(queue='unrouted_queue', durable=True)
    channel.queue_bind(exchange='backup_exchange', queue='unrouted_queue')# 声明主交换机
    channel.exchange_declare(exchange='main_exchange',exchange_type='direct',durable=True,arguments={# 指定备用交换机'alternate-exchange': 'backup_exchange'}
    )# 不绑定任何队列,向主交换机发送消息,消息将路由到备用交换机
    channel.basic_publish(exchange='main_exchange',routing_key='nonexistent_key',body='这条消息将进入备用交换机',
    )# 消费备用队列中的消息
    def callback(ch, method, properties, body):print(f"💡 收到备用消息: {body.decode()}")channel.basic_consume(queue='unrouted_queue', on_message_callback=callback, auto_ack=True)
    channel.start_consuming()
    

10. 队列的属性

RabbitMQ队列的属性决定了队列的行为,例如是否持久化、是否自动删除、是否设置最大长度、是否启用死信等。

属性名类型说明
queuestr队列名称
durablebool是否持久化
exclusivebool是否排他队列:仅当前连接可使用,连接断开则自动删除
auto_deletebool如果最后一个消费者断开连接,是否自动删除队列
argumentsdict高级功能配置,如 TTL、死信、最大长度等

对于队列的高级设置,即arguments参数的设置,如下:

属性名类型说明
x-message-ttlintms消息过期时间
x-expiresintms队列过期时间
x-max-lengthint最大消息数
x-max-length-bytesint最大消息大小【字节】
x-dead-letter-exchangestr死信交换机
x-dead-letter-routing-keystr死信路由键
x-overflowstr溢出策略【drop-head、reject-publish】
x-queue-modestr队列存储模式【default、lazy】,lazy表示存储在磁盘上,速度变慢
channel.queue_declare(queue='main_queue',durable=True,arguments={'x-max-length': 5,'x-dead-letter-exchange': 'dlx_exchange','x-queue-mode': 'lazy',  # 消息优先写磁盘}
)

11. 消息的幂等性

消息的幂等性即消息不被重复消费,不管你请求一次还是请求多次,对该资源本身造成的影响应该是相同的,不能因为重复的请求而对该资源重复造成影响。对于数据库的insertdeleteselectupdate来说,insert不具有幂等性;对于http请求来说,post不具有幂等性。当然可以延申到接口幂等性,如果一个接口使用相同参数反复调用,不会造成业务错误,那么这个接口就具有幂等性。如注册接口、发送短信接口都不具有幂等性。

如在一个订单系统,收到一条消息就会插入一个订单,如果这条消息被RabbitMQ发送了两次,而你又没做防重处理,数据库中就可能存在两条一样的订单,这在生产环境中是绝对不允许的。常用如下方法解决幂等性问题:

  • 使用唯一约束:给数据库添加唯一字段,收到重复消息则直接插入失败。

  • 使用redis + 唯一标识

    import uuid
    from django_redis import get_redis_connection
    from django.http import JsonResponsedef create_order(request):redis_conn = get_redis_connection("default")# 从请求中提取幂等 token,前端生成,或者后端下发后使用idempotent_key = request.headers.get("Idempotent-Key")if not idempotent_key:return JsonResponse({"code": 400, "msg": "缺少幂等Key"})redis_key = f"idempotent:{idempotent_key}"# 设置幂等key不存在才设置成功,设置成功说明是首次请求is_first = redis_conn.set(redis_key, "1", nx=True, ex=60)  # 60秒有效期if not is_first:return JsonResponse({"code": 429, "msg": "重复请求,请勿重复提交"})# 模拟业务逻辑:创建订单等order_id = str(uuid.uuid4())[:8]return JsonResponse({"code": 200, "msg": "创建成功", "order_id": order_id})
    

12. 集群模式

RabbitMQ的集群分为两种模式,一种是默认集群模式,一种是镜像集群模式。在rabbitmq集群中所有的节点【一个节点就是一个broker服务器】被归为两类,一类是磁盘节点,一类是内存节点。磁盘节点会把集群的所有消息持久化到磁盘中,而内存节点只会将这些信息保存到内存中,如果该节点宕机或重启,内存节点中的数据会全部丢失,而磁盘节点的数据不会丢失。

所有节点都可以接受连接,处理消息。但是队列和消息默认是节点本地的,即只存储在声明的节点上。集群模式有以下三种:

模式描述
普通模式队列绑定在哪个节点就存储在哪个节点,其他节点访问时需跨节点通信
镜像同步替代镜像队列的高可用队列,使用 Raft 协议,写入需大多数节点确认【推荐使用】
+-------------+         +-------------+         +-------------+
|  node1      | <-----> |   node2     | <-----> |   node3     |
|  (disk)     |         |   (disk)    |         |   (disk)    |
+-------------+         +-------------+         +-------------+↑                        ↑                       ↑用户连接                   消息同步               镜像/Quorum 同步

12.1 默认集群

默认集群指的是将多个RabbitMQ节点【一般在不同机器上】组建成一个集群,共享元数据如交换机、队列元信息、绑定关系等,但消息和队列本身并不自动复制,默认仍只存储在单个节点上。

  • ✅ 所有节点之间共享虚拟主机、用户、权限等元信息。
  • ❌ 队列/消息默认不共享,在哪个节点声明就在哪个节点存在。

默认集群的优缺点如下:

  • 优点:易部署,元数据同步,且可以随时添加节点扩容。
  • 缺点:队列非分布式,队列和消息默认默认只存在一个节点,单点故障时队列不可用;且容错性太差,单点宕机会导致该节点的队列无法消费,数据一致性也很差。

搭建默认集群的步骤如下:假设有三台服务器node1, node2, node3

  • 保证时间同步:sudo apt install ntp

  • 所有服务器安装rabbitmqerlangsudo apt install erlang-nox rabbitmq-server -y

  • 确保.erlang.cookie一致

    # 在node1上
    cat /var/lib/rabbitmq/.erlang.cookie
    # 将其内容复制到node2和node3上
    sudo scp /var/lib/rabbitmq/.erlang.cookie node2:/var/lib/rabbitmq/
    sudo chmod 400 /var/lib/rabbitmq/.erlang.cookie
    sudo chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
    
  • 配置主机名:确保每台机器能 ping 通对方

    sudo vim /etc/hosts# 添加
    192.168.1.100  node1
    192.168.1.101  node2
    192.168.1.102  node3
    
  • 启动rabbitMQ

    sudo systemctl enable rabbitmq-server
    sudo systemctl start rabbitmq-server
    
  • node2node3上加入集群

    sudo rabbitmqctl stop_app
    sudo rabbitmqctl join_cluster rabbit@node1
    sudo rabbitmqctl start_app# 如果以磁盘节点加入
    sudo rabbitmqctl join_cluster --ram rabbit@node1
    
  • 查看集群状态

    sudo rabbitmqctl cluster_status
    
  • 验证集群是否生效

    # 在任意节点
    sudo rabbitmqctl list_nodes
    # 在node1上声明交换机和队列
    rabbitmqadmin declare queue name=test durable=true
    # 然后在 node2 消费或查看是否能访问该队列
    

12.2 镜像同步

RabbitMQ镜像同步集群Queue Mirroring Cluster,也叫镜像队列集群是指将一个队列的消息同步复制到多个节点上,实现高可用。当主节点宕机后,其他节点可自动接管,从而保障服务不中断。镜像同步是在默认集群的基础上实现的。

配置好默认集群后,执行以下命令:

rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all", "ha-sync-mode":"automatic"}' --apply-to queues
  • ha-mode:镜像模式,可选 all【全节点同步】、exactly【指定节点数】、nodes【指定具体节点】

  • ha-sync-mode:同步方式,automatic 自动同步,manual 需手动触发

    # 查看已配置的策略
    rabbitmqctl list_policies
    
    # 验证集群状态
    rabbitmqctl cluster_status
    

对于镜像集群,有以下注意事项与优化建议:

  • 性能影响:镜像队列会显著增加网络流量和磁盘 I/O,建议根据业务需求选择同步模式:

    • 高频消息场景:优先使用 ha-mode: exactly 限制副本数
    • 低延迟要求:避免跨数据中心镜像,优先局域网内部署
  • 故障恢复

    • 主节点恢复后不会自动重新成为主节点,需通过负载均衡或手动调整

    • 使用 ha-sync-mode: automatic 时,若队列数据量大,可能因同步阻塞影响性能

  • 资源规划

    • 磁盘空间:每个镜像节点需存储完整队列数据,预留充足存储

    • 节点数量:镜像节点数建议为奇数,避免脑裂问题

相关文章:

  • 超详细网络介绍(超全)
  • 5.24本日总结
  • CQF预备知识:Python相关库 -- NumPy 基础知识 - 数组创建
  • Ubuntu20.04 gr-gsm完整安装教程
  • SQL每日一题
  • SQL SERVER常用聚合函数整理及示例
  • 全面指南:使用Node.js和Python连接与操作MongoDB
  • 二十五、面向对象底层逻辑-SpringMVC九大组件之HandlerMapping接口设计
  • 【Linux】进程基本概念与基本操作
  • 【深度学习新浪潮】如何用Dify构建自己的AI Agent?
  • 利用 XML 外部实体注入(XXE)读取文件和探测内部网络
  • 快速排序算法的C++和C语言对比
  • WordPress多语言插件安装与使用教程
  • Java核心基础
  • VMware Flings又又又搬家了
  • SAP全面转向AI战略,S/4HANA悄然隐身
  • 【每日一题】【前缀和优化】【前/后缀最值】牛客练习赛139 B/C题 大卫的密码 (Hard Version) C++
  • 基于netmiko模块实现支持SSH or Telnet的多线程多厂商网络设备自动化巡检脚本
  • 教育信息化2.0时代下学校网络安全治理:零信任架构的创新实践与应用
  • 常见的图像生成模型
  • icp备案查询网站/360收录提交入口
  • 体育设施建设发布有没有网站/百度在线客服系统
  • 极简主义 网站/谷歌手机网页版入口
  • 今日国际新闻最新头条10条/seo优化设计
  • 如何利用网站做产品推广/浏览广告赚佣金的app
  • 如何建立自己的免费网站/广州seo网站推广