当前位置: 首页 > news >正文

gevent 高并发、 RabbitMQ 消息队列、Celery 分布式的案例和说明

1. gevent 高并发请求示例

gevent​​:基于协程的Python库,通过异步非阻塞模式实现高并发请求。例如,同时抓取100个网页时,无需等待每个请求完成,提升效率。

import gevent
from gevent import monkey
monkey.patch_all()  # 替换标准库的阻塞IO
import requests

def fetch_url(url):
    try:
        response = requests.get(url, timeout=5)
        print(f"URL: {url} 状态码: {response.status_code} 长度: {len(response.text)}")
    except Exception as e:
        print(f"请求失败 {url}: {str(e)}")

urls = [
    'https://www.baidu.com',
    'https://www.qq.com',
    'https://www.taobao.com',
    # 可添加更多URL
] * 25  # 重复25次达到100个请求

# 创建协程池并发执行
jobs = [gevent.spawn(fetch_url, url) for url in urls]
gevent.joinall(jobs, timeout=10)

运行:

pip install gevent requests
python demo.py

2. RabbitMQ 消息队列示例

消息队列(MQ)​​:如RabbitMQ或Kafka,用于解耦任务生产与消费。例如,将爬虫任务拆分为多个子任务,通过队列分发给不同服务器执行,避免单点故障。

生产者(producer.py)
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明持久化队列
channel.queue_declare(queue='task_queue', durable=True)

# 发送10个测试任务
for i in range(10):
    message = f'任务数据 {i}'
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # 消息持久化
        ))
    print(f" [x] 已发送 {message}")

connection.close()
消费者(consumer.py)
import pika
import time

def callback(ch, method, properties, body):
    print(f" [x] 收到 {body.decode()}")
    time.sleep(1)  # 模拟任务处理
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 手动确认

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
channel.basic_qos(prefetch_count=1)  # 公平分发
channel.basic_consume(queue='task_queue', on_message_callback=callback)

print(' [*] 等待消息...')
channel.start_consuming()

运行:

pip install pika
# 先启动RabbitMQ服务
python producer.py  # 另一个终端运行
python consumer.py

3. Celery 分布式任务示例

​​Celery​​:Python的分布式任务队列框架,支持定时任务和异步执行。例如,定时抓取新闻网站,任务自动分配到多台机器运行。

创建任务文件(celery_demo.py)
from celery import Celery

app = Celery(
    'tasks',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)

@app.task
def fetch_news(url):
    # 实际应使用requests等库
    print(f"正在抓取: {url}")
    return f"{url} 抓取完成"

# 定时配置
app.conf.beat_schedule = {
    'every-10-seconds': {
        'task': 'celery_demo.fetch_news',
        'schedule': 10.0,  # 每10秒执行
        'args': ('https://news.example.com',)
    },
}

运行:

pip install celery redis
# 启动Worker
celery -A celery_demo worker --loglevel=info
# 另一个终端启动定时任务
celery -A celery_demo beat --loglevel=info

关键点说明:

  1. gevent

    • 通过协程实现并发(非并行)
    • monkey.patch_all() 替换标准库的阻塞调用
    • 适合I/O密集型场景
  2. RabbitMQ

    • 使用持久化队列防止消息丢失
    • 手动消息确认保证可靠性
    • 公平分发(prefetch_count=1)
  3. Celery

    • 使用Redis作为消息代理和结果存储
    • 支持定时任务(需启动beat)
    • 可分布式部署多个Worker

实际生产环境中需考虑:

  • 错误重试机制
  • 日志记录
  • 资源监控
  • 集群部署配置
http://www.dtcms.com/a/130698.html

相关文章:

  • 论文精度:BoltzFormer:基于Boltzmann采样的动态稀疏注意力机制在小物体图像分析中的应用
  • 心理教育辅导|基于Java+vue的高校心理教育辅导系统(源码+数据库+文档)
  • 【数据结构_6下篇】有关链表的oj题
  • 数据中台、数据湖和数据仓库 区别
  • RTX 5080 PyTorch2.8 Ubuntu24.04 安装Neural Render排坑
  • AI工具导航 快速找到喜欢的AI工具 功能使用介绍
  • 如何评估大模型的性能?有哪些常用的评估指标?
  • Java中的泛型和泛型擦除机制【一文读懂】
  • Java面向对象核心:多态、抽象类与接口实战解析
  • 基本数据类型和引用类型的存储位置问题+复制问题
  • 在VMware中安装虚拟机Ubuntu
  • 文件流---------获取文件的内容到控制台
  • 火影 遇上 python Baby_Brother_GGY
  • TypeScript 的 interface 接口
  • 文件上传靶场
  • 类型转换
  • ArkTS基础语法:从声明到类型的深度解析
  • 系统与网络安全------网络通信原理(5)
  • nlp面试重点
  • 算法差分详解 + 总结
  • lx2160 LSDK21.08 firmware 笔记 - 1.bl31.bin 链接脚本 bl31.ld.S 分析
  • JavaWeb 课堂笔记 —— 09 MySQL 概述 + DDL
  • 基于贝叶斯方法的地震动分析及AI拓展
  • mysql安装-MySQL MGR(Group Replication)+ ProxySQL 架构
  • 前端 react+ant design ,后端 springboot +mysql+redis 全栈项目零基础小白从服务器初始化开始部署上线超详细保姆级教程
  • Ubuntu24.04 编译 Qt5 和 Qt6 源码
  • Android Cmake构建的项目,需不需要配置指定ndk及版本
  • 动态路由, RIP路由协议,RIPv1,RIPv2
  • MarkDown 输出表格的方法
  • 信息安全管理与评估2022国赛正式卷一阶段答案截图