基于机器学习的基金股票预测:从数据到决策
用Python构建智能投资分析模型
---
引言
在金融市场中,准确预测基金和股票的价格走势一直是投资者和量化分析师追求的目标。随着人工智能技术的发展,机器学习为这一领域带来了新的可能性。本文将详细介绍如何使用Python构建一个完整的基金股票预测系统,涵盖从数据获取到模型部署的全流程。
技术概览
预测目标
· 价格预测:预测未来N天的收盘价
· 趋势分类:判断未来涨跌方向(上涨/下跌)
· 风险评估:评估投资风险等级
技术栈
```python
# 核心库
import pandas as pd
import numpy as np
import yfinance as yf
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.preprocessing import StandardScaler
import tensorflow as tf
from tensorflow import keras
import matplotlib.pyplot as plt
import seaborn as sns
```
项目架构设计
系统架构
```
数据层 → 特征工程 → 模型训练 → 预测分析 → 可视化展示
↓ ↓ ↓ ↓ ↓
数据获取 技术指标 集成学习 回测验证 交互图表
时间特征 深度学习 风险评估 报告生成
```
核心模块
1. 数据采集与预处理模块
2. 特征工程模块
3. 多模型训练模块
4. 回测验证模块
5. 可视化分析模块
完整实现代码
1. 数据获取与预处理
```python
class StockDataFetcher:
def __init__(self):
self.data = None
def fetch_data(self, symbol, start_date, end_date):
"""获取股票/基金历史数据"""
try:
self.data = yf.download(symbol, start=start_date, end=end_date)
print(f"成功获取 {symbol} 从 {start_date} 到 {end_date} 的数据")
return self.data
except Exception as e:
print(f"数据获取失败: {e}")
return None
def preprocess_data(self, data):
"""数据预处理"""
# 处理缺失值
data = data.fillna(method='ffill')
# 计算基础特征
data['Daily Return'] = data['Close'].pct_change()
data['Price Change'] = data['Close'].diff()
return data
# 使用示例
fetcher = StockDataFetcher()
stock_data = fetcher.fetch_data('000001.SS', '2020-01-01', '2024-01-01')
processed_data = fetcher.preprocess_data(stock_data)
```
2. 特征工程
```python
class FeatureEngineer:
def __init__(self):
self.features = []
def add_technical_indicators(self, data):
"""添加技术指标特征"""
# 移动平均线
data['MA_5'] = data['Close'].rolling(window=5).mean()
data['MA_20'] = data['Close'].rolling(window=20).mean()
data['MA_60'] = data['Close'].rolling(window=60).mean()
# 相对强弱指数(RSI)
data['RSI'] = self.calculate_rsi(data['Close'])
# 布林带
data['BB_upper'], data['BB_lower'] = self.calculate_bollinger_bands(data['Close'])
# MACD
data['MACD'], data['MACD_signal'] = self.calculate_macd(data['Close'])
# 成交量相关特征
data['Volume_MA'] = data['Volume'].rolling(window=5).mean()
data['Price_Volume'] = data['Close'] * data['Volume']
return data
def calculate_rsi(self, prices, window=14):
"""计算RSI指标"""
delta = prices.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
def calculate_bollinger_bands(self, prices, window=20, num_std=2):
"""计算布林带"""
rolling_mean = prices.rolling(window=window).mean()
rolling_std = prices.rolling(window=window).std()
upper_band = rolling_mean + (rolling_std * num_std)
lower_band = rolling_mean - (rolling_std * num_std)
return upper_band, lower_band
def calculate_macd(self, prices, fast=12, slow=26, signal=9):
"""计算MACD指标"""
ema_fast = prices.ewm(span=fast).mean()
ema_slow = prices.ewm(span=slow).mean()
macd = ema_fast - ema_slow
macd_signal = macd.ewm(span=signal).mean()
return macd, macd_signal
def add_time_features(self, data):
"""添加时间特征"""
data['DayOfWeek'] = data.index.dayofweek
data['Month'] = data.index.month
data['Quarter'] = data.index.quarter
data['Year'] = data.index.year
data['IsMonthStart'] = data.index.is_month_start.astype(int)
data['IsMonthEnd'] = data.index.is_month_end.astype(int)
return data
# 特征工程应用
engineer = FeatureEngineer()
featured_data = engineer.add_technical_indicators(processed_data)
featured_data = engineer.add_time_features(featured_data)
```
3. 目标变量定义
```python
class TargetBuilder:
def __init__(self, prediction_days=5, threshold=0.02):
self.prediction_days = prediction_days
self.threshold = threshold # 涨跌阈值
def create_regression_target(self, data):
"""创建回归目标(未来价格)"""
data['Future_Close'] = data['Close'].shift(-self.prediction_days)
data['Price_Ratio'] = data['Future_Close'] / data['Close'] - 1
return data
def create_classification_target(self, data):
"""创建分类目标(涨跌方向)"""
future_returns = data['Close'].pct_change(self.prediction_days).shift(-self.prediction_days)
# 1: 上涨, 0: 下跌
data['Trend'] = (future_returns > self.threshold).astype(int)
# 三分类:大涨、震荡、大跌
data['Trend_3class'] = pd.cut(future_returns,
bins=[-np.inf, -self.threshold, self.threshold, np.inf],
labels=[0, 1, 2])
return data
# 创建目标变量
target_builder = TargetBuilder(prediction_days=5)
featured_data = target_builder.create_regression_target(featured_data)
featured_data = target_builder.create_classification_target(featured_data)
```
4. 模型构建与训练
```python
class StockPredictor:
def __init__(self, test_size=0.2):
self.test_size = test_size
self.models = {}
self.scalers = {}
self.performance = {}
def prepare_features(self, data, target_column):
"""准备特征和目标变量"""
# 选择数值型特征
feature_columns = [col for col in data.columns if col not in
['Future_Close', 'Trend', 'Trend_3class', 'Price_Ratio']]
features = data[feature_columns].select_dtypes(include=[np.number])
# 移除包含NaN的行
valid_indices = ~(features.isnull().any(axis=1) | data[target_column].isnull())
X = features[valid_indices]
y = data[target_column][valid_indices]
return X, y, valid_indices
def train_test_split_time_series(self, X, y):
"""时间序列数据的分割"""
split_index = int(len(X) * (1 - self.test_size))
X_train = X.iloc[:split_index]
X_test = X.iloc[split_index:]
y_train = y.iloc[:split_index]
y_test = y.iloc[split_index:]
return X_train, X_test, y_train, y_test
def train_models(self, data, target_type='regression'):
"""训练多个模型"""
if target_type == 'regression':
target_column = 'Price_Ratio'
models = {
'RandomForest': RandomForestRegressor(n_estimators=100, random_state=42),
'GradientBoosting': GradientBoostingRegressor(n_estimators=100, random_state=42),
'LSTM': self.build_lstm_model()
}
else:
target_column = 'Trend'
from sk
