当前位置: 首页 > news >正文

PyTorch量化进阶教程:第四章 Transformer 模型构建与训练

PyTorch量化进阶教程:第四章 Transformer 模型构建与训练

本教程通过深入讲解 Transformer 架构、自注意力机制及时间序列预测,结合 Tushare 数据源和 TA-Lib 技术指标,实现从数据处理到模型训练、回测与策略部署的完整量化交易系统。教程每个环节都通过专业示例和代码实现进行阐释,确保读者能够扎实掌握并灵活运用所学知识。
文中内容仅限技术学习与代码实践参考,市场存在不确定性,技术分析需谨慎验证,不构成任何投资建议。适合量化新手建立系统认知,为策略开发打下基础。

PyTorch量化进阶教程

学习对象

  • 中高级水平的开发者
  • 具备 Python 编程基础和一定的机器学习知识
  • 对 A 股市场有一定了解,熟悉 Tushare 数据源和 TA-Lib 技术指标

教程目标

  • 系统学习 PyTorch 和 Transformer 技术
  • 掌握 Transformer 在时间序列预测和量化交易中的应用
  • 使用 Tushare 数据源和 TA-Lib 技术指标构建量化交易模型
  • 实现从数据获取到模型训练、评估和部署的完整流程

教程目录

第一章 PyTorch 基础

1.1 PyTorch 环境搭建与基本操作
1.2 张量(Tensor)与自动求导机制
1.3 神经网络模块(nn.Module)与优化器
1.4 数据加载与预处理(DataLoader 和 Dataset)

第二章 Transformer 理论详解

2.1 Transformer 架构概述
2.2 自注意力机制(Self-Attention)
2.3 编码器与解码器结构
2.4 Transformer 在时间序列预测中的应用

第三章 A 股数据处理与特征工程

3.1 使用 Tushare 获取 A 股数据
3.2 数据存储与管理(Parquet 文件)
3.3 使用 TA-Lib 计算技术指标
3.4 特征工程与数据预处理

第四章 Transformer 模型构建与训练

4.1 Transformer 模型的 PyTorch 实现
4.2 时间序列预测任务的模型设计
4.3 模型训练与超参数优化
4.4 模型评估与性能分析

第五章 Transformer 在量化交易中的应用

5.1 量化交易策略设计与实现
5.2 回测与风险评估
5.3 策略优化与改进
5.4 模型保存与加载
5.5 ONNX 优化模型

第六章 模型部署与生产化

6.1 部署整体架构设计
6.2 核心部署流程
6.3 关键技术实现
6.4 性能调优路线
6.5 监控指标设计
6.6 总结建议

第四章 Transformer 模型构建与训练

4.1 Transformer 模型的 PyTorch 实现

基于之前对 Transformer 的理论理解和 A 股数据的处理,现在我们来构建一个完整的 Transformer 模型,用于 A 股价格预测。

4.1.1 模型定义

import torch
import torch.nn as nn
import math


