当前位置: 首页 > news >正文

Python 实现定时查询数据库并发送消息的完整流程

Python 实现定时查询数据库并发送消息的完整流程

简介:在许多实际应用场景中,我们需要定时从数据库中查询特定数据,并将这些数据发送到指定的接口进行后续处理。本文将详细介绍如何使用 Python 编写一个程序,实现定时查询 MySQL 数据库中的数据,生成签名并发送消息到指定接口,同时在消息发送成功后更新数据库中的状态字段。

一、 整体流程概述

(一)整个程序的主要流程如下:

1、根据接口要求生成签名和时间戳,用于后续发送消息时的身份验证。
2、定时查询 MySQL 数据库中的msg表的数据。
3、对于查询到的每一行数据,如果其sfts字段为否,则将该行数据组织成消息内容,并结合之前生成的签名和时间戳发送到指定接口。a
4、在消息发送成功后,更新数据库中该行数据的sfts字段为是。
5、每小时重置一次数据。

(二)数据库数据处理

  1. 设置定时调度事件或者使用触发器,将需要的数据根据需求写入到msg表中。
CREATE DEFINER=`root`@`%` PROCEDURE `insert_zero_rows_data`()
BEGIN
    -- 插入数据到 msg 表
    INSERT INTO msg 
    SELECT 
        -- 插入数据到 msg 表
        '否', --是否发送,默认为未发送
        NOW() -- 记录数据插入的当前时间
    FROM 
       来源表1 gg
    JOIN 
        来源表2 dr ON gg.bywm = dr.table_name
    WHERE 
        dr.TABLE_ROWS = 0
        AND gg.bywm NOT IN (SELECT bywm FROM msg);-- 避免重复插入数据
END

根据实际的业务需求,合理设置上述存储过程的调用周期或者触发条件。例如,我们可以通过以下代码调用该存储过程:
BEGIN
CALL insert_zero_rows_data();
END

(三)python代码实现

  1. 导入必要的库
import hashlib
import base64
import hmac
import time
import requests
import pymysql
import schedule

这里导入了多个库:
hashlib:用于哈希算法,如 SHA256。
base64:用于进行 Base64 编码和解码。
hmac:用于生成 HMAC(Hash - based Message Authentication Code)。
time:用于获取当前时间戳。
requests:用于发送 HTTP 请求。
pymysql:用于连接和操作 MySQL 数据库。
schedule:用于实现定时任务。

  1. 生成签名和时间戳
    注:这里需要根据你的实际情况来进行修改。
def generate_signature():
    """
    生成签名和时间戳
    :return: 签名和时间戳
    """
    secret = "this is secret " # 预定义的密钥,需根据实际情况修改
    timestamp = int(round(time.time())) # 获取当前时间戳
    string_to_sign = f'{timestamp}@{secret}' # 组合时间戳和密钥生成待签名的字符串
    hmac_code = hmac.new(string_to_sign.encode("utf-8"), digestmod=hashlib.sha256).digest()# 使用 HMAC-SHA256 算法生成哈希值
    sign = base64.b64encode(hmac_code).decode('utf-8')# 对哈希值进行 Base64 编码得到签名
    return sign, timestamp

这个函数首先获取当前时间戳,然后将时间戳和一个预定义的密钥(secret)组合成一个字符串。接着使用 HMAC 算法,以这个字符串为输入,使用 SHA256 哈希算法生成一个哈希值,最后将这个哈希值进行 Base64 编码得到签名。函数返回生成的签名和时间戳。

  1. 查询数据库
def query_database():
    """
    查询数据库中 msg 表的数据
    :return: 查询结果
    """
    try:
        connection = pymysql.connect(
            host="host",
            port=port,
            user="user",
            password=r"password",
            database="database"
        )
        with connection.cursor() as cursor:
            sql = "SELECT jkmc, bywm, sydw, sfts, datatime FROM msg"
            cursor.execute(sql)
            results = cursor.fetchall()
        return results
    except pymysql.Error as e:
        print(f"数据库查询出错: {e}")
        return []
    finally:
        if 'connection' in locals() and connection:
            connection.close()

此函数用于连接到指定的 MySQL 数据库,并执行 SQL 查询语句,从msg表中获取jkmc、bywm、sydw、sfts和datatime字段的数据。如果查询过程中出现错误,将打印错误信息并返回一个空列表。无论查询是否成功,最终都会关闭数据库连接。

  1. 发送消息到指定接口
