当前位置: 首页 > news >正文

Airflow量化入门系列:第四章 A股数据处理与存储优化

Airflow量化入门系列:第四章 A股数据处理与存储优化

本教程系统性地讲解了 Apache Airflow 在 A 股量化交易中的应用,覆盖从基础安装到高级功能的完整知识体系。通过八章内容,读者将掌握 Airflow 的核心概念、任务调度、数据处理、技术指标计算、策略回测及工作流监控、Docker/Kubernetes集成及生产环境优化等关键技能。教程整合 Tushare 数据源、TA-Lib 技术指标库和 VectorBT 策略回测工具,提供丰富实战案例,帮助构建高效、可靠的工作流,助力量化交易实践。
文中内容仅限技术学习与代码实践参考,市场存在不确定性,技术分析需谨慎验证,不构成任何投资建议。适合量化新手建立系统认知,为策略开发打下基础。

AirFlow

学习对象

  • 中高级水平的开发者
  • 具备 Python 编程基础,熟悉基本的 ETL 流程和数据分析工具
  • 希望掌握 Airflow 在量化交易场景中的应用

教程目标

  • 系统掌握 Apache Airflow 的核心功能与高级特性
  • 深入理解 Airflow 在 A 股量化交易工作流中的应用
  • 能够独立设计、部署和维护复杂的量化交易工作流

教程目录

第一章 Airflow基础与量化交易场景

1.1 Airflow 2.10+核心概念与架构
1.2 量化交易中的任务调度需求
1.3 Airflow环境搭建
1.4 DAG设计原则与任务依赖关系
1.5 示例:Tushare数据定时抓取与存储(Parquet格式)

第二章 DAG设计与任务流控制

2.1 DAG生命周期与状态管理
2.2 Operator类型详解(PythonOperator、BashOperator、TaskFlow API)
2.3 任务依赖关系与XCom通信机制
2.4 示例:A股数据预处理流水线(缺失值处理、特征工程)
2.5 错误处理与重试机制

第三章 定时调度与告警集成飞书

3.1 Cron表达式与调度策略
3.2 Airflow Scheduler工作原理
3.3 Airflow Web UI监控与日志分析
3.4 示例:A股日线数据定时更新与异常检测
3.5 告警机制与飞书集成

第四章 A股数据处理与存储优化

4.1 Parquet文件格式优化(分区、压缩)
4.2 Airflow与Tushare API交互最佳实践
4.3 数据管道性能优化(并行任务与资源管理)
4.4 示例:A股基本面数据ETL流程
4.5 数据版本控制与回滚策略

第五章 技术指标计算与特征工程

5.1 TA-Lib集成与自定义指标开发
5.2 并行计算与任务拆分策略
5.3 特征工程流水线设计(滞后特征、滚动窗口)
5.4 数据校验与一致性检查
5.5 示例:A股技术指标流水线(MACD、RSI、布林带)

第六章 VectorBT策略开发与回测

6.1 VectorBT集成与回测流程
6.2 策略参数优化与网格搜索
6.3 回测结果与性能评估
6.4 策略版本管理与持续迭代
6.5 示例:基于动量策略的A股回测

第七章 机器学习模型训练与部署

7.1 Airflow与Scikit-learn集成
7.2 模型训练流水线设计(数据分片、交叉验证)
7.3 模型评估与超参数调优
7.4 模型部署与在线预测
7.5 示例:A股收益率预测模型训练

第八章 高级特性与生产环境优化

8.1 动态任务生成与SubDAG
8.2 Airflow与Kubernetes集成
8.3 安全性与权限管理
8.4 生产环境部署与监控
9.5 性能优化与扩展性设计

第四章 A股数据处理与存储优化

4.1 Parquet文件格式优化(分区、压缩)

Parquet是一种高效的列式存储格式,特别适合处理大规模结构化数据。通过合理的分区策略和压缩算法选择,可以显著提升数据读写效率。

推荐阅读🚀

  • Pandas+PyArrow:股票数据存储 Parquet 入门指引 🔥
  • A股数据存储实战:Parquet技术深度解析

关键知识点

  1. 分区存储:通过partition_cols参数按交易日期分区存储,路径结构为trade_date=YYYYMMDD,减少查询时的I/O操作。
  2. 压缩算法:使用snappy压缩算法,平衡了压缩率和读写速度。
  3. 列式存储:Parquet的列式存储结构允许只读取需要的列,减少内存占用并提升查询效率。

示例:按交易日期分区存储A股日线数据

import os

import pendulum
import tushare as ts

from airflow import DAG
from airflow.configuration import conf
from airflow.operators.python import PythonOperator

# 配置 Tushare API
TS_TOKEN = conf.get("tushare", "api_token")
ts.set_token(TS_TOKEN)
pro = ts.pro_api()

# 数据存储路径
DATA_DIR = conf.get("tushare", "data_folder")
os.makedirs(DATA_DIR, exist_ok=True)


def optimized_data_storage(**kwargs):
    """按日期分区存储A股日线数据。

    :param kwargs: Airflow任务上下文
    """
    # 获取任务执行日期,格式为YYYYMMDD
    execution_date = kwargs["execution_date"].strftime("%Y%m%d")

    # 定义股票代码列表(示例:沪深300成分股)
    stock_codes = ["600000.SH", "600036.SH", "000001.SZ"]

    start_date = execution_date
    end_date = execution_date

    # 按日期分区存储,使用snappy压缩
    data_path = os.path.join(DATA_DIR, "kline")
    os.makedirs(data_path, exist_ok=True)
    contents = os.listdir(data_path)
    # 如果内容列表为空,则是首次抓取数据
    if len(contents) == 0:
        start_date = "20250101"  # 默认开始日期

    try:
        # 抓取指定日期的A股日线数据
        df = pro.daily(
            ts_code=",".join(stock_codes),
            start_date=start_date,
            end_date=end_date,
        )

        df.to_parquet(
            data_path,
            index=False,
            compression="snappy",
            partition_cols=["trade_date"],  # 按日期分区
        )

        print(f"Data saved to {data_path}")

    except Exception as e:
        print(f"Error fetching or writing data: {e}")


# 定义DAG
with DAG(
    "optimized_data_storage",
    description="按日期分区存储A股日线数据",
    schedule_interval="30 16 * * 1-5",  # 工作日16:30执行
    start_date=pendulum.today().add(days=-1),  # DAG开始日期
    tags=["quant", "storage"],
) as dag:
    # 定义任务
    optimize_task = PythonOperator(
        task_id="optimized_data_storage",
        python_callable=optimized_data_storage,
        provide_context=True,
    )

    # 设置任务依赖关系
    optimize_task

4.2 Airflow与Tushare API交互最佳实践

关键知识点

  1. 批量查询:通过批量查询减少API调用次数,提升效率。
  2. 错误处理:对每个股票的API调用添加异常处理,确保单个失败不会影响整体流程。
  3. 数据存储:使用Parquet格式存储数据,便于后续分析。

示例:批量抓取A股基本面数据

import os
from datetime import timedelta

import pandas as pd
import pendulum
import tushare as ts

from airflow import DAG
from airflow.configuration import conf
from airflow.operators.python import PythonOperator

# 配置 Tushare API
TS_TOKEN = conf.get("tushare", "api_token")
ts.set_token(TS_TOKEN)
pro = ts.pro_api()

# 数据存储路径
DATA_DIR = conf.get("tushare", "data_folder")
os.makedirs(DATA_DIR, exist_ok=True)


def fetch_financial_data(**kwargs):
    """批量抓取A股基本面数据并存储为Parquet文件。

    :param kwargs: Airflow任务上下文
    """
    # 获取任务执行日期
    execution_date = kwargs["execution_date"].strftime("%Y%m%d")

    try:
        # 批量获取沪深股通成份股
        hs300_cons = pro.hs_const(hs_type="SH")
        stock_codes = hs300_cons["ts_code"].tolist()
        # 示例:只抓取前5只股票数据
        fetch_stock_codes = stock_codes[:5]

        # 批量查询基本面数据
        financial_data = []
        for code in fetch_stock_codes:
            try:
                df = pro.fina_indicator(
                    ts_code=code,
                    start_date="20240101",  # 默认开始日期
                    end_date=execution_date,
                )
                financial_data.append(df)
            except Exception as e:
                print(f"Error fetching data for {code}: {e}")

        # 合并数据并存储
        if financial_data:
            df = pd.concat(financial_data)
            file_path = os.path.join(DATA_DIR, "financial_data.parquet")
            df.to_parquet(file_path, index=False, compression="snappy")
            print(f"Financial data saved to {file_path}")

    except Exception as e:
        print(f"Error in financial data pipeline: {e}")


# 定义DAG默认参数
default_args = {
    "owner": "airflow",  # DAG所有者
    "depends_on_past": False,  # 是否依赖过去任务
    "email_on_failure": False,  # 任务失败时是否发送邮件
    "email_on_retry": False,  # 任务重试时是否发送邮件
    "retries": 1,  # 任务重试次数
    "retry_delay": timedelta(minutes=5),  # 任务重试间隔
}

# 定义DAG
with DAG(
    "fetch_financial_data",  # DAG名称
    description="批量抓取A股基本面数据",  # DAG描述
    default_args=default_args,  # 默认参数
    schedule_interval=timedelta(days=1),  # 调度间隔(每天执行一次)
    start_date=pendulum.today().add(days=-1),  # DAG开始日期
    tags=["quant", "tushare"],  # DAG标签
) as dag:
    # 定义任务
    fetch_task = PythonOperator(
        task_id="fetch_financial_data",  # 任务ID
        python_callable=fetch_financial_data,  # 任务执行函数
        provide_context=True,  # 提供任务上下文
    )

    # 设置任务依赖关系
    fetch_task

4.3 数据管道性能优化(并行任务与资源管理)

在数据处理管道中,通过并行任务和资源管理可以显著提升性能。使用CeleryExecutor可以实现任务的分布式处理。

关键知识点

  1. 并行任务:通过CeleryExecutor实现任务的分布式处理,提升并行效率。
  2. 资源管理:配置任务的CPU和内存资源,避免资源竞争。
  3. 任务队列:使用CeleryRedis作为任务队列,确保任务的可靠执行。

CeleryExecutor配置

  1. 配置CeleryExecutor

    修改airflow.cfg文件,添加以下配置:

    [core]
    executor = CeleryExecutor
    
    [celery]
    broker_url = redis://localhost:6379/0
    result_backend = db+postgresql://user:password@localhost:5432/airflow
    worker_concurrency = 4  # 根据实际情况调整并发数
    
  2. 启动Celery Worker

    airflow celery worker
    

4.4 A股基本面数据ETL流程

ETL(Extract, Transform, Load)是数据处理的核心流程。通过优化ETL流程,可以提升数据处理效率。

关键知识点

  1. 数据抽取:批量获取沪深300成分股的基本面数据。
  2. 数据转换:清洗数据并计算特征。
  3. 数据加载:将处理后的数据存储为Parquet文件。

4.5 数据版本控制与回滚策略

在量化交易中,数据版本控制和回滚策略是确保数据一致性和稳定性的关键。

关键知识点

  1. 数据版本控制:通过Git管理数据集的版本,每次数据更新生成一个新的提交。
  2. 回滚策略:使用Git回滚到上一个版本,确保数据一致性。
  3. 数据一致性检查:通过校验和或哈希值确保数据完整性。

数据版本控制与回滚

import os
from datetime import timedelta

import pandas as pd
import pendulum
import tushare as ts

from airflow import DAG
from airflow.configuration import conf
from airflow.operators.python import BranchPythonOperator, PythonOperator

# 配置 Tushare API
TS_TOKEN = conf.get("tushare", "api_token")
ts.set_token(TS_TOKEN)
pro = ts.pro_api()

# 数据存储路径
DATA_DIR = conf.get("tushare", "data_folder")
os.makedirs(DATA_DIR, exist_ok=True)


def fetch_financial_data(**kwargs):
    """批量抓取A股基本面数据并存储为Parquet文件。

    :param kwargs: Airflow任务上下文
    """
    # 获取任务执行日期作为版本号
    execution_date = kwargs["execution_date"].strftime("%Y%m%d")

    try:
        # 批量获取沪深股通成份股
        hs300_cons = pro.hs_const(hs_type="SH")
        stock_codes = hs300_cons["ts_code"].tolist()
        # 示例:只抓取前5只股票数据
        fetch_stock_codes = stock_codes[:5]

        # 批量查询基本面数据
        financial_data = []
        for code in fetch_stock_codes:
            try:
                df = pro.fina_indicator(
                    ts_code=code,
                    start_date="20240101",  # 默认开始日期
                    end_date=execution_date,
                )
                financial_data.append(df)
            except Exception as e:
                print(f"Error fetching data for {code}: {e}")

        # 合并数据并存储
        if financial_data:
            df = pd.concat(financial_data)
            file_path = os.path.join(DATA_DIR, "financial_data.parquet")
            df.to_parquet(file_path, index=False, compression="snappy")
            print(f"Financial data saved to {file_path}")

        return "success"
    except Exception as e:
        print(f"Error in financial data pipeline: {e}")
        return "failure"


def git_commit_and_push(**kwargs):
    """提交数据到Git并推送到远程仓库。

    :param kwargs: Airflow任务上下文
    """
    try:
        # 获取任务执行日期作为提交信息
        commit_msg = kwargs["execution_date"].strftime("%Y%m%d")

        # 提交到Git
        os.system(f"git add {DATA_DIR}/financial_data_v{commit_msg}.parquet")
        os.system(f'git commit -m "Update data for {commit_msg}"')
        os.system("git push origin main")
        print("Data committed and pushed to Git")

    except Exception as e:
        print(f"Error in Git operations: {e}")


def git_rollback(**kwargs):
    """回滚到指定版本。

    :param kwargs: Airflow任务上下文
    """
    try:
        # 获取上一个版本的提交哈希
        previous_commit = (
            os.popen("git log -n 1 --skip 1 --pretty=format:%H").read().strip()
        )

        # 回滚到上一个版本
        os.system(f"git reset --hard {previous_commit}")
        print(f"Rolled back to commit {previous_commit}")

    except Exception as e:
        print(f"Error in rollback: {e}")


def decide_next_task(ti):
    """根据fetch_financial_data的结果决定下一步执行哪个任务。

    :param ti: TaskInstance对象
    """
    result = ti.xcom_pull(task_ids="fetch_financial_data")
    if result == "success":
        return "git_commit_and_push"
    else:
        return "git_rollback"


# 定义DAG默认参数
default_args = {
    "owner": "airflow",  # DAG所有者
    "depends_on_past": False,  # 是否依赖过去任务
    "email_on_failure": False,  # 任务失败时是否发送邮件
    "email_on_retry": False,  # 任务重试时是否发送邮件
    "retries": 1,  # 任务重试次数
    "retry_delay": timedelta(minutes=5),  # 任务重试间隔
}

# 定义DAG
with DAG(
    "data_version_control",
    description="数据版本控制与回滚",
    default_args=default_args,  # 默认参数
    schedule_interval=timedelta(days=1),  # 调度间隔(每天执行一次)
    start_date=pendulum.today().add(days=-1),  # DAG开始日期
    tags=["quant", "version"],
) as dag:
    # 定义任务
    fetch_task = PythonOperator(
        task_id="fetch_financial_data",
        python_callable=fetch_financial_data,
        provide_context=True,
    )

    commit_task = PythonOperator(
        task_id="git_commit_and_push",
        python_callable=git_commit_and_push,
        provide_context=True,
    )

    rollback_task = PythonOperator(
        task_id="git_rollback",
        python_callable=git_rollback,
        provide_context=True,
    )

    decide_task = BranchPythonOperator(
        task_id="decide_next_task",
        python_callable=decide_next_task,
        provide_context=True,
    )

    # 设置任务依赖关系
    fetch_task >> decide_task
    decide_task >> [commit_task, rollback_task]

风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。

http://www.dtcms.com/a/113649.html

相关文章:

  • 浅谈StarRocks 常见问题解析
  • (5)模拟后——Leonardo的可视化操作
  • 探秘叁仟智盒设备:智慧城市的智能枢纽
  • Django4.0 快速集成jwt
  • ASP.NET Core Web API 参数传递方式
  • NLP简介及其发展历史
  • docker stack常用命令
  • C#结构体(Struct)深度解析:轻量数据容器与游戏开发应用 (Day 20)
  • pinia-plugin-persist、vuex
  • Spring Boot项目连接MySQL数据库及CRUD操作示例
  • Java Timer:老派但好用的“定时任务小闹钟“
  • 【Linux】进程间通信、匿名管道、进程池
  • 将OpenFOAM中的lduMatrix数据转换为CSC稀疏矩阵格式
  • 混合编程的架构
  • Java EE期末总结(第三章)
  • Leedcode刷题 | 回溯算法小总结01
  • kali——masscan
  • Matlab轴承故障信号仿真与故障分析
  • spring-cloud-alibaba-nacos-config使用说明
  • 《K230 从熟悉到...》无线网络
  • LINUX 4 tar -zcvf -jcvf -Jcvf -tf -uf
  • Transformer+BO-SVM多变量时间序列预测(Matlab)
  • 力扣刷题——508.出现次数最多的子树和
  • Docker存储策略深度解析:临时文件 vs 持久化存储选型指南
  • 每日算法-250405
  • 4. 面向对象程序设计
  • 分布式事务解决方案全解析:从经典模式到现代实践
  • 每天五分钟深度学习框架pytorch:搭建LSTM完成手写字体识别任务?
  • 深入探索 Linux Top 命令:15 个实用示例
  • python中的sort使用