使用FinTSB框架进行金融时间序列预测的完整指南
使用FinTSB框架进行金融时间序列预测的完整指南
1. 引言
FinTSB是一个开源金融时间序列预测框架,由同济大学金融实验室开发。本指南将详细介绍如何将FinTSB框架应用于自定义数据集,特别是针对金融指数预测任务。我们将从环境配置开始,逐步讲解数据准备、模型训练、预测评估以及结果可视化等完整流程。
1.1 FinTSB框架概述
FinTSB是一个专门为金融时间序列预测设计的框架,具有以下特点:
- 支持多种先进的深度学习模型
- 提供完整的数据预处理管道
- 包含丰富的评估指标
- 支持多变量时间序列预测
- 提供可视化工具
1.2 项目目标
本项目的核心目标包括:
- 使用FinTSB框架对单一金融指数进行预测
- 评估预测结果的准确性
- 扩展框架以支持多指数预测比较
- 建立可复现的实验流程
2. 环境配置
2.1 硬件要求
- CPU: 推荐4核以上
- 内存: 8GB以上
- GPU: 可选,但推荐用于加速训练(NVIDIA GPU with CUDA支持)
- 存储: 至少10GB可用空间
2.2 软件依赖
首先,我们需要设置Python环境并安装必要的依赖包:
# 创建并激活conda环境(推荐)
conda create -n fintsb python=3.8
conda activate fintsb# 安装基础依赖
pip install torch==1.10.0+cu113 torchvision==0.11.1+cu113 torchaudio==0.10.0 -f https://download.pytorch.org/whl/cu113/torch_stable.html
pip install numpy pandas matplotlib scikit-learn tqdm# 安装FinTSB框架
git clone https://github.com/TongjiFinLab/FinTSB.git
cd FinTSB
pip install -e .
2.3 环境验证
import torch
import numpy as np
import pandas as pd
from fintsb import models, utilsprint(f"PyTorch版本: {torch.__version__}")
print(f"CUDA可用: {torch.cuda.is_available()}")
print(f"FinTSB版本: {utils.__version__}")
3. 数据准备
3.1 数据源选择
对于金融指数预测,我们可以选择以下数据源:
- Yahoo Finance API
- 本地CSV/Excel文件
- 数据库连接(MySQL, MongoDB等)
- 第三方金融数据API(如Alpha Vantage)
本示例将使用Yahoo Finance API获取标普500指数数据。
3.2 数据获取
import yfinance as yf
import pandas as pddef download_index_data(ticker, start_date, end_date):"""从Yahoo Finance下载指数数据:param ticker: 指数代码(如'^GSPC'表示标普500):param start_date: 开始日期(YYYY-MM-DD):param end_date: 结束日期(YYYY-MM-DD):return: 包含历史数据的DataFrame"""data = yf.download(ticker, start=start_date, end=end_date)data.reset_index(inplace=True)data['Date'] = pd.to_datetime(data['Date'])return data# 下载标普500指数数据
sp500_data = download_index_data('^GSPC', '2010-01-01', '2023-12-31')
print(sp500_data.head())
3.3 数据预处理
FinTSB框架要求数据经过标准化处理,并转换为特定的时间序列格式。
from sklearn.preprocessing import MinMaxScalerdef preprocess_data(data, target_column='Close', sequence_length=60):"""预处理时间序列数据:param data: 原始数据DataFrame:param target_column: 目标预测列:param sequence_length: 时间序列窗口长度:return: 处理后的特征和目标数组"""# 提取目标列target = data[[target_column]].values# 标准化数据(0-1归一化)scaler = MinMaxScaler(feature_range=(0, 1))scaled_data = scaler.fit_transform(target)# 创建时间序列数据集X, y = [], []for i in range(len(scaled_data) - sequence_length):X.append(scaled_data[i:i+sequence_length])y.append(scaled_data[i+sequence_length])return np.array(X), np.array(y), scaler# 预处理数据
X, y, scaler = preprocess_data(sp500_data)
print(f"特征形状: {X.shape}, 目标形状: {y.shape}")
3.4 数据集划分
将数据划分为训练集、验证集和测试集:
def split_data(X, y, train_ratio=0.7, val_ratio=0.15):"""划分数据集为训练集、验证集和测试集:param X: 特征数据:param y: 目标数据:param train_ratio: 训练集比例:param val_ratio: 验证集比例:return: 划分后的数据集"""total_samples = len(X)train_end = int(total_samples * train_ratio)val_end = train_end + int(total_samples * val_ratio)X_train, y_train = X[:train_end], y[:train_end]X_val, y_val = X[train_end:val_end], y[train_end:val_end]X_test, y_test = X[val_end:], y[val_end:]return (X_train, y_train), (X_val, y_val), (X_test, y_test)# 划分数据集
(X_train, y_train), (X_val, y_val), (X_test, y_test) = split_data(X, y)
print(f"训练集: {X_train.shape}, 验证集: {X_val.shape}, 测试集: {X_test.shape}")
4. 模型训练
FinTSB提供了多种时间序列预测模型,包括LSTM、Transformer等。我们将以LSTM为例进行演示。
4.1 模型配置
from fintsb.models import LSTMModel
from fintsb.trainer import Trainer
from torch.optim import Adam# 模型参数配置
config = {'input_size': 1, # 输入特征维度(单变量)'hidden_size': 64, # LSTM隐藏层大小'num_layers': 2, # LSTM层数'output_size': 1, # 输出维度'dropout': 0.2, # Dropout率'learning_rate': 0.001, # 学习率'batch_size': 32, # 批量大小'num_epochs': 100, # 训练轮数'early_stopping_patience': 10 # 早停耐心值
}# 初始化模型
model = LSTMModel(input_size=config['input_size'],hidden_size=config['hidden_size'],num_layers=config['num_layers'],output_size=config['output_size'],dropout=config['dropout']
)# 定义优化器
optimizer = Adam(model.parameters(), lr=config['learning_rate'])# 定义损失函数(均方误差)
criterion = torch.nn.MSELoss()
4.2 数据加载器准备
from torch.utils.data import TensorDataset, DataLoader# 转换为PyTorch张量
X_train_tensor = torch.FloatTensor(X_train)
y_train_tensor = torch.FloatTensor(y_train)
X_val_tensor = torch.FloatTensor(X_val)
y_val_tensor = torch.FloatTensor(y_val)
X_test_tensor = torch.FloatTensor(X_test)
y_test_tensor = torch.FloatTensor(y_test)# 创建数据集和数据加载器
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)train_loader = DataLoader(train_dataset, batch_size=config['batch_size'], shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=config['batch_size'], shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=config['batch_size'], shuffle=False)
4.3 训练过程
# 初始化训练器
trainer = Trainer(model=model,optimizer=optimizer,criterion=criterion,config=config,train_loader=train_loader,val_loader=val_loader
)# 开始训练
train_losses, val_losses = trainer.train()# 绘制训练和验证损失曲线
import matplotlib.pyplot as pltplt.figure(figsize=(10, 6))
plt.plot(train_losses, label='Training Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss Over Epochs')
plt.legend()
plt.grid(True)
plt.show()
4.4 模型保存与加载
# 保存模型
torch.save(model.state_dict(), 'sp500_lstm_model.pth')# 加载模型
loaded_model = LSTMModel(input_size=config['input_size'],hidden_size=config['hidden_size'],num_layers=config['num_layers'],output_size=config['output_size'],dropout=config['dropout']
)
loaded_model.load_state_dict(torch.load('sp500_lstm_model.pth'))
loaded_model.eval()
5. 模型评估
5.1 测试集评估
from fintsb.metrics import calculate_metrics# 在测试集上进行预测
test_predictions = []
with torch.no_grad():for inputs, _ in test_loader:outputs = loaded_model(inputs)test_predictions.extend(outputs.numpy())test_predictions = np.array(test_predictions).reshape(-1, 1)
y_test_true = y_test_tensor.numpy()# 反标准化预测值和真实值
test_predictions = scaler.inverse_transform(test_predictions)
y_test_true = scaler.inverse_transform(y_test_true)# 计算评估指标
metrics = calculate_metrics(y_test_true, test_predictions)
print("测试集评估指标:")
for metric, value in metrics.items():print(f"{metric}: {value:.4f}")
5.2 预测可视化
# 准备日期标签(取测试集对应的日期)
test_dates = sp500_data['Date'].values[-len(y_test_true):]plt.figure(figsize=(14, 7))
plt.plot(test_dates, y_test_true, label='Actual Price', color='blue')
plt.plot(test_dates, test_predictions, label='Predicted Price', color='red', linestyle='--')
plt.xlabel('Date')
plt.ylabel('Price')
plt.title('S&P 500 Index: Actual vs Predicted Prices')
plt.legend()
plt.grid(True)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
5.3 多步预测
def multi_step_forecast(model, initial_sequence, steps, scaler):"""多步预测函数:param model: 训练好的模型:param initial_sequence: 初始输入序列:param steps: 预测步数:param scaler: 数据标准化器:return: 预测结果序列"""model.eval()current_sequence = initial_sequence.copy()predictions = []with torch.no_grad():for _ in range(steps):# 准备输入数据input_tensor = torch.FloatTensor(current_sequence).unsqueeze(0)# 预测下一步output = model(input_tensor)pred = output.numpy()[0, 0]predictions.append(pred)# 更新序列current_sequence = np.roll(current_sequence, -1)current_sequence[-1] = pred# 反标准化预测结果predictions = scaler.inverse_transform(np.array(predictions).reshape(-1, 1))return predictions# 使用测试集最后一个序列作为初始序列
initial_seq = X_test[-1]
forecast_steps = 30 # 预测未来30天
forecast = multi_step_forecast(loaded_model, initial_seq, forecast_steps, scaler)# 生成预测日期
last_date = sp500_data['Date'].values[-1]
forecast_dates = pd.date_range(start=last_date, periods=forecast_steps+1, closed='right')plt.figure(figsize=(14, 7))
plt.plot(test_dates[-60:], y_test_true[-60:], label='Historical Price', color='blue')
plt.plot(forecast_dates, forecast, label='Forecasted Price', color='green', linestyle='--')
plt.xlabel('Date')
plt.ylabel('Price')
plt.title('S&P 500 Index: 30-Day Forecast')
plt.legend()
plt.grid(True)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
6. 扩展到多指数预测
6.1 多指数数据准备
# 下载多个指数数据
indices = {'S&P 500': '^GSPC','NASDAQ': '^IXIC','Dow Jones': '^DJI'
}index_data = {}
for name, ticker in indices.items():data = download_index_data(ticker, '2010-01-01', '2023-12-31')index_data[name] = data[['Date', 'Close']].rename(columns={'Close': name})# 合并数据
multi_data = index_data['S&P 500']
for name in ['NASDAQ', 'Dow Jones']:multi_data = pd.merge(multi_data, index_data[name], on='Date', how='inner')print(multi_data.head())
6.2 多变量数据预处理
def preprocess_multi_data(data, target_column='S&P 500', sequence_length=60):"""预处理多变量时间序列数据:param data: 原始数据DataFrame:param target_column: 目标预测列:param sequence_length: 时间序列窗口长度:return: 处理后的特征和目标数组"""# 提取特征和目标features = data.drop(columns=['Date']).valuestarget = data[[target_column]].values# 标准化数据feature_scaler = MinMaxScaler(feature_range=(0, 1))target_scaler = MinMaxScaler(feature_range=(0, 1))scaled_features = feature_scaler.fit_transform(features)scaled_target = target_scaler.fit_transform(target)# 创建时间序列数据集X, y = [], []for i in range(len(scaled_features) - sequence_length):X.append(scaled_features[i:i+sequence_length])y.append(scaled_target[i+sequence_length])return np.array(X), np.array(y), feature_scaler, target_scaler# 预处理多变量数据
X_multi, y_multi, feature_scaler, target_scaler = preprocess_multi_data(multi_data)
print(f"多变量特征形状: {X_multi.shape}, 目标形状: {y_multi.shape}")
6.3 多变量模型训练
# 更新模型配置
multi_config = {'input_size': X_multi.shape[2], # 输入特征维度(3个指数)'hidden_size': 128, # 更大的隐藏层'num_layers': 3, # 更深的网络'output_size': 1, # 输出维度(预测一个指数)'dropout': 0.3, # 更高的Dropout率'learning_rate': 0.0005, # 更小的学习率'batch_size': 64, # 更大的批量'num_epochs': 150, # 更多的训练轮数'early_stopping_patience': 15 # 更大的早停耐心值
}# 初始化多变量LSTM模型
multi_model = LSTMModel(input_size=multi_config['input_size'],hidden_size=multi_config['hidden_size'],num_layers=multi_config['num_layers'],output_size=multi_config['output_size'],dropout=multi_config['dropout']
)# 划分数据集
(X_multi_train, y_multi_train), (X_multi_val, y_multi_val), (X_multi_test, y_multi_test) = split_data(X_multi, y_multi)# 准备数据加载器
multi_train_dataset = TensorDataset(torch.FloatTensor(X_multi_train), torch.FloatTensor(y_multi_train))
multi_val_dataset = TensorDataset(torch.FloatTensor(X_multi_val), torch.FloatTensor(y_multi_val))
multi_test_dataset = TensorDataset(torch.FloatTensor(X_multi_test), torch.FloatTensor(y_multi_test))multi_train_loader = DataLoader(multi_train_dataset, batch_size=multi_config['batch_size'], shuffle=True)
multi_val_loader = DataLoader(multi_val_dataset, batch_size=multi_config['batch_size'], shuffle=False)# 训练多变量模型
multi_optimizer = Adam(multi_model.parameters(), lr=multi_config['learning_rate'])
multi_trainer = Trainer(model=multi_model,optimizer=multi_optimizer,criterion=criterion,config=multi_config,train_loader=multi_train_loader,val_loader=multi_val_loader
)multi_train_losses, multi_val_losses = multi_trainer.train()
6.4 多变量模型评估
# 在测试集上进行预测
multi_test_predictions = []
with torch.no_grad():for inputs, _ in DataLoader(multi_test_dataset, batch_size=multi_config['batch_size']):outputs = multi_model(inputs)multi_test_predictions.extend(outputs.numpy())multi_test_predictions = np.array(multi_test_predictions).reshape(-1, 1)
y_multi_test_true = torch.FloatTensor(y_multi_test).numpy()# 反标准化预测值
multi_test_predictions = target_scaler.inverse_transform(multi_test_predictions)
y_multi_test_true = target_scaler.inverse_transform(y_multi_test_true)# 计算评估指标
multi_metrics = calculate_metrics(y_multi_test_true, multi_test_predictions)
print("多变量模型测试集评估指标:")
for metric, value in multi_metrics.items():print(f"{metric}: {value:.4f}")# 与单变量模型比较
print("\n模型比较:")
print("Metric\t\tSingle\t\tMulti")
for metric in metrics:print(f"{metric}\t\t{metrics[metric]:.4f}\t\t{multi_metrics[metric]:.4f}")
6.5 多指数预测可视化
# 准备测试集日期
test_dates = multi_data['Date'].values[-len(y_multi_test_true):]plt.figure(figsize=(14, 7))
plt.plot(test_dates, y_multi_test_true, label='Actual S&P 500 Price', color='blue')
plt.plot(test_dates, multi_test_predictions, label='Predicted S&P 500 Price (Multi)', color='purple', linestyle='--')
plt.plot(test_dates, test_predictions[-len(y_multi_test_true):], label='Predicted S&P 500 Price (Single)', color='red', linestyle=':')
plt.xlabel('Date')
plt.ylabel('Price')
plt.title('S&P 500 Index: Single vs Multi-variable Model Predictions')
plt.legend()
plt.grid(True)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
7. 高级主题与优化
7.1 超参数调优
from fintsb.tuning import HyperparameterTuner# 定义搜索空间
search_space = {'hidden_size': [64, 128, 256],'num_layers': [2, 3, 4],'dropout': [0.1, 0.2, 0.3],'learning_rate': [0.001, 0.0005, 0.0001],'batch_size': [32, 64, 128]
}# 初始化调优器
tuner = HyperparameterTuner(model_class=LSTMModel,search_space=search_space,fixed_params={'input_size': X_multi.shape[2],'output_size': 1},train_loader=multi_train_loader,val_loader=multi_val_loader,criterion=criterion,max_epochs=50,num_trials=20
)# 执行超参数搜索
best_params, best_score = tuner.search()
print(f"最佳参数: {best_params}")
print(f"最佳验证分数: {best_score}")# 使用最佳参数训练最终模型
best_model = LSTMModel(input_size=X_multi.shape[2],output_size=1,hidden_size=best_params['hidden_size'],num_layers=best_params['num_layers'],dropout=best_params['dropout']
)best_optimizer = Adam(best_model.parameters(), lr=best_params['learning_rate'])
best_trainer = Trainer(model=best_model,optimizer=best_optimizer,criterion=criterion,config={'batch_size': best_params['batch_size'],'num_epochs': 200,'early_stopping_patience': 20},train_loader=multi_train_loader,val_loader=multi_val_loader
)best_train_losses, best_val_losses = best_trainer.train()
7.2 模型集成
from fintsb.ensemble import ModelEnsemble# 创建不同架构的模型
model1 = LSTMModel(input_size=X_multi.shape[2], hidden_size=64, num_layers=2, output_size=1, dropout=0.2)
model2 = LSTMModel(input_size=X_multi.shape[2], hidden_size=128, num_layers=3, output_size=1, dropout=0.3)
model3 = LSTMModel(input_size=X_multi.shape[2], hidden_size=256, num_layers=2, output_size=1, dropout=0.1)# 初始化集成模型
ensemble = ModelEnsemble(models=[model1, model2, model3])# 训练集成模型中的每个模型
for i, model in enumerate(ensemble.models):print(f"训练模型 {i+1}")optimizer = Adam(model.parameters(), lr=0.001)trainer = Trainer(model=model,optimizer=optimizer,criterion=criterion,config={'batch_size': 64,'num_epochs': 100,'early_stopping_patience': 10},train_loader=multi_train_loader,val_loader=multi_val_loader)trainer.train()# 评估集成模型
ensemble_predictions = ensemble.predict(multi_test_loader)
ensemble_predictions = target_scaler.inverse_transform(ensemble_predictions)ensemble_metrics = calculate_metrics(y_multi_test_true, ensemble_predictions)
print("集成模型测试集评估指标:")
for metric, value in ensemble_metrics.items():print(f"{metric}: {value:.4f}")
7.3 特征重要性分析
from fintsb.interpretability import FeatureImportanceAnalyzer# 初始化特征重要性分析器
analyzer = FeatureImportanceAnalyzer(model=best_model,feature_names=['S&P 500', 'NASDAQ', 'Dow Jones'],num_samples=1000
)# 计算特征重要性
importance = analyzer.analyze(multi_test_loader)
print("特征重要性:")
for feature, imp in zip(['S&P 500', 'NASDAQ', 'Dow Jones'], importance):print(f"{feature}: {imp:.4f}")# 可视化特征重要性
plt.figure(figsize=(10, 6))
plt.bar(['S&P 500', 'NASDAQ', 'Dow Jones'], importance)
plt.xlabel('Features')
plt.ylabel('Importance Score')
plt.title('Feature Importance for S&P 500 Prediction')
plt.grid(True)
plt.show()
8. 部署与生产化
8.1 创建预测API
from fastapi import FastAPI
from pydantic import BaseModel
import uvicornapp = FastAPI()class PredictionRequest(BaseModel):historical_data: list # 历史数据序列steps: int = 1 # 预测步数@app.post("/predict")
async def predict(request: PredictionRequest):# 预处理输入数据scaled_data = target_scaler.transform(np.array(request.historical_data).reshape(-1, 1))# 确保序列长度匹配模型期望if len(scaled_data) < X_multi.shape[1]:# 填充序列padded_data = np.zeros((X_multi.shape[1], 1))padded_data[-len(scaled_data):] = scaled_datacurrent_sequence = padded_dataelse:current_sequence = scaled_data[-X_multi.shape[1]:]# 执行预测predictions = []with torch.no_grad():for _ in range(request.steps):input_tensor = torch.FloatTensor(current_sequence).unsqueeze(0)output = best_model(input_tensor)pred = output.numpy()[0, 0]predictions.append(float(pred))# 更新序列current_sequence = np.roll(current_sequence, -1)current_sequence[-1] = pred# 反标准化预测结果predictions = target_scaler.inverse_transform(np.array(predictions).reshape(-1, 1))return {"predictions": predictions.flatten().tolist()}# 运行API服务器
if __name__ == "__main__":uvicorn.run(app, host="0.0.0.0", port=8000)
8.2 批处理预测管道
from fintsb.pipelines import BatchPredictionPipeline# 初始化批处理管道
batch_pipeline = BatchPredictionPipeline(model=best_model,scaler=target_scaler,sequence_length=X_multi.shape[1],batch_size=64
)# 模拟新数据到达
new_data = pd.DataFrame({'Date': pd.date_range(start='2024-01-01', periods=100),'S&P 500': np.random.normal(loc=4000, scale=100, size=100).cumsum(),'NASDAQ': np.random.normal(loc=12000, scale=300, size=100).cumsum(),'Dow Jones': np.random.normal(loc=33000, scale=200, size=100).cumsum()
})# 执行批处理预测
predictions = batch_pipeline.predict(new_data['S&P 500'].values)# 可视化结果
plt.figure(figsize=(14, 7))
plt.plot(new_data['Date'], new_data['S&P 500'], label='Actual Price', color='blue')
plt.plot(new_data['Date'][X_multi.shape[1]:], predictions, label='Predicted Price', color='red', linestyle='--')
plt.xlabel('Date')
plt.ylabel('Price')
plt.title('Batch Prediction Results')
plt.legend()
plt.grid(True)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
9. 结论与扩展
9.1 项目总结
通过本指南,我们完成了以下工作:
- 成功将FinTSB框架应用于自定义金融指数数据
- 实现了单变量和多变量时间序列预测模型
- 比较了不同模型的预测性能
- 实现了模型优化、集成和解释性分析
- 构建了可部署的预测API和批处理管道
9.2 扩展方向
未来可以探索以下方向:
- 集成更多类型的模型(如Transformer, TCN等)
- 添加宏观经济指标作为额外特征
- 实现实时数据流预测
- 开发交易策略回测框架
- 构建交互式可视化仪表板
9.3 最佳实践建议
- 数据质量优先:确保数据清洁和一致性
- 持续监控:定期评估模型在生产环境中的表现
- 版本控制:对模型和数据版本进行严格管理
- 文档记录:详细记录所有实验和参数
- 安全考虑:保护敏感金融数据和模型知识产权
通过遵循本指南中的方法和最佳实践,您可以有效地利用FinTSB框架进行金融时间序列预测,并根据具体需求进行定制和扩展。