构建灵活的监控系统:多表存储与动态告警规则设计实践
构建灵活的监控系统
- 构建灵活的监控系统:多表存储与动态告警规则设计实践
- 背景
- 面临的痛点
- 解决方案概览
- 1. 多表存储,公共代码设计
- 2. 动态查询与路径访问
- 3. 基于表达式的告警规则引擎
- 4. 统一动态告警结果持久化
- 系统工作流程详解
- 1. 触发监控任务
- 2. 数据查询(动态多表查询)
- 3. 解析与动态字段访问
- 4. 构造告警规则上下文与函数注册
- 5. 告警规则表达式执行
- 6. 告警结果持久化
- 7. 监控平台展示与后续处理
- 关键代码示例
- 多表查询与动态转换
- 动态路径访问工具
- 表达式引擎告警规则设计
- 统一动态告警结果持久化
- 缓存优化:使用 Redis 缓存表达式和字段映射,提升性能
- 为什么使用缓存?
- 设计思路
- 缓存示例流程
- 伪代码示例
- 总结
构建灵活的监控系统:多表存储与动态告警规则设计实践
背景
在现代仓库管理系统中,货物种类繁多,每种货物的属性和监控需求各不相同。为了满足业务需求,我们需要:
- 为每种货物设计单独的数据库表,存储其特有字段。
- 代码层面只维护一套公共处理逻辑,避免为每种货物写重复代码。
- 支持动态查询和访问多样化字段。
- 设计灵活的告警规则引擎,支持复杂表达式和自定义函数。
- 统一持久化告警结果,支持多表存储。
面临的痛点
-
多表存储,代码复杂
每种货物单独表,字段差异大,若为每种货物写一套代码,维护成本高。 -
动态字段访问难
扩展字段存储为 JSON,访问嵌套字段和数组不便。 -
告警规则多且复杂
规则硬编码难维护,且业务需求经常变更。 -
告警结果存储多样
不同货物告警结果存储表不同,代码重复且难扩展。
解决方案概览
1. 多表存储,公共代码设计
-
数据库设计
- 每种货物有独立表,存储核心字段和特有字段。
- 公共字段统一命名,如
ProductID
、ProductName
、Category
、Extra
(JSON)。 Extra
字段存储不重要或嘈杂的扩展信息。
-
代码设计
- 查询时动态指定表名,返回
[]map[string]interface{}
。 - 通过反射和类型断言,动态处理不同表结构。
- 不为每种货物写结构体,统一用 map 处理。
- 查询时动态指定表名,返回
2. 动态查询与路径访问
- 使用
database/sql
动态扫描列,安全处理 NULL。 - 实现
GetValueByPath
函数,支持点分隔和数组索引访问嵌套字段。
3. 基于表达式的告警规则引擎
- 使用
govaluate
表达式引擎,动态解析和执行规则表达式。 - 支持注册自定义函数,满足复杂业务逻辑。
- 传入上下文为 map,表达式中可直接访问字段。
4. 统一动态告警结果持久化
- 通过反射和路径访问,动态映射字段到数据库列。
- 支持 Extra 字段内嵌数据提取。
- 统一构造插入参数,调用 DAO 层执行。
系统工作流程详解
1. 触发监控任务
- 监控系统定时或事件驱动触发对某个仓库或某类货物的监控任务。
- 任务包含:需要监控的货物类型(对应数据库表)、告警策略、告警条件等信息。
2. 数据查询(动态多表查询)
- 根据任务中指定的货物类型,动态确定对应的数据库表名。
- 调用
GetProductsFromDB(db, tableName, limit)
,执行 SQL 查询,获取货物数据。 - 查询结果是
[]map[string]interface{}
,每条记录是一个 map,key 是字段名,value 是字段值。 - 其中,
Extra
字段是 JSON 字符串,存储该货物的扩展属性。
3. 解析与动态字段访问
- 对每条货物数据,调用
ExtractProductBaseInfo
(或类似逻辑)将公共字段提取出来,Extra
字段反序列化成map[string]interface{}
。 - 需要访问扩展字段时,调用
GetValueByPath(extraMap, "specs.weight")
等路径访问函数,支持嵌套和数组索引。 - 这样,代码无需为每种货物写结构体,动态访问任意字段。
4. 构造告警规则上下文与函数注册
- 将货物数据(包括反序列化后的 Extra)作为上下文
product map[string]interface{}
。 - 调用
GetGoValuateFunction(product, task, strategyID, conditionID)
,动态注册 govaluate 表达式引擎所需的自定义函数。 - 这些函数可以访问上下文中的字段,实现复杂业务逻辑。
5. 告警规则表达式执行
- 取出当前告警条件的表达式字符串(如
"Extra.weight > 100 && IsDiscontinued == 0"
)。 - 调用
ScanProductTypeScan(productID, condition, goValuateFunctions)
:- 使用 govaluate 解析表达式,传入自定义函数。
- 执行表达式,得到布尔结果
isRisk
。
- 如果表达式执行出错,记录日志并跳过该条数据。
6. 告警结果持久化
- 如果
isRisk == true
,说明该货物触发了告警。 - 调用
SaveResultService
,传入:- 结果表名(根据货物类型动态确定)
- 任务信息(AsyncID、AppID、策略ID、条件ID)
- 货物数据
product
和扩展字段extra
- 结果映射字段列表
resultReflectField
(定义了哪些字段映射到结果表的哪些列)
SaveResultService
通过反射和路径访问,动态提取字段值,构造插入参数。- 调用 DAO 层执行 SQL 插入,将告警结果写入对应表。
7. 监控平台展示与后续处理
- 告警结果存储后,监控平台前端可以查询对应结果表,展示告警详情。
- 支持告警通知、统计分析、历史查询等功能。
关键代码示例
多表查询与动态转换
func GetProductsFromDB(db *sql.DB, table string, limit int) ([]map[string]interface{}, error) {sqlStr := fmt.Sprintf("SELECT * FROM %s LIMIT ?", table)rows, err := db.Query(sqlStr, limit)if err != nil {return nil, err}defer rows.Close()columnTypes, err := rows.ColumnTypes()if err != nil {return nil, err}vals := make([]interface{}, len(columnTypes))for i, ct := range columnTypes {switch ct.DatabaseTypeName() {case "VARCHAR", "TEXT":vals[i] = new(sql.NullString)case "BIGINT", "INT":vals[i] = new(sql.NullInt64)case "FLOAT", "DOUBLE", "DECIMAL":vals[i] = new(sql.NullFloat64)case "BOOL", "BOOLEAN":vals[i] = new(sql.NullBool)default:vals[i] = new(sql.NullString)}}var results []map[string]interface{}for rows.Next() {err := rows.Scan(vals...)if err != nil {return nil, err}rowMap := make(map[string]interface{})for i, ct := range columnTypes {colName := ct.Name()switch ct.DatabaseTypeName() {case "VARCHAR", "TEXT":ns := vals[i].(*sql.NullString)if ns.Valid {rowMap[colName] = ns.String} else {rowMap[colName] = ""}case "BIGINT", "INT":ni := vals[i].(*sql.NullInt64)if ni.Valid {rowMap[colName] = ni.Int64} else {rowMap[colName] = int64(0)}case "FLOAT", "DOUBLE", "DECIMAL":nf := vals[i].(*sql.NullFloat64)if nf.Valid {rowMap[colName] = nf.Float64} else {rowMap[colName] = float64(0)}case "BOOL", "BOOLEAN":nb := vals[i].(*sql.NullBool)if nb.Valid {rowMap[colName] = nb.Bool} else {rowMap[colName] = false}default:ns := vals[i].(*sql.NullString)if ns.Valid {rowMap[colName] = ns.String} else {rowMap[colName] = ""}}}results = append(results, rowMap)}return results, nil
}
动态路径访问工具
func GetValueByPath(data interface{}, path string) (interface{}, error) {parts := strings.Split(path, ".")val := reflect.ValueOf(data)for _, part := range parts {if strings.Contains(part, "[") && strings.HasSuffix(part, "]") {idxStart := strings.Index(part, "[")key := part[:idxStart]idxStr := part[idxStart+1 : len(part)-1]if val.Kind() == reflect.Map {val = val.MapIndex(reflect.ValueOf(key))if !val.IsValid() {return nil, fmt.Errorf("key %s not found", key)}} else {return nil, errors.New("expected map for key access")}if val.Kind() == reflect.Slice {idx, err := strconv.Atoi(idxStr)if err != nil {return nil, err}if idx < 0 || idx >= val.Len() {return nil, fmt.Errorf("index %d out of range", idx)}val = val.Index(idx)} else {return nil, errors.New("expected slice for index access")}} else {if val.Kind() == reflect.Map {val = val.MapIndex(reflect.ValueOf(part))if !val.IsValid() {return nil, fmt.Errorf("key %s not found", part)}} else {return nil, errors.New("expected map for key access")}}val = reflect.Indirect(val)}switch val.Kind() {case reflect.String:return val.String(), nilcase reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:return val.Int(), nilcase reflect.Float32, reflect.Float64:return val.Float(), nilcase reflect.Bool:return val.Bool(), nil}return val.Interface(), nil
}
表达式引擎告警规则设计
func GetGoValuateFunction(product map[string]interface{}, task TaskInfo, strategyID, conditionID int64) map[string]govaluate.ExpressionFunction {goValuateFunctions := map[string]govaluate.ExpressionFunction{}// 反序列化 Extra 字段if extraInterface, exist := product["Extra"]; exist {if extraStr, ok := extraInterface.(string); ok {var extra map[string]interface{}if err := json.Unmarshal([]byte(extraStr), &extra); err == nil {product["Extra"] = extra}}}goValuateService := &GoValuateService{Product: product,Task: task,StrategyID: strategyID,ConditionID: conditionID,}for _, service := range GoValuateServiceMap {goValuateFunctions = service(goValuateService).RegistryFunc(goValuateFunctions)}return goValuateFunctions
}func ScanProductTypeScan(productID string, condition Condition, goValuateFunctions map[string]govaluate.ExpressionFunction) bool {expression, err := govaluate.NewEvaluableExpressionWithFunctions(condition.PolicyV2, goValuateFunctions)if err != nil {// 记录错误日志return false}result, err := expression.Evaluate(nil)if err != nil {// 记录错误日志return false}if boolResult, ok := result.(bool); ok {return boolResult}return false
}
统一动态告警结果持久化
func (r *ResultService) SaveResultService(table, asyncID string, appID, strategyID, conditionID int64,product map[string]interface{}, extra map[string]interface{}, resultReflectField []ResultReflectField) error {insMap := map[string]interface{}{"appid": appID,"async_id": asyncID,"strategy_id": strategyID,"condition_id": conditionID,}level, err := r.ConditionDaoClient.GetConditionLevel(conditionID)if err != nil {return err}insMap["level"] = levelfor _, relationInfo := range resultReflectField {var value interface{}if !strings.HasPrefix(relationInfo.InsKey, "Extra.") {value = product[relationInfo.InsKey]} else {newKey := strings.TrimPrefix(relationInfo.InsKey, "Extra.")value, err = GetValueByPath(extra, newKey)if err != nil {// 记录错误,继续continue}}// 处理复杂类型转字符串valReflect := reflect.ValueOf(value)switch valReflect.Kind() {case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64,reflect.Float32, reflect.Float64, reflect.String:// 直接使用default:if relationInfo.ResultType == "string" {strVal, err := json.Marshal(value)if err == nil {value = string(strVal)} else {value = ""}}}insMap[relationInfo.ResultKey] = value}return r.ResultServiceInterface.SaveResultService(table, insMap)
}
缓存优化:使用 Redis 缓存表达式和字段映射,提升性能
为什么使用缓存?
-
表达式解析开销大
govaluate 解析表达式字符串生成抽象语法树(AST)有一定开销,频繁解析影响性能。 -
字段映射动态解析成本高
告警结果持久化时,动态解析字段映射关系耗时,重复解析影响效率。
设计思路
-
表达式缓存
- 使用 Redis 存储表达式字符串对应的序列化表达式对象(或表达式字符串本身,结合本地缓存)。
- 本地内存缓存配合 Redis,减少网络开销。
- key 设计为
expr_cache:{conditionID}
。
-
字段映射缓存
- Redis 存储字段映射 JSON,key 设计为
field_mapping:{conditionID}
。 - 应用启动时或变更时加载到本地缓存。
- Redis 存储字段映射 JSON,key 设计为
缓存示例流程
- 查询本地内存缓存,若无则查询 Redis。
- Redis 无则从数据库加载,更新 Redis 和本地缓存。
- 使用缓存数据执行告警逻辑。
伪代码示例
func GetCachedExpression(conditionID int64, exprStr string, funcs map[string]govaluate.ExpressionFunction) (*govaluate.EvaluableExpression, error) {// 先查本地内存缓存if expr, ok := localExprCache.Load(conditionID); ok {return expr.(*govaluate.EvaluableExpression), nil}// 查 RediscachedExprStr, err := redisClient.Get(ctx, fmt.Sprintf("expr_cache:%d", conditionID)).Result()if err == redis.Nil || cachedExprStr == "" {// Redis无,存入redisClient.Set(ctx, fmt.Sprintf("expr_cache:%d", conditionID), exprStr, time.Hour)cachedExprStr = exprStr}expr, err := govaluate.NewEvaluableExpressionWithFunctions(cachedExprStr, funcs)if err != nil {return nil, err}localExprCache.Store(conditionID, expr)return expr, nil
}
总结
通过以上设计,我们实现了:
- 高复用的多表存储处理,避免重复代码。
- 灵活的动态字段访问,支持复杂嵌套和数组。
- 基于表达式的灵活告警规则引擎,支持自定义函数。
- 统一动态告警结果持久化,支持多表存储。
- 基于 Redis 的缓存优化,显著提升性能和响应速度。