def send_message(row, sign, timestamp):
    """
    发送消息到指定接口
    :param row: 数据库查询结果的一行数据
    :param sign: 签名
    :param timestamp: 时间戳
    :return: 响应对象
    """
    HOOK_TOKEN = "HOOK_TOKEN"
    url = "url"
    params = {
        "hook_token": HOOK_TOKEN
    }
    CONTENT = f"监测:\n名称 {row[0]}\n表英文名 {row[1]}\n其他 {row[2]}\n发现问题时间 {row[4]}"
    payload = {
        "timestamp": str(timestamp),
        "sign": sign,
        "msgType": "text",
        "msgData": {
            "text": {
                "content": CONTENT
            }
        }
    }
    try:
        response = requests.post(
            url,
            params=params,
            json=payload,
            headers={'Content-Type': 'application/json'}
        )
        print(f"Status Code: {response.status_code}")# 打印响应状态码
        print(f"Response: {response.text}")# 打印响应内容
        return response
    except requests.RequestException as e:
        print(f"消息发送出错: {e}")# 打印发送错误信息
        return None

该函数负责将数据库查询结果的一行数据组织成消息内容,并发送到指定的接口。它首先定义了接口的 URL、请求参数(hook_token)以及消息内容(CONTENT)。然后构建请求负载(payload),其中包含时间戳、签名、消息类型和具体的消息数据。使用requests.post方法发送 POST 请求,并在请求成功或失败时打印相应的状态码和响应信息,最后返回响应对象。

  1. 更新数据库
def update_database(row):
    """
    更新数据库中 msg 表的 sfts 字段
    :param row: 数据库查询结果的一行数据
    """
    try:
        connection = pymysql.connect(
            host="host",
            port=port,
            user="user",
            password=r"password",
            database="database"
        )
        with connection.cursor() as cursor:
            # 修改更新条件,避免更新多条记录
            sql = "UPDATE msg SET sfts = '是' WHERE bywm = %s AND datatime = %s"
            cursor.execute(sql, (row[1], row[4]))
            connection.commit() # 提交事务
    except pymysql.Error as e:
        print(f"数据库更新出错: {e}")
        connection.rollback()
    finally:
        if 'connection' in locals() and connection:
            connection.close()

此函数用于更新数据库中msg表的sfts字段。它连接到数据库,执行 SQL 更新语句,将符合条件(bywm和datatime匹配)的记录的sfts字段更新为是。如果更新过程中出现错误,将打印错误信息并回滚事务。无论更新是否成功,最终都会关闭数据库连接。

  1. 主函数
def main():
    sign, timestamp = generate_signature()
    results = query_database()
    for row in results:
        if row[3] == "否":
            send_message(row, sign, timestamp)
            update_database(row)

主函数main首先调用generate_signature函数生成签名和时间戳,然后调用query_database函数获取数据库查询结果。接着遍历查询结果,对于每一行数据,如果其sfts字段为否,则调用send_message函数发送消息,并在消息发送成功后调用update_database函数更新数据库.

  1. 定时任务设置
    注:定时任务可以在代码中部署,也可以在服务器上进行部署(见往期:服务器上任务的定时调度)

本次采用python 定时调度

def job():
    main()


if __name__ == "__main__":
    # 每五分钟运行一次 job 函数
    schedule.every(5).minutes.do(job)
    while True:
        schedule.run_pending()
        time.sleep(1)
        job()

这里定义了一个job函数,其内部调用main函数。在if name == "main"代码块中,使用schedule库设置了一个定时任务,每五分钟运行一次job函数。通过一个无限循环,不断检查是否有定时任务需要执行,并在每次循环中等待 1 秒,同时也额外调用一次job函数(这部分额外调用可能是代码编写时的一个小失误,正常情况下按照schedule的机制,每五分钟执行一次即可,这部分额外调用可能会导致某些逻辑异常,建议根据实际需求进行调整)。

二、注意事项

  1. 数据库连接配置:确保数据库的主机地址、端口、用户名、密码和数据库名称等配置信息正确无误,否则将无法成功连接数据库。
  2. 签名和时间戳的有效性:生成的签名和时间戳与接口的验证机制紧密相关,确保接口方的验证逻辑与本地生成逻辑一致,并且时间戳的有效期在接口允许的范围内。
  3. 错误处理:在实际应用中,应根据具体需求对错误处理进行进一步优化。例如,对于数据库查询失败或消息发送失败的情况,可以记录详细的日志信息,以便后续排查问题。
  4. 定时任务的稳定性:schedule库在某些复杂环境下可能会出现一些稳定性问题,建议在生产环境中进行充分的测试和监控,确保定时任务能够稳定运行。

通过以上步骤,我们成功实现了一个定时查询数据库并发送消息的 Python 程序,希望这篇文章对你有所帮助。如果你在实际应用中遇到问题,欢迎在评论区留言讨论。
以上代码仅供参考,实际应用中请根据具体需求进行调整和优化。

