当前位置: 首页 > news >正文

实验指导-基于阿里云函数计算的简单邮件发送服务 之数据库访问中间件

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 前言
  • 1. 环境准备
    • 1.1 创建白名单以及账号信息
    • 1.2 获取内网访问方式
    • 1.3 查看当前专有网络VPC和交换机vSwitch信息
    • 1.4 构建FC自定义公共层,提供 Python sqlalchemy依赖
    • 1.5 配置现有FC允许访问VPC
  • 2. 创建告警邮件数据库表
  • 3. 基于pymysql框架重构告警邮件发送功能
    • 3.1 新增邮件发送配置
    • 3.2 发送告警邮件
  • 4. 基于sqlalchemy框架重构邮件配置管理实现
    • 4.1 重构邮件配置管理实现
    • 4.2 新增邮件发送配置
    • 4.3 发送告警邮件
    • 4.4 修改CloudFlow工作流的参数传递
  • 总结


前言

1. 环境准备

1.1 创建白名单以及账号信息

PolarDB Mysql
是阿里云的云原生数据库,现在我们使用这个数据库

云原生数据库 PolarDB MySQL 版

点击下面的入门与试用
在这里插入图片描述

地区选择应该和函数计算FC一样

直接进入控制台

在这里插入图片描述
然后进入这个集群,刷新一下这个集群就变为运行中了

在这里插入图片描述
选择集群白名单

然后新建IP白名单分组.增加白名单IP0.0.0.0/0
允许所有的IP访问这个数据库
在这里插入图片描述

然后是点击账号管理
创建账号

在这里插入图片描述
在这里插入图片描述

1.2 获取内网访问方式

点击基本信息,然后往下面翻,就可以看到集群的私网访问地址

在这里插入图片描述

1.3 查看当前专有网络VPC和交换机vSwitch信息

专有网络VPC(VirtualPrivateCloud)是阿里云提供的一种隔离的、私有的云上网络环境,允许用户在公共云
上配置和管理一个逻辑隔离的网络区域。每个VPC都是逻辑上完全隔离的,确保了不同用户或业务间的数据和操作
互不影响

还是在基本信息这里上面就可以看到VPC和交换机信息了

1.4 构建FC自定义公共层,提供 Python sqlalchemy依赖

Q:为什么本实验需要使用pymysql和sqlalchemy两个框架,只构建sqlalchemy的公共层?
A:因为创建FCWeb函数时选择阿里云官方提供的Python3.10运行时中已经预装了pymysql,故无需
额外构建新的公共层。

函数计算FC

在这里插入图片描述

在这里插入图片描述

这样创建应该就可以了

1.5 配置现有FC允许访问VPC

进入函数fun-alarm-email-send
点击配置
然后是高级配置–》编辑----》网络编辑
允许访问 VPC–》自定义配置
在这里插入图片描述

VPC和交换机都是刚刚创建好了的,都是MYsql那里的

选择与PolarDB一致的专有网络和交换机。

然后还有一个安全组的选择

云服务器安全组配置

我们在这里创建安全组
VPC就选择刚刚PolarDB
因为内网互通,所以没有配置额外的端口

在这里插入图片描述

然后直接部署这个配置

2. 创建告警邮件数据库表

阿里云云数据库统一控制台DMS

在这里插入图片描述

在这里插入图片描述

这里将会显示可以登录的云数据库实例
我们需要登录这个我们刚刚创建的实例

在这里插入图片描述

在这里插入图片描述
登录成功以后就会进入sql控制台

在这里插入图片描述

然后DMS这里还会显示我们已经登录的实例
这里也有SQL控制台
在这里插入图片描述
在这里插入图片描述
数据库管理这里就可以创建库db_message了

在这里插入图片描述
然后点击db_message,新建一个SQL控制台,自动生成的

