【量化开发】从0到1实现自己的量化算子系统
一、系统设计与架构 🏗️
在量化交易中,因子计算是策略开发的核心环节。传统的因子开发方式存在以下痛点:
- 代码重复性高:相同的技术指标在不同策略中反复实现
- 维护成本高:修改计算逻辑需要在多处进行
- 性能优化困难:缺乏统一的优化机制
- 扩展性差:添加新因子需要大量重复工作
算子系统通过将复杂因子分解为可复用的原子算子,提供链式组合能力,有效解决了这些问题。💡
系统整体架构设计 🗺️
算子系统的整体架构采用分层设计,各层之间职责清晰,通过标准接口进行交互:
系统核心数据流如下:
核心组件详解 ⚙️
BaseOperator - 算子基类
所有算子的基类,定义了统一接口和核心机制:
- 参数验证:确保输入参数的有效性
- 元数据管理:提供算子描述、输入输出类型等信息
- 执行框架:定义标准的执行流程(预处理→执行→后处理)
class BaseOperator(ABC):def __init__(self, name: Optional[str] = None, **kwargs):self.name = name or self.__class__.__name__self.config = OperatorConfig(**kwargs)self._validate_parameters()@abstractmethoddef execute(self, data, **kwargs) ->:pass
BaseOperator执行流程:
OperatorChain - 算子链
支持算子的链式组合,提供流水线式的处理能力:
- 动态构建:支持运行时添加算子
- 调试模式:记录执行过程,便于问题排查
- 性能分析:统计各算子执行时间,识别性能瓶颈
chain = OperatorChain("动量因子")
chain.add(Returns(periods=20, column='close')) # 计算20日收益率
chain.add(Zscore(window=60)) # 60日标准化
result = chain.execute(data)
算子链执行流程如下图所示:
DataAdapter - 数据适配器
实现多数据格式兼容,支持pandas和polars:
- 透明兼容:上层代码无需关心底层数据格式
- 统一接口:不同数据格式提供一致的操作接口
- 性能优化:针对不同格式进行专门优化
数据适配器工作原理:
数据适配器系统架构:
OperatorRegistry - 算子注册器
管理所有可用算子,支持自动发现和注册:
- 自动注册:通过装饰器自动注册算子
- 运行时发现:动态获取所有可用算子
- 插件扩展:支持第三方算子插件
OperatorRegistry使用方法:
- 自动注册算子:使用
@register_operator
装饰器自动注册算子
@register_operator
class MyCustomOperator(BaseOperator):def __init__(self, name: Optional[str] = None, window: int = 10, **kwargs):self.window = windowsuper().__init__(name, **kwargs)def execute(self, data, **kwargs):# 实现自定义计算逻辑return data.rolling(window=self.window).mean()
- 手动注册算子:通过Registry实例手动注册算子
# 获取全局注册器实例
registry = get_global_registry()# 手动注册算子类
registry.register(MyCustomOperator, "MyCustom")
- 获取已注册算子:从注册器中获取算子类或实例
# 通过名称获取算子类
operator_class = registry.get("MyCustom")# 创建算子实例
operator_instance = operator_class(window=20)# 或者直接在算子链中使用名称
chain = OperatorChain("自定义因子")
chain.add("MyCustom", window=20) # 通过名称添加算子
注册器模式工作流程:
设计模式深度解析 🧩
1. 抽象工厂模式
通过BaseOperator抽象基类,系统提供了统一的算子创建接口。所有算子都继承自BaseOperator,确保了接口的一致性。
# 所有算子都遵循相同接口
class SMA(BaseOperator):def execute(self, data, **kwargs):# SMA实现class RSI(BaseOperator):def execute(self, data, **kwargs):# RSI实现
2. 适配器模式
DataAdapter体系实现了对不同数据格式的透明支持,将数据操作的接口与具体实现分离。
# 适配器基类定义统一接口
class DataFrameAdapter(ABC):@abstractmethoddef rolling(self, window: int) -> 'RollingAdapter':pass@abstractmethoddef add(self, other) -> 'DataFrameAdapter':pass# 具体适配器实现
class PandasAdapter(DataFrameAdapter):def rolling(self, window: int):return PandasRollingAdapter(self._data.rolling(window))
3. 责任链模式
OperatorChain实现了算子的链式处理,每个算子负责处理自己的计算逻辑。
# 算子链按顺序执行每个算子
for operator in self.operators:current_data = operator(current_data, **kwargs)
责任链模式执行流程:
4. 注册器模式
OperatorRegistry提供了算子的自动发现和注册机制。
# 通过装饰器自动注册算子
@register_operator
class SMA(BaseOperator):pass# 运行时获取算子
operator_class = registry.get("SMA")
二、代码实现详解 💻
BaseOperator基类
class BaseOperator(ABC):def __init__(self, name: Optional[str] = None, **kwargs):self.name = name or self.__class__.__name__self.config = OperatorConfig(**kwargs)self._validate_parameters()@abstractmethoddef execute(self, data: Union[pd.DataFrame, pd.Series, Any], **kwargs) -> Union[pd.DataFrame, pd.Series, Any]:passdef __call__(self, data: Union[pd.DataFrame, pd.Series, Any], **kwargs) -> Union[pd.DataFrame, pd.Series, Any]:# 验证输入if not self._validate_input(data):raise ValueError(f"算子 {self.name} 的输入数据类型不支持: {type(data)}")# 预处理processed_data = self._preprocess(data)# 执行计算result = self.execute(processed_data, **kwargs)# 后处理final_result = self._postprocess(result)return final_result
OperatorChain实现
class OperatorChain:def __init__(self, name: Optional[str] = None):self.name = name or f"Chain_{id(self)}"self.operators: List[BaseOperator] = []self.debug_mode = Falseself.profile_mode = Falsedef add(self, operator: Union[BaseOperator, str], **kwargs) -> 'OperatorChain':# 添加算子到链中if isinstance(operator, str):# 通过名称创建算子实例operator_class = self.registry.get(operator)operator_instance = operator_class(**kwargs)elif isinstance(operator, BaseOperator):operator_instance = operatorself.operators.append(operator_instance)return selfdef execute(self, data: Union[pd.DataFrame, pd.Series], **kwargs) -> Union[pd.DataFrame, pd.Series]:# 逐个执行算子data_flow = DataFlow(data)for operator in self.operators:current_data = data_flow.get_current_data()result = operator(current_data, **kwargs)data_flow.context.update_data(result, operator.name)return data_flow.get_current_data()
数据适配器系统
class DataFrameAdapter(ABC):def __init__(self, data: Any):self._data = dataself._format = self._detect_format(data)@abstractmethoddef _detect_format(self, data: Any) -> DataFormat:pass@propertydef format(self) -> DataFormat:return self._format# 数据操作接口@abstractmethoddef rolling(self, window: int, min_periods: Optional[int] = None) -> 'RollingAdapter':pass@abstractmethoddef ewm(self, span: Optional[int] = None, alpha: Optional[float] = None) -> 'EWMAdapter':pass# 数学运算接口@abstractmethoddef add(self, other: Union['DataFrameAdapter', float, int]) -> 'DataFrameAdapter':pass@abstractmethoddef subtract(self, other: Union['DataFrameAdapter', float, int]) -> 'DataFrameAdapter':pass
具体算子实现示例
@register_operator
class SMA(UnaryOperator):def __init__(self, name: Optional[str] = None, window: int = 20, column: Optional[str] = None, **kwargs):self.window = windowself.column = columnsuper().__init__(name, **kwargs)def _validate_parameters(self):if self.window <= 0:raise ValueError("window 必须大于 0")def _create_metadata(self) -> OperatorMetadata:return OperatorMetadata(name=self.name,operator_type=OperatorType.TECHNICAL,description=f"简单移动平均线:计算{self.window}期简单移动平均",input_types=[DataType.SERIES, DataType.DATAFRAME],output_type=DataType.SERIES,parameters={'window': f'int, default={self.window} - 移动平均窗口期','column': 'str, optional - 指定列名(DataFrame输入时)'})def execute(self, data: Union[pd.DataFrame, pd.Series, Any], **kwargs) -> Union[pd.Series, Any]:# 使用适配器系统处理多数据格式adapter = self._create_adapter(data)if adapter is None:# 如果适配器不可用,使用原有逻辑if isinstance(data, pd.DataFrame):if self.column:series = data[self.column]elif 'column' in kwargs:series = data[kwargs['column']]else:series = data.iloc[:, 0]else:series = datareturn series.rolling(window=self.window).mean()# 使用适配器系统if adapter.is_dataframe():if self.column:series_adapter = adapter[self.column]elif 'column' in kwargs:series_adapter = adapter[kwargs['column']]else:# 获取第一列columns = adapter.columns()series_adapter = adapter[columns[0]]else:series_adapter = adapter# 执行滚动均值计算rolling_adapter = series_adapter.rolling(self.window)result_adapter = rolling_adapter.mean()# 返回原始数据格式return self._extract_original_data(result_adapter)
SMA算子执行流程:
三、因子编写实践 🧪
实际因子编写示例
1. 简单技术指标因子
from backtest.calculate_factor.operator_system.operators.technical import RSI, MACD
from backtest.calculate_factor.operator_system.core.operator_chain import OperatorChaindef create_rsi_factor(data):"""创建RSI超买超卖因子"""# RSI小于30为超卖,大于70为超买chain = OperatorChain("RSI因子")chain.add(RSI(window=14, column='close_fq'))chain.add(Between(lower=30, upper=70)) # RSI在30-70之间return chain.execute(data)def create_macd_factor(data):"""创建MACD金叉死叉因子"""chain = OperatorChain("MACD因子")chain.add(MACD(fast_period=12, slow_period=26, signal_period=9, column='close_fq'))# MACD线与信号线的差值chain.add(Subtract(column2='MACD_Signal'))return chain.execute(data)
简单因子构建流程:
2. 多因子组合策略
def create_multifactor_strategy(data):"""创建多因子组合策略"""# 动量因子momentum_chain = OperatorChain("动量因子")momentum_chain.add(Returns(periods=20, column='close_fq'))momentum_chain.add(Zscore(window=60))momentum_factor = momentum_chain.execute(data)# 波动率因子volatility_chain = OperatorChain("波动率因子")volatility_chain.add(RollingStd(window=20, column='close_fq'))volatility_chain.add(Zscore(window=60))volatility_factor = volatility_chain.execute(data)# 价值因子(价格偏离度)value_chain = OperatorChain("价值因子")value_chain.add(SMA(window=20, column='close_fq'))value_chain.add(Divide(column2='close_fq'))value_chain.add(Subtract(value=1.0))value_factor = value_chain.execute(data)# 组合因子(简单加权平均)combined_factor = (momentum_factor - volatility_factor + value_factor) / 3return combined_factor
多因子组合策略架构:
3. 高级因子构建
def create_advanced_factor(data):"""创建高级复合因子"""# 布林带位置因子bb_chain = OperatorChain("布林带因子")bb_chain.add(BollingerBands(window=20, num_std=2.0, column='close_fq'))# 价格在布林带中的位置(0-1之间)bb_position = bb_chain.execute(data)['BB_Position']# 成交量因子volume_chain = OperatorChain("成交量因子")volume_chain.add(Returns(periods=5, column='volume'))volume_chain.add(Zscore(window=20))volume_factor = volume_chain.execute(data)# 量价关系因子price_return_chain = OperatorChain("价格收益率")price_return_chain.add(Returns(periods=5, column='close_fq'))price_return = price_return_chain.execute(data)# 量价相关性因子volume_price_factor = price_return * volume_factor# 综合因子advanced_factor = bb_position * volume_price_factorreturn advanced_factor
高级因子构建流程: