当前位置: 首页 > news >正文

Openmetadata数据质量管理-新增自定义测试类型

自定义测试类型

  • 写在前面
  • 开发流程
    • 一、新增测试类型
    • 二、架构设计
    • 三、Python 相关源码解析
      • 1. base_test_handler(验证器基类)
      • 2. tableCustomSQLQuery(验证自定义SLQ查询测试基类)
      • 3. tableCustomSQLQuery(关系型数据库验证自定义SQL实现)
    • 四、Python 开发流程
      • 1. 新增自定义测试验证器基类
      • 2. 新增自定义测试验证器实现
      • 3. 自定义测试验证器绑定运行时参数对象
    • 五、避坑指南
      • 1. 测试套件无法绑定测试用例

写在前面

在使用 OpenMetadata 的过程中,涉及其数据质量管理模块。当前业务需求要求输出测试用例的执行明细信息,包括具体哪些数据满足校验条件、哪些数据不满足。然而,现有内置的测试类型无法满足该需求,因此需要扩展并新增自定义测试类型以支持更细粒度的数据验证与结果输出。

当前 Openmetadata 版本:1.10.0

开发流程

参考官方说明:https://blog.open-metadata.org/leveraging-the-power-of-openmetadata-data-quality-framework-385ba2d8eaf

一、新增测试类型

究其最终实现,无非在 test_definition 表中插入一条数据

相关源码位置: org.openmetadata.service.resources.dqtests.TestDefinitionResource#createOrUpdate

接口地址:

${ip}:${port}/api/v1/dataQuality/testDefinitions

接口类型:

POST

请求参数:

参数说明:

参数说明
description这是对您的测试的描述。它将为您的用户提供更多信息和详细信息
entityType如果正在定义表级测试,则设置TABLE;如果正在定义列级测试,请设置COLUMN
name测试定义的名称。它在所有测试定义中都应该是唯一的
displayName测试定义的名称,它将出现在OpenMetadata UI中
testPlatforms这是一组测试平台。要使测试通过OpenMetadata可执行,应将其设置为OpenMetadata
supportedDataTypes(可选)传递测试定义的支持数据类型列表。这将限制用户仅对具有支持的数据类型的列创建测试的能力。可以在此处找到数据类型值的完整列表。
parameterDefinition将用于配置测试的参数列表
parameterDefinition[0].name参数的名称
parameterDefinition[0].displayName参数在UI中的显示名称
parameterDefinition[0].description参数的描述。这将用于填充UI中的工具提示
parameterDefinition[0].optionValues如果要从下拉菜单中将参数值设置为可选值

示例:

{"name": "tableCustomSQLQueryExhance","displayName": "Custom SQL Query Exhance","description": "Test if a custom SQL returns 0 row or `COUNT(<x>) == 0` and show test details","entityType": "TABLE","testPlatforms": ["OpenMetadata"],"supportedDataTypes": [],"parameterDefinition": [{"name": "sqlExpression","dataType": "STRING","required": true,"description": "SQL expression to run against the table","displayName": "SQL Expression","optionValues": []},{"name": "strategy","dataType": "ARRAY","required": false,"description": "Strategy to use to run the custom SQL query (i.e. `SELECT COUNT(<col>)` or `SELECT <col> (defaults to ROWS)","displayName": "Strategy","optionValues": ["ROWS","COUNT"]},{"name": "operator","dataType": "STRING","required": false,"description": "Operator to use to compare the result of the custom SQL query to the threshold.","displayName": "Operator","optionValues": ["==",">",">=","<","<=","!="]},{"name": "threshold","dataType": "NUMBER","required": false,"description": "Threshold to use to determine if the test passes or fails (defaults to 0).","displayName": "Threshold","optionValues": []},{"name": "partitionExpression","dataType": "STRING","required": false,"description": "Partition expression that will be used to compute the passed/failed row count, if compute row count is enabled.","displayName": "Partition Expression","optionValues": []}]
}

在上手使用中,主要关注点如下:

  • name:测试类型唯一名称
  • displayName:页面展示规则名称
  • entityType:测试目标类型(对应测试表级别与列级别)
  • parameterDefinition:参数列表,对应页面中的参数列表,对于一些自定义的参数,可通过该方式传递

二、架构设计

数据质量验证框架

