当前位置: 首页 > wzjs >正文

手机网站建设服务器长沙app网站开发

手机网站建设服务器,长沙app网站开发,网络营销和网络推广有什么区别,天津网站建设运营方案Python 实现定时查询数据库并发送消息的完整流程 简介:在许多实际应用场景中,我们需要定时从数据库中查询特定数据,并将这些数据发送到指定的接口进行后续处理。本文将详细介绍如何使用 Python 编写一个程序,实现定时查询 MySQL 数…

Python 实现定时查询数据库并发送消息的完整流程

简介:在许多实际应用场景中,我们需要定时从数据库中查询特定数据,并将这些数据发送到指定的接口进行后续处理。本文将详细介绍如何使用 Python 编写一个程序,实现定时查询 MySQL 数据库中的数据,生成签名并发送消息到指定接口,同时在消息发送成功后更新数据库中的状态字段。

一、 整体流程概述

(一)整个程序的主要流程如下:

1、根据接口要求生成签名和时间戳,用于后续发送消息时的身份验证。
2、定时查询 MySQL 数据库中的msg表的数据。
3、对于查询到的每一行数据,如果其sfts字段为否,则将该行数据组织成消息内容,并结合之前生成的签名和时间戳发送到指定接口。a
4、在消息发送成功后,更新数据库中该行数据的sfts字段为是。
5、每小时重置一次数据。

(二)数据库数据处理

  1. 设置定时调度事件或者使用触发器,将需要的数据根据需求写入到msg表中。
CREATE DEFINER=`root`@`%` PROCEDURE `insert_zero_rows_data`()
BEGIN-- 插入数据到 msg 表INSERT INTO msg SELECT -- 插入数据到 msg 表'否', --是否发送,默认为未发送NOW() -- 记录数据插入的当前时间FROM 来源表1 ggJOIN 来源表2 dr ON gg.bywm = dr.table_nameWHERE dr.TABLE_ROWS = 0AND gg.bywm NOT IN (SELECT bywm FROM msg);-- 避免重复插入数据
END

根据实际的业务需求,合理设置上述存储过程的调用周期或者触发条件。例如,我们可以通过以下代码调用该存储过程:
BEGIN
CALL insert_zero_rows_data();
END

(三)python代码实现

  1. 导入必要的库
import hashlib
import base64
import hmac
import time
import requests
import pymysql
import schedule

这里导入了多个库:
hashlib:用于哈希算法,如 SHA256。
base64:用于进行 Base64 编码和解码。
hmac:用于生成 HMAC(Hash - based Message Authentication Code)。
time:用于获取当前时间戳。
requests:用于发送 HTTP 请求。
pymysql:用于连接和操作 MySQL 数据库。
schedule:用于实现定时任务。

  1. 生成签名和时间戳
    注:这里需要根据你的实际情况来进行修改。
def generate_signature():"""生成签名和时间戳:return: 签名和时间戳"""secret = "this is secret " # 预定义的密钥,需根据实际情况修改timestamp = int(round(time.time())) # 获取当前时间戳string_to_sign = f'{timestamp}@{secret}' # 组合时间戳和密钥生成待签名的字符串hmac_code = hmac.new(string_to_sign.encode("utf-8"), digestmod=hashlib.sha256).digest()# 使用 HMAC-SHA256 算法生成哈希值sign = base64.b64encode(hmac_code).decode('utf-8')# 对哈希值进行 Base64 编码得到签名return sign, timestamp

这个函数首先获取当前时间戳,然后将时间戳和一个预定义的密钥(secret)组合成一个字符串。接着使用 HMAC 算法,以这个字符串为输入,使用 SHA256 哈希算法生成一个哈希值,最后将这个哈希值进行 Base64 编码得到签名。函数返回生成的签名和时间戳。

  1. 查询数据库
def query_database():"""查询数据库中 msg 表的数据:return: 查询结果"""try:connection = pymysql.connect(host="host",port=port,user="user",password=r"password",database="database")with connection.cursor() as cursor:sql = "SELECT jkmc, bywm, sydw, sfts, datatime FROM msg"cursor.execute(sql)results = cursor.fetchall()return resultsexcept pymysql.Error as e:print(f"数据库查询出错: {e}")return []finally:if 'connection' in locals() and connection:connection.close()

此函数用于连接到指定的 MySQL 数据库,并执行 SQL 查询语句,从msg表中获取jkmc、bywm、sydw、sfts和datatime字段的数据。如果查询过程中出现错误,将打印错误信息并返回一个空列表。无论查询是否成功,最终都会关闭数据库连接。

  1. 发送消息到指定接口
def send_message(row, sign, timestamp):"""发送消息到指定接口:param row: 数据库查询结果的一行数据:param sign: 签名:param timestamp: 时间戳:return: 响应对象"""HOOK_TOKEN = "HOOK_TOKEN"url = "url"params = {"hook_token": HOOK_TOKEN}CONTENT = f"监测:\n名称 {row[0]}\n表英文名 {row[1]}\n其他 {row[2]}\n发现问题时间 {row[4]}"payload = {"timestamp": str(timestamp),"sign": sign,"msgType": "text","msgData": {"text": {"content": CONTENT}}}try:response = requests.post(url,params=params,json=payload,headers={'Content-Type': 'application/json'})print(f"Status Code: {response.status_code}")# 打印响应状态码print(f"Response: {response.text}")# 打印响应内容return responseexcept requests.RequestException as e:print(f"消息发送出错: {e}")# 打印发送错误信息return None