class MultiHeadAttention(nn.Module):
    """多头注意力机制。

    :param d_model: 输入和输出的维度
    :param num_heads: 注意力头的数量
    """

    def __init__(self, d_model, num_heads):
        super().__init__()
        self.num_heads = num_heads  # 注意力头的数量
        self.d_model = d_model  # 输入和输出的维度
        self.depth = d_model // num_heads  # 每个头的维度

        self.wq = nn.Linear(d_model, d_model)  # 查询线性变换
        self.wk = nn.Linear(d_model, d_model)  # 键线性变换
        self.wv = nn.Linear(d_model, d_model)  # 值线性变换
        self.dense = nn.Linear(d_model, d_model)  # 输出线性变换

    def split_heads(self, x, batch_size):
        """将输入张量分割成多个头。

        :param x: 输入张量 (batch_size, seq_len, d_model)
        :param batch_size: 批次大小
        :return: 分割后的张量 (batch_size, num_heads, seq_len, depth)
        """
        x = x.view(
            batch_size, -1, self.num_heads, self.depth
        )  # 将d_model维度拆分成num_heads和depth
        return x.permute(
            0, 2, 1, 3
        )  # 调整维度顺序为 (batch_size, num_heads, seq_len, depth)

    def forward(self, q, k, v):
        """前向传播函数。

        :param q: 查询张量 (batch_size, seq_len, d_model)
        :param k: 键张量 (batch_size, seq_len, d_model)
        :param v: 值张量 (batch_size, seq_len, d_model)
        :return: 输出张量 (batch_size, seq_len, d_model) 和注意力权重 (batch_size, num_heads, seq_len, seq_len)
        """
        batch_size = q.size(0)  # 获取批次大小

        q = self.wq(q)  # 对查询进行线性变换
        k = self.wk(k)  # 对键进行线性变换
        v = self.wv(v)  # 对值进行线性变换

        q = self.split_heads(q, batch_size)  # 将查询张量分割成多个头
        k = self.split_heads(k, batch_size)  # 将键张量分割成多个头
        v = self.split_heads(v, batch_size)  # 将值张量分割成多个头

        # 计算注意力
        scaled_attention, attention_weights = self.scaled_dot_product_attention(q, k, v)
        scaled_attention = scaled_attention.permute(
            0, 2, 1, 3
        )  # 调整维度顺序为 (batch_size, seq_len, num_heads, depth)
        concat_attention = scaled_attention.reshape(
            batch_size, -1, self.d_model
        )  # 合并头,恢复原始维度

        output = self.dense(concat_attention)  # 进行最终的线性变换
        return output, attention_weights

    def scaled_dot_product_attention(self, q, k, v):
        """计算缩放点积注意力。

        :param q: 查询张量 (batch_size, num_heads, seq_len, depth)
        :param k: 键张量 (batch_size, num_heads, seq_len, depth)
        :param v: 值张量 (batch_size, num_heads, seq_len, depth)
        :return: 注意力输出 (batch_size, num_heads, seq_len, depth) 和注意力权重 (batch_size, num_heads, seq_len, seq_len)
        """
        matmul_qk = torch.matmul(q, k.transpose(-1, -2))  # 计算Q和K的点积
        dk = torch.tensor(k.size(-1), dtype=torch.float32)  # 获取深度dk
        scaled_attention_logits = matmul_qk / torch.sqrt(dk)  # 缩放点积

        attention_weights = torch.softmax(
            scaled_attention_logits, dim=-1
        )  # 计算注意力权重
        output = torch.matmul(attention_weights, v)  # 应用注意力权重到值上
        return output, attention_weights


class EncoderLayer(nn.Module):
    """编码器层。

    :param d_model: 输入和输出的维度
    :param num_heads: 注意力头的数量
    :param dff: 前馈神经网络的中间维度
    :param dropout: dropout 概率,默认为 0.1
    """

    def __init__(self, d_model, num_heads, dff, dropout=0.1):
        super().__init__()
        self.mha = MultiHeadAttention(d_model, num_heads)  # 多头注意力机制
        self.ffn = nn.Sequential(
            nn.Linear(d_model, dff),  # 线性变换到dff维度
            nn.GELU(),  # GELU激活函数
            nn.Linear(dff, d_model),  # 线性变换回d_model维度
        )
        self.layer_norm1 = nn.LayerNorm(d_model)  # 第一个层归一化
        self.layer_norm2 = nn.LayerNorm(d_model)  # 第二个层归一化
        self.dropout = nn.Dropout(dropout)  # Dropout层

    def forward(self, x):
        """前向传播函数。

        :param x: 输入张量 (batch_size, seq_len, d_model)
        :return: 输出张量 (batch_size, seq_len, d_model)
        """
        # 多头注意力
        attn_output, _ = self.mha(x, x, x)  # 计算多头注意力
        attn_output = self.dropout(attn_output)  # 应用dropout
        out1 = self.layer_norm1(x + attn_output)  # 残差连接和层归一化

        # 前馈神经网络
        ffn_output = self.ffn(out1)  # 前馈神经网络
        ffn_output = self.dropout(ffn_output)  # 应用dropout
        out2 = self.layer_norm2(out1 + ffn_output)  # 残差连接和层归一化
        return out2


