TDengine 中 TDgp 中添加机器学习模型
添加机器学习模型
机器/深度学习模型一般要求训练集样本量足够大,才能取得不错的预测效果。训练过程要消耗一定量的时间和计算资源,并需要根据输入的数据进行定期的训练以及更新模型。TDgpt 内置了 PyTorch 和 Keras 机器学习库。所有使用 PyTorch 或 Keras 开发的模型均可以驱动运行。
本章介绍将预训练完成的机器/深度学习分析模型添加到 TDgpt 中的方法。
准备模型
推荐将模型保存在默认的保存目录 /usr/local/taos/taosanode/model/
中,也可以在目录中建立下一级目录,用以保存模型。下面使用 Keras 开发的基于自编码器 auto encoder
的异常检测模型添加到 TDgpt 为例讲解整个流程。
该模型在 TDgpt 系统中名称为 ‘sample_ad_model’。
训练该模型的代码见:training_ad_model.py。
该模型训练使用了 NAB 的 art_daily_small_noise 数据集。
训练完成得到的模型保存成为了两个文件,点击 此处 下载该模型文件,模型文件说明如下。
sample-ad-autoencoder.keras 模型文件,默认 keras 模型文件格式
sample-ad-autoencoder.info 模型附加参数文件,采用了 joblib 格式保存
保存在合适位置
然后在 /usr/local/taos/taosanode/model
文件目录下建立子目录 sample-ad-autoencoder
,用以保存下载两个模型文件。此时 model
文件夹结构如下:
.
└── model└── sample-ad-autoencoder├── sample-ad-autoencoder.keras└── sample-ad-autoencoder.info
添加模型适配代码
需要在 taosanalytics 目录下添加加载该模型并进行适配的 Python 代码。适配并行运行的代码见 autoencoder.py。
为了方便使用,我们已经将该文件保存在该目录,所以您在执行 show anodes full
命令时候,能够看见该算法模型。
下面详细说明该代码的逻辑。
class _AutoEncoderDetectionService(AbstractAnomalyDetectionService):name = 'sample_ad_model' # 通过 show 命令看到的算法的名称,在此处定义desc = "sample anomaly detection model based on auto encoder"def __init__(self):super().__init__()self.table_name = Noneself.mean = Noneself.std = Noneself.threshold = Noneself.time_interval = Noneself.model = None# 模型文件保存的文件夹名称,如果您更改了文件夹名称,在此处需要同步修改,以确保代码可以正确加载模型文件self.dir = 'sample-ad-autoencoder' self.root_path = conf.get_model_directory()self.root_path = self.root_path + f'/{self.dir}/'# 检查代码中指定的模型文件路径是否存在if not os.path.exists(self.root_path):app_logger.log_inst.error("%s ad algorithm failed to locate default module directory:""%s, not active", self.__class__.__name__, self.root_path)else:app_logger.log_inst.info("%s ad algorithm root path is: %s", self.__class__.__name__,self.root_path)def execute(self):"""异常检测主体执行函数"""if self.input_is_empty():return []if self.model is None:raise FileNotFoundError("not load autoencoder model yet, or load model failed")# 初始化输入进行异常检测的数据array_2d = np.reshape(self.list, (len(self.list), 1))df = pd.DataFrame(array_2d)# 使用 z-score 进行归一化处理normalized_list = (df - self.mean.value) / self.std.valueseq = create_sequences(normalized_list.values, self.time_interval)# 进行模型推理pred_list = self.model.predict(seq)# 计算 MAE 损失值mae_loss = np.mean(np.abs(pred_list - seq), axis=1)mae = mae_loss.reshape((-1))# 大于阈值的设置为异常点anomalies = mae > self.threshold# data i is an anomaly if samples [(i - timesteps + 1) to (i)] are anomaliesad_indices = []for data_idx in range(self.time_interval - 1,len(normalized_list) - self.time_interval + 1):if np.all(anomalies[data_idx - self.time_interval + 1: data_idx]):ad_indices.append(data_idx)# 变换结果,符合输出的约定,支持分析完成return [-1 if i in ad_indices else 1 for i in range(len(self.list))]def set_params(self, params):"""在此函数中进行模型的加载操作,然后在 executor 中调用"""if "model" not in params:raise ValueError("model needs to be specified")name = params['model']# 拼装模型文件的全路径,此处有两个文件,一个模型主体文件,一个信息文件module_file_path = f'{self.root_path}/{name}.keras'module_info_path = f'{self.root_path}/{name}.info'app_logger.log_inst.info("try to load module:%s", module_file_path)# 因为保存成为 keras 格式,所以调用 keras API 加载模型文件if os.path.exists(module_file_path):self.model = keras.models.load_model(module_file_path)else:app_logger.log_inst.error("failed to load autoencoder model file: %s", module_file_path)raise FileNotFoundError(f"{module_file_path} not found")# 加载辅助信息文件if os.path.exists(module_info_path):info = joblib.load(module_info_path)else:app_logger.log_inst.error("failed to load autoencoder model file: %s", module_file_path)raise FileNotFoundError("%s not found", module_info_path)# 初始化模型推理的辅助信息到对象中if info is not None:self.mean = info["mean"]self.std = info["std"]self.threshold = info["threshold"]self.time_interval = info["timesteps"]app_logger.log_inst.info("load ac module success, mean: %f, std: %f, threshold: %f, time_interval: %d",self.mean[0], self.std[0], self.threshold, self.time_interval)else:app_logger.log_inst.error("failed to load %s model", name)raise RuntimeError(f"failed to load model {name}")
使用 SQL 调用模型
该模型已经预置在系统中,您可通过 show anodes full
直接查看。一个新的算法适配完成以后,需要重新启动 taosanode,并执行命令 update all anodes
更新 mnode 的算法列表。
- 通过设置参数
algo=sample_ad_model
,告诉 TDgpt 调用自编码器算法训练的模型(该算法模型在可用算法列表中)。 - 通过设置参数
model=sample-ad-autoencoder
,告诉自编码器加载特定的模型。
--- 在 options 中增加 model 参数 sample-ad-autoencoder, 针对 foo 数据集(表)训练的采用自编码器的异常检测模型进行异常检测
SELECT _wstart, count(*)
FROM foo anomaly_window(val, 'algo=sample_ad_model,model=sample-ad-autoencoder');