当前位置: 首页 > news >正文

【RabbitMQ】多系统下的安装配置与编码使用(python)

文章目录

  • 引言
    • 核心概念
    • RabbitMQ 的常见应用场景
  • 1. 安装 RabbitMQ
    • 在本地安装 RabbitMQ
      • Linux (Ubuntu)
      • MacOS
      • Windows
        • 配置环境变量
  • 2. Python 代码示例
    • 安装 Python 客户端库
    • 示例 1:基本队列通信
      • 生产者(producer.py)
      • 消费者(consumer.py)
      • 运行测试
    • 示例 2:使用交换机(Topic 模式)
      • 生产者(topic_producer.py)
      • 消费者(topic_consumer.py)
      • 运行测试
    • 示例 3:消息持久化与手动确认
      • 生产者(持久化消息)
      • 消费者(手动 ACK)
    • 代码结合:
    • 3. 关键注意事项

引言

RabbitMQ 是一个开源的 消息代理(Message Broker) 软件,实现了 高级消息队列协议(AMQP),用于在分布式系统中存储、转发消息,支持多种消息传递模式。它通过解耦生产者和消费者、异步处理、流量削峰等机制,提升系统的可扩展性、可靠性和灵活性。

本文主要介绍在Linux/MacOS/Windows下的Rabbit的安装,以及以python为示例的具体代码编写。

核心概念

  1. Producer(生产者):发送消息的程序。
  2. Consumer(消费者):接收消息的程序。
  3. Queue(队列):存储消息的缓冲区,消息会一直存在队列中直到被消费。
  4. Exchange(交换机):接收生产者发送的消息,并根据规则(路由键、绑定等)将消息路由到队列。
  5. Binding(绑定):定义 Exchange 和 Queue 之间的关系。
  6. Message(消息):传递的数据,包含有效负载(payload)和元数据(如路由键)。

RabbitMQ 的常见应用场景

  1. 异步处理

    • 示例:用户注册后发送邮件/短信通知。
    • 优势:主流程快速响应,耗时操作异步交给消费者处理。
  2. 应用解耦

    • 示例:订单系统和库存系统通过消息队列通信,避免直接接口调用。
    • 优势:某一系统宕机不影响其他系统,消息可暂存后处理。
  3. 流量削峰

    • 示例:秒杀活动的高并发请求先写入队列,系统按处理能力逐步消费。
    • 优势:避免服务器瞬时过载。
  4. 日志收集

    • 示例:多台服务器将日志发送到队列,由统一服务消费存储。
    • 优势:集中处理,避免日志丢失。

1. 安装 RabbitMQ

在本地安装 RabbitMQ

Linux (Ubuntu)

# 安装 Erlang(RabbitMQ 依赖)
sudo apt-get install -y erlang# 安装 RabbitMQ
sudo apt-get install -y rabbitmq-server# 启动服务
sudo systemctl start rabbitmq-server# 启用管理插件(可选)
sudo rabbitmq-plugins enable rabbitmq_management

MacOS

# 通过 Homebrew 安装
brew install rabbitmq# 启动服务
brew services start rabbitmq# 启用管理插件
rabbitmq-plugins enable rabbitmq_management

Windows

  1. 下载并安装 Erlang。
    在这里插入图片描述

  2. 下载 RabbitMQ 的 Windows 安装包。

    • 安装完成后,勾选“Start RabbitMQ Service”,或之后手动启动rabbtmq服务
      在这里插入图片描述
      在这里插入图片描述
  3. 通过命令行启用管理插件:

    rabbitmq-plugins enable rabbitmq_management
    
配置环境变量

安装RabbitMQ完成后需要配置到系统环境变量,否则直接执行 rabbitmq-plugins enable rabbitmq_management 命令会找不到。

  1. 右键点击“此电脑” → “属性” → “高级系统设置” → “环境变量”。
  2. 在“系统变量”中找到 Path,点击“编辑”。
  3. 添加 RabbitMQ 的 sbin 目录路径(例如):
    C:\Program Files\RabbitMQ Server\rabbitmq_server-3.12.0\sbin
    
  4. 保存后重新打开命令行,即可直接运行 rabbitmq-plugins

在这里插入图片描述

在这里插入图片描述

如果不配置系统环境变量,也可直接在 rabbitmq-plugins.bat 所在的路径(即sbin)下执行命令。

在这里插入图片描述


2. Python 代码示例

安装 Python 客户端库

pip install pika

pika 是 用于与 RabbitMQ 进行通信的Python 客户端库,构建高效、可靠的消息队列系统。


示例 1:基本队列通信

生产者(producer.py)

import pika# 连接到 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()# 声明一个队列(如果不存在则创建)
channel.queue_declare(queue='hello')# 发送消息
channel.basic_publish(exchange='',          # 使用默认交换机routing_key='hello',  # 指定队列名称body='Hello World!'   # 消息内容
)
print(" [x] Sent 'Hello World!'")# 关闭连接
connection.close()

