当前位置: 首页 > news >正文

Python:消息队列(RabbitMQ)应用开发实践

文章目录

  • 前言
  • 一、名词解释
    • 1、核心概念
    • 2、交换器类型
  • 二、消息模式
    • 1、简单模式
    • 2、工作队列
      • 消息持久性
    • 3、发布订阅
    • 4、路由模式
    • 5、主题模式
      • 通配符规则
    • 6、RPC模式
      • RPC工作流程
  • 三、安装部署
    • 1、拉取镜像
    • 2、启动容器
    • 3、使用`docker-compose`
  • 四、应用测试
    • 1、安装依赖
    • 2、创建连接
      • 2.1、引入依赖
      • 2.2、创建连接参数
      • 2.3、建立连接
    • 3、生产消息
    • 4、消费消息
  • 总结


前言

  官网有云:RabbitMQ是一个可靠且成熟的消息传递和流媒体代理,易于在云环境、本地和本地计算机上部署。支持多种开放标准协议,包括 AMQP 1.0MQTT 5.0。提供了许多选项,定义消息如何从发布者发送到一个或多个使用者。路由、筛选、流式处理、联合身份验证等。通过确认消息传输和跨集群复制消息的能力,确保消息是安全的。

一、名词解释

1、核心概念

  • 生产者(Producer):发送消息到队列的应用程序。
  • 消费者(Consumer):从队列中接收消息的应用程序。
  • 交换器(Exchange):接收来自生产者的消息,并根据绑定规则将消息路由到相应的队列。
  • 队列(Queue):存储消息,等待消费者处理。
  • 绑定(Binding):定义交换器和队列之间的关系。
  • 头交换器(Headers Exchange):使用消息头属性进行路由,而不是路由键。

2、交换器类型

  • 直接交换器(Direct Exchange):根据消息的路由键精确匹配队列。
  • 主题交换器(Topic Exchange):根据模式匹配路由键,将消息路由到一个或多个队列。
  • 扇出交换器(Fanout Exchange):将消息广播到所有绑定的队列,不考虑路由键。

二、消息模式

  RabbitMQ提供6种消息模式:简单模式、工作队列、发布订阅、路由模式、主题模式和RPC模式。

1、简单模式

  最简单的队列。

在这里插入图片描述

  • P:生产者,也就是要发送消息的程序。
channel.queue_declare(queue=queue_name)
channel.basic_publish(exchange='',routing_key=queue_name,body='Hello World!'
)

  参数exchange不指定时,消息会发送的默认的Exchange,此时的routing_key必须是queue_name(队列名称)。

  • C:消费者:消息的接受者,会一直等待消息到来。
channel.basic_consume(queue=queue_name,auto_ack=True,on_message_callback=callback
)

  参数 auto_ack=True 表示自动应答,消费者取出消息后,消息队列将删除此消息。

  • Queue:消息队列。可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

2、工作队列

  在Worker之间分配任务(竞争消费者模式)。两个消费端共同消费同一个队列中的消息,但是一个消息只能被一个消费者获取。

在这里插入图片描述
  通过 BasicQos 方法设置prefetchCount = 1,如:channel.basic_qos(0, 1, False);。使得每个消费者在同一个时间点最多处理1个消息。prefetchCount只有在手动ackauto_ack=False)的情况下才生效,自动ack不生效。

消息持久性

  使用手动ackauto_ack=False)能够确保消息处理失败或者消费者崩溃时任务不会丢失,但是如果RabbitMQ服务器停止、退出或者崩溃时,队列中的消息将丢失。需要做两件事来确保消息不会丢失:

  • 首先,定义队列时指定队列是持久的(durable=True),如:
channel.queue_declare(queue=queue_name, durable=True)

  如果队列已

http://www.dtcms.com/a/280502.html

相关文章:

  • 【C#地图显示教程:实现鼠标绘制图形操作】
  • 开通保存图片权限
  • 如何设计实现开发自助重启工具-01-设计篇
  • eVTOL分布式电推进(DEP)适航审定探究
  • Ajax接收java后端传递的json对象包含长整型被截断导致丢失精度的解决方案
  • 【橘子分布式】Thrift RPC(编程篇)
  • 亚矩阵云手机:破解 Yandex 广告平台多账号风控难题的利器
  • Redis学习系列之——高并发应用的缓存问题(二)
  • JDK1.8函数式编程实战(附日常工作案例,仅此一篇耐心看完彻底搞懂)
  • 17、鸿蒙Harmony Next开发:状态管理(组件拥有的状态和应用拥有的状态)
  • Vue获取上传Excel文件内容并展示在表格中
  • 【人工智能99问】神经网络的工作原理是什么?(4/99)
  • 使用Pydantic开发时,如何将返回数据由snake_case自动转为camel case
  • Mac IDEA启动报错:Error occurred during initialization of VM
  • Linux操作系统从入门到实战(九)Linux开发工具(中)自动化构建-make/Makefile知识讲解
  • ubuntu部署kvm
  • AI-Compass LLM训练框架生态:整合ms-swift、Unsloth、Megatron-LM等核心框架,涵盖全参数/PEFT训练与分布式优化
  • 正则表达式深度解析:从LeetCode 3136题说起
  • 028_分布式部署架构
  • OpenCV图像自动缩放(Autoscaling)函数autoscaling()
  • 2025.7.15总结
  • 用Python构建机器学习模型预测股票趋势:从数据到部署的实战指南
  • 希尔排序:突破传统排序的边界
  • 【Java】【企业级应用】学生信息管理系统项目介绍
  • Mybatis05-动态sql
  • 深度解析 AI 提示词工程(Prompt Engineering)
  • 2025世界机器人大赛ICode专属训练平台图形化小学组答案
  • 光伏设计全方位指南
  • B/S 架构通信原理详解
  • sqli-labs靶场通关笔记:第17关 POST请求的密码重置