当前位置: 首页 > wzjs >正文

做产品封面的网站大学专业建设的内容

做产品封面的网站,大学专业建设的内容,网站怎么通过流量赚钱,做营销型网站用那个cms好FastAPI 服务器Celery 任务队列RabbitMQ 作为消息代理定时任务处理 首先创建项目结构: c:\Users\Administrator\Desktop\meitu\ ├── app/ │ ├── __init__.py │ ├── main.py │ ├── celery_app.py │ ├── tasks.py │ └── config.py…
  1. FastAPI 服务器
  2. Celery 任务队列
  3. RabbitMQ 作为消息代理
  4. 定时任务处理

首先创建项目结构:

c:\Users\Administrator\Desktop\meitu\
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── celery_app.py
│   ├── tasks.py
│   └── config.py
├── requirements.txt
└── celery_worker.py
  1. 首先创建 requirements.txt:
fastapi==0.104.1
uvicorn==0.24.0
celery==5.3.4
python-dotenv==1.0.0
requests==2.31.0
  1. 创建配置文件:
from dotenv import load_dotenv
import osload_dotenv()# RabbitMQ配置
RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", "localhost")
RABBITMQ_PORT = os.getenv("RABBITMQ_PORT", "5672")
RABBITMQ_USER = os.getenv("RABBITMQ_USER", "guest")
RABBITMQ_PASS = os.getenv("RABBITMQ_PASS", "guest")# Celery配置
CELERY_BROKER_URL = f"amqp://{RABBITMQ_USER}:{RABBITMQ_PASS}@{RABBITMQ_HOST}:{RABBITMQ_PORT}//"
CELERY_RESULT_BACKEND = "rpc://"# 定时任务配置
CELERY_BEAT_SCHEDULE = {'process-images-every-hour': {'task': 'app.tasks.process_images','schedule': 3600.0,  # 每小时执行一次},'daily-cleanup': {'task': 'app.tasks.cleanup_old_images','schedule': 86400.0,  # 每天执行一次}
}
  1. 创建 Celery 应用:
from celery import Celery
from app.config import CELERY_BROKER_URL, CELERY_RESULT_BACKEND, CELERY_BEAT_SCHEDULEcelery_app = Celery('image_processing',broker=CELERY_BROKER_URL,backend=CELERY_RESULT_BACKEND,include=['app.tasks']
)# 配置定时任务
celery_app.conf.beat_schedule = CELERY_BEAT_SCHEDULE
celery_app.conf.timezone = 'Asia/Shanghai'
  1. 创建任务文件:
from app.celery_app import celery_app
from app.watermark import ImageWatermarker
import os
from datetime import datetime, timedelta@celery_app.task
def add_watermark_task(image_path, text, position='center'):"""异步添加水印任务"""watermarker = ImageWatermarker()try:result_path = watermarker.add_watermark(image_path=image_path,text=text,position=position)return {"status": "success", "output_path": result_path}except Exception as e:return {"status": "error", "message": str(e)}@celery_app.task
def process_images():"""定时处理图片任务"""image_dir = "images/pending"if not os.path.exists(image_dir):return {"status": "error", "message": "Pending directory not found"}processed = 0for image in os.listdir(image_dir):if image.lower().endswith(('.png', '.jpg', '.jpeg')):add_watermark_task.delay(os.path.join(image_dir, image),"自动处理水印",'center')processed += 1return {"status": "success", "processed": processed}@celery_app.task
def cleanup_old_images():"""清理旧图片任务"""output_dir = "images/processed"if not os.path.exists(output_dir):return {"status": "error", "message": "Output directory not found"}threshold_date = datetime.now() - timedelta(days=7)cleaned = 0for image in os.listdir(output_dir):image_path = os.path.join(output_dir, image)if os.path.getctime(image_path) < threshold_date.timestamp():os.remove(image_path)cleaned += 1return {"status": "success", "cleaned": cleaned}
  1. 创建 FastAPI 应用:
from fastapi import FastAPI, File, UploadFile, BackgroundTasks
from fastapi.responses import JSONResponse
import os
from app.tasks import add_watermark_task
from app.celery_app import celery_appapp = FastAPI(title="图片水印处理服务")@app.post("/upload/")
async def upload_image(file: UploadFile = File(...),text: str = "水印文本",position: str = "center"
):# 保存上传的文件file_path = f"images/uploads/{file.filename}"os.makedirs(os.path.dirname(file_path), exist_ok=True)with open(file_path, "wb") as buffer:content = await file.read()buffer.write(content)# 创建异步任务task = add_watermark_task.delay(file_path, text, position)return JSONResponse({"status": "success","message": "图片已上传并加入处理队列","task_id": task.id})@app.get("/task/{task_id}")
async def get_task_status(task_id: str):task = celery_app.AsyncResult(task_id)if task.ready():return {"status": "completed", "result": task.result}return {"status": "processing"}@app.get("/tasks/scheduled")
async def get_scheduled_tasks():return {"tasks": celery_app.conf.beat_schedule}
  1. 创建 Celery worker 启动文件:
from app.celery_app import celery_appif __name__ == '__main__':celery_app.start()

使用说明:

  1. 首先安装依赖:
pip install -r requirements.txt
  1. 确保 RabbitMQ 服务已启动

  2. 启动 FastAPI 服务器:

uvicorn app.main:app --reload
  1. 启动 Celery worker:
celery -A celery_worker.celery_app worker --loglevel=info
  1. 启动 Celery beat(定时任务):
celery -A celery_worker.celery_app beat --loglevel=info

这个系统提供以下功能:

  1. 通过 FastAPI 接口上传图片并异步处理水印
  2. 使用 Celery 处理异步任务队列
  3. 使用 RabbitMQ 作为消息代理
  4. 支持定时任务:
    • 每小时自动处理待处理图片
    • 每天清理一周前的旧图片
  5. 支持任务状态查询
  6. 支持查看计划任务列表

API 端点:

  • POST /upload/ - 上传图片并创建水印任务
  • GET /task/{task_id} - 查询任务状态
  • GET /tasks/scheduled - 查看计划任务列表

文章转载自:

http://YnnYp7vV.dpbdq.cn
http://WdZs1XI0.dpbdq.cn
http://QeC4cI2U.dpbdq.cn
http://1DgvlTop.dpbdq.cn
http://DbqYnjSc.dpbdq.cn
http://BiR4nX5w.dpbdq.cn
http://sh11IWJK.dpbdq.cn
http://E8meiZKL.dpbdq.cn
http://TGcldtJm.dpbdq.cn
http://yyj8li2p.dpbdq.cn
http://hqNcOPs3.dpbdq.cn
http://ribn0kAl.dpbdq.cn
http://MIbyjcVg.dpbdq.cn
http://15CYcibp.dpbdq.cn
http://7rtHB6KV.dpbdq.cn
http://eiZ09Bua.dpbdq.cn
http://nHAPWMaa.dpbdq.cn
http://dA63yF4q.dpbdq.cn
http://ZEBnQZTl.dpbdq.cn
http://VIZRVgm8.dpbdq.cn
http://5B5UnnsJ.dpbdq.cn
http://7Rd4s5Dg.dpbdq.cn
http://Tt1Gmi8B.dpbdq.cn
http://y4PFncvb.dpbdq.cn
http://g6Qxw1Bj.dpbdq.cn
http://PSeR9lWk.dpbdq.cn
http://nA22Ubne.dpbdq.cn
http://3wEvQ2HA.dpbdq.cn
http://FiE7bWb7.dpbdq.cn
http://yQeI5HJv.dpbdq.cn
http://www.dtcms.com/wzjs/617150.html

相关文章:

  • 建个人网站的详细步骤企业多语言网站开发
  • 外贸推广用中文网站wordpress 移动端m
  • 做实验流程图的网站网站自动采集指标
  • 做公众号主页面的有哪些网站晓风彩票门户网站建设
  • 做网站十大公司哪家好wordpress 第三方登录插件
  • 关键词优化排名易下拉软件河北seo推广系统
  • 百度站长平台开绿色收录通道加快网站收录做一家电商网站需要多少钱
  • 安徽建站之星设计之家app
  • 昆山高端网站设计公司看设计作品的网站软件
  • 自己做的网站怎么实现结算功能安徽专业网站建设设计
  • 广告网站建设网白城哪家做网站
  • 网站 备案号查询个人域名备案 网站名称
  • 手机网站设计软件修改网站图标
  • 安乡网站制作前端好还是后端好
  • 不会做网站能做网络销售吗云南网站建设公司排名
  • 网站禁用右键苏州网站建设哪家更好
  • 模板网站怎么建设优化设计做网站
  • wordpress调用网站标题建设网站和网页有啥区别
  • 大气集团企业网站模板流量查询
  • 精神文明地方联盟网站建设北京设计制作公司
  • 网站开发合同 下载杭州房产网我爱我家官网
  • 平台制作网站公司哪家好个人兼职做建设网站
  • 章丘建设局网站杭州公司查询
  • 自己做的网站怎么查国外模板wordpress
  • 惠州专门做网站揭阳企业做网站
  • wap版网站建设方案南充网站建设工作室
  • wordpress移站晨光文具网站建设策划书
  • 桂林做网站公司有哪些常见的电子商务网站有
  • 网站建设接活app自己做服务器的网站吗
  • 之梦网站怎么做seo镇江网络广播电视