当前位置: 首页 > news >正文

基于 Flask + APScheduler + MySQL 的自动报表系统设计

背景

在日常运维与自动化任务中,定时生成并发送报表是非常常见的场景。比如每天早上汇总前一日任务执行情况、日志统计、业务指标、错误趋势等内容,再通过邮件推送给相关负责人。
本文将介绍一个基于 Flask、Flask-SQLAlchemy、APScheduler、MySQL 组成的轻量级自动报表系统的核心设计原理。

一、系统目标与整体思路

系统的目标非常明确:

通过一个长期运行的 API 服务,接收外部任务上报的数据,并在每天固定时间自动生成邮件报表。

这意味着系统需要同时具备:

  • 持续运行的 HTTP 服务(用于数据接收)
  • 定时任务调度能力(用于报表生成与发送)
  • 可靠的数据存储与查询机制

从架构上看,可以分为三个核心模块:

  1. API 服务器(数据入口)
  2. 数据库层(数据持久化)
  3. 定时调度器(报表生成与发送)

工作机制

  • 外部任务在完成执行后,会构造一个结构化的 JSON 负载,包含任务名称、执行状态(成功/失败)、耗时、日志摘要等关键信息。
  • 通过 HTTP POST 请求将数据发送至预设的 API 端点(例如 /api/report)。
  • API 服务接收到请求后,进行以下处理:
    • 验证请求来源(可选使用 Token 或 IP 白名单);
    • 解析并校验 JSON 数据格式;
    • 将有效数据映射为 ORM 模型对象(如 TaskReport);
    • 利用 Flask-SQLAlchemy 将对象写入 MySQL 数据库,实现持久化存储。

这一设计实现了解耦合:外部任务只需关心自身逻辑和上报动作,无需了解后续的数据处理流程。同时,API 层提供了标准化接口,便于未来扩展更多类型的任务接入。
在这里插入图片描述

系统架构设计

项目采用模块化结构,以 清晰的职责划分 和 易扩展性 为设计核心:

project/
├── app.py                # 主应用入口,初始化 Flask 与调度器
├── config.py             # 配置文件(数据库、调度、邮件等)
├── models.py             # 数据库模型定义
├── services/
│   └── reporting.py      # 报表生成与发送逻辑
└── requirements.txt

.env 环境变量

# Flask environment
FLASK_ENV=development
PORT=5000# Security
SECRET_KEY=heiankey
API_KEY=123456# Database (MySQL)
DB_USER=task_reporter
DB_PASSWORD=zk33kzd3GFEFTfbE
DB_HOST=43.xx4.xxx.xx
DB_PORT=3306
DB_NAME=task_reporter# Optionally, override the full SQLAlchemy URI
# DATABASE_URL=mysql+pymysql://user:pass@host:3306/dbname?charset=utf8mb4# MailMAIL_SERVER=smtp.qq.comMAIL_PORT=465MAIL_USE_TLS=falseMAIL_USE_SSL=trueMAIL_USERNAME=xxxx@qq.comMAIL_PASSWORD=tdgekwgjsxxxxMAIL_SENDER=xxxx@qq.comMAIL_RECIPIENTS=xxxx@qq.com

conifg.py 配置文件

import os
from urllib.parse import quote_plusfrom dotenv import load_dotenvload_dotenv()class Config:"""Application configuration loaded from environment variables."""SECRET_KEY = os.getenv("SECRET_KEY", "dev-secret-key")API_KEY = os.getenv("API_KEY", "change-me")DB_USER = os.getenv("DB_USER", "root")DB_PASSWORD = os.getenv("DB_PASSWORD", "")DB_HOST = os.getenv("DB_HOST", "127.0.0.1")DB_PORT = os.getenv("DB_PORT", "3306")DB_NAME = os.getenv("DB_NAME", "task_reporter")DEFAULT_MYSQL_URI = (f"mysql+pymysql://{DB_USER}:{quote_plus(DB_PASSWORD)}@{DB_HOST}:{DB_PORT}/{DB_NAME}?charset=utf8mb4")SQLALCHEMY_DATABASE_URI = os.getenv("DATABASE_URL", DEFAULT_MYSQL_URI)SQLALCHEMY_TRACK_MODIFICATIONS = False# Mail settingsMAIL_SERVER = os.getenv("MAIL_SERVER", "")MAIL_PORT = int(os.getenv("MAIL_PORT", "0") or 0)MAIL_USE_TLS = os.getenv("MAIL_USE_TLS", "false").lower() in {"1", "true", "yes"}MAIL_USE_SSL = os.getenv("MAIL_USE_SSL", "false").lower() in {"1", "true", "yes"}MAIL_USERNAME = os.getenv("MAIL_USERNAME", "") or NoneMAIL_PASSWORD = os.getenv("MAIL_PASSWORD", "") or NoneMAIL_SENDER = os.getenv("MAIL_SENDER", "") or NoneMAIL_RECIPIENTS = [email.strip() for email in os.getenv("MAIL_RECIPIENTS", "").split(",") if email.strip()]if __name__ == "__main__":print(os.getenv("PORT", "task_reporter"))