消费者(consumer.py)

import pikadef callback(ch, method, properties, body):print(f" [x] Received {body}")# 连接到 RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()# 声明队列(确保存在)
channel.queue_declare(queue='hello')# 消费消息
channel.basic_consume(queue='hello',         # 监听的队列on_message_callback=callback,  # 消息处理函数auto_ack=True         # 自动确认消息
)print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 开始阻塞监听

运行测试

  1. 启动消费者:
    python consumer.py
    
  2. 启动生产者:
    python producer.py
    
  3. 消费者会输出:
    [x] Received b'Hello World!'

示例 2:使用交换机(Topic 模式)

生产者(topic_producer.py)

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()# 声明一个 Topic 类型的交换机
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')# 发送消息到特定路由键
routing_key = 'user.notification.email'
message = 'Email sent to user!'
channel.basic_publish(exchange='topic_logs',routing_key=routing_key,body=message
)
print(f" [x] Sent {routing_key}:{message}")
connection.close()

消费者(topic_consumer.py)

import pika
import sysdef callback(ch, method, properties, body):print(f" [x] {method.routing_key}:{body}")connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()# 声明交换机
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')# 创建临时队列(关闭连接后自动删除)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue# 绑定队列到交换机,监听特定路由键
routing_key = 'user.notification.*'  # 通配符匹配
channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key=routing_key
)channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True
)print(' [*] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()

运行测试

  1. 启动消费者(监听 user.notification.*):
    python topic_consumer.py
    
  2. 启动生产者(发送 user.notification.email):
    python topic_producer.py
    
  3. 消费者会收到匹配路由键的消息。

示例 3:消息持久化与手动确认

生产者(持久化消息)

channel.queue_declare(queue='task_queue', durable=True)  # 队列持久化
channel.basic_publish(exchange='',routing_key='task_queue',body='This is a persistent task',properties=pika.BasicProperties(delivery_mode=2,  # 消息持久化)
)

消费者(手动 ACK)

def callback(ch, method, properties, body):print(f" [x] Processing {body}")# 模拟耗时任务import timetime.sleep(5)print(" [x] Done")ch.basic_ack(delivery_tag=method.delivery_tag)  # 手动确认channel.basic_consume(queue='task_queue',on_message_callback=callback,auto_ack=False  # 关闭自动确认
)

代码结合:

可用把上面的通信代码写成一个项目,把RabbitMQ的部分封装成一个连接类:

https://gitee.com/aiyimu/python/tree/master/RabbitMQ_Practice/RabbitMQ_Practice

3. 关键注意事项

  1. 连接管理
    • 生产环境中使用连接池(如 pika.BlockingConnection 的复用)。
  2. 错误处理
    • 捕获 pika.exceptions.AMQPConnectionError 并实现重连逻辑。
  3. 性能优化
    • 通过 channel.basic_qos(prefetch_count=1) 限制消费者每次只处理一条消息。
  4. 监控
    • 访问 http://localhost:15672(默认账号 guest/guest)查看队列状态。

相关文章:

  • 韩国男女做游戏视频网站不用流量的地图导航软件
  • 网站设计公司简介友情下载网站
  • 郑州一站式网站搭建认真负责中国没有限制的搜索引擎
  • 欧美男女做黄色网站广州网站关键词排名
  • 什么是营销网站网络营销与传统营销的整合
  • 如何搭建手机网站西安百度推广联系方式
  • A2O MAY登上央视《中国音乐TOP榜》舞台,展现新歌榜冠军实力
  • docker repositories.json 文件学习
  • 七天学会SpringCloud分布式微服务——03——一些细节的心得感悟(续)
  • C2远控篇CC++SC转换格式UUID标识MAC物理IPV4地址减少熵值
  • ubuntu22.04系统kubeadm部署k8s高可用集群
  • Docker 部署 Kong云原生API网关
  • GitHub Actions 安全高效地推送 Docker 镜像到 AWS ECR
  • 与 AI 聊天更顺畅:GitHub 项目文件小助手
  • vue + vue-router写登陆验证的同步方法和异步方法,及页面组件的分离和后端代码
  • Vue3+ element 实现导入导出
  • 【Linux】软硬链接,动静态库
  • 简概:ETF动量策略
  • 大模型在坏疽及穿孔性阑尾炎预测与治疗方案制定中的应用研究
  • CCF GESP第十次认证模拟测试
  • 先考 HCIA 还是直接考 HCIP?网络工程师认证选择指南
  • 三轴云台之运动跟踪技术篇
  • uniapp页面间通信uni.$on与通过uni.navigateTo中eventChannal的方式的区别
  • 2025学年湖北省职业院校技能大赛 “信息安全管理与评估”赛项 样题卷(三)
  • Nginx + ELK + Grafana 全球访问热力图
  • php的案例分析----typecho项目