该函数负责将数据库查询结果的一行数据组织成消息内容,并发送到指定的接口。它首先定义了接口的 URL、请求参数(hook_token)以及消息内容(CONTENT)。然后构建请求负载(payload),其中包含时间戳、签名、消息类型和具体的消息数据。使用requests.post方法发送 POST 请求,并在请求成功或失败时打印相应的状态码和响应信息,最后返回响应对象。

  1. 更新数据库
def update_database(row):"""更新数据库中 msg 表的 sfts 字段:param row: 数据库查询结果的一行数据"""try:connection = pymysql.connect(host="host",port=port,user="user",password=r"password",database="database")with connection.cursor() as cursor:# 修改更新条件,避免更新多条记录sql = "UPDATE msg SET sfts = '是' WHERE bywm = %s AND datatime = %s"cursor.execute(sql, (row[1], row[4]))connection.commit() # 提交事务except pymysql.Error as e:print(f"数据库更新出错: {e}")connection.rollback()finally:if 'connection' in locals() and connection:connection.close()

此函数用于更新数据库中msg表的sfts字段。它连接到数据库,执行 SQL 更新语句,将符合条件(bywm和datatime匹配)的记录的sfts字段更新为是。如果更新过程中出现错误,将打印错误信息并回滚事务。无论更新是否成功,最终都会关闭数据库连接。

  1. 主函数
def main():sign, timestamp = generate_signature()results = query_database()for row in results:if row[3] == "否":send_message(row, sign, timestamp)update_database(row)

主函数main首先调用generate_signature函数生成签名和时间戳,然后调用query_database函数获取数据库查询结果。接着遍历查询结果,对于每一行数据,如果其sfts字段为否,则调用send_message函数发送消息,并在消息发送成功后调用update_database函数更新数据库.

  1. 定时任务设置
    注:定时任务可以在代码中部署,也可以在服务器上进行部署(见往期:服务器上任务的定时调度)

本次采用python 定时调度

def job():main()if __name__ == "__main__":# 每五分钟运行一次 job 函数schedule.every(5).minutes.do(job)while True:schedule.run_pending()time.sleep(1)job()

这里定义了一个job函数,其内部调用main函数。在if name == "main"代码块中,使用schedule库设置了一个定时任务,每五分钟运行一次job函数。通过一个无限循环,不断检查是否有定时任务需要执行,并在每次循环中等待 1 秒,同时也额外调用一次job函数(这部分额外调用可能是代码编写时的一个小失误,正常情况下按照schedule的机制,每五分钟执行一次即可,这部分额外调用可能会导致某些逻辑异常,建议根据实际需求进行调整)。

二、注意事项

  1. 数据库连接配置:确保数据库的主机地址、端口、用户名、密码和数据库名称等配置信息正确无误,否则将无法成功连接数据库。
  2. 签名和时间戳的有效性:生成的签名和时间戳与接口的验证机制紧密相关,确保接口方的验证逻辑与本地生成逻辑一致,并且时间戳的有效期在接口允许的范围内。
  3. 错误处理:在实际应用中,应根据具体需求对错误处理进行进一步优化。例如,对于数据库查询失败或消息发送失败的情况,可以记录详细的日志信息,以便后续排查问题。
  4. 定时任务的稳定性:schedule库在某些复杂环境下可能会出现一些稳定性问题,建议在生产环境中进行充分的测试和监控,确保定时任务能够稳定运行。

通过以上步骤,我们成功实现了一个定时查询数据库并发送消息的 Python 程序,希望这篇文章对你有所帮助。如果你在实际应用中遇到问题,欢迎在评论区留言讨论。
以上代码仅供参考,实际应用中请根据具体需求进行调整和优化。

三、完整代码