class DecoderLayer(nn.Module):
    """解码器层。

    :param d_model: 输入和输出的维度
    :param num_heads: 注意力头的数量
    :param dff: 前馈神经网络的中间维度
    :param dropout: dropout 概率,默认为 0.1
    """

    def __init__(self, d_model, num_heads, dff, dropout=0.1):
        super().__init__()
        self.mha1 = MultiHeadAttention(d_model, num_heads)  # 掩码多头注意力
        self.mha2 = MultiHeadAttention(d_model, num_heads)  # 编码器-解码器注意力
        self.ffn = nn.Sequential(
            nn.Linear(d_model, dff),  # 线性变换到dff维度
            nn.GELU(),  # GELU激活函数
            nn.Linear(dff, d_model),  # 线性变换回d_model维度
        )
        self.layer_norm1 = nn.LayerNorm(d_model)  # 第一个层归一化
        self.layer_norm2 = nn.LayerNorm(d_model)  # 第二个层归一化
        self.layer_norm3 = nn.LayerNorm(d_model)  # 第三个层归一化
        self.dropout = nn.Dropout(dropout)  # Dropout层

    def forward(self, x, enc_output):
        """前向传播函数。

        :param x: 输入张量 (batch_size, seq_len, d_model)
        :param enc_output: 编码器输出 (batch_size, src_seq_len, d_model)
        :return: 输出张量 (batch_size, seq_len, d_model) 和两个注意力权重
        """
        # 掩码多头注意力
        attn1, attn_weights1 = self.mha1(x, x, x)  # 计算掩码多头注意力
        attn1 = self.dropout(attn1)  # 应用dropout
        out1 = self.layer_norm1(x + attn1)  # 残差连接和层归一化

        # 编码器-解码器注意力
        attn2, attn_weights2 = self.mha2(
            out1, enc_output, enc_output
        )  # 计算编码器-解码器注意力
        attn2 = self.dropout(attn2)  # 应用dropout
        out2 = self.layer_norm2(out1 + attn2)  # 残差连接和层归一化

        # 前馈神经网络
        ffn_output = self.ffn(out2)  # 前馈神经网络
        ffn_output = self.dropout(ffn_output)  # 应用dropout
        out3 = self.layer_norm3(out2 + ffn_output)  # 残差连接和层归一化
        return out3, attn_weights1, attn_weights2


class PositionalEncoding(nn.Module):
    """位置编码。

    :param d_model: 输入和输出的维度
    :param max_len: 最大序列长度,默认为5000
    :param dropout: dropout 概率,默认为 0.1
    """

    def __init__(self, d_model, max_len=5000, dropout=0.1):
        super().__init__()
        self.dropout = nn.Dropout(dropout)  # Dropout层

        pe = torch.zeros(max_len, d_model)  # 初始化位置编码
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)  # 位置索引
        div_term = torch.exp(
            torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)
        )  # 用于计算正弦和余弦的位置项
        pe[:, 0::2] = torch.sin(position * div_term)  # 正弦位置编码
        pe[:, 1::2] = torch.cos(position * div_term)  # 余弦位置编码
        pe = pe.unsqueeze(0)  # 增加批次维度
        self.register_buffer("pe", pe)  # 注册位置编码为缓冲区

    def forward(self, x):
        """前向传播函数。

        :param x: 输入张量 (batch_size, seq_len, d_model)
        :return: 添加了位置编码的张量 (batch_size, seq_len, d_model)
        """
        x = x + self.pe[:, : x.size(1)]  # 添加位置编码
        return self.dropout(x)  # 应用dropout


class TransformerModel(nn.Module):
    """Transformer模型。

    :param input_size: 输入特征的维度
    :param d_model: 输入和输出的维度
    :param num_heads: 注意力头的数量
    :param num_layers: 编码器层数
    :param dff: 前馈神经网络的中间维度
    :param dropout: dropout 概率,默认为 0.1
    """

    def __init__(self, input_size, d_model, num_heads, num_layers, dff, dropout=0.1):
        super().__init__()
        self.embedding = nn.Linear(input_size, d_model)  # 输入嵌入
        self.pos_encoding = PositionalEncoding(d_model, dropout=dropout)  # 位置编码
        self.encoder_layers = nn.ModuleList(
            [
                EncoderLayer(d_model, num_heads, dff, dropout)
                for _ in range(num_layers)
            ]  # 编码器层
        )
        self.fc = nn.Linear(d_model, 1)  # 全连接层

    def forward(self, x):
        """前向传播函数。

        :param x: 输入张量 (batch_size, seq_len, input_size)
        :return: 输出张量 (batch_size, 1)
        """
        x = self.embedding(x)  # 输入嵌入
        x = self.pos_encoding(x)  # 位置编码
        for enc_layer in self.encoder_layers:
            x = enc_layer(x)  # 通过每个编码器层
        x = self.fc(x.mean(dim=1))  # 使用全局平均池化并进行最终线性变换
        return x

4.1.2 模型初始化

# 模型参数
input_size = X_train.shape[1]  # 输入特征维度
d_model = 64  # 模型隐藏层维度
num_heads = 8  # 多头注意力头数
num_layers = 3  # 编码器层数
dff = 256  # 前馈网络维度
dropout = 0.1  # Dropout 概率

# 设置设备
device = torch.device(
    "cuda"
    if torch.cuda.is_available()
    else "mps" if torch.backends.mps.is_available() else "cpu"
)
# 初始化模型
model = TransformerModel(input_size, d_model, num_heads, num_layers, dff, dropout).to(
    device
)

4.2 时间序列预测任务的模型设计

4.2.1 输入输出设计

我们将使用过去 n n n 天的特征来预测下一天的收盘价。例如,使用过去 30 天的数据来预测第 31 天的收盘价。

4.2.2 损失函数与优化器选择

我们选择均方误差(MSE)作为损失函数,Adam 优化器用于参数更新。

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

4.3 模型训练与超参数优化

4.3.1 训练循环的实现

from torch.utils.data import Dataset, DataLoader


class StockDataset(Dataset):
    def __init__(self, X, y, seq_len=30):
        self.X = X
        self.y = y
        self.seq_len = seq_len

    def __len__(self):
        return len(self.X) - self.seq_len

    def __getitem__(self, idx):
        x = self.X[idx : idx + self.seq_len]
        y = self.y[idx + self.seq_len]
        return x, y


# 创建数据集和数据加载器
seq_len = 30  # 序列长度
train_dataset = StockDataset(X_train, y_train.values, seq_len)
test_dataset = StockDataset(X_test, y_test.values, seq_len)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# 训练循环
num_epochs = 50

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    for batch_X, batch_y in train_loader:
        batch_X = batch_X.float().to(device)
        batch_y = batch_y.float().unsqueeze(1).to(device)

        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()

        train_loss += loss.item() * batch_X.size(0)

    train_loss = train_loss / len(train_loader.dataset)

    # 验证
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for batch_X, batch_y in test_loader:
            batch_X = batch_X.float().to(device)
            batch_y = batch_y.float().unsqueeze(1).to(device)
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            val_loss += loss.item() * batch_X.size(0)

    val_loss = val_loss / len(test_loader.dataset)

    print(
        f"Epoch: {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}"
    )

输出

Epoch: 1/50, Train Loss: 0.0887, Val Loss: 0.0007
Epoch: 2/50, Train Loss: 0.0046, Val Loss: 0.0023
Epoch: 3/50, Train Loss: 0.0013, Val Loss: 0.0005
Epoch: 4/50, Train Loss: 0.0007, Val Loss: 0.0005
Epoch: 5/50, Train Loss: 0.0006, Val Loss: 0.0004
...
...
Epoch: 47/50, Train Loss: 0.0005, Val Loss: 0.0005
Epoch: 48/50, Train Loss: 0.0005, Val Loss: 0.0005
Epoch: 49/50, Train Loss: 0.0005, Val Loss: 0.0005
Epoch: 50/50, Train Loss: 0.0005, Val Loss: 0.0005

4.3.2 使用 Optuna 进行超参数优化

import optuna