models.py 数据库模块

from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import func# SQLAlchemy instance to be initialized by the application factory
db = SQLAlchemy()class TaskReport(db.Model):"""Task execution report entity stored in the database."""__tablename__ = "task_reports"id = db.Column(db.Integer, primary_key=True)task_id = db.Column(db.String(128), nullable=False, index=True)task_name = db.Column(db.String(255), nullable=False)status = db.Column(db.String(32), nullable=False)message = db.Column(db.String(1024), nullable=True)details = db.Column(db.Text, nullable=True)execution_time = db.Column(db.Float, nullable=True)created_at = db.Column(db.DateTime,nullable=False,server_default=func.now(),index=True,)def __repr__(self) -> str:return f"<TaskReport id={self.id} task_id={self.task_id} status={self.status}>"

reporting.py 发送处理模块


"""
任务执行日报生成与邮件发送模块功能:
1. 从数据库中获取指定时间范围内的任务执行记录。
2. 生成 HTML 格式的日报(成功任务 & 失败任务)。
3. 通过 SMTP 将日报发送到指定邮箱。
"""from __future__ import annotationsimport datetime as dt
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import Iterable, List, Tuplefrom flask import current_app
from app import create_app
from models import TaskReport, dbdef _get_previous_day_range(now: dt.datetime | None = None) -> Tuple[dt.datetime, dt.datetime]:"""获取前一天的时间范围(本地时间)。起始时间为 00:00:00,结束时间为 23:59:59.999999。"""now = now or dt.datetime.now()# prev_day = now.date() - dt.timedelta(days=1)prev_day= dt.date.fromisoformat("2025-08-14")start = dt.datetime.combine(prev_day, dt.time.min)end = dt.datetime.combine(prev_day, dt.time.max)return start, enddef generate_daily_report(start_time: dt.datetime | None = None,end_time: dt.datetime | None = None,
) -> str:"""生成任务执行日报的 HTML 内容。:param start_time: 起始时间(默认前一天 00:00:00):param end_time:   结束时间(默认前一天 23:59:59.999999):return: HTML 报告字符串"""if not start_time or not end_time:start_time, end_time = _get_previous_day_range()# 查询时间范围内的任务执行记录reports: List[TaskReport] = (TaskReport.query.filter(TaskReport.created_at >= start_time).filter(TaskReport.created_at <= end_time).order_by(TaskReport.created_at.asc()).all())# 分类任务记录success_items = [r for r in reports if (r.status or "").lower() == "success"]failure_items = [r for r in reports if (r.status or "").lower() != "success"]return _render_html(success_items, failure_items, start_time, end_time)def _render_html(success_items: Iterable[TaskReport],failure_items: Iterable[TaskReport],start_time: dt.datetime,end_time: dt.datetime,
) -> str:"""渲染 HTML 报表,使用 Bootstrap 美化样式。"""def _row(r: TaskReport) -> str:return (f"<tr>"f"<td>{r.id}</td>"f"<td>{r.task_id}</td>"f"<td>{r.task_name}</td>"f"<td>{r.status}</td>"f"<td>{r.execution_time or ''}</td>"f"<td>{(r.message or '').replace('<', '&lt;').replace('>', '&gt;')}</td>"f"<td>{r.created_at or ''}</td>"f"</tr>")success_rows = "".join(_row(r) for r in success_items) or ("<tr><td colspan=7 class='text-center text-muted'>无成功记录</td></tr>")failure_rows = "".join(_row(r) for r in failure_items) or ("<tr><td colspan=7 class='text-center text-muted'>无失败记录</td></tr>")start_str = start_time.strftime("%Y-%m-%d %H:%M:%S")end_str = end_time.strftime("%Y-%m-%d %H:%M:%S")return f"""<!doctype html><html><head><meta charset="utf-8" /><title>任务执行日报</title><!-- 引入 Bootstrap 5 CDN --><link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet"></head><body class="p-4"><div class="container"><h1 class="mb-3">任务执行日报</h1><p class="text-muted">时间范围:{start_str} - {end_str}</p><h3 class="mt-4">✅ 成功任务</h3><table class="table table-bordered table-hover table-sm align-middle"><thead class="table-success"><tr><th>ID</th><th>Task ID</th><th>任务名称</th><th>状态</th><th>耗时(s)</th><th>信息</th><th>创建时间</th></tr></thead><tbody>{success_rows}</tbody></table><h3 class="mt-4">❌ 失败 / 其他状态任务</h3><table class="table table-bordered table-hover table-sm align-middle"><thead class="table-danger"><tr><th>ID</th><th>Task ID</th><th>任务名称</th><th>状态</th><th>耗时(s)</th><th>信息</th><th>创建时间</th></tr></thead><tbody>{failure_rows}</tbody></table></div></body></html>"""def send_email(html_content: str,subject: str | None = None,recipients: Iterable[str] | None = None
) -> None:"""发送 HTML 邮件。:param html_content: 邮件 HTML 内容:param subject: 邮件主题:param recipients: 收件人列表"""app = current_appconfig = app.configserver = config.get("MAIL_SERVER", "")port = int(config.get("MAIL_PORT", 0) or 0)use_tls = bool(config.get("MAIL_USE_TLS", False))use_ssl = bool(config.get("MAIL_USE_SSL", False))username = config.get("MAIL_USERNAME")password = config.get("MAIL_PASSWORD")sender = config.get("MAIL_SENDER", username)recips = list(recipients) if recipients else list(config.get("MAIL_RECIPIENTS", []) or [])if not server or not port:raise RuntimeError("MAIL_SERVER 和 MAIL_PORT 必须配置")if not sender:raise RuntimeError("MAIL_SENDER 必须配置(或 MAIL_USERNAME)")if not recips:raise RuntimeError("收件人为空,请配置 MAIL_RECIPIENTS 或传入 recipients")subject = subject or "任务执行日报"msg = MIMEMultipart("alternative")msg["Subject"] = subjectmsg["From"] = sendermsg["To"] = ", ".join(recips)msg.attach(MIMEText(html_content, "html", "utf-8"))if use_ssl:smtp = smtplib.SMTP_SSL(server, port)else:smtp = smtplib.SMTP(server, port)if use_tls:smtp.starttls()try:if username and password:smtp.login(username, password)smtp.sendmail(sender, recips, msg.as_string())finally:smtp.quit()def generate_and_send_report(start_time: dt.datetime | None = None,end_time: dt.datetime | None = None,subject: str | None = None,recipients: Iterable[str] | None = None,
) -> None:"""生成日报并发送邮件的便捷方法。"""html = generate_daily_report(start_time=start_time, end_time=end_time)print(html)send_email(html, subject=subject, recipients=recipients)if __name__ == "__main__":with create_app().app_context():generate_and_send_report()