三、完整代码

import hashlib
import base64
import hmac
import time
import requests
import pymysql
import schedule


def generate_signature():
    """
    生成签名和时间戳
    :return: 签名和时间戳
    """
    secret = ""
    timestamp = int(round(time.time()))
    string_to_sign = f'{timestamp}@{secret}'
    hmac_code = hmac.new(string_to_sign.encode("utf-8"), digestmod=hashlib.sha256).digest()
    sign = base64.b64encode(hmac_code).decode('utf-8')
    return sign, timestamp


def query_database():
    """
    查询数据库中 msg 表的数据
    :return: 查询结果
    """
    try:
        connection = pymysql.connect(
            host="",
            port=,
            user="",
            password=r"",
            database=""
        )
        with connection.cursor() as cursor:
            sql = "SELECT jkmc, bywm, sydw, sfts, datatime FROM msg"
            cursor.execute(sql)
            results = cursor.fetchall()
        return results
    except pymysql.Error as e:
        print(f"数据库查询出错: {e}")
        return []
    finally:
        if 'connection' in locals() and connection:
            connection.close()


def send_message(row, sign, timestamp):
    """
    发送消息到指定接口
    :param row: 数据库查询结果的一行数据
    :param sign: 签名
    :param timestamp: 时间戳
    :return: 响应对象
    """
    HOOK_TOKEN = ""
    url = ""
    params = {
        "hook_token": HOOK_TOKEN
    }
    CONTENT = f"监测:\n接口名称 {row[0]}\n表英文名 {row[1]}\n其他 {row[2]}\n发现问题时间 {row[4]}"
    payload = {
        "timestamp": str(timestamp),
        "sign": sign,
        "msgType": "text",
        "msgData": {
            "text": {
                "content": CONTENT
            }
        }
    }
    try:
        response = requests.post(
            url,
            params=params,
            json=payload,
            headers={'Content-Type': 'application/json'}
        )
        print(f"Status Code: {response.status_code}")
        print(f"Response: {response.text}")
        return response
    except requests.RequestException as e:
        print(f"消息发送出错: {e}")
        return None


def update_database(row):
    """
    更新数据库中 msg 表的 sfts 字段
    :param row: 数据库查询结果的一行数据
    """
    try:
        connection = pymysql.connect(
            host="",
            port=,
            user="",
            password=r"",
            database=""
        )
        with connection.cursor() as cursor:
            # 修改更新条件,避免更新多条记录
            sql = "UPDATE msg SET sfts = '是' WHERE bywm = %s AND datatime = %s"
            cursor.execute(sql, (row[1], row[4]))
            connection.commit()
    except pymysql.Error as e:
        print(f"数据库更新出错: {e}")
        connection.rollback()
    finally:
        if 'connection' in locals() and connection:
            connection.close()


def main():
    sign, timestamp = generate_signature()
    results = query_database()
    for row in results:
        if row[3] == "否":
            send_message(row, sign, timestamp)
            update_database(row)


def job():
    main()


if __name__ == "__main__":
    # 每五分钟运行一次 job 函数
    schedule.every(5).minutes.do(job)
    while True:
        schedule.run_pending()
        time.sleep(1)
        job()

相关文章:

  • Eureka Server 数据同步原理深度解析
  • Go红队开发—编解码工具
  • 2025年02月26日Github流行趋势
  • C++之vector
  • 如何在工控机上实现机器视觉检测?
  • Vue05
  • 计算机毕业设计SpringBoot+Vue.js英语知识应用网站(源码+文档+PPT+讲解)
  • 如何下载MinGW-w64到MATLAB
  • 解决Docker Desktop启动后Docker Engine stopped问题
  • 进入DeepSeek部署第一阵营后,奇墨科技推进多元应用场景落地
  • 小红的回文子串
  • CSS 实现波浪效果
  • Ubuntu 下 nginx-1.24.0 源码分析 - ngx_modules
  • 前端Npm面试题及参考答案
  • 深度剖析数据分析职业成长阶梯
  • Ubuntu20.04下各类常用软件及库安装汇总
  • 解锁浏览器内置API,助力跨标签/跨页面数据通信
  • 详解:事务注解 @Transactional
  • 【后端开发面试题】每日 3 题(四)
  • 【Python LeetCode 专题】面试经典 150 题
  • 淮安市做网站的公司/免费发广告的平台
  • 今天合肥疫情最新情况/站内优化主要从哪些方面进行
  • wordpress cn/seo就业哪家好
  • 推广网站的方法有/游戏推广员一个月能赚多少
  • 外贸网站建设销售常用语/seo关键词排名如何
  • 北京建设工程招标信息网/北京优化互联网公司