实验指导-基于阿里云函数计算的简单邮件发送服务 之数据库访问中间件
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
- 前言
- 1. 环境准备
- 1.1 创建白名单以及账号信息
- 1.2 获取内网访问方式
- 1.3 查看当前专有网络VPC和交换机vSwitch信息
- 1.4 构建FC自定义公共层,提供 Python sqlalchemy依赖
- 1.5 配置现有FC允许访问VPC
- 2. 创建告警邮件数据库表
- 3. 基于pymysql框架重构告警邮件发送功能
- 3.1 新增邮件发送配置
- 3.2 发送告警邮件
- 4. 基于sqlalchemy框架重构邮件配置管理实现
- 4.1 重构邮件配置管理实现
- 4.2 新增邮件发送配置
- 4.3 发送告警邮件
- 4.4 修改CloudFlow工作流的参数传递
- 总结
前言
1. 环境准备
1.1 创建白名单以及账号信息
PolarDB Mysql
是阿里云的云原生数据库,现在我们使用这个数据库
云原生数据库 PolarDB MySQL 版
点击下面的入门与试用
地区选择应该和函数计算FC一样
直接进入控制台
然后进入这个集群,刷新一下这个集群就变为运行中了
选择集群白名单
然后新建IP白名单分组.增加白名单IP0.0.0.0/0
允许所有的IP访问这个数据库
然后是点击账号管理
创建账号
1.2 获取内网访问方式
点击基本信息,然后往下面翻,就可以看到集群的私网访问地址
1.3 查看当前专有网络VPC和交换机vSwitch信息
专有网络VPC(VirtualPrivateCloud)是阿里云提供的一种隔离的、私有的云上网络环境,允许用户在公共云
上配置和管理一个逻辑隔离的网络区域。每个VPC都是逻辑上完全隔离的,确保了不同用户或业务间的数据和操作
互不影响
还是在基本信息这里上面就可以看到VPC和交换机信息了
1.4 构建FC自定义公共层,提供 Python sqlalchemy依赖
Q:为什么本实验需要使用pymysql和sqlalchemy两个框架,只构建sqlalchemy的公共层?
A:因为创建FCWeb函数时选择阿里云官方提供的Python3.10运行时中已经预装了pymysql,故无需
额外构建新的公共层。
函数计算FC
这样创建应该就可以了
1.5 配置现有FC允许访问VPC
进入函数fun-alarm-email-send
点击配置
然后是高级配置–》编辑----》网络编辑
允许访问 VPC–》自定义配置
VPC和交换机都是刚刚创建好了的,都是MYsql那里的
选择与PolarDB一致的专有网络和交换机。
然后还有一个安全组的选择
云服务器安全组配置
我们在这里创建安全组
VPC就选择刚刚PolarDB
因为内网互通,所以没有配置额外的端口
然后直接部署这个配置
2. 创建告警邮件数据库表
阿里云云数据库统一控制台DMS
这里将会显示可以登录的云数据库实例
我们需要登录这个我们刚刚创建的实例
登录成功以后就会进入sql控制台
然后DMS这里还会显示我们已经登录的实例
这里也有SQL控制台
数据库管理这里就可以创建库db_message了
然后点击db_message,新建一个SQL控制台,自动生成的
CREATE TABLE `tbl_config` (`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '配置记录的唯一标识符',`host` varchar(255) NOT NULL COMMENT '邮件服务器的主机地址',`port` int(5) NOT NULL COMMENT '邮件服务器的端口号',`username` varchar(100) NOT NULL COMMENT '登录邮件服务器的用户名,示例:synx@emample.com',`password` varchar(100) NOT NULL COMMENT '登录邮件服务器的授权码',`sender` varchar(255) NOT NULL COMMENT '邮件发送人的地址,示例:synx@emample.com',`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '记录创建时间',`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '记录最后更新时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='邮件配置表';
3. 基于pymysql框架重构告警邮件发送功能
总共三个文件
app.py# Sanic应用入口
custom_json_encoder.py#自定义JSON编码器,用于处理数据库格式序列化问题
email_config_service.py#邮件配置实现
# filename: custom_json_encoder.py
import json
from datetime import datetimeclass DateTimeEncoder(json.JSONEncoder):def default(self, obj):if isinstance(obj, datetime):return obj.isoformat()return super().default(obj)
# filename: email_config_service.py
import json
from typing import Any, Dict
import pymysql
from custom_json_encoder import DateTimeEncoderdef get_db_connection():"""获取数据库连接,需要自行替换为步骤6.1.1中的数据库连接信息"""return pymysql.connect(host='xxx.aliyuncs.com',port=3306,user='synx',password='******',db='db_message',charset='utf8mb4')async def create_config(data) -> Dict[str, Any]:"""创建配置"""try:with get_db_connection() as connection:with connection.cursor() as cursor:cursor.execute("""INSERT INTO tbl_config (host, port, username, password, sender)VALUES (%s, %s, %s, %s, %s)""", (data['host'], data['port'], data['username'],data['password'], data['sender']))connection.commit()return {"message": "config created successfully"}except Exception as e:connection.rollback()return {"error": str(e)}async def read_config(data) -> Dict[str, Any]:"""读取配置,此处仅做最简单的查询,实际应用中需要根据业务需求进行查询"""try:with get_db_connection() as connection:with connection.cursor() as cursor:cursor.execute("SELECT * FROM tbl_config LIMIT 1")result = cursor.fetchone()if result:config = {"id": result[0],"host": result[1],"port": result[2],"username": result[3],"password": result[4],"sender": result[5],"create_time": result[6],"update_time": result[7],}return json.loads(json.dumps(config, cls=DateTimeEncoder))else:return {"error": "No configuration found"}except Exception as e:return {"error": str(e)}async def update_config(data) -> Dict[str, Any]:"""修改邮件配置"""try:with get_db_connection() as connection:with connection.cursor() as cursor:if not data.get("id"):raise ValueError("id is required")sql = "UPDATE tbl_config SET "params = []for key in ['host', 'port', 'username', 'password', 'sender']:if data.get(key):sql += f"{key}=%s, "params.append(data[key])sql = sql.rstrip(", ") + " WHERE id=%s"params.append(data["id"])if cursor.execute(sql, tuple(params)) == 0:raise ValueError("config not found")connection.commit()return {"message": "Config updated successfully"}except Exception as e:connection.rollback()return {"error": str(e)}async def delete_config(data) -> Dict[str, Any]:"""修改邮件配置"""try:with get_db_connection() as connection:with connection.cursor() as cursor:cursor.execute("DELETE FROM tbl_config WHERE id=%s", (data["id"],))connection.commit()return {"message": "Config deleted successfully"}except Exception as e:connection.rollback()return {"error": str(e)}
其中Host在这里获取,为私网
# -*- coding: utf-8 -*-
import json as std_json
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from smtplib import SMTP
from sanic import Sanic, response
from sanic.response import json
from email_config_service import (create_config,delete_config,read_config,update_config
)app = Sanic("EmailSender")async def send_email(data):# 从数据库中读取邮件配置email_config = await read_config(None)# 创建邮件对象msg = MIMEMultipart()msg['From'] = email_config["sender"]msg['To'] = data.get("recipient")msg['Subject'] = data.get("subject")# 添加邮件正文msg.attach(MIMEText(data.get("body"), 'plain'))# 连接SMTP服务器server = SMTP(email_config["host"], email_config["port"])server.starttls() # 启动TLS加密server.login(email_config["username"], email_config["password"])# 发送邮件server.send_message(msg)# 关闭连接server.quit()return {"message": "Email sent successfully"}@app.route("/invoke", methods=["POST"])
async def send_email_route(request):action = request.json.get("action")data = request.json.get("data", {})actions = {"create_config": create_config,"read_config": read_config,"update_config": update_config,"delete_config": delete_config,"send_email": send_email}func = actions.get(action)if func:return json(await func(data))else:return response.json({"error": "Invalid action"}, status=400)if __name__ == "__main__":app.run(host="0.0.0.0", port=9000, dev=True)
action为"send_email"时则执行发送邮件相关逻辑
然后把这三个py放在我们原来的函数fun-alarm-email-send里面,并且把main.py里面的代码复制到app.py,注释掉原来app.py的内容
并且把公共层换为sqlalchemy-sanic-custom-layer
3.1 新增邮件发送配置
因为我们发送邮件的配置,比如host啥的,都是从数据库中获取的,所以我们要先在数据库中插入数据
{"action": "create_config","data": {"host": "smtp.qq.com","port": 587,"username": "synx@example.com","password": "a","sender": "synx@example.com"}
}
action为create_config表示插入数据data到数据库中,表示调用方法create_config,就是插入配置数据了
然后数据库中也有数据了
3.2 发送告警邮件
{"action": "send_email","data": {"recipient": "smtp.example.com","subject": "告警邮件","body": "告警邮件正文部分",}
}
smtp.example.com是接受者的邮件
send_email表示这次的功能是发送邮件
4. 基于sqlalchemy框架重构邮件配置管理实现
4.1 重构邮件配置管理实现
我们修改email_config_service.py里面的值为
import json
import urllib.parse
from typing import Dict, Anyfrom sqlalchemy import Integer, Column, String, create_engine
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from custom_json_encoder import DateTimeEncoder
Base = declarative_base()class Config(Base):__tablename__ = 'tbl_config'id = Column(Integer, primary_key=True,autoincrement= True)host = Column(String(255))port = Column(Integer)username = Column(String(255))password = Column(String(255))sender = Column(String(255))create_time = Column(String(255))update_time = Column(String(255))def get_db_connection():##这个方法的作用是链接数据库DB_HOST = 'xxxx.com'DB_PORT = 3306DB_USER = 'synx'DB_PASSWORD = 'xxxx'DB_NAME = 'db_message'urllib.parse.quote_plus(DB_PASSWORD)# 会把密码里所有非 ASCII数字字母的字符(特别是:、 / 、@、 % 、 & 、=、空格等)先转成# URL - 安全的百分号编码,再把空格统一变成加号 +。# 典型用途:拼数据库连接串(DSN)时,防止密码里的特殊字符破坏格式:encoded_password = urllib.parse.quote_plus(DB_PASSWORD)DATABASE_URL = f'mysql+pymysql://{DB_USER}:{encoded_password}@{DB_HOST}:{DB_PORT}/{DB_NAME}?charset=utf8mb4'engine = create_engine(DATABASE_URL)Session=sessionmaker(bind=engine)return Session()
# Dict[str, Any] 是 类型注解,告诉阅读者和 IDE:
# 这个函数返回一个 字典(Dict 来自 typing 模块);
# 字典的 键 必须是字符串(str);
# 字典的 值 可以是任意类型(Any)。
async def create_config(data) -> Dict[str, Any]:##往数据库插入配置数据with get_db_connection() as session:try:config = Config(host=data['host'],port=data['port'],username=data['username'],password=data['password'],sender=data['sender'])session.add(config)session.commit()return {"message": "config created successfully"}except SQLAlchemyError as e:session.rollback()return {"error": str(e)}async def read_config(data) -> Dict[str, Any]:"""从数据库中读取配置"""with get_db_connection() as session:try:# 查询数据库Config中第一条数据config = session.query(Config).first()if config:config_dict= {"id": config.id,"host": config.host,"port": config.port,"username": config.username,"password": config.password,"sender": config.sender,"create_time": config.create_time,"update_time": config.update_time}encode_result = json.dumps(config_dict, cls=DateTimeEncoder)return json.loads(encode_result)else:return {"error": "No configuration found"}except SQLAlchemyError as e:return {"error": str(e)}# encode_result = json.dumps(config_dict, cls=DateTimeEncoder)
# return json.loads(encode_result)
# “先把含datetime的对象转成JSON字符串,再立刻拆回普通字典,这样数据库里带日期的记录就能干干净净地返回给前端。”
# encode_result = json.dumps(config_dict, cls=DateTimeEncoder)
# 把 config_dict(可能含 datetime、Decimal 等)序列化成 JSON 字符串。
# DateTimeEncoder 负责把 datetime 字段变成 ISO-8601 字符串,其余字段保持原值。
# return json.loads(encode_result)
# 再把 JSON 字符串反序列化成普通 Python 字典。
# 此时所有值都是基础类型(str、int、float、list、dict),没有 datetime 对象,后续框架自动 jsonify 时不会再报错。async def update_config(data) -> Dict[str, Any]:"""修改邮件配置"""with get_db_connection() as session:try:config_id = data.get("id")if not config_id:return {"error": "Invalid config id"}config = session.query(Config).filter(Config.id == config_id).first()if not config:return {"error": "Config not found"}# 把data[key]的值动态赋给对象config的属性key。因为是修改数据库for key in ["host", "port", "username", "password", "sender"]:if key in data:setattr(config, key, data[key])session.commit()return {"message": "Config updated successfully"}except SQLAlchemyError as e:session.rollback()return {"error": str(e)}async def delete_config(data) -> Dict[str, Any]:"""删除邮件配置"""with get_db_connection() as session:try:config_id = data.get("id")if not config_id:raise ValueError("Invalid config id")config = session.query(Config).filter(Config.id == config_id).first()if not config:return {"error": "Config not found"}session.delete(config)session.commit()return {"message": "Config deleted successfully"}except SQLAlchemyError as e:session.rollback()return {"error": str(e)}
4.2 新增邮件发送配置
还是和原来一样的
4.3 发送告警邮件
4.4 修改CloudFlow工作流的参数传递
这个要怎么修改呢
我们修改的是原来的这个流水线
我们发现
fun-alarm-eamil-send的body也就是json就是input的data
而input就是fun-temperature-and-humidity-data-upload返回的数据
结构是这样·的
{"status" : 1 if t_out_flag or h_out_flag else 0,"message" : "异常" if t_out_flag or h_out_flag else "正常","data" : {#需要自定义收件人邮箱"recipient":"zjdsxc12@qq.com","subject":"告知邮箱-温湿度异常","body":email_body}
data类型是
{#需要自定义收件人邮箱"recipient":"zjdsxc12@qq.com","subject":"告知邮箱-温湿度异常","body":email_body}
但是fun-alarm-eamil-send要接受的body也就是input.data是
{"action": "send_email","data": {"recipient": "zjdsxc12@qq.com","subject": "告警邮件","body": "告警邮件正文部分",}
}
所以要修改fun-temperature-and-humidity-data-upload的返回数据格式
import datetimefrom sanic import Sanic
from sanic.response import jsonapp = Sanic("MyApp")#温度阈值
t_threshold = (25,28)
#湿度阈值
h_threshold = (30,33)# if t_out_flag or h_out_flag:
# "status": 1
# else:
# "status": 0@app.route("/upload",methods=["POST"],name="upload")
@app.route("/invoke",methods=["POST"],name="invoke")
async def data_upload(request):try:data = request.jsonsn = data.get("sn")temperature = data.get("temperature")humidity = data.get("humidity")if not all([sn,temperature,humidity]):return json({"error": "Missing sn, temperature, or humidity"}, status=400)#判断温湿度是否超过阈值#t_out_flag为true就表示超过了阈值t_out_flag = not (t_threshold[0]<=temperature<=t_threshold[1])h_out_flag = not (h_threshold[0]<=humidity<=h_threshold[1])email_body = generate_email_body(sn,temperature,humidity,t_threshold,h_threshold)res = {"status" : 1 if t_out_flag or h_out_flag else 0,"message" : "异常" if t_out_flag or h_out_flag else "正常","data" : {"action": "send_email","data": {"recipient": "zjdsxc12@qq.com","subject": "告警邮件","body": "告警邮件正文部分",}}if t_out_flag or h_out_flag else True,}return json(res)except Exception as e:return json({"error": str(e)}, status=500)
if __name__ == "__main__":app.run(host="0.0.0.0",port=9000)# 作用是允许在字符串中直接嵌入变量或表达式,语法是用 {} 包裹变量名或计算式
def generate_email_body(sn,temperature,humidity,t_threshold,h_threshold):return (f"告警通知:\n\n"f"当前设备{sn}的温湿度数据超出正常范围。\n\n"f"设备温度:{temperature}°c\n"f"温度阈值:{t_threshold[0]}°c-{t_threshold[1]}°c\n\n"f"设备湿度:{humidity}%\n"f"湿度阈值:{h_threshold[0]}%-{h_threshold[1]}%\n\n"f"请尽快检查设备并采取相应措施。\n\n"f"时间:{datetime.datetime.now().isoformat()}")if __name__ == "__main__":app.run(host="0.0.0.0",port=9000)
就修改这一个应该就可以了
点击详情,里面有公网访问地址,就可以调用工作流了
注意里面的公网ip地址是携带者token的哦,不要删除了
还是成功了的