OpenMetadata 在数据质量校验模块中巧妙地结合了 策略模式(Strategy Pattern) 与 适配器模式(Adapter Pattern),实现了对多种数据源的统一校验能力。

在 /metadata/data_quality/validations 目录下,代码结构清晰体现了这一设计思想:

  • base/:定义了校验逻辑的抽象基类和通用框架,封装了校验流程的骨架(如参数解析、结果组装、错误处理等),为不同数据后端提供一致的接口契约。
  • sqlalchemy/:针对关系型数据库(如 MySQL、PostgreSQL、Snowflake 等)提供了具体实现,基于 SQLAlchemy ORM 构建查询逻辑,继承自 base 中的抽象类,将通用校验规则适配到 SQL 执行上下文。
  • pandas/:面向文件(如 CSV、Parquet)或内存中的 DataFrame 数据,利用 Pandas 进行本地计算,同样继承自同一套基类,确保校验语义在不同执行引擎下保持一致。

通过这种分层设计,OpenMetadata 能够根据数据源类型(如数据库表 vs. 本地文件)动态选择并加载对应的校验策略,既避免了逻辑耦合,又保证了扩展性——新增一种数据源只需实现对应的适配器,无需修改核心校验流程。这种架构不仅提升了代码的可维护性,也为未来支持更多计算引擎(如 Spark、DuckDB)奠定了坚实基础。


三、Python 相关源码解析

以自定义SQL规则为例

1. base_test_handler(验证器基类)

代码位置:metadata.data_quality.validations.base_test_handler.py

作用:为测试验证器提供基础功能和统一接口