app.py 启动入口

from functools import wrapsfrom apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask, requestfrom conifg import Config
from models import db, TaskReportdef create_app():app = Flask(__name__)app.config.from_object(Config)# Initialize SQLAlchemydb.init_app(app)# 创建一个后台调度器scheduler = BackgroundScheduler(timezone="Asia/Shanghai")# 添加一个每隔20秒执行一次的定时任务# scheduler.add_job(func=send_alert, trigger="interval", seconds=20)# 添加一个每天早上9点执行的定时任务# scheduler.add_job(func=send_alert, trigger=CronTrigger(hour=9, minute=0))# 启动调度器# scheduler.start()return appapp = create_app()@app.route("/health")
def health() -> tuple[dict, int]:return {"status": "ok"}, 200def require_api_key(func):@wraps(func)def wrapper(*args, **kwargs):provided = request.headers.get("X-API-KEY")if not provided or provided != app.config.get("API_KEY"):return {"error": "Unauthorized failed, please check API_KEY"}, 401return func(*args, **kwargs)return wrapper@app.route("/api/v1/task-report", methods=["POST"])
@require_api_key
def create_task_report() -> tuple[dict, int]:data = request.get_json(silent=True)if not isinstance(data, dict):return {"error": "Invalid JSON body"}, 400required_fields = ["task_id", "task_name", "status"]missing_fields = [f for f in required_fields if f not in data]if missing_fields:return {"error": f"Missing fields: {', '.join(missing_fields)}"}, 400execution_time_value = data.get("execution_time")try:execution_time_parsed = (float(execution_time_value) if execution_time_value is not None else None)except (TypeError, ValueError):return {"error": "execution_time must be a number"}, 400report = TaskReport(task_id=str(data["task_id"]),task_name=str(data["task_name"]),status=str(data["status"]),message=data.get("message"),details=data.get("details"),execution_time=execution_time_parsed,)try:db.session.add(report)db.session.commit()return {"success": True, "id": report.id}, 201except Exception as e:print(e)db.session.rollback()return {"error": "Database error"}, 500if __name__ == "__main__":app.run(debug=True)