CREATE TABLE `tbl_config` (`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '配置记录的唯一标识符',`host` varchar(255) NOT NULL COMMENT '邮件服务器的主机地址',`port` int(5) NOT NULL COMMENT '邮件服务器的端口号',`username` varchar(100) NOT NULL COMMENT '登录邮件服务器的用户名,示例:synx@emample.com',`password` varchar(100) NOT NULL COMMENT '登录邮件服务器的授权码',`sender` varchar(255) NOT NULL COMMENT '邮件发送人的地址,示例:synx@emample.com',`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '记录创建时间',`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '记录最后更新时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='邮件配置表';

3. 基于pymysql框架重构告警邮件发送功能

总共三个文件

app.py# Sanic应用入口
custom_json_encoder.py#自定义JSON编码器,用于处理数据库格式序列化问题
email_config_service.py#邮件配置实现
# filename: custom_json_encoder.py
import json
from datetime import datetimeclass DateTimeEncoder(json.JSONEncoder):def default(self, obj):if isinstance(obj, datetime):return obj.isoformat()return super().default(obj)
# filename: email_config_service.py
import json
from typing import Any, Dict
import pymysql
from custom_json_encoder import DateTimeEncoderdef get_db_connection():"""获取数据库连接,需要自行替换为步骤6.1.1中的数据库连接信息"""return pymysql.connect(host='xxx.aliyuncs.com',port=3306,user='synx',password='******',db='db_message',charset='utf8mb4')async def create_config(data) -> Dict[str, Any]:"""创建配置"""try:with get_db_connection() as connection:with connection.cursor() as cursor:cursor.execute("""INSERT INTO tbl_config (host, port, username, password, sender)VALUES (%s, %s, %s, %s, %s)""", (data['host'], data['port'], data['username'],data['password'], data['sender']))connection.commit()return {"message": "config created successfully"}except Exception as e:connection.rollback()return {"error": str(e)}async def read_config(data) -> Dict[str, Any]:"""读取配置,此处仅做最简单的查询,实际应用中需要根据业务需求进行查询"""try:with get_db_connection() as connection:with connection.cursor() as cursor:cursor.execute("SELECT * FROM tbl_config LIMIT 1")result = cursor.fetchone()if result:config = {"id": result[0],"host": result[1],"port": result[2],"username": result[3],"password": result[4],"sender": result[5],"create_time": result[6],"update_time": result[7],}return json.loads(json.dumps(config, cls=DateTimeEncoder))else:return {"error": "No configuration found"}except Exception as e:return {"error": str(e)}async def update_config(data) -> Dict[str, Any]:"""修改邮件配置"""try:with get_db_connection() as connection:with connection.cursor() as cursor:if not data.get("id"):raise ValueError("id is required")sql = "UPDATE tbl_config SET "params = []for key in ['host', 'port', 'username', 'password', 'sender']:if data.get(key):sql += f"{key}=%s, "params.append(data[key])sql = sql.rstrip(", ") + " WHERE id=%s"params.append(data["id"])if cursor.execute(sql, tuple(params)) == 0:raise ValueError("config not found")connection.commit()return {"message": "Config updated successfully"}except Exception as e:connection.rollback()return {"error": str(e)}async def delete_config(data) -> Dict[str, Any]:"""修改邮件配置"""try:with get_db_connection() as connection:with connection.cursor() as cursor:cursor.execute("DELETE FROM tbl_config WHERE id=%s", (data["id"],))connection.commit()return {"message": "Config deleted successfully"}except Exception as e:connection.rollback()return {"error": str(e)}

在这里插入图片描述
其中Host在这里获取,为私网

# -*- coding: utf-8 -*-
import json as std_json
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from smtplib import SMTP
from sanic import Sanic, response
from sanic.response import json
from email_config_service import (create_config,delete_config,read_config,update_config
)app = Sanic("EmailSender")async def send_email(data):# 从数据库中读取邮件配置email_config = await read_config(None)# 创建邮件对象msg = MIMEMultipart()msg['From'] = email_config["sender"]msg['To'] = data.get("recipient")msg['Subject'] = data.get("subject")# 添加邮件正文msg.attach(MIMEText(data.get("body"), 'plain'))# 连接SMTP服务器server = SMTP(email_config["host"], email_config["port"])server.starttls()  # 启动TLS加密server.login(email_config["username"], email_config["password"])# 发送邮件server.send_message(msg)# 关闭连接server.quit()return {"message": "Email sent successfully"}@app.route("/invoke", methods=["POST"])
async def send_email_route(request):action = request.json.get("action")data = request.json.get("data", {})actions = {"create_config": create_config,"read_config": read_config,"update_config": update_config,"delete_config": delete_config,"send_email": send_email}func = actions.get(action)if func:return json(await func(data))else:return response.json({"error": "Invalid action"}, status=400)if __name__ == "__main__":app.run(host="0.0.0.0", port=9000, dev=True)

action为"send_email"时则执行发送邮件相关逻辑

在这里插入图片描述
然后把这三个py放在我们原来的函数fun-alarm-email-send里面,并且把main.py里面的代码复制到app.py,注释掉原来app.py的内容

并且把公共层换为sqlalchemy-sanic-custom-layer

3.1 新增邮件发送配置

因为我们发送邮件的配置,比如host啥的,都是从数据库中获取的,所以我们要先在数据库中插入数据

{"action": "create_config","data": {"host": "smtp.qq.com","port": 587,"username": "synx@example.com","password": "a","sender": "synx@example.com"}
}

action为create_config表示插入数据data到数据库中,表示调用方法create_config,就是插入配置数据了

在这里插入图片描述
在这里插入图片描述
然后数据库中也有数据了

3.2 发送告警邮件

{"action": "send_email","data": {"recipient": "smtp.example.com","subject": "告警邮件","body": "告警邮件正文部分",}
}

smtp.example.com是接受者的邮件

send_email表示这次的功能是发送邮件

在这里插入图片描述

在这里插入图片描述

4. 基于sqlalchemy框架重构邮件配置管理实现

4.1 重构邮件配置管理实现

我们修改email_config_service.py里面的值为

import json
import urllib.parse
from typing import Dict, Anyfrom sqlalchemy import Integer, Column, String, create_engine
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from custom_json_encoder import DateTimeEncoder
Base = declarative_base()class Config(Base):__tablename__ = 'tbl_config'id = Column(Integer, primary_key=True,autoincrement= True)host = Column(String(255))port = Column(Integer)username = Column(String(255))password = Column(String(255))sender = Column(String(255))create_time = Column(String(255))update_time = Column(String(255))def get_db_connection():##这个方法的作用是链接数据库DB_HOST = 'xxxx.com'DB_PORT = 3306DB_USER = 'synx'DB_PASSWORD = 'xxxx'DB_NAME = 'db_message'urllib.parse.quote_plus(DB_PASSWORD)# 会把密码里所有非 ASCII数字字母的字符(特别是:/ 、@、 %&=、空格等)先转成# URL - 安全的百分号编码,再把空格统一变成加号 +。# 典型用途:拼数据库连接串(DSN)时,防止密码里的特殊字符破坏格式:encoded_password = urllib.parse.quote_plus(DB_PASSWORD)DATABASE_URL = f'mysql+pymysql://{DB_USER}:{encoded_password}@{DB_HOST}:{DB_PORT}/{DB_NAME}?charset=utf8mb4'engine = create_engine(DATABASE_URL)Session=sessionmaker(bind=engine)return Session()
# Dict[str, Any] 是 类型注解,告诉阅读者和 IDE:
# 这个函数返回一个 字典(Dict 来自 typing 模块);
# 字典的 键 必须是字符串(str);
# 字典的 值 可以是任意类型(Any)。
async def create_config(data) -> Dict[str, Any]:##往数据库插入配置数据with get_db_connection() as session:try:config = Config(host=data['host'],port=data['port'],username=data['username'],password=data['password'],sender=data['sender'])session.add(config)session.commit()return {"message": "config created successfully"}except SQLAlchemyError as e:session.rollback()return {"error": str(e)}async def read_config(data) -> Dict[str, Any]:"""从数据库中读取配置"""with get_db_connection() as session:try:# 查询数据库Config中第一条数据config = session.query(Config).first()if config:config_dict= {"id": config.id,"host": config.host,"port": config.port,"username": config.username,"password": config.password,"sender": config.sender,"create_time": config.create_time,"update_time": config.update_time}encode_result = json.dumps(config_dict, cls=DateTimeEncoder)return json.loads(encode_result)else:return {"error": "No configuration found"}except SQLAlchemyError as e:return {"error": str(e)}# encode_result = json.dumps(config_dict, cls=DateTimeEncoder)
# return json.loads(encode_result)
# “先把含datetime的对象转成JSON字符串,再立刻拆回普通字典,这样数据库里带日期的记录就能干干净净地返回给前端。”
# encode_result = json.dumps(config_dict, cls=DateTimeEncoder)
# 把 config_dict(可能含 datetime、Decimal 等)序列化成 JSON 字符串。
# DateTimeEncoder 负责把 datetime 字段变成 ISO-8601 字符串,其余字段保持原值。
# return json.loads(encode_result)
# 再把 JSON 字符串反序列化成普通 Python 字典。
# 此时所有值都是基础类型(str、intfloat、list、dict),没有 datetime 对象,后续框架自动 jsonify 时不会再报错。async def update_config(data) -> Dict[str, Any]:"""修改邮件配置"""with get_db_connection() as session:try:config_id = data.get("id")if not config_id:return {"error": "Invalid config id"}config = session.query(Config).filter(Config.id == config_id).first()if not config:return {"error": "Config not found"}# 把data[key]的值动态赋给对象config的属性key。因为是修改数据库for key in ["host", "port", "username", "password", "sender"]:if key in data:setattr(config, key, data[key])session.commit()return {"message": "Config updated successfully"}except SQLAlchemyError as e:session.rollback()return {"error": str(e)}async def delete_config(data) -> Dict[str, Any]:"""删除邮件配置"""with get_db_connection() as session:try:config_id = data.get("id")if not config_id:raise ValueError("Invalid config id")config = session.query(Config).filter(Config.id == config_id).first()if not config:return {"error": "Config not found"}session.delete(config)session.commit()return {"message": "Config deleted successfully"}except SQLAlchemyError as e:session.rollback()return {"error": str(e)}

4.2 新增邮件发送配置

还是和原来一样的
在这里插入图片描述
在这里插入图片描述

4.3 发送告警邮件

在这里插入图片描述

在这里插入图片描述

4.4 修改CloudFlow工作流的参数传递

这个要怎么修改呢

在这里插入图片描述

我们修改的是原来的这个流水线

我们发现
fun-alarm-eamil-send的body也就是json就是input的data
而input就是fun-temperature-and-humidity-data-upload返回的数据
在这里插入图片描述
结构是这样·的

{"status" : 1 if t_out_flag or h_out_flag else 0,"message" : "异常" if t_out_flag or h_out_flag else "正常","data" : {#需要自定义收件人邮箱"recipient":"zjdsxc12@qq.com","subject":"告知邮箱-温湿度异常","body":email_body}

data类型是

{#需要自定义收件人邮箱"recipient":"zjdsxc12@qq.com","subject":"告知邮箱-温湿度异常","body":email_body}

但是fun-alarm-eamil-send要接受的body也就是input.data是

{"action": "send_email","data": {"recipient": "zjdsxc12@qq.com","subject": "告警邮件","body": "告警邮件正文部分",}
}

所以要修改fun-temperature-and-humidity-data-upload的返回数据格式

import datetimefrom sanic import Sanic
from sanic.response import jsonapp = Sanic("MyApp")#温度阈值
t_threshold = (25,28)
#湿度阈值
h_threshold = (30,33)# if t_out_flag or h_out_flag:
#     "status": 1
# else:
#     "status": 0@app.route("/upload",methods=["POST"],name="upload")
@app.route("/invoke",methods=["POST"],name="invoke")
async def data_upload(request):try:data = request.jsonsn = data.get("sn")temperature = data.get("temperature")humidity = data.get("humidity")if not all([sn,temperature,humidity]):return json({"error": "Missing sn, temperature, or humidity"}, status=400)#判断温湿度是否超过阈值#t_out_flag为true就表示超过了阈值t_out_flag = not (t_threshold[0]<=temperature<=t_threshold[1])h_out_flag = not (h_threshold[0]<=humidity<=h_threshold[1])email_body = generate_email_body(sn,temperature,humidity,t_threshold,h_threshold)res = {"status" : 1 if t_out_flag or h_out_flag else 0,"message" : "异常" if t_out_flag or h_out_flag else "正常","data" : {"action": "send_email","data": {"recipient": "zjdsxc12@qq.com","subject": "告警邮件","body": "告警邮件正文部分",}}if t_out_flag or h_out_flag else True,}return json(res)except Exception as e:return json({"error": str(e)}, status=500)
if __name__ == "__main__":app.run(host="0.0.0.0",port=9000)# 作用是允许在字符串中直接嵌入变量或表达式,语法是用 {} 包裹变量名或计算式
def generate_email_body(sn,temperature,humidity,t_threshold,h_threshold):return (f"告警通知:\n\n"f"当前设备{sn}的温湿度数据超出正常范围。\n\n"f"设备温度:{temperature}°c\n"f"温度阈值:{t_threshold[0]}°c-{t_threshold[1]}°c\n\n"f"设备湿度:{humidity}%\n"f"湿度阈值:{h_threshold[0]}%-{h_threshold[1]}%\n\n"f"请尽快检查设备并采取相应措施。\n\n"f"时间:{datetime.datetime.now().isoformat()}")if __name__ == "__main__":app.run(host="0.0.0.0",port=9000)

就修改这一个应该就可以了

在这里插入图片描述
点击详情,里面有公网访问地址,就可以调用工作流了

注意里面的公网ip地址是携带者token的哦,不要删除了
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
还是成功了的

总结

http://www.dtcms.com/a/418329.html

相关文章:

  • PPO算法
  • 网站建设公司方维wordpress 上传文件路径
  • gRPC0到1系列之【6】
  • 【Java系列课程·Java学前须知】第3课 JDK,JVM,JRE的区别和优缺
  • JVM栈溢出时如何dump栈信息?
  • 重庆奉节网站建设公司重庆沙坪坝地图全图
  • RK3588芯片与板卡全面解析:旗舰级AIoT与边缘计算的核心
  • 226.翻转二叉树(二叉树算法题)
  • #itertools.product
  • AcWing 1172:祖孙询问 ← 倍增法求LCA(DFS预处理)
  • C语言 分支结构(1)
  • 扭蛋机抽赏小程序:重构线上娱乐的“盲盒式”新体验
  • EtherNet/IP转EtherCAT网关在新能源制造中实现机器人与运动卡数据互通
  • Imatest-Wedge模块
  • 岳阳博物馆网站网站建设想法
  • Day03_STM32F103C8T6学习笔记6-9章(江科大)
  • 专业企业网站搭建服务docker创建wordpress
  • 各大网站的网址网站的功能性
  • ZYNQ平台摄像头性能深度对决:OV7725 vs OV5640全面速度测试与优化实战
  • Qt 界面优化 --- 绘图
  • MySQL Online DDL:高性能表结构变更指南
  • 操作系统:进程调度,创建和终止
  • Kafka09-速答-尚硅谷
  • Jenkins与GitLab-CI的技术对比分析
  • 2025 年 AI 智能体(Agent)发展全景:技术突破、场景落地与产业重构
  • 电子商务的网站建设名词解释网站设计的流程简答题
  • Spark源码中的线程池
  • Kafka06-进阶-尚硅谷
  • TDengine 时序函数 IRATE 用户手册
  • 网站模板源码下载广告网站建设