"""
Base validator class
验证器基类
"""from __future__ import annotationsimport reprlib
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Callable, List, Optional, Type, TypeVar, Unionfrom pydantic import BaseModelfrom metadata.data_quality.validations import utils
from metadata.generated.schema.tests.basic import (TestCaseResult,TestCaseStatus,TestResultValue,
)
from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue
from metadata.generated.schema.type.basic import Timestamp
from metadata.profiler.processor.runner import QueryRunnerif TYPE_CHECKING:from pandas import DataFrameT = TypeVar("T", bound=Callable)
R = TypeVar("R")
S = TypeVar("S", bound=BaseModel)class BaseTestValidator(ABC):"""Abstract class for test case handlersThe runtime_parameter_setter is run after the test case is created to set the runtime parameters.This can be useful to resolve complex test parameters based on the parameters gibven by the user."""def __init__(self,runner: Union[QueryRunner, List["DataFrame"]],test_case: TestCase, # 测试用例对象execution_date: Timestamp,) -> None:self.runner = runnerself.test_case = test_caseself.execution_date = execution_date# 验证逻辑抽象方法(需要子类中实现)@abstractmethoddef run_validation(self) -> TestCaseResult:"""Run validation for the given test caseReturns:TestCaseResult:"""raise NotImplementedError# 从测试用例中获取执行名称的参数值静态方法@staticmethoddef get_test_case_param_value(test_case_param_vals: List[TestCaseParameterValue],name: str,type_: T,default: Optional[R] = None,pre_processor: Optional[Callable] = None,) -> Optional[Union[R, T]]:return utils.get_test_case_param_value(test_case_param_vals, name, type_, default, pre_processor)# 创建并返回标准化的测试结果对象(处理行数和百分比计算)def get_test_case_result_object(  # pylint: disable=too-many-argumentsself,execution_date: Timestamp,status: TestCaseStatus,result: str,test_result_value: List[TestResultValue],row_count: Optional[int] = None,failed_rows: Optional[int] = None,passed_rows: Optional[int] = None,min_bound: Optional[float] = None,max_bound: Optional[float] = None,) -> TestCaseResult:"""Returns a TestCaseResult object with the given argsArgs:execution_date (Union[datetime, float]): test case execution datetimestatus (TestCaseStatus): failed, success, abortedresult (str): test case resulttest_result_value (List[TestResultValue]): test result value to display in UIReturns:TestCaseResult:"""test_case_result = TestCaseResult(timestamp=execution_date,  # type: ignoretestCaseStatus=status,result=result,testResultValue=test_result_value,sampleData=None,# if users don't set the min/max bound, we'll change the inf/-inf (used for computation) to NoneminBound=None if min_bound == float("-inf") else min_bound,maxBound=None if max_bound == float("inf") else max_bound,)# 计算通过数、失败数、通过率、失败率等if (row_count is not None and row_count != 0) and (# we'll need at least one of these to be not None to compute the other(failed_rows is not None)or (passed_rows is not None)):passed_rows = passed_rows if passed_rows is not None else (row_count - failed_rows)  # type: ignorefailed_rows = (failed_rows if failed_rows is not None else (row_count - passed_rows))test_case_result.passedRows = int(passed_rows)test_case_result.failedRows = int(failed_rows)test_case_result.passedRowsPercentage = float(passed_rows / row_count) * 100test_case_result.failedRowsPercentage = float(failed_rows / row_count) * 100  # type: ignorereturn test_case_result# 根据测试状态格式化列表显示(成功状态简洁显示,失败状态显示完整列表)def format_column_list(self, status: TestCaseStatus, cols: List):"""Format column list based on the test statusArgs:cols: list of columns"""if status == TestCaseStatus.Success:return reprlib.repr(cols)return cols# 返回测试状态def get_test_case_status(self, condition: bool) -> TestCaseStatus:"""Returns TestCaseStatus based on conditionArgs:condition (bool): condition to checkReturns:TestCaseStatus:"""return TestCaseStatus.Success if condition else TestCaseStatus.Failed# 获取最小边界值def get_min_bound(self, param_name: str) -> Optional[float]:"""get min value for max value in column test case"""return self.get_test_case_param_value(self.test_case.parameterValues,  # type: ignoreparam_name,float,default=float("-inf"),)# 获取最大边界值def get_max_bound(self, param_name: str) -> Optional[float]:"""get max value for max value in column test case"""return self.get_test_case_param_value(self.test_case.parameterValues,  # type: ignoreparam_name,float,default=float("inf"),)# 获取预测值(可在基类中重写)def get_predicted_value(self) -> Optional[str]:"""Get predicted value"""return None# 从测试用例参数中获取运行时参数def get_runtime_parameters(self, setter_class: Type[S]) -> S:"""Get runtime parameters"""for param in self.test_case.parameterValues or []:if param.name == setter_class.__name__:return setter_class.model_validate_json(param.value)raise ValueError(f"Runtime parameter {setter_class.__name__} not found")

2. tableCustomSQLQuery(验证自定义SLQ查询测试基类)

代码位置:ingestion/src/metadata/data_quality/validations/table/base/tableCustomSQLQuery.py

作用:验证自定义SQL查询测试基类

"""
Validator for table custom SQL Query test case
表自定义SQL查询测试用例的验证器
"""import traceback
from abc import abstractmethod
from enum import Enum
from typing import castfrom metadata.data_quality.validations.base_test_handler import BaseTestValidator
from metadata.generated.schema.tests.basic import (TestCaseResult,TestCaseStatus,TestResultValue,
)
from metadata.utils.helpers import evaluate_threshold
from metadata.utils.logger import test_suite_loggerlogger = test_suite_logger()RESULT_ROW_COUNT = "resultRowCount"class Strategy(Enum):COUNT = "COUNT"ROWS = "ROWS"class BaseTableCustomSQLQueryValidator(BaseTestValidator):"""Validator table custom SQL Query test case"""def _run_validation(self) -> TestCaseResult:"""Execute the specific test validation logicThis method contains the core validation logic that was previouslyin the run_validation method.Returns:TestCaseResult: The test case result for the overall validation""""""获取测试参数:sqlExpression:用户写的SQL查询语句operator:比较符号(如 <=, >=, == 等)threshold:阈值(期望的结果数量)strategy:策略(是计算行数还是返回具体数据)"""sql_expression = self.get_test_case_param_value(self.test_case.parameterValues,  # type: ignore"sqlExpression",str,)operator = self.get_test_case_param_value(self.test_case.parameterValues, "operator", str, "<="  # type: ignore)threshold = self.get_test_case_param_value(self.test_case.parameterValues,  # type: ignore"threshold",int,default=0,)strategy = self.get_test_case_param_value(self.test_case.parameterValues,  # type: ignore"strategy",Strategy,)# 参数格式转换operator = cast(str, operator)  # satisfy mypysql_expression = cast(str, sql_expression)  # satisfy mypythreshold = cast(int, threshold)  # satisfy mypystrategy = cast(Strategy, strategy)  # satisfy mypytry:# 核心方法:执行SQL返回行数rows = self._run_results(sql_expression, strategy)except Exception as exc:msg = f"Error computing {self.test_case.fullyQualifiedName}: {exc}"  # type: ignorelogger.debug(traceback.format_exc())logger.warning(msg)return self.get_test_case_result_object(self.execution_date,TestCaseStatus.Aborted,msg,[TestResultValue(name=RESULT_ROW_COUNT, value=None)],)# 如果返回结果不为int,则取其长度(查询数据列表)len_rows = rows if isinstance(rows, int) else len(rows)# 检查是否通过test_passed = evaluate_threshold(threshold,operator,len_rows,)if test_passed:status = TestCaseStatus.Successresult_value = len_rowselse:status = TestCaseStatus.Failedresult_value = len_rowsif self.test_case.computePassedFailedRowCount:row_count = self._get_total_row_count_if_needed()passed_rows, failed_rows = self._calculate_passed_failed_rows(test_passed, operator, threshold, len_rows, row_count)else:passed_rows = Nonefailed_rows = Nonerow_count = Nonereturn self.get_test_case_result_object(self.execution_date,status,f"Found {result_value} row(s). Test query is expected to return {operator} {threshold} row(s).",[TestResultValue(name=RESULT_ROW_COUNT, value=str(result_value))],row_count=row_count,failed_rows=failed_rows,passed_rows=passed_rows,)# 核心方法:执行SQL返回行数@abstractmethoddef _run_results(self, sql_expression: str, strategy: Strategy = Strategy.ROWS):raise NotImplementedError@abstractmethoddef compute_row_count(self):"""Compute row count for the given columnRaises:NotImplementedError:"""raise NotImplementedErrordef get_row_count(self) -> int:"""Get row countReturns:Tuple[int, int]:"""return self.compute_row_count()def _get_total_row_count_if_needed(self) -> int:"""Get total row count if computePassedFailedRowCount is enabled"""return self.get_row_count()def _calculate_passed_failed_rows(self,test_passed: bool,operator: str,threshold: int,len_rows: int,row_count: int,) -> tuple[int, int]:"""Calculate passed and failed rows based on test result and operatorArgs:test_passed: Whether the test passedoperator: Comparison operator (>, >=, <, <=, ==)threshold: Expected threshold valuelen_rows: Number of rows returned by the test queryrow_count: Total number of rows in the table (or None)Returns:Tuple of (passed_rows, failed_rows)"""if test_passed:return self._calculate_passed_rows_success(operator, len_rows, row_count)return self._calculate_passed_rows_failure(operator, threshold, len_rows, row_count)def _calculate_passed_rows_success(self, operator: str, len_rows: int, row_count: int) -> tuple[int, int]:"""Calculate passed/failed rows when test passed"""if operator in (">", ">="):passed_rows = len_rowsfailed_rows = (row_count - len_rows) if row_count else 0elif operator in ("<", "<="):passed_rows = row_count - len_rowsfailed_rows = len_rowselif operator == "==":passed_rows = len_rowsfailed_rows = row_count - len_rowselse:passed_rows = len_rowsfailed_rows = 0return max(0, passed_rows), max(0, failed_rows)def _calculate_passed_rows_failure(self, operator: str, threshold: int, len_rows: int, row_count: int) -> tuple[int, int]:"""Calculate passed/failed rows when test failed"""if operator in (">", ">="):return self._calculate_greater_than_failure(len_rows, row_count)if operator in ("<", "<="):return self._calculate_less_than_failure(len_rows, row_count)if operator == "==":return self._calculate_equal_failure(threshold, len_rows, row_count)failed_rows = row_count if row_count else len_rowsreturn 0, max(0, failed_rows)def _calculate_greater_than_failure(self, len_rows: int, row_count: int) -> tuple[int, int]:"""Calculate rows for > or >= operator failure (expected more rows)"""passed_rows = len_rowsfailed_rows = (row_count - len_rows) if row_count else 0return max(0, passed_rows), max(0, failed_rows)def _calculate_less_than_failure(self, len_rows: int, row_count: int) -> tuple[int, int]:"""Calculate rows for < or <= operator failure (expected fewer rows)"""failed_rows = len_rowspassed_rows = row_count - failed_rowsreturn max(0, passed_rows), max(0, failed_rows)def _calculate_equal_failure(self, threshold: int, len_rows: int, row_count: int) -> tuple[int, int]:"""Calculate rows for == operator failure (expected exact count)"""if row_count:if len_rows > threshold:failed_rows = len_rows - thresholdpassed_rows = row_count - failed_rowselse:failed_rows = row_count - len_rowspassed_rows = len_rowselse:failed_rows = abs(len_rows - threshold)passed_rows = 0return max(0, passed_rows), max(0, failed_rows)

3. tableCustomSQLQuery(关系型数据库验证自定义SQL实现)

代码位置:ingestion/src/metadata/data_quality/validations/table/sqlalchemy/tableCustomSQLQuery.py

作用:关系型数据库验证自定义SQL实现

"""
Validator for table custom SQL Query test case
表自定义SQL查询测试用例的验证器实现
"""from typing import Optional, Tuple, castimport sqlparse
from sqlalchemy import text
from sqlalchemy.sql import func, select
from sqlparse.sql import Statement, Token, Where
from sqlparse.tokens import Keywordfrom metadata.data_quality.validations.mixins.sqa_validator_mixin import (SQAValidatorMixin,
)
from metadata.data_quality.validations.models import (TableCustomSQLQueryRuntimeParameters,
)
from metadata.data_quality.validations.table.base.tableCustomSQLQuery import (BaseTableCustomSQLQueryValidator,Strategy,
)
from metadata.generated.schema.tests.basic import TestCaseResult
from metadata.profiler.metrics.registry import Metrics
from metadata.profiler.orm.functions.table_metric_computer import TableMetricComputer
from metadata.profiler.processor.runner import QueryRunner
from metadata.utils.helpers import is_safe_sql_query
from metadata.utils.logger import ingestion_loggerlogger = ingestion_logger()# 继承父类
class TableCustomSQLQueryValidator(BaseTableCustomSQLQueryValidator, SQAValidatorMixin):"""Validator for table custom SQL Query test case"""# 使用 sqlParse 库解析和修改SQL查询def _replace_where_clause(self, sql_query: str, partition_expression: str) -> Optional[str]:"""Replace or add WHERE clause in SQL query using sqlparse.This method properly handles:- Queries with existing WHERE clause (replaces it)- Queries without WHERE clause (adds it)- Complex queries with joins, subqueries, CTEs- Preserves GROUP BY, ORDER BY, LIMIT, etc.Args:sql_query: Original SQL querypartition_expression: New WHERE condition (without WHERE keyword)Returns:Modified SQL query with partition_expression as WHERE clause"""parsed = sqlparse.parse(sql_query)if not parsed or len(parsed) == 0:return Nonestatement: Statement = parsed[0]tokens = list(statement.tokens)where_idx, where_end_idx, insert_before_idx = self._find_clause_positions(tokens)new_tokens = self._build_new_tokens(tokens, where_idx, where_end_idx, insert_before_idx, partition_expression)return "".join(str(token) for token in new_tokens)# 查找SQL中子句位置def _find_clause_positions(self, tokens: list) -> Tuple[Optional[int], Optional[int], Optional[int]]:"""Find positions of WHERE clause and insertion points in token list.Args:tokens: List of parsed SQL tokensReturns:Tuple of (where_idx, where_end_idx, insert_before_idx)"""where_idx = Nonewhere_end_idx = Noneinsert_before_idx = Noneparen_depth = 0for i, token in enumerate(tokens):paren_depth = self._update_parentheses_depth(token, paren_depth)if isinstance(token, Where) and paren_depth == 0:where_idx = iwhere_end_idx = i + 1breakif self._should_insert_before_token(token, insert_before_idx, paren_depth):insert_before_idx = ireturn where_idx, where_end_idx, insert_before_idx# 跟踪括号嵌套色深度def _update_parentheses_depth(self, token: Token, current_depth: int) -> int:"""Update parentheses depth based on token content.Args:token: SQL token to analyzecurrent_depth: Current parentheses depthReturns:Updated parentheses depth"""if token.ttype is None and hasattr(token, "tokens"):paren_count = str(token).count("(") - str(token).count(")")return current_depth + paren_countelif token.value == "(":return current_depth + 1elif token.value == ")":return current_depth - 1return current_depth# 判断是否应在某标记前插入WHEREdef _should_insert_before_token(self, token: Token, insert_before_idx: Optional[int], paren_depth: int) -> bool:"""Check if WHERE clause should be inserted before this token.Args:token: SQL token to checkinsert_before_idx: Current insertion index (None if not set)paren_depth: Current parentheses depthReturns:True if WHERE should be inserted before this token"""if insert_before_idx is not None or paren_depth != 0:return Falseif token.ttype is not Keyword:return Falseclause_keywords = {"GROUP BY","ORDER BY","HAVING","LIMIT","OFFSET","UNION","EXCEPT","INTERSECT",}return any(keyword in token.value.upper() for keyword in clause_keywords)# 构建修改后的SQL标记列表def _build_new_tokens(self,tokens: list,where_idx: Optional[int],where_end_idx: Optional[int],insert_before_idx: Optional[int],partition_expression: str,) -> list:"""Build new token list with WHERE clause inserted or replaced.Args:tokens: Original token listwhere_idx: Index of existing WHERE clause (None if not found)where_end_idx: End index of existing WHERE clauseinsert_before_idx: Index to insert WHERE before (None if append)partition_expression: WHERE condition expressionReturns:New list of tokens with WHERE clause"""if where_idx is not None:return self._replace_existing_where(tokens, where_idx, where_end_idx, partition_expression)elif insert_before_idx is not None:return self._insert_where_before_clause(tokens, insert_before_idx, partition_expression)else:return self._append_where_clause(tokens, partition_expression)def _replace_existing_where(self,tokens: list,where_idx: int,where_end_idx: int,partition_expression: str,) -> list:"""Replace existing WHERE clause with new expression.Args:tokens: Original token listwhere_idx: Index of WHERE clause to replacewhere_end_idx: End index of WHERE clausepartition_expression: New WHERE conditionReturns:Token list with replaced WHERE clause"""original_where = str(tokens[where_idx])trailing_whitespace = self._extract_trailing_whitespace(original_where)return (tokens[:where_idx]+ [Token(Keyword, "WHERE"),Token(None, f" {partition_expression}{trailing_whitespace}"),]+ tokens[where_end_idx:])def _insert_where_before_clause(self, tokens: list, insert_before_idx: int, partition_expression: str) -> list:"""Insert WHERE clause before specified token index.Args:tokens: Original token listinsert_before_idx: Index to insert WHERE clause beforepartition_expression: WHERE condition expressionReturns:Token list with WHERE clause inserted"""return (tokens[:insert_before_idx]+ [Token(Keyword, "WHERE"), Token(None, f" {partition_expression} ")]+ tokens[insert_before_idx:])def _append_where_clause(self, tokens: list, partition_expression: str) -> list:"""Append WHERE clause to end of token list.Args:tokens: Original token listpartition_expression: WHERE condition expressionReturns:Token list with WHERE clause appended"""return tokens + [Token(None, " "),Token(Keyword, "WHERE"),Token(None, f" {partition_expression}"),]def _extract_trailing_whitespace(self, where_clause: str) -> str:"""Extract trailing whitespace from WHERE clause string.Args:where_clause: Original WHERE clause stringReturns:Trailing whitespace string"""if not where_clause.split():return ""last_word = where_clause.split()[-1]where_content_end = where_clause.rfind(last_word) + len(last_word)if where_content_end < len(where_clause):return where_clause[where_content_end:]return ""# 核心方法def run_validation(self) -> TestCaseResult:"""Run validation for the given test caseReturns:TestCaseResult:"""# 获取运行时参数self.runtime_params = self.get_runtime_parameters(TableCustomSQLQueryRuntimeParameters)# 调用父类方法return super().run_validation()# 执行SQL核心方法def _run_results(self, sql_expression: str, strategy: Strategy = Strategy.ROWS):"""compute result of the test case"""# 检查SQL是否安全if not is_safe_sql_query(sql_expression):raise RuntimeError(f"SQL expression is not safe\n\n{sql_expression}")try:# 执行SQLcursor = self.runner._session.execute(  # pylint: disable=protected-accesstext(sql_expression))# 结果处理(如果统计行数,则返回行数,否则返回查询结果)if strategy == Strategy.COUNT:result = cursor.scalar()if not isinstance(result, int):raise ValueError(f"When using COUNT strategy, the result must be an integer. Received: {type(result)}\n""Example: SELECT COUNT(*) FROM table_name WHERE my_value IS NOT NULL")return resultreturn cursor.fetchall()except Exception as exc:self.runner._session.rollback()  # pylint: disable=protected-accessraise excdef compute_row_count(self) -> Optional[int]:"""Compute row count for the given columnRaises:NotImplementedError:"""partition_expression = next((param.valuefor param in self.test_case.parameterValuesif param.name == "partitionExpression"),None,)if partition_expression:custom_sql = self.get_test_case_param_value(self.test_case.parameterValues,  # type: ignore"sqlExpression",str,)if custom_sql:modified_query = self._replace_where_clause(custom_sql, partition_expression)if modified_query is None:return Nonecount_query = f"SELECT COUNT(*) FROM ({modified_query}) AS test_results"try:result = self.runner.session.execute(text(count_query)).scalar()return resultexcept Exception as exc:logger.error("Failed to execute custom SQL with partition expression. "f"Query: {count_query}\n"f"Error: {exc}\n",exc_info=True,)self.runner.session.rollback()raise excelse:stmt = (select(func.count()).select_from(self.runner.table).filter(text(partition_expression)))return self.runner.session.execute(stmt).scalar()self.runner = cast(QueryRunner, self.runner)dialect = self.runner._session.get_bind().dialect.nametable_metric_computer: TableMetricComputer = TableMetricComputer(dialect,runner=self.runner,metrics=[Metrics.ROW_COUNT],conn_config=self.runtime_params.conn_config,entity=self.runtime_params.entity,)row = table_metric_computer.compute()if row:return dict(row).get(Metrics.ROW_COUNT.value.name())return None

四、Python 开发流程

整体开发流程(简化版):

我们以官方的 表自定义SQL查询测试用例的验证器(tableCustomSQLQuery.py)为底座,进行扩展 tableCustomSQLQueryGrow.py

论实用性来说,当验证器可执行自定义SQL时,扩展范围大大增强

1. 新增自定义测试验证器基类

文件名称:tableCustomSQLQueryGrow.py

文件位置:/metadata/data_quality/validations/table/base

文件内容:参考文件 /metadata/data_quality/validations/table/base/tableCustomSQLQuery.py

2. 新增自定义测试验证器实现

文件名称:tableCustomSQLQueryGrow.py

文件位置:/metadata/data_quality/validations/table/sqlalchemy

关键改动:(参考文件:/metadata/data_quality/validations/table/sqlalchemy/tableCustomSQLQuery.py)

    def compute_row_count(self) -> Optional[int]:"""Compute row count for the given columnRaises:NotImplementedError:"""partition_expression = next((param.valuefor param in self.test_case.parameterValuesif param.name == "partitionExpression"),None,)if partition_expression:stmt = (select(func.count()).select_from(self.runner.table).filter(text(partition_expression)))return self.runner.session.execute(stmt).scalar()self.runner = cast(QueryRunner, self.runner)dialect = self.runner._session.get_bind().dialect.nametable_metric_computer: TableMetricComputer = TableMetricComputer(dialect,runner=self.runner,metrics=[Metrics.ROW_COUNT],conn_config=self.runtime_params.conn_config,entity=self.runtime_params.entity,)row = table_metric_computer.compute()if row:return dict(row).get(Metrics.ROW_COUNT.value.name())return None

代码解析:

  • SQL执行:self.runner.session.execute(stmt),这里获取到指定数据库连接后执行SQL,stmt可传入自定义SQL,或在 Python 中动态构建
  • 后续操作:self.runner.session.execute(stmt) 结果中可获取到表中数据、查询字段等信息,可用于自定义构建需要的内容

3. 自定义测试验证器绑定运行时参数对象

文件名称:param_setter_factory.py

文件位置:/metadata/data_quality/validations/runtime_param_setter

变更原因:前文中新增测试类型时,可以自定义参数,在测试验证器执行时如需获取自定义参数,则必须要绑定指定类

文件内容:

class RuntimeParameterSetterFactory:"""runtime parameter setter factory class"""def __init__(self) -> None:"""Set"""self._setter_map: Dict[str, Set[Type[RuntimeParameterSetter]]] = {validator_name(TableDiffValidator): {TableDiffParamsSetter},validator_name(TableCustomSQLQueryValidator): {TableCustomSQLQueryParamsSetter},# 新增绑定validator_name(TableCustomSQLQueryGrowValidator): {TableCustomSQLQueryParamsSetter}}
}

文件说明:将 TableCustomSQLQueryGrowValidator 绑定参数摄取类 TableCustomSQLQueryParamsSetter

五、避坑指南

1. 测试套件无法绑定测试用例

场景再现:在新增测试类型时,为了参数传递方便,将某个字段的值定义为JSON字符串;在测试套件(test_suite)绑定测试用例(test_case)时,因为ES格式不匹配导致绑定异常(页面五异常,但日志有异常)。

影响范围:测试套件无法绑定测试用例。

核心报错:

ERROR [2025-10-31 06:02:20,854] [ - PUT /api/v1/dataQuality/testCases/logicalTestCases] 
o.o.s.s.SearchRepository - Issue in Updating the search document for entity [f5229b38-6861-4174-b71b-1c875c01e317] and entityType [testCase]. 
Reason[Elasticsearch exception [type=document_parsing_exception, reason=[1:1903] failed to parse field [testSuites.incrementalChangeDescription.fieldsUpdated.newValue] of type [text] in document with id 'f5229b38-6861-4174-b71b-1c875c01e317'. Preview of field's value: '
']],
Cause[ElasticsearchException[Elasticsearch exception [type=illegal_state_exception, reason=Can't get text on a START_OBJECT at 1:1717]]], 
Stack [ElasticsearchStatusException[Elasticsearch exception [type=document_parsing_exception, reason=[1:1903] failed to parse field [testSuites.incrementalChangeDescription.fieldsUpdated.newValue] of type [text] in document with id 'f5229b38-6861-4174-b71b-1c875c01e317'. Preview of field's value: '}']]; nested: ElasticsearchException[Elasticsearch exception [type=illegal_state_exception, reason=Can't get text on a START_OBJECT at 1:1717]];

重点:reason=Can’t get text on a START_OBJECT at 1:1717

解决方式:对于JSON格式的字符串,避免 { 开头,可以添加一定前缀,例如 JONS={}

在 python 中使用参数时,再将参数截取后转换为JSON:s.split('JOSN=', 1)[1]

http://www.dtcms.com/a/574423.html

相关文章:

  • 49-基于ZigBee的室内甲醛监测系统设计与实现
  • 一文读懂稳态太阳光模拟器
  • 云南建站推广南通网站seo服务
  • 网站配色绿色微商商城系统开发
  • 仓颉编程语言基础集合类型详解:HashSet深度解析
  • 无代码网站开发网页怎么生成长图
  • 户县网站建设福建省网站建设绩效排名
  • 基于MQTT和Sparkplug B的UNS系统的元数据管理
  • Origin将Y偏移图升级为3D瀑布图
  • 职业学院网站建设方案做网站怎么搭建环境
  • 网站副标题wordpresswordpress做个米表
  • 开淘宝店怎么做充值网站杭州巴顿品牌设计
  • 北京市城乡建设协会官方网站开发网站如何选需要
  • VASP 教程:使用 VASP 进行机器学习力场训练
  • 受限长度路径搜索算法
  • H265 vs AV1 vs H266 rdoq对比
  • 在Linux服务器上安装CVAT (Docker 28.5.1)
  • 四川学校网站建设农业公司网站建设
  • 网站建设报价购物凡科建站提示网站建设中
  • 基于STM32的多模态智能门锁系统设计与实现
  • 淮北网站建设如何提高 网站的点击量
  • OpenAI Agent 工具全面开发者指南——从 RAG 到 Computer Use —— 深入解析全新 Responses API
  • 国外文件传输网站新浪企业邮箱
  • 强制将析构函数放到类外定义
  • 虚幻引擎5 GAS开发俯视角RPG游戏 P07-06 能力输入的回调
  • 中企动力做网站贵吗wordpress wp-cumulus
  • 网站没有备案信息该怎么做气象网站建设
  • 6 AutoGen 多 Agent 协作框架:构建智能团队协作系统
  • 昆明做商城网站多少钱网站统计功能设计
  • 优秀个人网站图片如何建立一个小程序的网站