requirements.txt 版本依赖

blinker==1.9.0
click==8.2.1
colorama==0.4.6
Flask==3.1.1
itsdangerous==2.2.0
Jinja2==3.1.6
MarkupSafe==3.0.2
python-frontmatter==1.1.0
PyYAML==6.0.2
Werkzeug==3.1.3

在这里插入图片描述
Post请求这个接口
http://127.0.0.1:5000/api/v1/task-report

{"task_id": "db_backup_20230815_023000","task_name": "1夜间数据库全量备份","status": "completed","message": "数据库备份成功完成","details": "{\"database\": \"production_primary\", \"backup_type\": \"full\", \"size\": \"24.7GB\", \"tables\": 42, \"location\": \"/backups/db/2023-08-15_0200.sql.gz\", \"checksum\": \"sha256:9a8b7c6d5e4f3g2h1i0j9k8l7m6n5o4p3q2r1s0\"}","execution_time": 186.4
}

在这里插入图片描述

调度器设计原理(APScheduler)

Flask 本身是一个 Web 框架,并不具备定时调度能力。
因此我们引入 APScheduler(Advanced Python Scheduler),它能在 Flask 应用内部嵌入一个轻量级调度器,用于周期性任务执行。

1. 调度策略

在应用启动时,APScheduler 会被初始化并注册一个定时任务(CronJob):

每天早上 9:00 自动触发报表生成与邮件发送。

通过 Cron 表达式,可以灵活地配置执行周期(如每天、每周、每小时),并可扩展多个任务。

2. 调度与 Flask 的集成

APScheduler 支持与 Flask 的上下文无缝结合,调度函数在执行时可以直接访问 Flask 应用的配置、数据库连接等资源,从而避免了多进程通信的复杂性。

在这里插入图片描述

报表生成与发送原理

1. 数据读取

定时任务触发后,系统会自动执行报表逻辑模块 services/reporting.py 中的核心函数。

该函数通过 SQLAlchemy 查询数据库中**上一周期(如前一天)**的数据,按任务、项目或业务维度进行聚合统计。

2. 报表构建

系统会将统计结果渲染为 HTML 格式报表。
使用 HTML 而非纯文本的好处是显而易见的:

  • 可视化更强(表格、颜色标识、状态图标);
  • 邮件阅读体验更好;
  • 便于直接转发与展示。

3. 邮件发送

报表生成后,通过 Python 内置的 smtplibemail.mime 模块构造 MIME 邮件:

  • 设置主题(如 “每日任务执行报告”)
  • 填充 HTML 内容
  • 指定收件人列表
  • 通过企业邮箱 SMTP 服务器发送

发送过程带有重试机制与异常捕获,保证任务的稳定性。

在这里插入图片描述

http://www.dtcms.com/a/550396.html

相关文章:

  • 建筑设计自学网站怎么开发一个自己的网站
  • go做网站网站建设优化东莞
  • AI智能座舱是什么?
  • 传奇手游网站大全9377网站建设哪便宜
  • 2023/12 JLPT听力原文 问题四
  • 域名备案时网站名称全国信息企业公示系统
  • Tokio的多线程调度器架构:深度解析与实践
  • Ubuntu(①shell脚本)
  • 个人婚礼网站模板网站建设 丽水
  • 服装定制网站模板茂名建站模板搭建
  • VB.NET 与 C# 文件操作文本到二进制文件的读写
  • ROS2系列 (12) : 自定义msg通信接口
  • 建设科技网络网站的意义和目的国产长尾关键词拘挖掘
  • 个人网站备注wordpress 模板 旅游
  • 嘉定南翔网站建设南阳平面设计培训学校
  • HTML做网站的书籍临沂做网站哪家好
  • 购物网站开发含代码织梦网站安装教程视频
  • k8s 实战入门
  • 网站建设与管理报告书先域名 还是先做网站
  • SQL -- GROUP BY 基本语法
  • 简易 建站做外贸自己建网站
  • 带数据库的网站怎么建收到短信说备案被退回但工信部网站上正常啊
  • 莆田专业建站公司手机怎么建立网站
  • 成都的网站建设开发公司如何看网站是谁做的
  • 彩票网站建设哪家公司好贾汪徐州网站开发
  • PyTorch实战:CV模型搭建全指南
  • Spring Boot 常见性能与配置优化
  • 网站广告文案商务网站的建设阶段包括
  • 老显卡老cpu用vllm推理大模型失败Intel(R) Xeon(R) CPU E5-2643 v2
  • wordpress有中文主题吗qq的seo综合查询