网站建设推广案例网站seo外包靠谱吗
Python 实现定时查询数据库并发送消息的完整流程
简介:在许多实际应用场景中,我们需要定时从数据库中查询特定数据,并将这些数据发送到指定的接口进行后续处理。本文将详细介绍如何使用 Python 编写一个程序,实现定时查询 MySQL 数据库中的数据,生成签名并发送消息到指定接口,同时在消息发送成功后更新数据库中的状态字段。
一、 整体流程概述
(一)整个程序的主要流程如下:
1、根据接口要求生成签名和时间戳,用于后续发送消息时的身份验证。
2、定时查询 MySQL 数据库中的msg表的数据。
3、对于查询到的每一行数据,如果其sfts字段为否,则将该行数据组织成消息内容,并结合之前生成的签名和时间戳发送到指定接口。a
4、在消息发送成功后,更新数据库中该行数据的sfts字段为是。
5、每小时重置一次数据。
(二)数据库数据处理
- 设置定时调度事件或者使用触发器,将需要的数据根据需求写入到msg表中。
CREATE DEFINER=`root`@`%` PROCEDURE `insert_zero_rows_data`()
BEGIN-- 插入数据到 msg 表INSERT INTO msg SELECT -- 插入数据到 msg 表'否', --是否发送,默认为未发送NOW() -- 记录数据插入的当前时间FROM 来源表1 ggJOIN 来源表2 dr ON gg.bywm = dr.table_nameWHERE dr.TABLE_ROWS = 0AND gg.bywm NOT IN (SELECT bywm FROM msg);-- 避免重复插入数据
END
根据实际的业务需求,合理设置上述存储过程的调用周期或者触发条件。例如,我们可以通过以下代码调用该存储过程:
BEGIN
CALL insert_zero_rows_data();
END
(三)python代码实现
- 导入必要的库
import hashlib
import base64
import hmac
import time
import requests
import pymysql
import schedule
这里导入了多个库:
hashlib:用于哈希算法,如 SHA256。
base64:用于进行 Base64 编码和解码。
hmac:用于生成 HMAC(Hash - based Message Authentication Code)。
time:用于获取当前时间戳。
requests:用于发送 HTTP 请求。
pymysql:用于连接和操作 MySQL 数据库。
schedule:用于实现定时任务。
- 生成签名和时间戳
注:这里需要根据你的实际情况来进行修改。
def generate_signature():"""生成签名和时间戳:return: 签名和时间戳"""secret = "this is secret " # 预定义的密钥,需根据实际情况修改timestamp = int(round(time.time())) # 获取当前时间戳string_to_sign = f'{timestamp}@{secret}' # 组合时间戳和密钥生成待签名的字符串hmac_code = hmac.new(string_to_sign.encode("utf-8"), digestmod=hashlib.sha256).digest()# 使用 HMAC-SHA256 算法生成哈希值sign = base64.b64encode(hmac_code).decode('utf-8')# 对哈希值进行 Base64 编码得到签名return sign, timestamp
这个函数首先获取当前时间戳,然后将时间戳和一个预定义的密钥(secret)组合成一个字符串。接着使用 HMAC 算法,以这个字符串为输入,使用 SHA256 哈希算法生成一个哈希值,最后将这个哈希值进行 Base64 编码得到签名。函数返回生成的签名和时间戳。
- 查询数据库
def query_database():"""查询数据库中 msg 表的数据:return: 查询结果"""try:connection = pymysql.connect(host="host",port=port,user="user",password=r"password",database="database")with connection.cursor() as cursor:sql = "SELECT jkmc, bywm, sydw, sfts, datatime FROM msg"cursor.execute(sql)results = cursor.fetchall()return resultsexcept pymysql.Error as e:print(f"数据库查询出错: {e}")return []finally:if 'connection' in locals() and connection:connection.close()
此函数用于连接到指定的 MySQL 数据库,并执行 SQL 查询语句,从msg表中获取jkmc、bywm、sydw、sfts和datatime字段的数据。如果查询过程中出现错误,将打印错误信息并返回一个空列表。无论查询是否成功,最终都会关闭数据库连接。
- 发送消息到指定接口
def send_message(row, sign, timestamp):"""发送消息到指定接口:param row: 数据库查询结果的一行数据:param sign: 签名:param timestamp: 时间戳:return: 响应对象"""HOOK_TOKEN = "HOOK_TOKEN"url = "url"params = {"hook_token": HOOK_TOKEN}CONTENT = f"监测:\n名称 {row[0]}\n表英文名 {row[1]}\n其他 {row[2]}\n发现问题时间 {row[4]}"payload = {"timestamp": str(timestamp),"sign": sign,"msgType": "text","msgData": {"text": {"content": CONTENT}}}try:response = requests.post(url,params=params,json=payload,headers={'Content-Type': 'application/json'})print(f"Status Code: {response.status_code}")# 打印响应状态码print(f"Response: {response.text}")# 打印响应内容return responseexcept requests.RequestException as e:print(f"消息发送出错: {e}")# 打印发送错误信息return None
该函数负责将数据库查询结果的一行数据组织成消息内容,并发送到指定的接口。它首先定义了接口的 URL、请求参数(hook_token)以及消息内容(CONTENT)。然后构建请求负载(payload),其中包含时间戳、签名、消息类型和具体的消息数据。使用requests.post方法发送 POST 请求,并在请求成功或失败时打印相应的状态码和响应信息,最后返回响应对象。
- 更新数据库
def update_database(row):"""更新数据库中 msg 表的 sfts 字段:param row: 数据库查询结果的一行数据"""try:connection = pymysql.connect(host="host",port=port,user="user",password=r"password",database="database")with connection.cursor() as cursor:# 修改更新条件,避免更新多条记录sql = "UPDATE msg SET sfts = '是' WHERE bywm = %s AND datatime = %s"cursor.execute(sql, (row[1], row[4]))connection.commit() # 提交事务except pymysql.Error as e:print(f"数据库更新出错: {e}")connection.rollback()finally:if 'connection' in locals() and connection:connection.close()
此函数用于更新数据库中msg表的sfts字段。它连接到数据库,执行 SQL 更新语句,将符合条件(bywm和datatime匹配)的记录的sfts字段更新为是。如果更新过程中出现错误,将打印错误信息并回滚事务。无论更新是否成功,最终都会关闭数据库连接。
- 主函数
def main():sign, timestamp = generate_signature()results = query_database()for row in results:if row[3] == "否":send_message(row, sign, timestamp)update_database(row)
主函数main首先调用generate_signature函数生成签名和时间戳,然后调用query_database函数获取数据库查询结果。接着遍历查询结果,对于每一行数据,如果其sfts字段为否,则调用send_message函数发送消息,并在消息发送成功后调用update_database函数更新数据库.
- 定时任务设置
注:定时任务可以在代码中部署,也可以在服务器上进行部署(见往期:服务器上任务的定时调度)
本次采用python 定时调度
def job():main()if __name__ == "__main__":# 每五分钟运行一次 job 函数schedule.every(5).minutes.do(job)while True:schedule.run_pending()time.sleep(1)job()
这里定义了一个job函数,其内部调用main函数。在if name == "main"代码块中,使用schedule库设置了一个定时任务,每五分钟运行一次job函数。通过一个无限循环,不断检查是否有定时任务需要执行,并在每次循环中等待 1 秒,同时也额外调用一次job函数(这部分额外调用可能是代码编写时的一个小失误,正常情况下按照schedule的机制,每五分钟执行一次即可,这部分额外调用可能会导致某些逻辑异常,建议根据实际需求进行调整)。
二、注意事项
- 数据库连接配置:确保数据库的主机地址、端口、用户名、密码和数据库名称等配置信息正确无误,否则将无法成功连接数据库。
- 签名和时间戳的有效性:生成的签名和时间戳与接口的验证机制紧密相关,确保接口方的验证逻辑与本地生成逻辑一致,并且时间戳的有效期在接口允许的范围内。
- 错误处理:在实际应用中,应根据具体需求对错误处理进行进一步优化。例如,对于数据库查询失败或消息发送失败的情况,可以记录详细的日志信息,以便后续排查问题。
- 定时任务的稳定性:schedule库在某些复杂环境下可能会出现一些稳定性问题,建议在生产环境中进行充分的测试和监控,确保定时任务能够稳定运行。
通过以上步骤,我们成功实现了一个定时查询数据库并发送消息的 Python 程序,希望这篇文章对你有所帮助。如果你在实际应用中遇到问题,欢迎在评论区留言讨论。
以上代码仅供参考,实际应用中请根据具体需求进行调整和优化。
三、完整代码
import hashlib
import base64
import hmac
import time
import requests
import pymysql
import scheduledef generate_signature():"""生成签名和时间戳:return: 签名和时间戳"""secret = ""timestamp = int(round(time.time()))string_to_sign = f'{timestamp}@{secret}'hmac_code = hmac.new(string_to_sign.encode("utf-8"), digestmod=hashlib.sha256).digest()sign = base64.b64encode(hmac_code).decode('utf-8')return sign, timestampdef query_database():"""查询数据库中 msg 表的数据:return: 查询结果"""try:connection = pymysql.connect(host="",port=,user="",password=r"",database="")with connection.cursor() as cursor:sql = "SELECT jkmc, bywm, sydw, sfts, datatime FROM msg"cursor.execute(sql)results = cursor.fetchall()return resultsexcept pymysql.Error as e:print(f"数据库查询出错: {e}")return []finally:if 'connection' in locals() and connection:connection.close()def send_message(row, sign, timestamp):"""发送消息到指定接口:param row: 数据库查询结果的一行数据:param sign: 签名:param timestamp: 时间戳:return: 响应对象"""HOOK_TOKEN = ""url = ""params = {"hook_token": HOOK_TOKEN}CONTENT = f"监测:\n接口名称 {row[0]}\n表英文名 {row[1]}\n其他 {row[2]}\n发现问题时间 {row[4]}"payload = {"timestamp": str(timestamp),"sign": sign,"msgType": "text","msgData": {"text": {"content": CONTENT}}}try:response = requests.post(url,params=params,json=payload,headers={'Content-Type': 'application/json'})print(f"Status Code: {response.status_code}")print(f"Response: {response.text}")return responseexcept requests.RequestException as e:print(f"消息发送出错: {e}")return Nonedef update_database(row):"""更新数据库中 msg 表的 sfts 字段:param row: 数据库查询结果的一行数据"""try:connection = pymysql.connect(host="",port=,user="",password=r"",database="")with connection.cursor() as cursor:# 修改更新条件,避免更新多条记录sql = "UPDATE msg SET sfts = '是' WHERE bywm = %s AND datatime = %s"cursor.execute(sql, (row[1], row[4]))connection.commit()except pymysql.Error as e:print(f"数据库更新出错: {e}")connection.rollback()finally:if 'connection' in locals() and connection:connection.close()def main():sign, timestamp = generate_signature()results = query_database()for row in results:if row[3] == "否":send_message(row, sign, timestamp)update_database(row)def job():main()if __name__ == "__main__":# 每五分钟运行一次 job 函数schedule.every(5).minutes.do(job)while True:schedule.run_pending()time.sleep(1)job()