Python 实现定时查询数据库并发送消息的完整流程
Python 实现定时查询数据库并发送消息的完整流程
简介:在许多实际应用场景中,我们需要定时从数据库中查询特定数据,并将这些数据发送到指定的接口进行后续处理。本文将详细介绍如何使用 Python 编写一个程序,实现定时查询 MySQL 数据库中的数据,生成签名并发送消息到指定接口,同时在消息发送成功后更新数据库中的状态字段。
一、 整体流程概述
(一)整个程序的主要流程如下:
1、根据接口要求生成签名和时间戳,用于后续发送消息时的身份验证。
2、定时查询 MySQL 数据库中的msg表的数据。
3、对于查询到的每一行数据,如果其sfts字段为否,则将该行数据组织成消息内容,并结合之前生成的签名和时间戳发送到指定接口。a
4、在消息发送成功后,更新数据库中该行数据的sfts字段为是。
5、每小时重置一次数据。
(二)数据库数据处理
- 设置定时调度事件或者使用触发器,将需要的数据根据需求写入到msg表中。
CREATE DEFINER=`root`@`%` PROCEDURE `insert_zero_rows_data`()
BEGIN
-- 插入数据到 msg 表
INSERT INTO msg
SELECT
-- 插入数据到 msg 表
'否', --是否发送,默认为未发送
NOW() -- 记录数据插入的当前时间
FROM
来源表1 gg
JOIN
来源表2 dr ON gg.bywm = dr.table_name
WHERE
dr.TABLE_ROWS = 0
AND gg.bywm NOT IN (SELECT bywm FROM msg);-- 避免重复插入数据
END
根据实际的业务需求,合理设置上述存储过程的调用周期或者触发条件。例如,我们可以通过以下代码调用该存储过程:
BEGIN
CALL insert_zero_rows_data();
END
(三)python代码实现
- 导入必要的库
import hashlib
import base64
import hmac
import time
import requests
import pymysql
import schedule
这里导入了多个库:
hashlib:用于哈希算法,如 SHA256。
base64:用于进行 Base64 编码和解码。
hmac:用于生成 HMAC(Hash - based Message Authentication Code)。
time:用于获取当前时间戳。
requests:用于发送 HTTP 请求。
pymysql:用于连接和操作 MySQL 数据库。
schedule:用于实现定时任务。
- 生成签名和时间戳
注:这里需要根据你的实际情况来进行修改。
def generate_signature():
"""
生成签名和时间戳
:return: 签名和时间戳
"""
secret = "this is secret " # 预定义的密钥,需根据实际情况修改
timestamp = int(round(time.time())) # 获取当前时间戳
string_to_sign = f'{timestamp}@{secret}' # 组合时间戳和密钥生成待签名的字符串
hmac_code = hmac.new(string_to_sign.encode("utf-8"), digestmod=hashlib.sha256).digest()# 使用 HMAC-SHA256 算法生成哈希值
sign = base64.b64encode(hmac_code).decode('utf-8')# 对哈希值进行 Base64 编码得到签名
return sign, timestamp
这个函数首先获取当前时间戳,然后将时间戳和一个预定义的密钥(secret)组合成一个字符串。接着使用 HMAC 算法,以这个字符串为输入,使用 SHA256 哈希算法生成一个哈希值,最后将这个哈希值进行 Base64 编码得到签名。函数返回生成的签名和时间戳。
- 查询数据库
def query_database():
"""
查询数据库中 msg 表的数据
:return: 查询结果
"""
try:
connection = pymysql.connect(
host="host",
port=port,
user="user",
password=r"password",
database="database"
)
with connection.cursor() as cursor:
sql = "SELECT jkmc, bywm, sydw, sfts, datatime FROM msg"
cursor.execute(sql)
results = cursor.fetchall()
return results
except pymysql.Error as e:
print(f"数据库查询出错: {e}")
return []
finally:
if 'connection' in locals() and connection:
connection.close()
此函数用于连接到指定的 MySQL 数据库,并执行 SQL 查询语句,从msg表中获取jkmc、bywm、sydw、sfts和datatime字段的数据。如果查询过程中出现错误,将打印错误信息并返回一个空列表。无论查询是否成功,最终都会关闭数据库连接。
- 发送消息到指定接口
def send_message(row, sign, timestamp):
"""
发送消息到指定接口
:param row: 数据库查询结果的一行数据
:param sign: 签名
:param timestamp: 时间戳
:return: 响应对象
"""
HOOK_TOKEN = "HOOK_TOKEN"
url = "url"
params = {
"hook_token": HOOK_TOKEN
}
CONTENT = f"监测:\n名称 {row[0]}\n表英文名 {row[1]}\n其他 {row[2]}\n发现问题时间 {row[4]}"
payload = {
"timestamp": str(timestamp),
"sign": sign,
"msgType": "text",
"msgData": {
"text": {
"content": CONTENT
}
}
}
try:
response = requests.post(
url,
params=params,
json=payload,
headers={'Content-Type': 'application/json'}
)
print(f"Status Code: {response.status_code}")# 打印响应状态码
print(f"Response: {response.text}")# 打印响应内容
return response
except requests.RequestException as e:
print(f"消息发送出错: {e}")# 打印发送错误信息
return None
该函数负责将数据库查询结果的一行数据组织成消息内容,并发送到指定的接口。它首先定义了接口的 URL、请求参数(hook_token)以及消息内容(CONTENT)。然后构建请求负载(payload),其中包含时间戳、签名、消息类型和具体的消息数据。使用requests.post方法发送 POST 请求,并在请求成功或失败时打印相应的状态码和响应信息,最后返回响应对象。
- 更新数据库
def update_database(row):
"""
更新数据库中 msg 表的 sfts 字段
:param row: 数据库查询结果的一行数据
"""
try:
connection = pymysql.connect(
host="host",
port=port,
user="user",
password=r"password",
database="database"
)
with connection.cursor() as cursor:
# 修改更新条件,避免更新多条记录
sql = "UPDATE msg SET sfts = '是' WHERE bywm = %s AND datatime = %s"
cursor.execute(sql, (row[1], row[4]))
connection.commit() # 提交事务
except pymysql.Error as e:
print(f"数据库更新出错: {e}")
connection.rollback()
finally:
if 'connection' in locals() and connection:
connection.close()
此函数用于更新数据库中msg表的sfts字段。它连接到数据库,执行 SQL 更新语句,将符合条件(bywm和datatime匹配)的记录的sfts字段更新为是。如果更新过程中出现错误,将打印错误信息并回滚事务。无论更新是否成功,最终都会关闭数据库连接。
- 主函数
def main():
sign, timestamp = generate_signature()
results = query_database()
for row in results:
if row[3] == "否":
send_message(row, sign, timestamp)
update_database(row)
主函数main首先调用generate_signature函数生成签名和时间戳,然后调用query_database函数获取数据库查询结果。接着遍历查询结果,对于每一行数据,如果其sfts字段为否,则调用send_message函数发送消息,并在消息发送成功后调用update_database函数更新数据库.
- 定时任务设置
注:定时任务可以在代码中部署,也可以在服务器上进行部署(见往期:服务器上任务的定时调度)
本次采用python 定时调度
def job():
main()
if __name__ == "__main__":
# 每五分钟运行一次 job 函数
schedule.every(5).minutes.do(job)
while True:
schedule.run_pending()
time.sleep(1)
job()
这里定义了一个job函数,其内部调用main函数。在if name == "main"代码块中,使用schedule库设置了一个定时任务,每五分钟运行一次job函数。通过一个无限循环,不断检查是否有定时任务需要执行,并在每次循环中等待 1 秒,同时也额外调用一次job函数(这部分额外调用可能是代码编写时的一个小失误,正常情况下按照schedule的机制,每五分钟执行一次即可,这部分额外调用可能会导致某些逻辑异常,建议根据实际需求进行调整)。
二、注意事项
- 数据库连接配置:确保数据库的主机地址、端口、用户名、密码和数据库名称等配置信息正确无误,否则将无法成功连接数据库。
- 签名和时间戳的有效性:生成的签名和时间戳与接口的验证机制紧密相关,确保接口方的验证逻辑与本地生成逻辑一致,并且时间戳的有效期在接口允许的范围内。
- 错误处理:在实际应用中,应根据具体需求对错误处理进行进一步优化。例如,对于数据库查询失败或消息发送失败的情况,可以记录详细的日志信息,以便后续排查问题。
- 定时任务的稳定性:schedule库在某些复杂环境下可能会出现一些稳定性问题,建议在生产环境中进行充分的测试和监控,确保定时任务能够稳定运行。
通过以上步骤,我们成功实现了一个定时查询数据库并发送消息的 Python 程序,希望这篇文章对你有所帮助。如果你在实际应用中遇到问题,欢迎在评论区留言讨论。
以上代码仅供参考,实际应用中请根据具体需求进行调整和优化。
三、完整代码
import hashlib
import base64
import hmac
import time
import requests
import pymysql
import schedule
def generate_signature():
"""
生成签名和时间戳
:return: 签名和时间戳
"""
secret = ""
timestamp = int(round(time.time()))
string_to_sign = f'{timestamp}@{secret}'
hmac_code = hmac.new(string_to_sign.encode("utf-8"), digestmod=hashlib.sha256).digest()
sign = base64.b64encode(hmac_code).decode('utf-8')
return sign, timestamp
def query_database():
"""
查询数据库中 msg 表的数据
:return: 查询结果
"""
try:
connection = pymysql.connect(
host="",
port=,
user="",
password=r"",
database=""
)
with connection.cursor() as cursor:
sql = "SELECT jkmc, bywm, sydw, sfts, datatime FROM msg"
cursor.execute(sql)
results = cursor.fetchall()
return results
except pymysql.Error as e:
print(f"数据库查询出错: {e}")
return []
finally:
if 'connection' in locals() and connection:
connection.close()
def send_message(row, sign, timestamp):
"""
发送消息到指定接口
:param row: 数据库查询结果的一行数据
:param sign: 签名
:param timestamp: 时间戳
:return: 响应对象
"""
HOOK_TOKEN = ""
url = ""
params = {
"hook_token": HOOK_TOKEN
}
CONTENT = f"监测:\n接口名称 {row[0]}\n表英文名 {row[1]}\n其他 {row[2]}\n发现问题时间 {row[4]}"
payload = {
"timestamp": str(timestamp),
"sign": sign,
"msgType": "text",
"msgData": {
"text": {
"content": CONTENT
}
}
}
try:
response = requests.post(
url,
params=params,
json=payload,
headers={'Content-Type': 'application/json'}
)
print(f"Status Code: {response.status_code}")
print(f"Response: {response.text}")
return response
except requests.RequestException as e:
print(f"消息发送出错: {e}")
return None
def update_database(row):
"""
更新数据库中 msg 表的 sfts 字段
:param row: 数据库查询结果的一行数据
"""
try:
connection = pymysql.connect(
host="",
port=,
user="",
password=r"",
database=""
)
with connection.cursor() as cursor:
# 修改更新条件,避免更新多条记录
sql = "UPDATE msg SET sfts = '是' WHERE bywm = %s AND datatime = %s"
cursor.execute(sql, (row[1], row[4]))
connection.commit()
except pymysql.Error as e:
print(f"数据库更新出错: {e}")
connection.rollback()
finally:
if 'connection' in locals() and connection:
connection.close()
def main():
sign, timestamp = generate_signature()
results = query_database()
for row in results:
if row[3] == "否":
send_message(row, sign, timestamp)
update_database(row)
def job():
main()
if __name__ == "__main__":
# 每五分钟运行一次 job 函数
schedule.every(5).minutes.do(job)
while True:
schedule.run_pending()
time.sleep(1)
job()