import hashlib
import base64
import hmac
import time
import requests
import pymysql
import scheduledef generate_signature():"""生成签名和时间戳:return: 签名和时间戳"""secret = ""timestamp = int(round(time.time()))string_to_sign = f'{timestamp}@{secret}'hmac_code = hmac.new(string_to_sign.encode("utf-8"), digestmod=hashlib.sha256).digest()sign = base64.b64encode(hmac_code).decode('utf-8')return sign, timestampdef query_database():"""查询数据库中 msg 表的数据:return: 查询结果"""try:connection = pymysql.connect(host="",port=,user="",password=r"",database="")with connection.cursor() as cursor:sql = "SELECT jkmc, bywm, sydw, sfts, datatime FROM msg"cursor.execute(sql)results = cursor.fetchall()return resultsexcept pymysql.Error as e:print(f"数据库查询出错: {e}")return []finally:if 'connection' in locals() and connection:connection.close()def send_message(row, sign, timestamp):"""发送消息到指定接口:param row: 数据库查询结果的一行数据:param sign: 签名:param timestamp: 时间戳:return: 响应对象"""HOOK_TOKEN = ""url = ""params = {"hook_token": HOOK_TOKEN}CONTENT = f"监测:\n接口名称 {row[0]}\n表英文名 {row[1]}\n其他 {row[2]}\n发现问题时间 {row[4]}"payload = {"timestamp": str(timestamp),"sign": sign,"msgType": "text","msgData": {"text": {"content": CONTENT}}}try:response = requests.post(url,params=params,json=payload,headers={'Content-Type': 'application/json'})print(f"Status Code: {response.status_code}")print(f"Response: {response.text}")return responseexcept requests.RequestException as e:print(f"消息发送出错: {e}")return Nonedef update_database(row):"""更新数据库中 msg 表的 sfts 字段:param row: 数据库查询结果的一行数据"""try:connection = pymysql.connect(host="",port=,user="",password=r"",database="")with connection.cursor() as cursor:# 修改更新条件,避免更新多条记录sql = "UPDATE msg SET sfts = '是' WHERE bywm = %s AND datatime = %s"cursor.execute(sql, (row[1], row[4]))connection.commit()except pymysql.Error as e:print(f"数据库更新出错: {e}")connection.rollback()finally:if 'connection' in locals() and connection:connection.close()def main():sign, timestamp = generate_signature()results = query_database()for row in results:if row[3] == "否":send_message(row, sign, timestamp)update_database(row)def job():main()if __name__ == "__main__":# 每五分钟运行一次 job 函数schedule.every(5).minutes.do(job)while True:schedule.run_pending()time.sleep(1)job()

文章转载自:

http://Bf6QILEx.fhbhr.cn
http://01YcUvgx.fhbhr.cn
http://08bsi9Wz.fhbhr.cn
http://4v9aVVmP.fhbhr.cn
http://XFgkjlT1.fhbhr.cn
http://t4i8QJLf.fhbhr.cn
http://Rs64hQkS.fhbhr.cn
http://1jsuR3nc.fhbhr.cn
http://Y8EJCflM.fhbhr.cn
http://PQ4h3Ceb.fhbhr.cn
http://of82zsmV.fhbhr.cn
http://z916ezzW.fhbhr.cn
http://rf1gNhrh.fhbhr.cn
http://1J1YQ43S.fhbhr.cn
http://D5y23G2n.fhbhr.cn
http://0ZghDKw9.fhbhr.cn
http://6yJyFzQu.fhbhr.cn
http://hhE05Ig3.fhbhr.cn
http://empUU7oB.fhbhr.cn
http://Wdtamxek.fhbhr.cn
http://3buaanZw.fhbhr.cn
http://31cxye6q.fhbhr.cn
http://nFVWPb7t.fhbhr.cn
http://0iQAPyvZ.fhbhr.cn
http://5caMD03b.fhbhr.cn
http://L40S8Fa9.fhbhr.cn
http://I89ZVTS9.fhbhr.cn
http://QHpdVnhd.fhbhr.cn
http://6iD9YCQ2.fhbhr.cn
http://UkP7Oh8e.fhbhr.cn
http://www.dtcms.com/wzjs/731094.html

相关文章:

  • 百度网址大全网站网站制作 常见问题
  • 苏州网站建设网站网站设计中的div是什么
  • 2018年主流网站开发语言电子商务专升本可以报什么专业
  • 江苏有哪些做网站建设的公司企业网站建设与维护
  • 高中信息技术网站设计规划晋江住房和城乡建设局网站
  • 建网站主要工具用了采集站域名做网站
  • 有空间与域名 怎么做网站网站建设中网页代码
  • 专做专业课视频的网站网站布局策划的流程图
  • 贵州国龙翔建设有限公司网站男通网站哪个好用
  • 热门的网站模板下载北京app搭建
  • python云服务器网站开发实例网站运营
  • 在线免费货源网站入口网站信息更新如何做
  • 外贸网站建设哪里做得好企业网站制作的软件
  • 贵州网站制作设计公司哪家好延边省建设局网站官网
  • 网站导航仿站西安短视频代运营
  • 如何建设网站建设现在做个网站要多少钱
  • 福建设备公司网站商丘建网站
  • 建一个多用户团购网站需要多少钱企业多语言网站开发
  • 北京做网站好公司新手做网站视频
  • 园林网站建设设计方案湖南长沙seo教育
  • 网站建设优化方法 s做安全题目是哪个网站
  • q网站建设智能营销型网站制作
  • 有哪些网站是cms北京优化生育
  • 英文网站建设 济南wordpress project
  • 济南网站开发公司排名邢台123生活最新帖子
  • wordpress背景自动变幻图形seo站长网怎么下载
  • 网站开发树形图重庆网站建设专家
  • 千万别去电商公司上班做seo推广网站在线咨询
  • 青岛网站建设多少钱重庆渝中区企业网站建设公司
  • 怎么.做网站镇江做网站需要多少钱