def objective(trial):
    # 超参数搜索空间
    num_heads = trial.suggest_categorical("num_heads", [4, 8])
    base_d_model = trial.suggest_int("base_d_model", 64, 256, step=32)  # 以32为步长
    d_model = (base_d_model // num_heads) * num_heads

    num_layers = trial.suggest_int("num_layers", 2, 4)
    dff = trial.suggest_int("dff", 128, 512, step=64)
    dropout = trial.suggest_float("dropout", 0.1, 0.3)
    lr = trial.suggest_float("lr", 1e-4, 1e-3, log=True)

    # 强制维度约束
    if d_model % num_heads != 0:
        d_model = (d_model // num_heads) * num_heads  # 自动调整为最近的可整除数

    # 初始化模型
    model = TransformerModel(
        input_size, d_model, num_heads, num_layers, dff, dropout
    ).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.MSELoss()

    # 训练和验证
    for epoch in range(10):  # 减少训练轮数以加速搜索
        model.train()
        train_loss = 0.0
        for batch_X, batch_y in train_loader:
            batch_X = batch_X.float().to(device)
            batch_y = batch_y.float().unsqueeze(1).to(device)

            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * batch_X.size(0)

        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for batch_X, batch_y in test_loader:
                batch_X = batch_X.float().to(device)
                batch_y = batch_y.float().unsqueeze(1).to(device)
                outputs = model(batch_X)
                loss = criterion(outputs, batch_y)
                val_loss += loss.item() * batch_X.size(0)

    return val_loss / len(test_loader.dataset)


# 创建 Optuna 研究
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=10)

# 输出最佳超参数
print("Best trial:")
trial = study.best_trial
print(f"  Value: {trial.value}")
print("  Params: ")
for key, value in trial.params.items():
    print(f"    {key}: {value}")

输出

Best trial:
  Value: 0.0004504568067296774
  Params:
    num_heads: 8
    base_d_model: 160
    num_layers: 3
    dff: 320
    dropout: 0.10135902255908476
    lr: 0.000851687261750414

4.4 模型评估与性能分析

4.4.1 评估指标

我们使用均方误差(MAE)和均方根误差(RMSE)作为评估指标。

def evaluate_model(model, dataloader):
    model.eval()
    predictions = []
    true_values = []
    with torch.no_grad():
        for batch_X, batch_y in dataloader:
            batch_X = batch_X.float().to(device)
            batch_y = batch_y.float().to(device)
            outputs = model(batch_X).squeeze(1)
            predictions.extend(outputs.cpu().numpy())
            true_values.extend(batch_y.cpu().numpy())
    return predictions, true_values


# 获取预测值和真实值
train_predictions, train_true = evaluate_model(model, train_loader)
test_predictions, test_true = evaluate_model(model, test_loader)

# 计算评估指标
from sklearn.metrics import (
    mean_absolute_error,
    root_mean_squared_error,
)

train_mae = mean_absolute_error(train_true, train_predictions)
train_rmse = root_mean_squared_error(train_true, train_predictions)
test_mae = mean_absolute_error(test_true, test_predictions)
test_rmse = root_mean_squared_error(test_true, test_predictions)

print(f"Train MAE: {train_mae:.4f}, Train RMSE: {train_rmse:.4f}")
print(f"Test MAE: {test_mae:.4f}, Test RMSE: {test_rmse:.4f}")

输出

Train MAE: 0.0153, Train RMSE: 0.0206
Test MAE: 0.0161, Test RMSE: 0.0225

4.4.2 模型性能可视化

import matplotlib.pyplot as plt

# 绘制预测结果
plt.figure(figsize=(12, 6))
plt.plot(test_true, label="True Price")
plt.plot(test_predictions, label="Predicted Price")
plt.title("Stock Price Prediction")
plt.xlabel("Time")
plt.ylabel("Price")
plt.legend()
plt.show()

通过以上步骤,我们构建并训练了一个基于 Transformer 的 A 股价格预测模型,并对其性能进行了评估和可视化。

风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。

相关文章:

  • C/C++蓝桥杯算法真题打卡(Day12)
  • Python Flask并发demo(http并发与锁)独占接口、monkey功能还不太确定
  • 目标检测 AP 计算 实例 python
  • SpringBoot详细教程(持续更新中...)
  • 不同版本的mysql数据库对于注入的影响
  • 解决Win7下打开Delphi 11.3 后提示“LSP Server 已停止工作“
  • 【Ragflow】8.基于ragflow API 搭建极简聊天Web界面
  • CMake —— 1、CMake简介(附:Windows、Linux下的CMake安装方法,及编译测试结果)
  • C++的位与、位或、左移、右移、异或、位取反
  • c++的封装
  • 大数据与datax1.0
  • HarmonyOS Next~鸿蒙元服务开发指南:核心功能与实践
  • DAY 32 leetcode 242--哈希表.有效的字母异位词
  • PHP Swoole 启动时的进程关系
  • ISIS报文
  • ES6 新特性全面总结
  • 26考研|高等代数:线性空间
  • 关于CodeJava的学习笔记——10
  • 医院信息系统与AI赋能的介绍
  • 【Easylive】获取request对象的两种方式
  • 宠物店网站建设计划书/seo软件服务
  • 公司的网站的设计/免费网络推广公司
  • 江苏建设信息网证书查询/seo系统是什么意思
  • 要看网站是多少/二次感染即将大爆发
  • 工商营业执照注册网站/seo监控系统
  • wap网站为什么没有了/图片优化是什么意思