Posts
- 第01章-Qlib概述
第1章:Qlib 概述
本章将带你全面了解 Qlib 量化投资框架,包括其背景、特性、架构以及如何快速上手。
目录
1.1 什么是 Qlib
1.1.1 Qlib 的诞生背景
量化投资作为一种系统化的投资方法,已经在全球金融市场中扮演着越来越重要的角色。然而,传统的量化投资研究面临着诸多挑战:
- 数据量大:需要处理海量的历史数据和实时数据
- 模型复杂:从简单的线性模型到复杂的深度学习模型
- 流程繁琐:从数据清洗、特征工程、模型训练到策略回测,环节众多
- 工程复杂:需要将研究成果转化为可部署的生产系统
- 协作困难:研究团队和工程团队之间的协作效率低下
为了解决这些问题,微软亚洲研究院(MSRA)在 2019 年开源了 Qlib —— 一个旨在通过 AI 技术赋能量化投资的 AI 化量化投资平台。
1.1.2 Qlib 的设计理念
Qlib 的设计围绕以下核心理念:
1. AI 为先
Qlib 从设计之初就将 AI 技术作为核心驱动力,支持各种机器学习和深度学习模型,为量化投资提供更强大的工具。
2. 端到端的工作流
Qlib 提供了从数据获取、特征工程、模型训练、策略回测到在线部署的完整工作流,打破了各个环节之间的壁垒。
3. 工程化导向
Qlib 不仅关注研究层面,更注重工程实践,提供了生产级的性能和可靠性。
4. 开放与可扩展
Qlib 采用开源的方式,鼓励社区贡献,同时提供了灵活的扩展机制,支持自定义模型、策略和数据源。
1.1.3 Qlib vs 其他量化框架
特性 Qlib Backtrader Zipline PyAlgoTrade AI 支持 ✅ 原生支持 ⚠️ 有限 ⚠️ 有限 ⚠️ 有限 工作流管理 ✅ 完整 ❌ 无 ❌ 无 ❌ 无 回测引擎 ✅ 高性能 ✅ 成熟 ✅ 成熟 ✅ 基础 特征工程 ✅ 表达式系统 ❌ 无 ❌ 无 ❌ 无 模型支持 ✅ 丰富 ⚠️ 基础 ⚠️ 基础 ⚠️ 基础 在线服务 ✅ 支持 ❌ 无 ❌ 无 ❌ 无 学习曲线 中等 较陡 较陡 较平 文档质量 ✅ 优秀 ✅ 良好 ✅ 良好 ⚠️ 一般 Qlib 的核心优势:
- 第02章-核心概念
第2章:核心概念
本章深入讲解 Qlib 的核心概念,包括表达式系统、数据提供者、数据集、模型和 Recorder 系统。掌握这些概念是熟练使用 Qlib 的基础。
目录
2.1 表达式系统
表达式系统是 Qlib 的核心计算引擎,它提供了一种灵活、高效的方式来计算金融特征。
2.1.1 什么是表达式
表达式(Expression) 是 Qlib 中用于描述数据计算和转换的基本单元。它类似于 Pandas 的操作,但更加灵活和高效。
核心特点:
- 延迟计算:表达式定义后不会立即计算,而是在实际需要时才求值
- 链式操作:支持类似 Pandas 的链式调用
- 智能缓存:自动缓存计算结果,避免重复计算
- 并行化:支持多进程并行计算
2.1.2 Expression 类
所有表达式都继承自
Expression基类:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34from qlib.data.expression import Expression class Expression(ABC): """表达式基类""" def __init__(self, feature=None): self.feature = feature @abstractmethod def load(self, instrument, start_index, end_index): """加载数据""" pass def __add__(self, other): """支持 + 运算符""" return Add(self, other) def __sub__(self, other): """支持 - 运算符""" return Sub(self, other) def __mul__(self, other): """支持 * 运算符""" return Mul(self, other) def __truediv__(self, other): """支持 / 运算符""" return Div(self, other) def __gt__(self, other): """支持 > 运算符""" return Gt(self, other) # ... 其他运算符2.1.3 表达式类型
1. Feature(特征)
最基础的表达式,直接从数据源加载原始数据:
- 第03章-数据系统架构
第3章:数据系统架构
本章深入讲解 Qlib 的数据系统架构,包括数据层的分层设计、Provider 系统、数据流、缓存机制等核心内容。
目录
3.1 数据层设计
3.1.1 数据层架构
Qlib 的数据层采用分层设计,从下到上依次为:
┌─────────────────────────────────────────────────────┐ │ 用户接口层 │ │ D.features() | D.instruments() | D.calendar() │ ├─────────────────────────────────────────────────────┤ │ 表达式层 │ │ Expression Engine | Operators | Calculator │ ├─────────────────────────────────────────────────────┤ │ Provider层 │ │ LocalProvider | ClientProvider | ArcticProvider │ ├─────────────────────────────────────────────────────┤ │ 存储层 │ │ 本地文件 | 数据库 | API | 第三方数据 │ └─────────────────────────────────────────────────────┘3.1.2 核心组件
1. D 接口
D是 Qlib 提供的统一数据访问接口: - 第04章-表达式系统详解
第4章:表达式系统详解
表达式系统是 Qlib 的核心计算引擎,本章深入讲解表达式的设计原理、各类算子的使用方法以及如何实现自定义算子。
目录
4.1 表达式系统原理
4.1.1 设计思想
Qlib 的表达式系统借鉴了 Pandas 的设计理念,但针对量化投资场景进行了优化:
核心特性:
- 延迟计算:表达式定义后不会立即计算,在实际需要时才求值
- 智能缓存:自动缓存计算结果,避免重复计算
- 并行化:支持多进程并行计算
- 类型推断:自动推断表达式的返回类型
- 扩展性:易于添加自定义算子
4.1.2 Expression 类体系
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80from qlib.data.expression import Expression class Expression(ABC): """表达式基类""" def __init__(self, feature=None): self.feature = feature @abstractmethod def load(self, instrument, start_index, end_index): """ 加载数据 Args: instrument: 证券代码 start_index: 开始索引 end_index: 结束索引 Returns: pd.Series: 数据序列 """ pass def get_extended_window_size(self): """ 获取需要的扩展窗口大小 某些算子需要额外的历史数据才能计算 例如:Mean($close, 20) 需要至少20个历史数据点 Returns: tuple: (left, right) 需要的额外数据点数 """ return 0, 0 # 运算符重载 def __add__(self, other): return Add(self, other) def __sub__(self, other): return Sub(self, other) def __mul__(self, other): return Mul(self, other) def __truediv__(self, other): return Div(self, other) def __gt__(self, other): return Gt(self, other) def __ge__(self, other): return Ge(self, other) def __lt__(self, other): return Lt(self, other) def __le__(self, other): return Le(self, other) def __eq__(self, other): return Eq(self, other) def __ne__(self, other): return Ne(self, other) def __and__(self, other): return And(self, other) def __or__(self, other): return Or(self, other) def __invert__(self): return Not(self) def __neg__(self): return Neg(self) def __pos__(self): return Pos(self)4.1.3 表达式求值流程
定义表达式 ↓ 调用 D.features() ↓ 表达式引擎解析 ↓ 构建表达式树 ↓ 拓扑排序 ↓ 并行计算 ↓ 合并结果 ↓ 返回给用户示例:
- 第05章-特征工程
第5章:特征工程
特征工程是量化投资的核心,本章深入讲解 Qlib 的特征工程体系,包括 Alpha158、Alpha360 因子、自定义因子开发以及因子有效性评估。
目录
5.1 特征工程概述
5.1.1 什么是特征工程
特征工程(Feature Engineering) 是指利用领域知识和数据创建新特征,使机器学习算法能够更好地学习。在量化投资中,特征工程尤为重要:
为什么特征工程重要?
数据决定上限
- 好的模型 + 差的数据 = 差的结果
- 好的模型 + 好的数据 = 好的结果
市场非有效性
- 市场并非完全随机
- 存在各种可利用的规律
- 通过特征工程发现这些规律
模型限制
- 机器学习模型只能学习输入特征包含的信息
- 特征工程决定了模型能"看到"什么
5.1.2 量化因子分类
量化因子 ├── 价格动量因子 │ ├── 收益率动量 │ ├── 价格趋势 │ └── 相对强弱 ├── 波动率因子 │ ├── 历史波动率 │ ├── 实现波动率 │ └── 波动率变化 ├── 量价因子 │ ├── 量价关系 │ ├── 成交量变化 │ └── 换手率 ├── 技术指标因子 │ ├── 趋势指标 │ ├── 动量指标 │ └── 摆动指标 └── 基本面因子 ├── 估值指标 ├── 成长指标 └── 质量指标5.1.3 因子开发流程
1. 因子构思 └── 基于金融理论或经验 ↓ 2. 因子实现 └── 使用表达式系统实现 ↓ 3. 因子计算 └── 在历史数据上计算因子值 ↓ 4. 因子评估 ├── IC/Rank IC 分析 ├── 因子分布分析 ├── 换手率分析 └── 多空收益分析 ↓ 5. 因子优化 ├── 标准化 ├── 去极值 ├── 中性化 └── 正交化 ↓ 6. 因子组合 └── 单因子组合或多因子合成5.1.4 Qlib 因子体系
Qlib 提供了丰富的因子实现:
- 第06章-传统机器学习模型
第6章:传统机器学习模型
本章深入讲解 Qlib 中传统机器学习模型的使用,包括 LightGBM、XGBoost、线性模型等,涵盖模型原理、训练流程、参数调优和模型评估。
目录
6.1 模型概述
6.1.1 Qlib 模型架构
Qlib 提供了统一的模型接口,支持多种机器学习框架:
Model (统一接口) ├── 传统机器学习模型 │ ├── LGBModel (LightGBM) │ ├── XGBModel (XGBoost) │ ├── CatBoostModel (CatBoost) │ └── LinearModel (线性模型) │ └── 深度学习模型 ├── LSTM (长短期记忆网络) ├── GRU (门控循环单元) ├── Transformer (注意力机制) └── GATs (图注意力网络)6.1.2 BaseModel 接口
所有模型都继承自
ModelBase基类: - 第07章-深度学习模型
第07章 深度学习模型
本章将深入探讨 Qlib 中的深度学习模型实现,包括神经网络架构、LSTM、GRU、Transformer 和 GATs 等模型,以及训练技巧和最佳实践。
目录
1. 深度学习概述
1.1 为什么使用深度学习
深度学习在量化投资中有以下优势:
优势 说明 适用场景 自动特征提取 无需手工设计特征 时序数据、图像数据 非线性建模 捕捉复杂非线性关系 市场状态识别 序列建模 处理时间序列依赖关系 趋势跟踪、动量策略 跨股票学习 共享参数提升泛化能力 多因子选股 端到端优化 直接优化预测目标 预测未来收益 1.2 Qlib 中的深度学习模型
Qlib 提供了丰富的深度学习模型实现:
1 2 3 4 5 6 7 8# 模型分类 from qlib.contrib.model.pytorch_nn import DNNModelPytorch # 多层感知机 from qlib.contrib.model.pytorch_lstm import LSTM # LSTM from qlib.contrib.model.pytorch_gru import GRU # GRU from qlib.contrib.model.pytorch_transformer import TransformerModel # Transformer from qlib.contrib.model.pytorch_gats import GATs # 图注意力网络 from qlib.contrib.model.pytorch_tabnet import TabNet # TabNet from qlib.contrib.model.pytorch_tcn import TCN # 时序卷积网络模型对比:
- 第08章-模型集成与优化
第08章 模型集成与优化
本章将深入探讨模型集成与优化技术,包括集成学习、模型融合、知识蒸馏等高级方法,以及如何通过这些技术提升量化投资模型的性能。
目录
1. 集成学习概述
1.1 为什么需要集成学习
在量化投资中,单一模型往往存在以下局限:
问题 说明 集成学习的解决方案 过拟合风险 单一模型容易过拟合训练数据 多样化降低方差 欠拟合风险 单一模型容量有限 组合提升表达能力 预测不稳定 对数据噪声敏感 平均化提升稳定性 局部最优 可能陷入局部最优 多起点探索 1.2 集成学习的分类
集成学习 ├── Bagging (Bootstrap Aggregating) │ ├── 并行训练 │ ├── 降低方差 │ └── 例子:随机森林 ├── Boosting │ ├── 串行训练 │ ├── 降低偏差 │ └── 例子:XGBoost、LightGBM ├── Stacking (Stacked Generalization) │ ├── 层次结构 │ ├── 元学习器 │ └── 例子:多模型融合 └── 混合集成 ├── Bagging + Boosting ├── 模型融合 └── 异构集成1.3 集成学习在 Qlib 中的实现
1 2 3 4# Qlib 提供的集成模型 from qlib.contrib.model.double_ensemble import DEnsembleModel # Double Ensemble from qlib.contrib.model.gbdt import LGBModel # LightGBM (Boosting) from qlib.contrib.model.xgboost import XGBModel # XGBoost (Boosting)2. Bagging 方法
2.1 Bagging 原理
Bagging (Bootstrap Aggregating) 通过自助采样并行训练多个基学习器,然后通过投票或平均进行预测:
- 第09章-交易策略
第09章 交易策略
本章将深入探讨 Qlib 中的交易策略系统,包括策略类型、信号生成、订单执行、仓位管理等核心内容。
目录
1. 交易策略概述
1.1 交易策略的定义
交易策略是量化投资系统中连接预测模型和实际交易的桥梁:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ 模型预测 │ ──> │ 交易策略 │ ──> │ 订单执行 │ │ (score) │ │ (strategy) │ │ (orders) │ └─────────────┘ └─────────────┘ └─────────────┘ │ ├── 仓位管理 ├── 风险控制 ├── 订单生成 └── 交易执行1.2 策略分类
分类维度 类型 说明 示例 信号来源 基于模型 使用模型预测分数 TopkDropoutStrategy 基于规则 使用技术指标规则 EMA交叉策略 混合型 模型+规则 信号+过滤器 仓位管理 等权 所有股票等权重 Topk等权 按市值 按市值加权 市值加权组合 优化权重 风险优化 EnhancedIndexing 交易频率 低频 日度、周度 每日调仓 高频 分钟级 分钟级调仓 换手率 低换手 稳定持仓 每次调仓少量 高换手 频繁调仓 每次大幅调仓 1.3 Qlib 中的策略实现
1 2 3 4 5 6 7 8 9 10# Qlib 提供的策略类型 from qlib.contrib.strategy.signal_strategy import ( TopkDropoutStrategy, # Topk Dropout 策略 EnhancedIndexingStrategy, # 增强指数策略 ) from qlib.contrib.strategy.rule_strategy import ( TWAPStrategy, # TWAP 执行策略 SBBStrategyEMA, # SBB 策略 ACStrategy, # AC 策略 )2. 策略架构
2.1 策略基类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17from qlib.strategy.base import BaseStrategy class BaseStrategy(ABC): """策略基类""" @abstractmethod def generate_trade_decision(self, execute_result=None): """ 生成交易决策 参数: - execute_result: 上一次执行结果 返回: - TradeDecision: 交易决策对象 """ pass2.2 策略执行流程
策略执行流程: 1. 初始化 (reset) ├── 设置交易对象 (trade_exchange) ├── 设置交易日历 (trade_calendar) └── 初始化状态 2. 生成决策 (generate_trade_decision) ├── 获取预测信号 (signal) ├── 分析当前仓位 (position) ├── 生成目标仓位 (target_position) └── 生成订单列表 (order_list) 3. 执行订单 (execute) ├── 检查订单合法性 (check_order) ├── 订单撮合 (deal_order) └── 更新仓位 (update_position) 4. 重复步骤 2-32.3 核心组件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25from qlib.backtest.signal import Signal from qlib.backtest.decision import Order, TradeDecisionWO from qlib.backtest.position import Position from qlib.backtest.exchange import Exchange # 信号:模型预测 signal = Signal(df) # pd.DataFrame 或 pd.Series # 订单:交易指令 order = Order( stock_id="SH600000", amount=1000, start_time="2020-01-01", end_time="2020-01-01", direction=Order.BUY, # 买入 ) # 决策:订单集合 decision = TradeDecisionWO([order1, order2, ...], strategy) # 仓位:当前持仓 position = Position() # 交易所:执行环境 exchange = Exchange(...)3. 信号策略
3.1 信号策略基类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19from qlib.contrib.strategy.signal_strategy import BaseSignalStrategy class BaseSignalStrategy(BaseStrategy): """基于信号的策略基类""" def __init__( self, signal=None, # 信号对象 risk_degree=0.95, # 风险度 (仓位比例) trade_exchange=None, **kwargs ): super().__init__(**kwargs) self.signal = signal self.risk_degree = risk_degree def get_risk_degree(self, trade_step=None): """获取当前风险度""" return self.risk_degree3.2 创建信号
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18from qlib.backtest.signal import create_signal_from import pandas as pd # 方式1:从模型和数据集创建 signal = create_signal_from((model, dataset)) # 方式2:从 DataFrame 创建 signal_df = pd.DataFrame({ "score": pred_score # 预测分数 }) signal = create_signal_from(signal_df) # 方式3:从 Series 创建 signal_series = pd.Series(pred_score, index=stock_index) signal = create_signal_from(signal_series) # 方式4:从文件路径创建 signal = create_signal_from("path/to/signal.pkl")3.3 信号查询
1 2 3 4 5 6 7 8 9 10 11 12 13 14# 获取特定时间范围的信号 signal_start_time = "2020-01-01" signal_end_time = "2020-01-31" pred_score = signal.get_signal( start_time=signal_start_time, end_time=signal_end_time ) # 查看信号 print(pred_score.head()) # instrument datetime # SH600000 2020-01-01 0.0123 # SH600000 2020-01-02 0.0234 # ...4. Topk Dropout 策略
4.1 策略原理
Topk Dropout 策略是一个经典的多因子选股策略:
- 第10章-回测系统
第10章 回测系统
本章将深入探讨 Qlib 的回测系统架构,包括交易所模拟、订单执行、仓位管理、绩效评估等核心组件。
目录
1. 回测系统概述
1.1 回测的作用
回测系统是量化投资的核心基础设施,用于验证策略的有效性:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ 策略 │ -> │ 回测 │ -> │ 评估 │ │ Strategy │ │ Backtest │ │ Evaluate │ └─────────────┘ └─────────────┘ └─────────────┘ │ ├── 历史数据 ├── 交易所模拟 ├── 订单执行 ├── 仓位管理 └── 绩效计算1.2 回测系统组件
组件 职责 核心类 交易所 模拟市场环境 Exchange 执行器 执行交易决策 Executor 账户 管理资金和仓位 Account 仓位 记录持仓情况 Position 订单 交易指令 Order 决策 交易决策 TradeDecision 报告 绩效分析 Indicator, Report 1.3 Qlib 回测流程
回测流程: 1. 初始化 ├── 创建 Exchange (交易所) ├── 创建 Executor (执行器) ├── 创建 Account (账户) └── 创建 Strategy (策略) 2. 回测循环 for each trading day: a. 策略生成决策 strategy.generate_trade_decision() b. 执行器处理决策 executor.collect_data() c. 交易所撮合订单 exchange.deal_order() d. 更新账户仓位 account.update_bar_end() e. 记录绩效数据 account.get_portfolio_metrics() 3. 生成报告 ├── 计算收益率曲线 ├── 计算风险指标 └── 生成分析报告2. 回测架构
2.1 整体架构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29""" Qlib 回测系统架构 ┌─────────────────────────────────────────────────────────┐ │ 回测主循环 │ │ (backtest_loop / collect_data_loop) │ └─────────────────────────────────────────────────────────┘ │ ┌───────────────┼───────────────┐ │ │ │ ▼ ▼ ▼ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ Strategy │ │ Executor │ │ Exchange │ │ (策略层) │ │ (执行层) │ │ (市场层) │ ├──────────────┤ ├──────────────┤ ├──────────────┤ │ - 生成决策 │ │ - 处理决策 │ │ - 撮合订单 │ │ - 调整仓位 │ │ - 管理账户 │ │ - 计算成本 │ │ - 风险控制 │ │ - 更新仓位 │ │ - 涨跌停 │ └──────────────┘ └──────┬───────┘ └──────────────┘ │ ▼ ┌──────────────┐ │ Account │ │ (账户层) │ ├──────────────┤ │ - 资金管理 │ │ - 仓位管理 │ │ - 绩效记录 │ └──────────────┘2.2 分层执行
Qlib 支持多层执行,可以处理不同时间粒度的策略:
- 第11章-回测实战
第11章 回测实战
本章将通过完整实例演示如何使用 Qlib 进行策略回测,包括端到端工作流、结果分析、参数优化等内容。
目录
1. 回测实战概述
1.1 实战目标
通过本章学习,你将能够:
- ✅ 掌握完整的回测工作流
- ✅ 使用代码和配置文件两种方式进行回测
- ✅ 分析回测结果并生成报告
- ✅ 进行参数优化和策略对比
- ✅ 处理回测中的常见问题
1.2 回测步骤概览
┌─────────────────────────────────────────────────────────────┐ │ 完整回测流程 │ ├─────────────────────────────────────────────────────────────┤ │ 1. 初始化 Qlib │ │ 2. 准备数据 (特征、标签) │ │ 3. 训练模型 │ │ 4. 配置策略 (信号生成、仓位管理) │ │ 5. 配置回测环境 (交易所、账户、执行器) │ │ 6. 运行回测 │ │ 7. 分析结果 │ │ 8. 优化参数 │ └─────────────────────────────────────────────────────────────┘2. 端到端回测流程
2.1 完整流程代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103import qlib from qlib.data import D from qlib.data.dataset import DatasetH from qlib.contrib.model.gbdt import LGBModel from qlib.contrib.strategy.signal_strategy import TopkDropoutStrategy from qlib.backtest import backtest, executor from qlib.backtest.executor import SimulatorExecutor from qlib.backtest.exchange import Exchange from qlib.contrib.evaluate import risk_analysis import pandas as pd # ==================== 1. 初始化 ==================== qlib.init(provider_uri="~/.qlib/qlib_data/cn_data", region="cn") # ==================== 2. 准备数据 ==================== market = "csi300" benchmark = "SH000300" # 数据时间配置 data_handler_config = { "start_time": "2008-01-01", "end_time": "2020-08-01", "fit_start_time": "2008-01-01", "fit_end_time": "2014-12-31", "instruments": market, } # 创建数据集 dataset = DatasetH( handler={ "class": "Alpha158", "module_path": "qlib.contrib.data.handler", "kwargs": data_handler_config, }, segments={ "train": ("2008-01-01", "2014-12-31"), "valid": ("2015-01-01", "2016-12-31"), "test": ("2017-01-01", "2020-08-01"), } ) # ==================== 3. 训练模型 ==================== model = LGBModel( loss="mse", colsample_bytree=0.8879, learning_rate=0.2, subsample=0.8789, lambda_l1=205.6999, lambda_l2=580.9768, max_depth=8, num_leaves=210, num_threads=20, ) model.fit(dataset) # ==================== 4. 配置策略 ==================== strategy = TopkDropoutStrategy( signal=(model, dataset), topk=50, n_drop=5, ) # ==================== 5. 配置回测环境 ==================== exchange = Exchange( freq="day", start_time="2017-01-01", end_time="2020-08-01", codes=market, deal_price="close", limit_threshold=0.095, open_cost=0.0005, close_cost=0.0015, min_cost=5, ) executor = SimulatorExecutor( time_per_step="day", trade_exchange=exchange, generate_portfolio_metrics=True, ) # ==================== 6. 运行回测 ==================== portfolio_dict, indicator_dict = backtest( start_time="2017-01-01", end_time="2020-08-01", trade_strategy=strategy, trade_executor=executor, ) # ==================== 7. 分析结果 ==================== portfolio_metrics = portfolio_dict["day"][0] report = risk_analysis(portfolio_metrics) print("\n" + "="*50) print("回测结果") print("="*50) print(f"累计收益率: {report['cumulative_return']:.2%}") print(f"年化收益率: {report['annual_return']:.2%}") print(f"波动率: {report['volatility']:.2%}") print(f"夏普比率: {report['sharpe_ratio']:.4f}") print(f"最大回撤: {report['max_drawdown']:.2%}") print(f"信息比率: {report['information_ratio']:.4f}")2.2 使用 workflow 模块
Qlib 提供了更高级的 workflow 模块简化回测流程:
- 第12章-在线服务系统
第12章 在线服务系统
本章将深入探讨 Qlib 的在线服务系统,包括模型部署、滚动训练、实时预测、动态更新等核心内容。
目录
1. 在线服务概述
1.1 为什么需要在线服务
在量化投资的实际应用中,模型需要持续运行并定期更新:
需求 说明 Qlib 解决方案 模型持久化 保存训练好的模型 R.save_objects(model=model)模型更新 根据新数据重新训练 定期滚动训练 实时预测 每日/每周生成最新预测 OnlineManager.get_prediction() 策略管理 动态调整策略权重 根据市场状态切换策略 多版本管理 同时管理多个模型版本 版本控制和 A/B 测试 1.2 在线服务 vs 离线回测
维度 离线回测 在线服务 数据 历史快照 实时增量更新 训练 一次性训练 增量训练 预测 批量预测 实时流式预测 部署 本地执行 远程服务调用 更新 手动触发 自动滚动更新 监控 日志文件 实时监控 1.3 在线服务组件
┌─────────────────────────────────────────┐ │ ┌─────────────┐ ┌───────────────┐ │ RollingOnlineManager │ OnlineManager │ Trainer │ │ (滚动管理) │ (在线管理) │ (模型训练) │ └─────────────────────────────────────────┘ │ │ │ │ ▼ ▼ ▼ 模型服务 策略更新 数据管理2. 系统架构
2.1 整体架构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18""" Qlib 在线服务系统架构 ┌─────────────────────────────────────────────────┐ │ 前端 (Frontend) │ │ ┌───────────────┤ │ │ │ API Server │ │ ├──────────────┤ │ │ │ 模型部署 Model Serving │ │ 策略更新 Strategy Update │ │ 数据管理 Data Management │ │ └──────────────┘ │ └─────────────────────────────────────────────┘ """ from qlib.contrib.online_srv.rolling_online_management import RollingOnlineManager from qlib.contrib.online_srv.online_manager import OnlineManager from qlib.contrib.online_srv.trainer import Trainer2.2 核心组件职责
组件 职责 说明 RollingOnlineManager 管理滚动训练流程 协调训练任务、版本管理 OnlineManager 管理在线服务 接收预测请求、返回预测结果 Trainer 执行模型训练 支持分布式训练 RollingStrategy 实现滚动策略 为在线服务提供接口 DataManager 管理数据加载和缓存 高效数据访问 3. 滚动训练流程
3.1 流程概览
滚动训练流程: 数据准备 ↓ ┌─────────→ 基础数据 (日K线) ├──────────────→─┤ 特征计算 (滚动窗口) │ │ ┌─┤ ├──────────────→ 数据加载 │ │ │ └───────────┘ │ │ │ │ │ ▼ │ │ ▼ ↓ │ ┌─────────→ 滚动窗口 (20/40/60天) │ ┌───────────────┤ │ │ ┌───────────────┤ │ │ │ │ └──────────────┘ │ ▼ ▼ │ │ │ │ │ │ ▼ ↓ │ └────────────┘ │ │ │ │ │ │ │ │ └───────────────┘ │ │ │ │ │ │ ↓ ┌─────────→ 模型训练 (LightGBM/GRU/LSTM) │ ┌───────────────┤ │ │ │ │ │ └───────────────┘ │ │ │ │ │ │ │ │ │ ↓ ┌───────────────┤ │ │ │ │ └───────────────┘ │ │ │ │ │ │ ▼ ▼ │ │ ▼ ↓ ┌───────────────┤ ▼ │ │ │ │ │ │ │ │ ↓ ┌───────────────┤ │ │ │ │ │ │ │ │ ▼ ▼ │ │ │ ▼ │ ↓ ┌───────────────┘ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ↓ ┌───────────────┤ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ 模型部署 (Serve) ↓ ┌─────────────────────────────────────────────┘3.2 滚动窗口策略
1 2 3 4 5 6 7 8 9 10 11 12 13# 滚动窗口配置 rolling_config = { "rolling_step": 550, # 滚动步长 "lookback_days": 20, # 向前看20天 "train_start": "2015-01-01", # 训练开始 "train_end": "2019-12-31", # 训练结束 "feature_cols": ["alpha", "close", "high", "low", "volume", "vwap", "factor"], # 特征列 } # 滚动间隔 (每天) # rolling_step = 1 (每天训练一次) # 滚动间隔 (每周) # rolling_step = 5 (每周训练一次)3.3 数据分段策略
1 2 3 4 5 6 7 8 9 10 11 12 13# 训练集、验证集、测试集 # 使用不同时间段进行数据分割 train_config = { "segments": { "train": ("2010-01-01", "2015-12-31"), # 5年数据 "valid": ("2016-01-01", "2017-12-31"), # 1年验证 "test": ("2018-01-01", "2020-12-31"), # 2年测试 } } # 滚动训练时使用第一个训练集 # 每次滚动训练时使用新的数据4. 模型部署
4.1 模型服务架构
1 2 3 4 5 6 7 8 9 10 11 12from qlib.contrib.model.pytorch_nn import DNNModelPytorch import torch # 模型服务类 class ModelServer: def __init__(self, model_path): self.model = torch.load(model_path) self.model.eval() def predict(self, features): with torch.no_grad(): return self.model(torch.tensor(features))4.2 Flask API 示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21from flask import Flask, request, jsonify import torch app = Flask(__name__) @app.route('/predict', methods=['POST']) def predict(): # 获取特征 data = request.json.get('features') # 模型推理 model = get_model() predictions = model.predict(data) return jsonify({ 'predictions': predictions.tolist() }) if __name__ == '__main__': # 加载模型 app.run(host='0.0.0.0', port=5000)5. 实时预测
5.1 OnlineManager 使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23from qlib.contrib.online_srv.online_manager import OnlineManager # 创建在线管理器 online_manager = OnlineManager() # 准备模型服务 # 1. 训练模型 model = LGBModel(loss="mse", learning_rate=0.05, n_estimators=100) model.fit(dataset) # 2. 部署模型 online_manager.reset() online_manager.add_model("lgb_model", model, is_default=True) # 3. 获取预测 pred = online_manager.get_prediction( start_time="2020-01-01", end_time="2020-01-31", model_name="lgb_model", instrument="csi300", ) print(f"预测结果:\n{pred.head()}")5.2 批量预测
1 2 3 4 5 6 7 8 9 10 11 12# 获取多只股票的预测 instruments = ["SH600000", "SH600519", "SZ000001"] predictions = online_manager.get_prediction( start_time="2020-01-01", end_time="2020-01-31", model_name="lgb_model", instrument=instruments, # 批量预测 ) for inst, pred in zip(instruments, predictions): print(f"{inst}: {pred}")5.3 实时预测服务部署
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29from flask import Flask from qlib.contrib.online_srv.online_manager import OnlineManager app = Flask(__name__) # 初始化在线管理器 online_manager = OnlineManager() # 加载模型 online_manager.reset() online_manager.add_model("main_model", model, is_default=True) @app.route('/api/predict', methods=['POST']) def predict(): data = request.json.get('data') # 调用在线管理器预测 predictions = online_manager.get_prediction( start_time=data['start_time'], end_time=data['end_time'], model_name="main_model", instrument=data['instrument'] ) return jsonify({'predictions': predictions.tolist()}) if __name__ == '__main__': # 启动服务 (端口 5001) app.run(debug=True)6. 策略更新
6.1 动态权重调整
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20from qlib.contrib.online_srv.online_manager import OnlineManager # 创建在线管理器 online_manager = OnlineManager() # 获取当前策略 strategy = online_manager.get_strategy("main_strategy") # 更新权重 new_weights = { "SH600000": 0.5, # 降低权重 "SH600519": 0.3, # 提高权重 "SZ000001": 0.2, # 新增股票 } # 更新策略 online_manager.update_strategy_weights( strategy_name="main_strategy", weights=new_weights )6.2 策略切换
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19# 准备两个策略 online_manager.reset() online_manager.add_model("lgb_model", model1, is_default=True) online_manager.add_model("gru_model", model2, is_default=False) # 设置策略A online_manager.update_strategy_weights( strategy_name="strategy_a", weights={"stock1": 0.6, "stock2": 0.4} ) # 设置策略B (默认) # online_manager.set_default_strategy("strategy_b") # 在特定日期切换策略 online_manager.update_strategy_weights( strategy_name="strategy_b", weights={"stock1": 0.3, "stock2": 0.7} # 调整策略B )6.3 版本管理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25# 模型版本控制 version_1 = "v1.0.0" # 当前生产版本 version_2 = "v1.1.0" # 新模型版本 # 部署新模型 online_manager.add_model( model_name="lgb_v2", model=model_v2, is_default=False # 不设为默认 ) # A/B 测试 # 创建测试版本 online_manager.add_model( model_name="lgb_v2_test", model=model_v2_test, is_default=False ) # 上线 if test_successful: online_manager.update_strategy_weights( strategy_name="main_strategy", weights={"lgb_v2": 0.5, "lgb_v2_test": 0.5} # 测试版权重50% )7. 数据管理
7.1 DataManager
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23from qlib.contrib.data.dataset import DatasetH from qlib.contrib.data.handler import Alpha158 # 数据加载器 data_loader = DataManager( provider_uri="~/.qlib/qlib_data/cn_data", region="cn", ) # 加载数据 dataset = DatasetH( handler={ "class": "Alpha158", "module_path": "qlib.contrib.data.handler", "kwargs": { "train": ("2010-01-01", "2020-12-31"), "valid": ("2017-01-01", "2018-12-31"), "test": ("2019-01-01", "2020-12-31"), "filter": True, "freq": "day" } } )7.2 数据缓存
1 2 3 4 5 6 7 8 9# Qlib 自动缓存常用数据 cache_config = { "enable": True, "max_size": 10, # 最大缓存数 "ttl": 3600 * 24, # 缓存时间 (1小时) } qlib.init(provider_uri="~/.qlib/qlib_data/cn_data", region="cn", **cache_config)7.3 数据预处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18from qlib.data.dataset.processor import Processor # 定义数据处理器 processor_config = { "class": "Filter", "module_path": "qlib.contrib.data.dataset.processor", "kwargs": { "filter_expression": "$volume > 1000000", # 过滤成交量 "dropna": True, # 删除NaN值 } } # 应用到数据集 dataset = DatasetH( handler=Alpha158, processor=processor_config, ... )8. 监控与日志
8.1 日志系统
1 2 3 4 5 6 7 8 9 10 11 12 13 14import logging # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # 使用日志 logger.info("开始训练模型") logger.warning("检测到异常数据") logger.error("模型训练失败")8.2 性能监控
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19import psutil import time # 监控函数 def monitor_performance(func): def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) elapsed = time.time() - start_time if elapsed > 1.0: # 超过1秒 logger.warning(f"{func.__name__} 执行时间: {elapsed:.2f}秒") return result @monitor_performance def train_model(model, dataset): return model.fit(dataset)8.3 实验跟踪
1 2 3 4 5 6 7 8 9from mlflow import log_metric # 使用 MLflow 记录实验 with mlflow.start_run(): log_param({"learning_rate": 0.01}) model.fit(dataset) log_metric({"test_ic": 0.05})9. 实践案例
9.1 完整的在线服务案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88""" 完整的在线服务系统案例 包括:滚动训练、模型部署、实时预测、策略更新 """ import qlib from qlib.data import D from qlib.contrib.data.dataset import DatasetH from qlib.contrib.model.gbdt import LGBModel from qlib.contrib.strategy.signal_strategy import TopkDropoutStrategy from qlib.backtest import backtest, executor from qlib.contrib.online_srv.rolling_online_management import RollingOnlineManager from qlib.contrib.evaluate import risk_analysis # ==================== 1. 初始化 ==================== print("="*60) print("Qlib 在线服务系统实战案例") print("="*60) qlib.init( provider_uri="~/.qlib/qlib_data/cn_data", region="cn" ) # ==================== 2. 准备数据 ==================== print("\n[1/2] 准备数据...") market = "csi300" benchmark = "SH000300" dataset = DatasetH( handler={ "class": "Alpha158", "module_path": "qlib.contrib.data.handler", "kwargs": { "train": ("2015-01-01", "2018-12-31"), "valid": ("2019-01-01", "2018-12-31"), "test": ("2020-01-01", "2021-12-31"), } } ) # ==================== 3. 训练基模型 ==================== print("\n[3/3] 训练基模型...") model_lgb = LGBModel( loss="mse", learning_rate=0.05, num_leaves=31, max_depth=6, n_estimators=100, ) model_lgb.fit(dataset) print("基线模型训练完成") # ==================== 4. 滚动训练模型 ==================== print("\n[4/4] 滚动训练模型...") # 配置滚动参数 rolling_config = { "rolling_step": 550, # 滚动步长 "lookback_days": 20, "train_start": "2015-01-01", "train_end": "2019-12-31", } # 创建滚动数据处理器 from qlib.contrib.data.handler import RollingHandler handler = RollingHandler( rolling_config=rolling_config, dataset=dataset, ) # 滚动训练 (参考 3.2 节) # 这里简化,实际需要使用 RollingOnlineManager # ... # ==================== 5. 部署模型服务 ==================== print("\n[5/5] 部署模型服务...") # 示例:参考模型部署章节 # ==================== 6. 启动在线服务 ==================== print("\n[6/7] 启动在线服务...") # 创建在线管理器并启动服务 # 参考 5.3 节 print("在线服务已启动")9.2 滚动更新实战
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23from datetime import datetime, timedelta # 获取当前预测 pred = online_manager.get_prediction( start_time=datetime.now(), end_time=datetime.now() + timedelta(days=1), model_name="lgb_model", instrument="csi300", ) print("预测结果:") print(pred) # 检查预测质量 from scipy.stats import pearsonr # 获取历史收益率 returns = ... # 从数据库查询 # 计算 IC ic, p_value = pearsonr(pred, returns) print(f"预测 IC: {ic:.4f}")10. 习题
基础题
简述在线服务系统的主要组件和功能。
- 第13章-强化学习
第13章:强化学习
强化学习(Reinforcement Learning, RL)是机器学习领域的重要分支,通过智能体与环境的交互来学习最优策略。Qlib提供了完善的强化学习框架,支持将量化投资问题建模为RL任务,并提供了多种RL算法和网络架构。
13.1 强化学习概述
13.1.1 RL基本概念
强化学习包含以下核心要素:
1. 智能体(Agent)
- 决策主体,负责观察环境状态并采取行动
- 在量化投资中,Agent可以是交易策略、模型调优器等
2. 环境(Environment)
- Agent交互的外部系统
- 提供状态、接收动作、返回奖励
- 在Qlib中,Environment可以是市场、回测系统等
3. 状态(State)
- 描述当前情况的信息
- 在量化中:市场数据、持仓、资金状态等
4. 动作(Action)
- Agent可以执行的操作
- 在量化中:买入、卖出、调仓、参数调整等
5. 奖励(Reward)
- 衡量动作好坏的反馈信号
- 在量化中:收益、风险调整后收益、信息比率等
6. 策略(Policy)
- 从状态到动作的映射规则
- 表示为π(a|s):在状态s下采取动作a的概率
7. 价值函数(Value Function)
- 评估状态或状态-动作对的价值
- V(s):状态s的期望回报
- Q(s,a):在状态s下采取动作a的期望回报
13.1.2 Qlib中的RL应用场景
Qlib中的强化学习主要用于两个场景:
1. 超参数优化(QlibTuner)
- 将模型超参数调优建模为RL问题
- Agent选择超参数组合,Environment训练模型并返回性能指标
- 自动寻找最优模型配置
2. 策略优化(RL Strategy)
- 将交易决策建模为RL问题
- Agent根据市场状态选择交易动作
- 直接优化投资目标
13.1.3 RL算法分类
基于价值的方法(Value-based)
- DQN(Deep Q-Network)
- Double DQN
- Dueling DQN
基于策略的方法(Policy-based)
- REINFORCE
- Actor-Critic
- PPO(Proximal Policy Optimization)
结合方法
- 第14章-高级数据处理
第14章:高级数据处理
在量化投资中,数据质量直接影响模型效果。Qlib提供了完善的数据处理框架,支持多频率数据融合、缺失值处理、异常值检测、因子标准化等高级功能。本章将详细介绍这些技术的原理和应用。
14.1 多频率数据处理
14.1.1 多频率数据概述
量化投资中常用的数据频率包括:
Tick级别:逐笔交易数据
- 买卖单流、成交价、成交量
- 最高精度,但数据量大、噪声多
分钟级:1min、5min、15min、30min、60min
- 日内交易、高频策略
- 平衡精度与计算成本
日频:日线数据
- 最常用频率
- 适合中长期策略
周频/月频:周线、月线
- 长期趋势分析
- 降低噪声影响
14.1.2 频率转换
Qlib提供了灵活的频率转换工具:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76import pandas as pd import numpy as np from qlib.data import D # 初始化Qlib qlib.init(provider_uri="~/.qlib/qlib_data/cn_data", region="cn") # 1. 升频(低频到高频) def upsampling_data(df, target_freq='1min'): """ 升频:将低频数据转换为高频数据 常用方法: - Forward Fill (ffill): 前值填充 - Linear Interpolation: 线性插值 - Spline Interpolation: 样条插值 """ if target_freq == '1min': # 转换为分钟级 df_upsampled = df.resample('1min').ffill() # 前值填充 # 或使用插值 # df_upsampled = df.resample('1min').interpolate(method='linear') return df_upsampled # 2. 降频(高频到低频) def downsampling_data(df, target_freq='1d'): """ 降频:将高频数据聚合为低频数据 常用聚合方法: - first: 周期第一个值(开盘价) - last: 周期最后一个值(收盘价) - max: 周期最大值(最高价) - min: 周期最小值(最低价) - sum: 周期总和(成交量) - mean: 周期平均 """ if target_freq == '1d': # 日线数据 df_downsampled = df.resample('1D').agg({ 'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum', 'amount': 'sum', }) elif target_freq == '1w': # 周线数据 df_downsampled = df.resample('1W').agg({ 'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum', 'amount': 'sum', }) return df_downsampled # 3. 使用Qlib进行频率转换 instruments = D.instruments(market='csi300') # 获取日频数据 daily_data = D.features( instruments, ['$open', '$high', '$low', '$close', '$volume'], start_time='2020-01-01', end_time='2020-12-31', freq='day' ) # 转换为周频 weekly_data = daily_data.resample('1W').last()14.1.3 多频率特征融合
将不同频率的特征结合使用:
- 第15章-完整案例-多因子选股
第15章:完整案例-多因子选股
多因子选股是量化投资中最经典的应用之一。本章将带你通过一个完整的实战案例,从数据准备到策略回测,全面掌握使用Qlib构建多因子选股策略的全流程。
15.1 案例概述
15.1.1 策略思路
核心思想:通过多个维度的因子(估值、成长、质量、动量等)综合评估股票,选出综合得分最高的股票组合。
策略特点:
- 多因子模型:结合多个维度因子
- 风险分散:持有30-50只股票
- 定期调仓:每月调仓一次
- 风险控制:行业中性、市值中性
15.1.2 技术路线
1. 数据准备 → 2. 因子构建 → 3. 因子检验 → 4. 模型训练 → 5. 回测评估 → 6. 参数优化 → 7. 实盘部署15.1.3 案例数据
- 股票池:中证500成分股
- 时间范围:2010-2020
- 数据频率:日频
- 因子库:自定义20+因子
15.2 数据准备
15.2.1 初始化Qlib
1 2 3 4 5 6 7 8 9 10 11 12 13import qlib from qlib.data import D from qlib.constant import REG_CN # 初始化Qlib qlib.init( provider_uri="~/.qlib/qlib_data/cn_data", # 数据路径 region=REG_CN, # 区域:中国 redis_cache=None, # Redis缓存(可选) expression_cache=None, # 表达式缓存(可选) ) print("Qlib initialized successfully!")15.2.2 获取股票池
1 2 3 4 5 6 7# 获取中证500成分股 instruments = D.instruments(market="csi500") print(f"Total stocks: {len(instruments)}") print(f"First 10 stocks: {instruments[:10]}") # 或者自定义股票池 # instruments = ["SH600000", "SH600519", "SZ000001", ...]15.2.3 准备市场数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34import pandas as pd # 定义时间范围 train_period = ("2010-01-01", "2016-12-31") valid_period = ("2017-01-01", "2018-12-31") test_period = ("2019-01-01", "2020-12-31") # 检查数据可用性 def check_data_availability(instruments, start_time, end_time): """检查数据完整性""" print("Checking data availability...") # 获取基本价格数据 fields = ['$open', '$high', '$low', '$close', '$volume', '$turnover'] df = D.features( instruments, fields, start_time=start_time, end_time=end_time, freq='day' ) # 统计 total_expected = len(instruments) * len(pd.date_range(start_time, end_time, freq='B')) total_actual = df.count().sum() coverage = total_actual / total_expected * 100 print(f"Data coverage: {coverage:.2f}%") print(f"Total data points: {total_actual}/{total_expected}") return df # 检查训练集数据 train_data = check_data_availability(instruments, *train_period)15.3 因子构建
15.3.1 因子分类
我们将构建以下几类因子:
- 第16章-完整案例-择时策略
第16章:完整案例-择时策略
择时策略(Market Timing)是指通过预测市场走势来调整仓位,在牛市中增持、熊市中减仓,以期获得超越基准的收益。本章将带你构建一个完整的择时策略,从信号生成到风险控制,全面掌握Qlib在择时中的应用。
16.1 案例概述
16.1.1 策略思路
核心思想:基于市场宏观指标和技术指标,预测市场未来走势,动态调整仓位。
策略类型:
- 趋势跟踪:识别并跟随市场趋势
- 均值回归:在市场偏离均值时反向操作
- 波动率择时:根据波动率调整仓位
- 宏观择时:基于宏观经济指标调整仓位
实现方式:
- 指数择时:预测指数涨跌,调整ETF仓位
- 行业轮动:根据行业强弱进行轮动
- 风险平价:根据风险水平调整资产配置
16.1.2 技术框架
市场信号生成 → 仓位决策模型 → 风险控制 → 执行交易 → 绩效评估16.1.3 案例目标
- 构建多指标择时模型
- 实现动态仓位管理
- 风险控制机制
- 回测与评估
16.2 市场信号构建
16.2.1 技术指标
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240import pandas as pd import numpy as np from qlib.data import D class MarketTechnicalIndicators: """ 市场技术指标 用于预测市场走势 """ def __init__(self, market_index="SH000300"): # 沪深300 self.market_index = market_index def get_market_data(self, start_time, end_time): """获取市场数据""" data = D.features( [self.market_index], ['$open', '$high', '$low', '$close', '$volume', '$turnover'], start_time=start_time, end_time=end_time, freq='day' ) # 展平MultiIndex data.columns = data.columns.droplevel(0) return data def calculate_ma(self, data, periods=[5, 10, 20, 60]): """ 移动平均线 Args: data: 市场数据 periods: 周期列表 Returns: MA指标DataFrame """ ma_df = pd.DataFrame(index=data.index) for period in periods: ma_df[f'MA{period}'] = data['$close'].rolling(period).mean() return ma_df def calculate_ema(self, data, periods=[12, 26]): """ 指数移动平均 Args: data: 市场数据 periods: 周期列表 Returns: EMA指标DataFrame """ ema_df = pd.DataFrame(index=data.index) for period in periods: ema_df[f'EMA{period}'] = data['$close'].ewm(span=period).mean() return ema_df def calculate_macd(self, data, fast=12, slow=26, signal=9): """ MACD指标 Args: data: 市场数据 fast: 快线周期 slow: 慢线周期 signal: 信号线周期 Returns: MACD, Signal, Histogram """ ema_fast = data['$close'].ewm(span=fast).mean() ema_slow = data['$close'].ewm(span=slow).mean() macd = ema_fast - ema_slow signal_line = macd.ewm(span=signal).mean() histogram = macd - signal_line return pd.DataFrame({ 'MACD': macd, 'Signal': signal_line, 'Histogram': histogram, }, index=data.index) def calculate_rsi(self, data, period=14): """ RSI相对强弱指标 Args: data: 市场数据 period: 周期 Returns: RSI值 """ delta = data['$close'].diff() gain = delta.where(delta > 0, 0) loss = -delta.where(delta < 0, 0) avg_gain = gain.rolling(period).mean() avg_loss = loss.rolling(period).mean() rs = avg_gain / (avg_loss + 1e-8) rsi = 100 - (100 / (1 + rs)) return pd.DataFrame({'RSI': rsi}, index=data.index) def calculate_bollinger_bands(self, data, period=20, std_dev=2): """ 布林带 Args: data: 市场数据 period: 周期 std_dev: 标准差倍数 Returns: 上轨、中轨、下轨 """ sma = data['$close'].rolling(period).mean() std = data['$close'].rolling(period).std() upper_band = sma + std_dev * std lower_band = sma - std_dev * std return pd.DataFrame({ 'BB_Upper': upper_band, 'BB_Middle': sma, 'BB_Lower': lower_band, }, index=data.index) def calculate_kdj(self, data, period=9): """ KDJ指标 Args: data: 市场数据 period: 周期 Returns: K, D, J值 """ low_min = data['$low'].rolling(period).min() high_max = data['$high'].rolling(period).max() rsv = (data['$close'] - low_min) / (high_max - low_min + 1e-8) * 100 k = rsv.ewm(alpha=1/3).mean() d = k.ewm(alpha=1/3).mean() j = 3 * k - 2 * d return pd.DataFrame({ 'K': k, 'D': d, 'J': j, }, index=data.index) def calculate_atr(self, data, period=14): """ ATR平均真实波幅 Args: data: 市场数据 period: 周期 Returns: ATR值 """ high_low = data['$high'] - data['$low'] high_close = np.abs(data['$high'] - data['$close'].shift()) low_close = np.abs(data['$low'] - data['$close'].shift()) true_range = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1) atr = true_range.rolling(period).mean() return pd.DataFrame({'ATR': atr}, index=data.index) def calculate_volume_profile(self, data, period=20): """ 成交量指标 Args: data: 市场数据 period: 周期 Returns: 成交量相关指标 """ volume_ma = data['$volume'].rolling(period).mean() volume_ratio = data['$volume'] / volume_ma # OBV(能量潮) price_change = data['$close'].diff() obv_direction = np.sign(price_change).fillna(0) obv = (obv_direction * data['$volume']).cumsum() return pd.DataFrame({ 'Volume_MA': volume_ma, 'Volume_Ratio': volume_ratio, 'OBV': obv, }, index=data.index) def generate_all_signals(self, start_time, end_time): """ 生成所有技术指标信号 Returns: 完整的技术指标DataFrame """ # 获取市场数据 data = self.get_market_data(start_time, end_time) # 计算各指标 ma = self.calculate_ma(data) ema = self.calculate_ema(data) macd = self.calculate_macd(data) rsi = self.calculate_rsi(data) bb = self.calculate_bollinger_bands(data) kdj = self.calculate_kdj(data) atr = self.calculate_atr(data) volume = self.calculate_volume_profile(data) # 合并所有指标 all_signals = pd.concat([ data, ma, ema, macd, rsi, bb, kdj, atr, volume ], axis=1) return all_signals # 使用示例 indicator = MarketTechnicalIndicators(market_index="SH000300") signals = indicator.generate_all_signals("2010-01-01", "2020-12-31") print(signals.head())16.2.2 信号生成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191class MarketTimingSignalGenerator: """ 择时信号生成器 基于技术指标生成择时信号 """ def __init__(self, signals_df): self.signals_df = signals_df def generate_ma_signal(self, fast=5, slow=20): """ MA交叉信号 规则: - 快线上穿慢线:买入信号 - 快线下穿慢线:卖出信号 """ fast_ma = self.signals_df[f'MA{fast}'] slow_ma = self.signals_df[f'MA{slow}'] # 金叉(快线上穿慢线) golden_cross = (fast_ma > slow_ma) & (fast_ma.shift(1) <= slow_ma.shift(1)) # 死叉(快线下穿慢线) death_cross = (fast_ma < slow_ma) & (fast_ma.shift(1) >= slow_ma.shift(1)) # 信号:1买入,-1卖出,0持有 signal = pd.Series(0, index=self.signals_df.index) signal[golden_cross] = 1 signal[death_cross] = -1 return signal def generate_macd_signal(self): """ MACD信号 规则: - MACD上穿Signal:买入 - MACD下穿Signal:卖出 """ macd = self.signals_df['MACD'] signal_line = self.signals_df['Signal'] # 金叉 golden_cross = (macd > signal_line) & (macd.shift(1) <= signal_line.shift(1)) # 死叉 death_cross = (macd < signal_line) & (macd.shift(1) >= signal_line.shift(1)) signal = pd.Series(0, index=self.signals_df.index) signal[golden_cross] = 1 signal[death_cross] = -1 return signal def generate_rsi_signal(self, overbought=70, oversold=30): """ RSI信号 规则: - RSI > 70:超买,考虑卖出 - RSI < 30:超卖,考虑买入 """ rsi = self.signals_df['RSI'] signal = pd.Series(0, index=self.signals_df.index) # 超卖区域,买入信号 signal[rsi < oversold] = 1 # 超买区域,卖出信号 signal[rsi > overbought] = -1 return signal def generate_bollinger_signal(self): """ 布林带信号 规则: - 价格触及下轨:买入 - 价格触及上轨:卖出 """ close = self.signals_df['$close'] upper = self.signals_df['BB_Upper'] lower = self.signals_df['BB_Lower'] signal = pd.Series(0, index=self.signals_df.index) # 触及下轨 signal[close <= lower] = 1 # 触及上轨 signal[close >= upper] = -1 return signal def generate_kdj_signal(self, overbought=80, oversold=20): """ KDJ信号 规则: - K > D > 80:超买,卖出 - K < D < 20:超卖,买入 - J > 100:超买,卖出 - J < 0:超卖,买入 """ k = self.signals_df['K'] d = self.signals_df['D'] j = self.signals_df['J'] signal = pd.Series(0, index=self.signals_df.index) # 超买 signal[(k > d) & (k > overbought)] = -1 signal[j > 100] = -1 # 超卖 signal[(k < d) & (k < oversold)] = 1 signal[j < 0] = 1 return signal def generate_trend_signal(self, period=20): """ 趋势信号 基于价格与均线的相对位置判断趋势 """ close = self.signals_df['$close'] ma = self.signals_df[f'MA{period}'] signal = pd.Series(0, index=self.signals_df.index) # 价格在均线上方:上涨趋势 signal[close > ma] = 1 # 价格在均线下方:下跌趋势 signal[close < ma] = -1 return signal def generate_volatility_signal(self, period=20): """ 波动率信号 高波动率降低仓位,低波动率增加仓位 """ atr = self.signals_df['ATR'] close = self.signals_df['$close'] # 波动率(ATR/收盘价) volatility = atr / close # 标准化 vol_mean = volatility.rolling(period).mean() vol_std = volatility.rolling(period).std() vol_zscore = (volatility - vol_mean) / (vol_std + 1e-8) # 信号:高波动减仓,低波动增仓 signal = pd.Series(0, index=self.signals_df.index) signal[vol_zscore > 1] = -1 # 高波动 signal[vol_zscore < -1] = 1 # 低波动 return signal def combine_signals(self, signal_dict, weights=None): """ 组合多个信号 Args: signal_dict: 信号字典 {'name': signal_series} weights: 权重字典 Returns: 组合信号 """ if weights is None: # 等权重 weights = {name: 1.0 / len(signal_dict) for name in signal_dict.keys()} # 加权求和 combined = pd.Series(0.0, index=self.signals_df.index) for name, signal in signal_dict.items(): combined += signal * weights.get(name, 0) return combined16.3 仓位决策模型
16.3.1 信号映射到仓位
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83class PositionSizer: """ 仓位决策器 将择时信号转换为具体仓位 """ def __init__(self, method='linear'): """ Args: method: 仓位方法 - 'binary': 二值仓位(0或1) - 'linear': 线性仓位 - 'sigmoid': Sigmoid仓位 - 'discrete': 离散步进仓位 """ self.method = method def signal_to_position(self, signal, max_position=1.0, min_position=0.0): """ 信号转仓位 Args: signal: 信号Series(-1到1) max_position: 最大仓位 min_position: 最小仓位 Returns: 仓位Series """ if self.method == 'binary': # 二值:信号>0满仓,<0空仓 position = pd.Series(min_position, index=signal.index) position[signal > 0] = max_position elif self.method == 'linear': # 线性映射 position = (signal + 1) / 2 # [-1, 1] -> [0, 1] position = position * (max_position - min_position) + min_position position = position.clip(min_position, max_position) elif self.method == 'sigmoid': # Sigmoid映射 import math position = signal.apply(lambda x: 1 / (1 + math.exp(-x * 3))) position = position * (max_position - min_position) + min_position elif self.method == 'discrete': # 离散步进:0%, 25%, 50%, 75%, 100% position = pd.Series(0.0, index=signal.index) position[signal > 0.8] = 1.0 position[(signal > 0.4) & (signal <= 0.8)] = 0.75 position[(signal > 0) & (signal <= 0.4)] = 0.5 position[(signal > -0.4) & (signal <= 0)] = 0.25 position[signal <= -0.4] = 0.0 return position def apply_position_limit(self, position, max_daily_change=0.3): """ 仓位变化限制 Args: position: 目标仓位 max_daily_change: 每日最大仓位变化 Returns: 限制后的仓位 """ # 计算仓位变化 position_change = position.diff() # 限制变化幅度 position_change_limited = position_change.clip( -max_daily_change, max_daily_change ) # 应用限制 position_limited = position.shift(1).fillna(0) + position_change_limited position_limited = position_limited.clip(0, 1) return position_limited16.3.2 多信号融合
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105class MultiSignalTimingModel: """ 多信号择时模型 融合多个择时信号,生成最终仓位 """ def __init__( self, signal_generator, position_sizer, signal_config=None, ): """ Args: signal_generator: 信号生成器 position_sizer: 仓位决策器 signal_config: 信号配置 { 'signal_name': { 'enabled': True, 'weight': 0.2, } } """ self.signal_generator = signal_generator self.position_sizer = position_sizer self.signal_config = signal_config or self._default_config() def _default_config(self): """默认信号配置""" return { 'ma': {'enabled': True, 'weight': 0.3}, 'macd': {'enabled': True, 'weight': 0.2}, 'rsi': {'enabled': True, 'weight': 0.1}, 'bollinger': {'enabled': True, 'weight': 0.1}, 'kdj': {'enabled': True, 'weight': 0.1}, 'trend': {'enabled': True, 'weight': 0.1}, 'volatility': {'enabled': True, 'weight': 0.1}, } def generate_combined_signal(self): """生成组合信号""" signals = {} weights = {} # 生成各信号 if self.signal_config['ma']['enabled']: signals['ma'] = self.signal_generator.generate_ma_signal() weights['ma'] = self.signal_config['ma']['weight'] if self.signal_config['macd']['enabled']: signals['macd'] = self.signal_generator.generate_macd_signal() weights['macd'] = self.signal_config['macd']['weight'] if self.signal_config['rsi']['enabled']: signals['rsi'] = self.signal_generator.generate_rsi_signal() weights['rsi'] = self.signal_config['rsi']['weight'] if self.signal_config['bollinger']['enabled']: signals['bollinger'] = self.signal_generator.generate_bollinger_signal() weights['bollinger'] = self.signal_config['bollinger']['weight'] if self.signal_config['kdj']['enabled']: signals['kdj'] = self.signal_generator.generate_kdj_signal() weights['kdj'] = self.signal_config['kdj']['weight'] if self.signal_config['trend']['enabled']: signals['trend'] = self.signal_generator.generate_trend_signal() weights['trend'] = self.signal_config['trend']['weight'] if self.signal_config['volatility']['enabled']: signals['volatility'] = self.signal_generator.generate_volatility_signal() weights['volatility'] = self.signal_config['volatility']['weight'] # 组合信号 combined = self.signal_generator.combine_signals(signals, weights) return combined, signals def generate_position(self, apply_limit=True, max_daily_change=0.3): """ 生成目标仓位 Args: apply_limit: 是否应用仓位变化限制 max_daily_change: 每日最大仓位变化 Returns: 仓位Series """ # 生成组合信号 combined_signal, individual_signals = self.generate_combined_signal() # 转换为仓位 position = self.position_sizer.signal_to_position(combined_signal) # 应用仓位限制 if apply_limit: position = self.position_sizer.apply_position_limit( position, max_daily_change=max_daily_change ) return position, individual_signals16.4 风险控制
16.4.1 止损止盈
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96class RiskController: """ 风险控制器 实现止损、止盈等功能 """ def __init__( self, stop_loss=None, stop_profit=None, max_drawdown=None, ): """ Args: stop_loss: 止损比例(如0.05表示-5%止损) stop_profit: 止盈比例(如0.15表示+15%止盈) max_drawdown: 最大回撤限制 """ self.stop_loss = stop_loss self.stop_profit = stop_profit self.max_drawdown = max_drawdown def check_stop_loss(self, current_value, entry_value): """ 检查止损 Args: current_value: 当前净值 entry_value: 入场净值 Returns: 是否触发止损 """ if self.stop_loss is None: return False pnl = (current_value - entry_value) / entry_value return pnl < -self.stop_loss def check_stop_profit(self, current_value, entry_value): """ 检查止盈 Args: current_value: 当前净值 entry_value: 入场净值 Returns: 是否触发止盈 """ if self.stop_profit is None: return False pnl = (current_value - entry_value) / entry_value return pnl > self.stop_profit def check_max_drawdown(self, equity_curve): """ 检查最大回撤 Args: equity_curve: 净值曲线 Returns: 是否触发最大回撤限制 """ if self.max_drawdown is None: return False # 计算当前回撤 drawdown = (equity_curve / equity_curve.cummax() - 1).iloc[-1] return drawdown < -self.max_drawdown def adjust_position_for_risk(self, target_position, equity_curve): """ 根据风险调整仓位 Args: target_position: 目标仓位 equity_curve: 净值曲线 Returns: 调整后的仓位 """ adjusted = target_position.copy() # 检查最大回撤 if self.check_max_drawdown(equity_curve): # 减半仓位 adjusted = adjusted * 0.5 return adjusted16.4.2 波动率目标
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54class VolatilityTarget: """ 波动率目标 根据目标波动率动态调整仓位 """ def __init__(self, target_volatility=0.15, lookback=20): """ Args: target_volatility: 目标年化波动率 lookback: 历史波动率计算窗口 """ self.target_volatility = target_volatility self.lookback = lookback def calculate_realized_volatility(self, returns): """ 计算已实现波动率 Args: returns: 收益率Series Returns: 年化波动率 """ daily_vol = returns.rolling(self.lookback).std() annualized_vol = daily_vol * np.sqrt(252) return annualized_vol def adjust_position(self, current_position, returns): """ 根据波动率调整仓位 Args: current_position: 当前仓位 returns: 历史收益率 Returns: 调整后的仓位 """ # 计算当前波动率 realized_vol = self.calculate_realized_volatility(returns) # 目标仓位 = 目标波动率 / 当前波动率 * 当前仓位 adjusted_position = ( self.target_volatility / (realized_vol + 1e-8) * current_position ) # 限制在[0, 1] adjusted_position = adjusted_position.clip(0, 1) return adjusted_position16.5 择时策略实现
16.5.1 完整策略类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75class MarketTimingStrategy: """ 择时策略 基于技术指标的择时策略 """ def __init__( self, market_index="SH000300", signal_config=None, position_method='linear', stop_loss=None, stop_profit=None, max_drawdown=None, target_volatility=None, ): """ Args: market_index: 市场指数 signal_config: 信号配置 position_method: 仓位方法 stop_loss: 止损比例 stop_profit: 止盈比例 max_drawdown: 最大回撤 target_volatility: 目标波动率 """ self.market_index = market_index self.signal_config = signal_config self.position_method = position_method self.stop_loss = stop_loss self.stop_profit = stop_profit self.max_drawdown = max_drawdown self.target_volatility = target_volatility # 初始化组件 self.technical_indicator = MarketTechnicalIndicators(market_index) self.signal_generator = None self.position_sizer = PositionSizer(method=position_method) self.risk_controller = RiskController( stop_loss=stop_loss, stop_profit=stop_profit, max_drawdown=max_drawdown, ) self.volatility_target = None if target_volatility: self.volatility_target = VolatilityTarget(target_volatility) def prepare_data(self, start_time, end_time): """准备数据""" signals_df = self.technical_indicator.generate_all_signals( start_time, end_time ) self.signal_generator = MarketTimingSignalGenerator(signals_df) return signals_df def generate_signals(self, start_time, end_time): """生成择时信号""" # 准备数据 self.prepare_data(start_time, end_time) # 生成组合信号 timing_model = MultiSignalTimingModel( signal_generator=self.signal_generator, position_sizer=self.position_sizer, signal_config=self.signal_config, ) # 生成仓位 position, individual_signals = timing_model.generate_position() return position, individual_signals16.5.2 回测框架
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184class TimingBacktester: """ 择时策略回测器 """ def __init__( self, strategy, initial_capital=1000000, benchmark="SH000300", transaction_cost=0.001, ): """ Args: strategy: 择时策略 initial_capital: 初始资金 benchmark: 基准指数 transaction_cost: 交易成本 """ self.strategy = strategy self.initial_capital = initial_capital self.benchmark = benchmark self.transaction_cost = transaction_cost def run_backtest(self, start_time, end_time): """ 运行回测 Args: start_time: 开始时间 end_time: 结束时间 Returns: 回测结果 """ # 生成仓位信号 target_position, individual_signals = self.strategy.generate_signals( start_time, end_time ) # 获取市场数据 market_data = self.strategy.technical_indicator.get_market_data( start_time, end_time ) # 获取基准数据 benchmark_data = D.features( [self.benchmark], ['$close'], start_time=start_time, end_time=end_time, freq='day' ) benchmark_data.columns = benchmark_data.columns.droplevel(0) # 对齐数据 aligned_data = pd.concat([ target_position.rename('position'), market_data['$close'].rename('close'), benchmark_data['$close'].rename('benchmark'), ], axis=1).dropna() # 计算净值 equity_curve = self._calculate_equity(aligned_data) # 计算基准净值 benchmark_curve = self._calculate_benchmark(aligned_data) # 合并结果 results = pd.DataFrame({ 'strategy': equity_curve, 'benchmark': benchmark_curve, 'position': aligned_data['position'], }) return results def _calculate_equity(self, data): """ 计算策略净值 Args: data: 包含position和close的数据 Returns: 净值Series """ # 初始化 capital = self.initial_capital equity = [capital] # 计算每日收益 for i in range(1, len(data)): # 获取仓位和价格 prev_position = data['position'].iloc[i-1] curr_position = data['position'].iloc[i] prev_price = data['close'].iloc[i-1] curr_price = data['close'].iloc[i] # 计算价格收益 price_return = (curr_price - prev_price) / prev_price # 计算仓位收益(使用前一日仓位) portfolio_return = prev_position * price_return # 交易成本 if abs(curr_position - prev_position) > 0.01: portfolio_return -= self.transaction_cost * abs(curr_position - prev_position) # 更新资金 capital = capital * (1 + portfolio_return) equity.append(capital) # 转换为Series equity_series = pd.Series(equity, index=data.index) equity_normalized = equity_series / self.initial_capital return equity_normalized def _calculate_benchmark(self, data): """ 计算基准净值 Args: data: 包含benchmark的数据 Returns: 基准净值Series """ benchmark_returns = data['benchmark'].pct_change().fillna(0) benchmark_curve = (1 + benchmark_returns).cumprod() return benchmark_curve def analyze_results(self, results): """ 分析回测结果 Args: results: 回测结果DataFrame Returns: 分析报告 """ from qlib.contrib.evaluate import risk_analysis # 策略收益 strategy_returns = results['strategy'].pct_change().fillna(0) # 基准收益 benchmark_returns = results['benchmark'].pct_change().fillna(0) # 计算指标 strategy_metrics = risk_analysis(strategy_returns, freq='day') benchmark_metrics = risk_analysis(benchmark_returns, freq='day') # 超额收益 excess_returns = strategy_returns - benchmark_returns excess_metrics = risk_analysis(excess_returns, freq='day') # 整理报告 report = { 'strategy': { 'total_return': strategy_metrics['cumulative_return'], 'annual_return': strategy_metrics['annual_return'], 'sharpe_ratio': strategy_metrics['sharpe'], 'max_drawdown': strategy_metrics['max_drawdown'], 'win_rate': strategy_metrics['win_rate'], }, 'benchmark': { 'total_return': benchmark_metrics['cumulative_return'], 'annual_return': benchmark_metrics['annual_return'], 'sharpe_ratio': benchmark_metrics['sharpe'], 'max_drawdown': benchmark_metrics['max_drawdown'], }, 'excess': { 'total_return': excess_metrics['cumulative_return'], 'annual_return': excess_metrics['annual_return'], 'sharpe_ratio': excess_metrics['sharpe'], 'information_ratio': excess_metrics['sharpe'], }, } return report16.6 完整案例
16.6.1 运行完整策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123import matplotlib.pyplot as plt def run_market_timing_strategy(): """ 运行完整择时策略 """ # 1. 配置策略 signal_config = { 'ma': {'enabled': True, 'weight': 0.3}, 'macd': {'enabled': True, 'weight': 0.2}, 'rsi': {'enabled': True, 'weight': 0.15}, 'bollinger': {'enabled': True, 'weight': 0.1}, 'kdj': {'enabled': True, 'weight': 0.1}, 'trend': {'enabled': True, 'weight': 0.1}, 'volatility': {'enabled': True, 'weight': 0.05}, } strategy = MarketTimingStrategy( market_index="SH000300", # 沪深300 signal_config=signal_config, position_method='linear', stop_loss=0.08, # 8%止损 stop_profit=0.30, # 30%止盈 max_drawdown=0.15, # 15%最大回撤 target_volatility=0.12, # 12%目标波动率 ) # 2. 创建回测器 backtester = TimingBacktester( strategy=strategy, initial_capital=1000000, benchmark="SH000300", transaction_cost=0.002, ) # 3. 运行回测 results = backtester.run_backtest( start_time="2015-01-01", end_time="2020-12-31", ) # 4. 分析结果 report = backtester.analyze_results(results) # 打印报告 print("=" * 50) print("Market Timing Strategy Backtest Report") print("=" * 50) print("\n【Strategy Performance】") print(f"Total Return: {report['strategy']['total_return']:.2%}") print(f"Annual Return: {report['strategy']['annual_return']:.2%}") print(f"Sharpe Ratio: {report['strategy']['sharpe_ratio']:.2f}") print(f"Max Drawdown: {report['strategy']['max_drawdown']:.2%}") print(f"Win Rate: {report['strategy']['win_rate']:.2%}") print("\n【Benchmark Performance】") print(f"Total Return: {report['benchmark']['total_return']:.2%}") print(f"Annual Return: {report['benchmark']['annual_return']:.2%}") print(f"Sharpe Ratio: {report['benchmark']['sharpe_ratio']:.2f}") print(f"Max Drawdown: {report['benchmark']['max_drawdown']:.2%}") print("\n【Excess Return】") print(f"Total Excess: {report['excess']['total_return']:.2%}") print(f"Annual Excess: {report['excess']['annual_return']:.2%}") print(f"Information Ratio: {report['excess']['information_ratio']:.2f}") # 5. 可视化 plot_backtest_results(results) return results, report def plot_backtest_results(results): """ 可视化回测结果 """ fig, axes = plt.subplots(3, 1, figsize=(14, 12)) # 1. 净值曲线 axes[0].plot(results.index, results['strategy'], label='Strategy', linewidth=2) axes[0].plot(results.index, results['benchmark'], label='Benchmark', linewidth=2) axes[0].set_title('Strategy vs Benchmark', fontsize=14) axes[0].set_ylabel('Net Value') axes[0].legend() axes[0].grid(True, alpha=0.3) # 2. 仓位曲线 axes[1].fill_between(results.index, 0, results['position'], alpha=0.3) axes[1].plot(results.index, results['position'], color='orange', linewidth=1) axes[1].set_title('Position Over Time', fontsize=14) axes[1].set_ylabel('Position') axes[1].set_ylim(0, 1.1) axes[1].grid(True, alpha=0.3) # 3. 回撤曲线 strategy_drawdown = (results['strategy'] / results['strategy'].cummax() - 1) benchmark_drawdown = (results['benchmark'] / results['benchmark'].cummax() - 1) axes[2].fill_between( results.index, strategy_drawdown, 0, alpha=0.3, color='red', label='Strategy Drawdown' ) axes[2].plot(results.index, benchmark_drawdown, label='Benchmark Drawdown', linewidth=1) axes[2].set_title('Drawdown', fontsize=14) axes[2].set_ylabel('Drawdown') axes[2].set_xlabel('Date') axes[2].legend() axes[2].grid(True, alpha=0.3) plt.tight_layout() plt.savefig('market_timing_results.png', dpi=300) plt.show() # 运行策略 if __name__ == "__main__": import qlib qlib.init(provider_uri="~/.qlib/qlib_data/cn_data", region="cn") results, report = run_market_timing_strategy()16.6.2 参数优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69def optimize_timing_parameters(): """ 优化择时策略参数 """ # 定义参数网格 param_grid = { 'ma_weight': [0.2, 0.3, 0.4], 'macd_weight': [0.1, 0.2, 0.3], 'rsi_weight': [0.1, 0.15, 0.2], 'stop_loss': [None, 0.05, 0.08, 0.10], 'stop_profit': [None, 0.20, 0.30, 0.40], } # 网格搜索 results = [] for ma_w in param_grid['ma_weight']: for macd_w in param_grid['macd_weight']: for rsi_w in param_grid['rsi_weight']: for stop_loss in param_grid['stop_loss']: for stop_profit in param_grid['stop_profit']: # 配置策略 signal_config = { 'ma': {'enabled': True, 'weight': ma_w}, 'macd': {'enabled': True, 'weight': macd_w}, 'rsi': {'enabled': True, 'weight': rsi_w}, 'bollinger': {'enabled': True, 'weight': 0.1}, 'kdj': {'enabled': True, 'weight': 0.1}, 'trend': {'enabled': True, 'weight': 0.1}, 'volatility': {'enabled': True, 'weight': 0.1}, } strategy = MarketTimingStrategy( signal_config=signal_config, stop_loss=stop_loss, stop_profit=stop_profit, ) backtester = TimingBacktester(strategy=strategy) # 回测 results_temp = backtester.run_backtest( start_time="2015-01-01", end_time="2020-12-31", ) report = backtester.analyze_results(results_temp) # 记录结果 results.append({ 'params': { 'ma_weight': ma_w, 'macd_weight': macd_w, 'rsi_weight': rsi_w, 'stop_loss': stop_loss, 'stop_profit': stop_profit, }, 'sharpe': report['excess']['information_ratio'], 'total_return': report['excess']['total_return'], }) # 找到最佳参数 best_result = max(results, key=lambda x: x['sharpe']) print(f"Best parameters: {best_result['params']}") print(f"Best IR: {best_result['sharpe']:.2f}") print(f"Best excess return: {best_result['total_return']:.2%}") return best_result, results16.7 本章小结
本章通过择时策略案例,展示了Qlib在市场择时中的应用:
- 第17章-完整案例-量化CTA
第17章:完整案例-量化CTA
CTA(Commodity Trading Advisor,商品交易顾问)策略是量化投资中的重要分支,主要在期货市场进行趋势跟踪。本章将带你构建完整的量化CTA策略,涵盖趋势识别、仓位管理、风险控制等核心环节。
17.1 CTA策略概述
17.1.1 CTA策略特点
与传统股票策略的区别:
特征 CTA策略 股票策略 标的 期货、商品 股票 方向 多空双向 主要做多 杠杆 高杠杆 低杠杆 周期 短中长期 中长期 风险 高波动 中等波动 CTA策略优势:
- 多空双向交易机会
- 与传统资产相关性低
- 可全天候交易
- 流动性好
17.1.2 CTA策略分类
1. 趋势跟踪(Trend Following)
- 移动平均线交叉
- 突破策略
- 动量策略
2. 均值回归(Mean Reversion)
- 布林带策略
- 统计套利
- 波动率策略
3. 波动率策略(Volatility)
- 波动率突破
- 波动率均值回归
4. 基本面策略(Fundamental)
- 供需平衡
- 升贴水策略
17.1.3 本章案例
我们将构建一个多品种趋势跟踪CTA策略:
- 标的:主要商品期货
- 策略:多时间周期趋势跟踪
- 仓位:波动率加权
- 风险:动态止损
17.2 数据准备
17.2.1 期货数据获取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70import qlib from qlib.data import D import pandas as pd import numpy as np # 初始化Qlib(需配置期货数据) qlib.init( provider_uri="~/.qlib/qlib_data/cn_futures", # 期货数据路径 region="cn", ) # 获取期货品种列表 def get_futures_pool(universe="main"): """ 获取期货池 Args: universe: 市场范围 - "main": 主力合约 - "all": 所有合约 Returns: 期货代码列表 """ if universe == "main": # 主要商品期货 futures = [ # 农产品 "M0", # 豆粕 "C0", # 玉米 "A0", # 豆一 "Y0", # 豆油 "P0", # 棕榈油 "RM0", # 菜粕 "OI0", # 菜油 "SR0", # 白糖 "CF0", # 棉花 "JD0", # 鸡蛋 # 化工 "TA0", # PTA "MA0", # 甲醇 "PP0", # PP "L0", # 塑料 "V0", # PVC "EG0", # 乙二醇 "RU0", # 橡胶 # 有色金属 "CU0", # 铜 "AL0", # 铝 "ZN0", # 锌 "NI0", # 镍 "SN0", # 锡 # 黑色 "RB0", # 螺纹钢 "HC0", # 热卷 "I0", # 铁矿石 "JM0", # 焦煤 "J0", # 焦炭 # 贵金属 "AU0", # 黄金 "AG0", # 白银 # 能源 "SC0", # 原油 "FU0", # 燃料油 "LU0", # 沥青 ] return futures futures_pool = get_futures_pool(universe="main") print(f"Total futures: {len(futures_pool)}")17.2.2 数据加载与处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101class FuturesDataLoader: """ 期货数据加载器 """ def __init__(self, futures_pool, freq='day'): """ Args: futures_pool: 期货池 freq: 数据频率 """ self.futures_pool = futures_pool self.freq = freq def load_futures_data(self, start_time, end_time): """ 加载期货数据 Args: start_time: 开始时间 end_time: 结束时间 Returns: 期货数据字典 {futures_code: dataframe} """ futures_data = {} for futures_code in self.futures_pool: try: # 获取OHLCV数据 data = D.features( [futures_code], ['$open', '$high', '$low', '$close', '$volume', '$turnover', '$oi'], start_time=start_time, end_time=end_time, freq=self.freq ) # 展平MultiIndex data.columns = data.columns.droplevel(0) # 过滤无效数据 data = data.dropna() if len(data) > 0: futures_data[futures_code] = data except Exception as e: print(f"Error loading {futures_code}: {e}") continue print(f"Successfully loaded {len(futures_data)}/{len(self.futures_pool)} futures") return futures_data def calculate_returns(self, data_dict): """ 计算收益率 Args: data_dict: 数据字典 Returns: 收益率字典 """ returns_dict = {} for code, data in data_dict.items(): # 计算对数收益率 returns = np.log(data['$close'] / data['$close'].shift(1)) returns_dict[code] = returns return returns_dict def calculate_volatility(self, data_dict, window=20): """ 计算波动率 Args: data_dict: 数据字典 window: 窗口期 Returns: 波动率字典 """ vol_dict = {} for code, data in data_dict.items(): # 计算收益率 returns = data['$close'].pct_change() # 计算波动率 volatility = returns.rolling(window).std() * np.sqrt(252) vol_dict[code] = volatility return vol_dict # 使用示例 loader = FuturesDataLoader(futures_pool) data_dict = loader.load_futures_data("2018-01-01", "2020-12-31") returns_dict = loader.calculate_returns(data_dict) vol_dict = loader.calculate_volatility(data_dict)17.3 趋势跟踪策略
17.3.1 单一品种策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192class TrendFollowingStrategy: """ 趋势跟踪策略 基于多种趋势识别方法的综合策略 """ def __init__(self, params=None): """ Args: params: 策略参数 """ self.params = params or self._default_params() def _default_params(self): """默认参数""" return { 'fast_ma': 10, 'slow_ma': 30, 'atr_period': 14, 'entry_threshold': 2.0, 'exit_threshold': 1.0, 'position_multiplier': 0.5, # ATR倍数 } def calculate_ma_signal(self, data): """ 移动平均线信号 Args: data: OHLCV数据 Returns: 信号Series (1多头, -1空头, 0空仓) """ fast_ma = data['$close'].rolling(self.params['fast_ma']).mean() slow_ma = data['$close'].rolling(self.params['slow_ma']).mean() # 金叉做多,死叉做空 signal = pd.Series(0, index=data.index) signal[fast_ma > slow_ma] = 1 signal[fast_ma < slow_ma] = -1 return signal def calculate_atr(self, data): """ 计算ATR(Average True Range) Args: data: OHLCV数据 Returns: ATR Series """ period = self.params['atr_period'] high_low = data['$high'] - data['$low'] high_close = np.abs(data['$high'] - data['$close'].shift()) low_close = np.abs(data['$low'] - data['$close'].shift()) true_range = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1) atr = true_range.rolling(period).mean() return atr def calculate_breakout_signal(self, data, lookback=20): """ 突破信号 Donchian Channel突破策略 Args: data: OHLCV数据 lookback: 回看期 Returns: 信号Series """ upper_channel = data['$high'].rolling(lookback).max() lower_channel = data['$low'].rolling(lookback).min() signal = pd.Series(0, index=data.index) # 向上突破做多 signal[data['$close'] > upper_channel.shift(1)] = 1 # 向下突破做空 signal[data['$close'] < lower_channel.shift(1)] = -1 return signal def calculate_momentum_signal(self, data, period=20): """ 动量信号 Args: data: OHLCV数据 period: 周期 Returns: 信号Series """ momentum = data['$close'] / data['$close'].shift(period) - 1 signal = pd.Series(0, index=data.index) signal[momentum > 0] = 1 signal[momentum < 0] = -1 return signal def generate_combined_signal(self, data): """ 生成组合信号 综合多种信号,投票决定方向 Args: data: OHLCV数据 Returns: 信号Series """ # 获取各信号 ma_signal = self.calculate_ma_signal(data) breakout_signal = self.calculate_breakout_signal(data) momentum_signal = self.calculate_momentum_signal(data) # 加权组合 combined_signal = ( 0.4 * ma_signal + 0.4 * breakout_signal + 0.2 * momentum_signal ) # 信号归一化 final_signal = pd.Series(0, index=data.index) final_signal[combined_signal > 0.3] = 1 final_signal[combined_signal < -0.3] = -1 return final_signal def calculate_position_size(self, data, signal): """ 计算仓位大小 基于ATR的波动率调整仓位 Args: data: OHLCV数据 signal: 交易信号 Returns: 仓位Series """ # 计算ATR atr = self.calculate_atr(data) # 基于ATR的仓位 # 仓位 = 信号 * ATR倍数 * (1 / ATR相对值) atr_normalized = atr / atr.rolling(60).mean() position_size = signal * self.params['position_multiplier'] / atr_normalized # 限制仓位范围 position_size = position_size.clip(-1, 1) return position_size def generate_trades(self, data): """ 生成交易指令 Args: data: OHLCV数据 Returns: 交易指令DataFrame """ # 生成信号 signal = self.generate_combined_signal(data) # 计算仓位 position = self.calculate_position_size(data, signal) # 记录交易 trades = pd.DataFrame({ 'signal': signal, 'position': position, 'price': data['$close'], }) return trades17.3.2 多品种组合策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143class MultiFuturesCTA: """ 多品种CTA策略 管理多个期货品种的趋势跟踪策略 """ def __init__( self, futures_pool, strategy_params=None, portfolio_method='equal_weight', ): """ Args: futures_pool: 期货池 strategy_params: 策略参数字典 {code: params} portfolio_method: 组合方法 - 'equal_weight': 等权重 - 'volatility_weight': 波动率加权 - 'risk_parity': 风险平价 """ self.futures_pool = futures_pool self.strategy_params = strategy_params or {} self.portfolio_method = portfolio_method # 为每个品种创建策略实例 self.strategies = {} for code in futures_pool: params = self.strategy_params.get(code, {}) self.strategies[code] = TrendFollowingStrategy(params) def generate_portfolio_signals(self, data_dict): """ 生成组合信号 Args: data_dict: 数据字典 Returns: 组合信号DataFrame """ portfolio_signals = pd.DataFrame() for code in self.futures_pool: if code not in data_dict: continue # 获取该品种交易 trades = self.strategies[code].generate_trades(data_dict[code]) # 添加到组合 portfolio_signals[code] = trades['position'] # 处理缺失值 portfolio_signals = portfolio_signals.fillna(0) return portfolio_signals def calculate_portfolio_weights(self, data_dict): """ 计算组合权重 Args: data_dict: 数据字典 Returns: 权重DataFrame """ if self.portfolio_method == 'equal_weight': # 等权重 n = len(self.futures_pool) weights = pd.DataFrame(1.0 / n, index=data_dict[self.futures_pool[0]].index, columns=self.futures_pool) elif self.portfolio_method == 'volatility_weight': # 波动率加权(反比) weights_dict = {} for code in self.futures_pool: if code not in data_dict: continue # 计算波动率 returns = data_dict[code]['$close'].pct_change() volatility = returns.rolling(20).std() # 反比权重 weights_dict[code] = 1.0 / (volatility + 1e-8) weights = pd.DataFrame(weights_dict) weights = weights.div(weights.sum(axis=1), axis=0) elif self.portfolio_method == 'risk_parity': # 风险平价 weights = self._calculate_risk_parity_weights(data_dict) return weights def _calculate_risk_parity_weights(self, data_dict): """ 计算风险平价权重 使每个品种对组合风险的贡献相等 Args: data_dict: 数据字典 Returns: 权重DataFrame """ # 计算协方差矩阵 returns_dict = {} for code in self.futures_pool: if code in data_dict: returns_dict[code] = data_dict[code]['$close'].pct_change() returns_df = pd.DataFrame(returns_dict).dropna() # 滚动计算权重 weights_list = [] for i in range(60, len(returns_df)): # 历史窗口 window_returns = returns_df.iloc[i-60:i] # 协方差矩阵 cov_matrix = window_returns.cov() # 风险平价权重(简化:与波动率反比) volatilities = np.sqrt(np.diag(cov_matrix)) weights = 1.0 / (volatilities + 1e-8) weights = weights / weights.sum() weights_list.append(weights) weights_df = pd.DataFrame( weights_list, index=returns_df.index[60:], columns=self.futures_pool ) return weights_df17.4 风险管理
17.4.1 动态止损
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146class RiskManager: """ 风险管理器 实现止损、限仓等功能 """ def __init__( self, stop_loss_method='atr', stop_loss_threshold=2.0, max_position=0.3, max_total_position=3.0, ): """ Args: stop_loss_method: 止损方法 - 'atr': 基于ATR - 'percentage': 基于百分比 - 'trailing': 跟踪止损 stop_loss_threshold: 止损阈值 max_position: 单品种最大仓位 max_total_position: 组合最大总仓位 """ self.stop_loss_method = stop_loss_method self.stop_loss_threshold = stop_loss_threshold self.max_position = max_position self.max_total_position = max_total_position self.entry_prices = {} def calculate_stop_loss(self, code, current_price, position, atr=None): """ 计算止损价格 Args: code: 品种代码 current_price: 当前价格 position: 当前仓位 atr: ATR值 Returns: 止损价格 """ if position == 0: return None if code not in self.entry_prices: self.entry_prices[code] = current_price entry_price = self.entry_prices[code] if self.stop_loss_method == 'atr' and atr is not None: # ATR止损 stop_distance = atr * self.stop_loss_threshold if position > 0: stop_loss = entry_price - stop_distance else: stop_loss = entry_price + stop_distance elif self.stop_loss_method == 'percentage': # 百分比止损 stop_distance = entry_price * self.stop_loss_threshold if position > 0: stop_loss = entry_price - stop_distance else: stop_loss = entry_price + stop_distance elif self.stop_loss_method == 'trailing': # 跟踪止损 if position > 0: stop_loss = max(entry_price, current_price * 0.95) else: stop_loss = min(entry_price, current_price * 1.05) return stop_loss def check_stop_loss(self, code, current_price, position, atr=None): """ 检查是否触发止损 Args: code: 品种代码 current_price: 当前价格 position: 当前仓位 atr: ATR值 Returns: 是否止损 """ if position == 0: return False stop_loss = self.calculate_stop_loss(code, current_price, position, atr) if position > 0: return current_price < stop_loss else: return current_price > stop_loss def limit_position(self, target_position, current_positions): """ 限制仓位 Args: target_position: 目标仓位 {code: position} current_positions: 当前持仓 {code: position} Returns: 限制后的仓位 """ limited_position = {} total_position = sum(abs(p) for p in target_position.values()) for code, position in target_position.items(): # 单品种限制 limited_position[code] = np.clip( position, -self.max_position, self.max_position ) # 总仓位限制 if total_position > self.max_total_position: scale_factor = self.max_total_position / total_position for code in limited_position: limited_position[code] *= scale_factor return limited_position def update_entry_price(self, code, price, position): """ 更新入场价格 Args: code: 品种代码 price: 价格 position: 仓位 """ if position != 0: self.entry_prices[code] = price elif code in self.entry_prices: del self.entry_prices[code]17.4.2 组合风险控制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86class PortfolioRiskControl: """ 组合风险控制 控制组合层面的风险 """ def __init__( self, max_drawdown=0.20, target_volatility=0.15, rebalance_threshold=0.05, ): """ Args: max_drawdown: 最大回撤 target_volatility: 目标波动率 rebalance_threshold: 再平衡阈值 """ self.max_drawdown = max_drawdown self.target_volatility = target_volatility self.rebalance_threshold = rebalance_threshold self.peak_equity = None def check_max_drawdown(self, current_equity): """ 检查最大回撤 Args: current_equity: 当前净值 Returns: 是否超过最大回撤 """ if self.peak_equity is None: self.peak_equity = current_equity else: self.peak_equity = max(self.peak_equity, current_equity) drawdown = (current_equity - self.peak_equity) / self.peak_equity return drawdown < -self.max_drawdown def scale_position_by_volatility(self, target_positions, data_dict, lookback=20): """ 根据波动率调整仓位 Args: target_positions: 目标仓位 data_dict: 数据字典 lookback: 回看期 Returns: 调整后的仓位 """ # 计算组合波动率 returns_dict = {} for code, position in target_positions.items(): if position != 0 and code in data_dict: returns = data_dict[code]['$close'].pct_change() returns_dict[code] = returns * position if not returns_dict: return target_positions returns_df = pd.DataFrame(returns_dict) portfolio_returns = returns_df.sum(axis=1) # 计算波动率 portfolio_volatility = portfolio_returns.rolling(lookback).std() * np.sqrt(252) # 调整系数 current_volatility = portfolio_volatility.iloc[-1] scale_factor = self.target_volatility / (current_volatility + 1e-8) # 限制调整范围 scale_factor = np.clip(scale_factor, 0.5, 1.5) # 应用调整 scaled_positions = { code: position * scale_factor for code, position in target_positions.items() } return scaled_positions17.5 回测与评估
17.5.1 CTA回测引擎
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178class CTABacktester: """ CTA策略回测引擎 """ def __init__( self, strategy, risk_manager, initial_capital=1000000, transaction_cost=0.0003, slippage=0.0002, ): """ Args: strategy: CTA策略 risk_manager: 风险管理器 initial_capital: 初始资金 transaction_cost: 交易成本 slippage: 滑点 """ self.strategy = strategy self.risk_manager = risk_manager self.initial_capital = initial_capital self.transaction_cost = transaction_cost self.slippage = slippage def run_backtest(self, start_date, end_date, data_dict): """ 运行回测 Args: start_date: 开始日期 end_date: 结束日期 data_dict: 数据字典 Returns: 回测结果 """ # 初始化 capital = self.initial_capital positions = {code: 0 for code in self.strategy.futures_pool} entry_prices = {code: 0 for code in self.strategy.futures_pool} # 记录 equity_curve = [] positions_history = [] # 遍历日期 dates = list(data_dict[self.strategy.futures_pool[0]].index) start_idx = dates.index(pd.Timestamp(start_date)) end_idx = dates.index(pd.Timestamp(end_date)) for i in range(start_idx, end_idx + 1): date = dates[i] # 当前价格 current_prices = { code: data_dict[code].loc[date, '$close'] for code in self.strategy.futures_pool if date in data_dict[code].index } # 计算当前净值 current_pnl = 0 for code, position in positions.items(): if position != 0 and code in current_prices: price_change = current_prices[code] - entry_prices[code] current_pnl += position * price_change current_equity = capital + current_pnl # 生成目标仓位 target_positions = {} for code in self.strategy.futures_pool: if date in data_dict[code].index: trades = self.strategy.strategies[code].generate_trades( data_dict[code].loc[:date] ) if len(trades) > 0: target_positions[code] = trades['position'].iloc[-1] # 风险控制 target_positions = self.risk_manager.limit_position( target_positions, positions ) # 检查止损 for code, position in positions.items(): if position != 0 and code in current_prices: if self.risk_manager.check_stop_loss( code, current_prices[code], position ): target_positions[code] = 0 # 执行交易 for code, target_position in target_positions.items(): if code not in current_prices: continue current_position = positions[code] trade = target_position - current_position if abs(trade) > 0.01: # 计算交易成本 trade_value = abs(trade) * current_prices[code] cost = trade_value * (self.transaction_cost + self.slippage) capital -= cost positions[code] = target_position if target_position != 0: entry_prices[code] = current_prices[code] # 记录 equity_curve.append(current_equity) positions_history.append(positions.copy()) # 整理结果 results = pd.DataFrame({ 'equity': equity_curve, 'date': dates[start_idx:end_idx+1], }) results.set_index('date', inplace=True) return results def calculate_metrics(self, results): """ 计算绩效指标 Args: results: 回测结果 Returns: 指标字典 """ # 计算收益率 returns = results['equity'].pct_change().fillna(0) # 年化收益 total_return = results['equity'].iloc[-1] / results['equity'].iloc[0] - 1 n_days = len(results) annual_return = (1 + total_return) ** (252 / n_days) - 1 # 波动率 annual_volatility = returns.std() * np.sqrt(252) # 夏普比率 risk_free_rate = 0.03 sharpe_ratio = (annual_return - risk_free_rate) / annual_volatility # 最大回撤 cumulative = (1 + returns).cumprod() max_drawdown = (cumulative / cumulative.cummax() - 1).min() # 胜率 win_rate = (returns > 0).sum() / len(returns) # 盈亏比 winning_returns = returns[returns > 0] losing_returns = returns[returns < 0] profit_loss_ratio = ( winning_returns.mean() / abs(losing_returns.mean()) if len(losing_returns) > 0 else 0 ) metrics = { 'total_return': total_return, 'annual_return': annual_return, 'annual_volatility': annual_volatility, 'sharpe_ratio': sharpe_ratio, 'max_drawdown': max_drawdown, 'win_rate': win_rate, 'profit_loss_ratio': profit_loss_ratio, } return metrics17.5.2 完整回测流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88def run_cta_backtest(): """ 运行完整CTA回测 """ # 1. 准备数据 futures_pool = get_futures_pool(universe="main") loader = FuturesDataLoader(futures_pool) data_dict = loader.load_futures_data("2018-01-01", "2020-12-31") # 2. 创建策略 strategy_params = {} for code in futures_pool: strategy_params[code] = { 'fast_ma': 10, 'slow_ma': 30, 'atr_period': 14, } strategy = MultiFuturesCTA( futures_pool, strategy_params=strategy_params, portfolio_method='volatility_weight', ) # 3. 风险管理 risk_manager = RiskManager( stop_loss_method='atr', stop_loss_threshold=2.0, max_position=0.3, max_total_position=2.0, ) # 4. 回测 backtester = CTABacktester( strategy=strategy, risk_manager=risk_manager, initial_capital=1000000, transaction_cost=0.0003, ) results = backtester.run_backtest( start_date="2019-01-01", end_date="2020-12-31", data_dict=data_dict, ) # 5. 计算指标 metrics = backtester.calculate_metrics(results) print("=" * 50) print("CTA Strategy Backtest Results") print("=" * 50) print(f"Total Return: {metrics['total_return']:.2%}") print(f"Annual Return: {metrics['annual_return']:.2%}") print(f"Annual Volatility: {metrics['annual_volatility']:.2%}") print(f"Sharpe Ratio: {metrics['sharpe_ratio']:.2f}") print(f"Max Drawdown: {metrics['max_drawdown']:.2%}") print(f"Win Rate: {metrics['win_rate']:.2%}") print(f"Profit/Loss Ratio: {metrics['profit_loss_ratio']:.2f}") # 6. 可视化 import matplotlib.pyplot as plt fig, axes = plt.subplots(2, 1, figsize=(14, 10)) # 净值曲线 axes[0].plot(results.index, results['equity'] / results['equity'].iloc[0]) axes[0].set_title('Equity Curve') axes[0].set_ylabel('Net Value') axes[0].grid(True) # 回撤曲线 drawdown = (results['equity'] / results['equity'].cummax() - 1) axes[1].fill_between(results.index, drawdown, 0, alpha=0.3, color='red') axes[1].set_title('Drawdown') axes[1].set_ylabel('Drawdown') axes[1].set_xlabel('Date') axes[1].grid(True) plt.tight_layout() plt.savefig('cta_backtest_results.png') plt.show() return results, metrics # 运行回测 if __name__ == "__main__": results, metrics = run_cta_backtest()17.6 本章小结
本章通过量化CTA策略案例,展示了Qlib在期货市场的应用:
- 第18章-最佳实践
第18章:最佳实践
本章总结了使用Qlib进行量化投资开发的最佳实践,涵盖代码规范、性能优化、调试技巧、部署方案等实用内容,帮助你构建高质量的量化系统。
18.1 代码规范
18.1.1 项目结构
推荐的Qlib项目目录结构:
project/ ├── config/ # 配置文件 │ ├── base.yaml # 基础配置 │ ├── data.yaml # 数据配置 │ ├── model.yaml # 模型配置 │ └── strategy.yaml # 策略配置 ├── data/ # 数据目录 │ ├── raw/ # 原始数据 │ └── processed/ # 处理后数据 ├── notebooks/ # Jupyter笔记本 ├── scripts/ # 脚本 │ ├── data_preparation.py # 数据准备 │ ├── training.py # 模型训练 │ ├── backtesting.py # 回测 │ └── deployment.py # 部署 ├── src/ # 源代码 │ ├── __init__.py │ ├── data/ # 数据模块 │ │ ├── __init__.py │ │ ├── loader.py # 数据加载 │ │ └── processor.py # 数据处理 │ ├── features/ # 特征模块 │ │ ├── __init__.py │ │ ├── alpha158.py │ │ └── custom.py │ ├── models/ # 模型模块 │ │ ├── __init__.py │ │ ├── lgb_model.py │ │ └── nn_model.py │ ├── strategies/ # 策略模块 │ │ ├── __init__.py │ │ ├── base_strategy.py │ │ └── enhanced_strategy.py │ └── utils/ # 工具模块 │ ├── __init__.py │ ├── logger.py │ ├── config.py │ └── metrics.py ├── tests/ # 测试 │ ├── test_data.py │ ├── test_models.py │ └── test_strategies.py ├── logs/ # 日志 ├── outputs/ # 输出 │ ├── models/ # 模型文件 │ ├── results/ # 结果文件 │ └── figures/ # 图表 ├── requirements.txt # 依赖 ├── setup.py # 安装脚本 ├── README.md # 说明文档 └── .gitignore # Git忽略文件18.1.2 命名规范
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38# 模块名:小写,下划线分隔 # data_loader.py # feature_engineering.py # 类名:大驼峰 class MultiFactorStrategy: pass class LightGBMModel: pass # 函数名:小写,下划线分隔 def calculate_sharpe_ratio(returns): pass def load_data_from_cache(cache_path): pass # 变量名:小写,下划线分隔 train_data = ... test_return = ... sharpe_ratio = ... # 常量:大写,下划线分隔 MAX_POSITION = 0.95 RISK_FREE_RATE = 0.03 DEFAULT_LEARNING_RATE = 0.01 # 私有方法/变量:前缀下划线 def _internal_calculator(self): pass self._internal_state = ... # 布尔变量:is/has前缀 is_valid = True has_data = False can_trade = True18.1.3 注释规范
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45""" 模块级文档字符串 描述模块功能、使用方法等。 """ def calculate_ic(predictions, returns): """ 计算IC值(Information Coefficient) IC是因子值与未来收益率的相关系数,衡量因子预测能力。 Args: predictions (pd.Series): 因子预测值,index为股票代码 returns (pd.Series): 实际收益率,index为股票代码 Returns: float: IC值,范围[-1, 1] - 接近1: 正相关,预测能力强 - 接近-1: 负相关 - 接近0: 无相关性 Raises: ValueError: 当输入长度不一致时 Examples: >>> pred = pd.Series([0.1, 0.2, 0.3], index=['A', 'B', 'C']) >>> ret = pd.Series([0.05, 0.15, 0.25], index=['A', 'B', 'C']) >>> ic = calculate_ic(pred, ret) >>> print(f"IC: {ic:.3f}") Note: 推荐使用Rank IC(Spearman相关)而非Normal IC """ if len(predictions) != len(returns): raise ValueError("预测值和收益率长度不一致") # 使用Spearman相关系数 from scipy.stats import spearmanr ic, _ = spearmanr(predictions, returns) return ic # 行内注释 sharpe = (returns.mean() - risk_free_rate) / returns.std() # 年化夏普比率18.1.4 类型注解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59from typing import List, Dict, Tuple, Optional, Union import pandas as pd import numpy as np def load_dataset( instruments: List[str], start_time: str, end_time: str, fields: Optional[List[str]] = None, ) -> pd.DataFrame: """ 加载数据集 Args: instruments: 股票列表 start_time: 开始时间 end_time: 结束时间 fields: 字段列表(可选) Returns: 数据DataFrame """ if fields is None: fields = ['$open', '$high', '$low', '$close', '$volume'] # 实现代码 return data def train_model( model_config: Dict[str, Union[str, int, float]], dataset: pd.DataFrame, epochs: int = 100, verbose: bool = True, ) -> Tuple[float, Dict[str, float]]: """ 训练模型 Returns: (训练时间, 指标字典) """ pass class Strategy: """策略基类""" def generate_signal( self, market_data: pd.DataFrame, ) -> pd.Series: """生成信号""" pass def calculate_position( self, signal: pd.Series, max_position: float = 1.0, ) -> pd.Series: """计算仓位""" pass18.2 配置管理
18.2.1 配置文件设计
使用YAML配置文件管理参数:
- 第19章-内部实现解析
第19章:内部实现解析
本章深入Qlib框架的内部实现,解析核心模块的设计原理和实现细节,帮助读者理解Qlib的工作机制,并为扩展开发奠定基础。
19.1 架构设计
19.1.1 整体架构
Qlib采用分层架构设计:
┌─────────────────────────────────────────────────────────┐ │ 应用层 (Application) │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ 策略研究 │ │ 回测分析 │ │ 在线服务 │ │ │ └──────────────┘ └──────────────┘ └──────────────┘ │ ├─────────────────────────────────────────────────────────┤ │ 框架层 (Framework) │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ 模型管理 │ │ 策略执行 │ │ 绩效评估 │ │ │ └──────────────┘ └──────────────┘ └──────────────┘ │ ├─────────────────────────────────────────────────────────┤ │ 基础层 (Base) │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ 数据处理 │ │ 表达式引擎 │ │ 工作流管理 │ │ │ └──────────────┘ └──────────────┘ └──────────────┘ │ ├─────────────────────────────────────────────────────────┤ │ 数据层 (Data) │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ Provider │ │ Expression │ │ Cache │ │ │ └──────────────┘ └──────────────┘ └──────────────┘ │ └─────────────────────────────────────────────────────────┘19.1.2 模块依赖关系
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20# 核心模块导入关系 # 1. 数据层 from qlib.data import D # 数据访问接口 from qlib.data.data import BaseProvider # Provider基类 from qlib.data.ops import * # 表达式操作符 # 2. 基础层 from qlib.data.dataset import DatasetH, DatasetDL # 数据集 from qlib.contrib.data.handler import Alpha158 # DataHandler # 3. 框架层 from qlib.contrib.model.gbdt import LGBModel # 模型 from qlib.contrib.strategy.signal_strategy import TopkDropoutStrategy # 策略 from qlib.backtest.executor import SimulatorExecutor # 执行器 from qlib.backtest.exchange import Exchange # 交易所模拟 # 4. 应用层 from qlib.workflow import R # Recorder管理 from qlib.contrib.evaluate import risk_analysis # 绩效分析19.2 数据系统
19.2.1 Provider架构
Qlib的数据访问通过Provider实现:
- 第20章-扩展开发
第20章:扩展开发
Qlib提供了强大的扩展能力,允许用户自定义数据源、因子、模型、策略等组件。本章将详细介绍如何扩展Qlib,构建符合特定需求的量化系统。
20.1 自定义数据源
20.1.1 自定义Provider
通过继承BaseProvider实现自定义数据源:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89from qlib.data.data import BaseProvider import pandas as pd class CustomCSVProvider(BaseProvider): """ 自定义CSV数据Provider 从CSV文件读取数据 """ def __init__(self, csv_dir): """ Args: csv_dir: CSV文件目录 """ super().__init__() self.csv_dir = Path(csv_dir) def get_range(self, instrument, start_time, end_time, fields): """ 获取时间范围数据 Args: instrument: 标的代码 start_time: 开始时间 end_time: 结束时间 fields: 字段列表 Returns: DataFrame """ # 构建文件路径 file_path = self.csv_dir / f"{instrument}.csv" # 读取CSV df = pd.read_csv( file_path, parse_dates=['datetime'], index_col='datetime' ) # 过滤时间范围 df = df.loc[start_time:end_time] # 选择字段 return df[fields] def get_items(self, instrument, time, fields): """获取单个时间点数据""" return self.get_range(instrument, time, time, fields).iloc[0] class DatabaseProvider(BaseProvider): """ 数据库Provider 从SQL数据库读取数据 """ def __init__(self, connection_string): """ Args: connection_string: 数据库连接字符串 """ super().__init__() self.connection_string = connection_string def get_range(self, instrument, start_time, end_time, fields): """从数据库获取数据""" import sqlalchemy # 创建连接 engine = sqlalchemy.create_engine(self.connection_string) # 构建SQL fields_str = ', '.join(fields) sql = f""" SELECT datetime, {fields_str} FROM stock_data WHERE instrument = '{instrument}' AND datetime >= '{start_time}' AND datetime <= '{end_time}' ORDER BY datetime """ # 执行查询 df = pd.read_sql(sql, engine, index_col='datetime') return df20.1.2 注册自定义Provider
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36from qlib.data import D # 注册自定义Provider def register_custom_provider(): """注册自定义Provider""" # 1. 创建Provider实例 provider = CustomCSVProvider(csv_dir="./data/csv") # 2. 注册到Qlib D.register_provider(provider) # 或者使用配置文件 # qlib.init( # provider_uri="./data", # region="cn", # provider={"class": CustomCSVProvider, "csv_dir": "./data/csv"} # ) # 使用自定义Provider if __name__ == "__main__": import qlib # 初始化Qlib qlib.init(provider_uri="~/.qlib/qlib_data/cn_data", region="cn") # 注册自定义Provider register_custom_provider() # 使用自定义数据 data = D.features( ["SH600000"], ["$open", "$high", "$low", "$close"], start_time="2020-01-01", end_time="2020-12-31", )20.2 自定义因子
20.2.1 表达式因子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66from qlib.expression import Expression import numpy as np import pandas as pd class MyCustomFactor(Expression): """ 自定义因子示例 实现:价格动量因子 计算公式:(当前价 - N天前价) / N天前价 """ def __init__(self, feature, period=20): """ Args: feature: 基础特征(如$close) period: 周期 """ super().__init__() self.feature = feature self.period = period self.children = [feature] def load(self, instrument, start_index, end_index): """ 加载因子数据 Args: instrument: 标的 start_index: 开始索引 end_index: 结束索引 Returns: Series: 因子值 """ # 加载基础数据(需要扩展窗口) extended_start = start_index - self.period data = self.feature.load(instrument, extended_start, end_index) # 计算动量 momentum = (data - data.shift(self.period)) / data.shift(self.period) # 返回目标范围 return momentum.loc[start_index:] def get_extended_window_size(self): """返回需要扩展的窗口大小""" return self.period # 使用自定义因子 def test_custom_factor(): """测试自定义因子""" import qlib from qlib.data.ops import get_expression qlib.init(provider_uri="~/.qlib/qlib_data/cn_data", region="cn") # 创建因子实例 close_feature = get_expression("$close") my_factor = MyCustomFactor(close_feature, period=20) # 计算因子值 data = my_factor.load("SH600000", "2020-01-01", "2020-12-31") print(data.head())20.2.2 Handler因子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102from qlib.data.dataset.handler import DataHandlerLP from qlib.contrib.data.handler import handler_registry @handler_registry.register("MyAlpha158") class MyAlpha158(DataHandlerLP): """ 自定义Alpha158 Handler 在原有Alpha158基础上添加自定义因子 """ def __init__( self, instruments="csi500", start_time=None, end_time=None, fit_start_time=None, fit_end_time=None, **kwargs, ): # 配置数据加载器 data_loader = { "class": "QlibDataLoader", "module_path": "qlib.data.dataset.loader", "kwargs": { "instruments": instruments, "start_time": start_time, "end_time": end_time, }, } # 定义因子字段 # 包含原有Alpha158字段 + 自定义字段 fields = self._get_fields() super().__init__( instruments=instruments, start_time=start_time, end_time=end_time, fit_start_time=fit_start_time, fit_end_time=fit_end_time, data_loader=data_loader, fields=fields, **kwargs, ) def _get_fields(self): """ 定义因子字段 Returns: list: 因子表达式列表 """ # 这里只展示部分因子 fields = [ # 原有因子 "Ref($close, 1) / $close - 1", "Ref($close, 5) / $close - 1", "Mean($close, 20) / $close - 1", # 自定义因子1: 波动率 "Std($close, 20) / Mean($close, 20)", # 自定义因子2: 量价比 "Mean($volume, 5) / ($close * Mean($close, 5))", # 自定义因子3: 振幅 "($high - $low) / $open", ] return fields # 使用自定义Handler def test_custom_handler(): """测试自定义Handler""" from qlib.data.dataset import DatasetH # 配置Handler handler_config = { "class": "MyAlpha158", "module_path": "__main__", # 或者实际模块路径 } # 创建Dataset dataset_config = { "class": "DatasetH", "module_path": "qlib.data.dataset", "kwargs": { "handler": handler_config, "segments": { "train": ("2010-01-01", "2015-12-31"), "valid": ("2016-01-01", "2017-12-31"), "test": ("2018-01-01", "2020-12-31"), }, }, } dataset = DatasetH(**dataset_config["kwargs"]) data = dataset.prepare('train') print(f"Features shape: {data['features'].shape}")20.3 自定义模型
20.3.1 传统ML模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90from qlib.contrib.model import Model from sklearn.ensemble import RandomForestRegressor class CustomRFModel(Model): """ 自定义随机森林模型 """ def __init__(self, **kwargs): """ Args: **kwargs: 模型参数 - n_estimators: 树数量 - max_depth: 最大深度 - min_samples_split: 最小分裂样本数 """ super().__init__(**kwargs) self.params = { 'n_estimators': kwargs.get('n_estimators', 100), 'max_depth': kwargs.get('max_depth', 6), 'min_samples_split': kwargs.get('min_samples_split', 2), } self.model = RandomForestRegressor(**self.params) def fit(self, dataset): """ 训练模型 Args: dataset: 数据集配置 """ from qlib.data.dataset import DatasetH # 准备数据 ds = DatasetH(**dataset['kwargs']) train_data = ds.prepare('train') valid_data = ds.prepare('valid') # 合并训练集和验证集 X_train = train_data['features'] y_train = train_data['label'] # 训练 self.model.fit(X_train, y_train) # 验证 X_valid = valid_data['features'] y_valid = valid_data['label'] y_pred = self.model.predict(X_valid) # 计算IC from scipy.stats import spearmanr ic, _ = spearmanr(y_pred, y_valid) print(f"Validation IC: {ic:.4f}") def predict(self, dataset): """ 预测 Args: dataset: 数据集配置 Returns: 预测结果 """ from qlib.data.dataset import DatasetH ds = DatasetH(**dataset['kwargs']) test_data = ds.prepare('test') X_test = test_data['features'] return self.model.predict(X_test) def save(self, path): """保存模型""" import pickle with open(path, 'wb') as f: pickle.dump(self.model, f) def load(self, path): """加载模型""" import pickle with open(path, 'rb') as f: self.model = pickle.load(f)20.3.2 深度学习模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120import torch import torch.nn as nn from qlib.contrib.model.pytorch_model import PytorchModel class CustomNet(nn.Module): """ 自定义神经网络 """ def __init__(self, input_dim, hidden_dim=64): super(CustomNet, self).__init__() self.feature_extractor = nn.Sequential( nn.Linear(input_dim, hidden_dim), nn.BatchNorm1d(hidden_dim), nn.ReLU(), nn.Dropout(0.3), nn.Linear(hidden_dim, hidden_dim // 2), nn.BatchNorm1d(hidden_dim // 2), nn.ReLU(), nn.Dropout(0.3), ) self.predictor = nn.Linear(hidden_dim // 2, 1) def forward(self, x): features = self.feature_extractor(x) output = self.predictor(features) return output.squeeze() class CustomDLModel(PytorchModel): """ 自定义深度学习模型 """ def __init__( self, d_feat=6, hidden_dim=64, **kwargs, ): """ Args: d_feat: 特征维度 hidden_dim: 隐藏层维度 """ super().__init__(**kwargs) self.d_feat = d_feat self.hidden_dim = hidden_dim # 创建模型 self.model = CustomNet(d_feat, hidden_dim) # 优化器 self.optimizer = torch.optim.Adam( self.model.parameters(), lr=kwargs.get('lr', 0.001), ) # 损失函数 self.loss_fn = nn.MSELoss() def fit(self, dataset): """训练模型""" from qlib.data.dataset import DatasetDL # 准备数据 ds = DatasetDL(**dataset['kwargs']) ds.setup_data() train_loader = torch.utils.data.DataLoader( ds, batch_size=self.batch_size, shuffle=True, ) # 训练循环 self.model.train() for epoch in range(self.n_epochs): for batch_idx, (features, labels) in enumerate(train_loader): # 前向传播 predictions = self.model(features.float()) loss = self.loss_fn(predictions, labels.float()) # 反向传播 self.optimizer.zero_grad() loss.backward() self.optimizer.step() # 打印进度 if epoch % 10 == 0: print(f"Epoch {epoch}, Loss: {loss.item():.4f}") def predict(self, dataset): """预测""" from qlib.data.dataset import DatasetDL ds = DatasetDL(**dataset['kwargs']) ds.setup_data() test_loader = torch.utils.data.DataLoader( ds, batch_size=self.batch_size, shuffle=False, ) self.model.eval() predictions = [] with torch.no_grad(): for features, _ in test_loader: pred = self.model(features.float()) predictions.extend(pred.cpu().numpy()) return np.array(predictions)20.4 自定义策略
20.4.1 信号策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59from qlib.strategy.base import BaseStrategy class CustomSignalStrategy(BaseStrategy): """ 自定义信号策略 基于自定义信号生成交易决策 """ def __init__( self, signal, topk=30, n_drop=5, risk_degree=0.95, ): """ Args: signal: 信号(模型或因子) topk: 持仓数量 n_drop: 每次调仓换出数量 risk_degree: 风险度(仓位比例) """ super().__init__() self.signal = signal self.topk = topk self.n_drop = n_drop self.risk_degree = risk_degree self.current_position = {} def generate_target_pos(self, start_time, end_time): """ 生成目标持仓 Args: start_time: 开始时间 end_time: 结束时间 Returns: dict: {stock: weight} """ # 获取预测值 if hasattr(self.signal, 'predict'): # 模型预测 predictions = self.signal.predict(start_time, end_time) else: # 因子值 predictions = self.signal.get_value(start_time, end_time) # 选择topk selected = predictions.nlargest(self.topk) # 等权重 target_weight = self.risk_degree / self.topk target_position = {stock: target_weight for stock in selected.index} return target_position20.4.2 执行策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81from qlib.backtest.executor import Executor class CustomExecutor(Executor): """ 自定义执行器 实现特殊的交易执行逻辑 """ def __init__( self, time_per_step, trade_exchange, execute_func=None, ): """ Args: time_per_step: 每步时间间隔 trade_exchange: 交易所对象 execute_func: 自定义执行函数 """ super().__init__() self.time_per_step = time_per_step self.trade_exchange = trade_exchange self.execute_func = execute_func def execute(self, trade_strategy, start_time, end_time): """ 执行交易 Args: trade_strategy: 交易策略 start_time: 开始时间 end_time: 结束时间 Returns: (portfolio_result, indicator_result) """ # 如果提供了自定义执行函数 if self.execute_func: return self.execute_func( trade_strategy, self.trade_exchange, start_time, end_time, ) # 否则使用默认执行逻辑 return super().execute(trade_strategy, start_time, end_time) # 自定义执行函数示例 def smart_execute(trade_strategy, exchange, start_time, end_time): """ 智能执行函数 根据市场状况调整交易执行方式 """ # 生成时间序列 time_steps = pd.date_range(start_time, end_time, freq='day') for trade_time in time_steps: # 获取市场波动率 market_volatility = calculate_market_volatility(trade_time, exchange) # 根据波动率调整交易 if market_volatility > 0.03: # 高波动 # 分批交易,降低冲击 target_position = trade_strategy.generate_target_pos( trade_time, trade_time ) execute_in_batches(target_position, exchange, n_batches=3) else: # 低波动 # 正常交易 target_position = trade_strategy.generate_target_pos( trade_time, trade_time ) execute_immediately(target_position, exchange)20.5 自定义评估指标
20.5.1 绩效指标
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73from qlib.contrib.evaluate import risk_analysis def custom_sharpe(returns, risk_free_rate=0.03, freq='day'): """ 自定义夏普比率 Args: returns: 收益率Series risk_free_rate: 无风险利率 freq: 频率 Returns: 夏普比率 """ # 年化收益率 annual_return = returns.mean() * 252 # 年化波动率 annual_volatility = returns.std() * np.sqrt(252) # 夏普比率 sharpe = (annual_return - risk_free_rate) / annual_volatility return sharpe def custom_calmar(returns): """ Calmar比率 年化收益 / 最大回撤 """ # 年化收益 annual_return = returns.mean() * 252 # 最大回撤 cumulative = (1 + returns).cumprod() max_drawdown = (cumulative / cumulative.cummax() - 1).min() # Calmar比率 calmar = annual_return / abs(max_drawdown) return calmar def custom_evaluation(returns): """ 自定义评估函数 整合多个自定义指标 """ metrics = { 'sharpe': custom_sharpe(returns), 'calmar': custom_calmar(returns), 'skew': returns.skew(), 'kurtosis': returns.kurtosis(), } return metrics # 使用自定义指标 def test_custom_metrics(): """测试自定义指标""" # 假设有收益率数据 returns = pd.Series(np.random.randn(252) * 0.02) # 计算自定义指标 metrics = custom_evaluation(returns) print("Custom Metrics:") for name, value in metrics.items(): print(f"{name}: {value:.4f}")20.5.2 因子分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57def factor_ic_analysis(predictions, returns): """ 因子IC分析 Args: predictions: 预测值 returns: 实际收益率 Returns: dict: IC统计指标 """ from scipy.stats import spearmanr # Rank IC rank_ic, p_value = spearmanr(predictions, returns) # Normal IC normal_ic = np.corrcoef(predictions, returns)[0, 1] # IC统计 return { 'rank_ic': rank_ic, 'normal_ic': normal_ic, 'p_value': p_value, 'ic_abs_mean': np.abs(predictions - returns).mean(), } def factor_layering_test(predictions, returns, n_layers=5): """ 因子分层测试 Args: predictions: 预测值 returns: 实际收益率 n_layers: 分层数 Returns: DataFrame: 各层统计 """ # 分层 quantiles = pd.qcut(predictions, n_layers, labels=False, duplicates='drop') # 各层收益 layer_returns = [] for i in range(n_layers): mask = quantiles == i layer_return = returns[mask].mean() layer_returns.append(layer_return) # 统计 result = pd.DataFrame({ 'Layer': range(n_layers), 'Return': layer_returns, }) return result20.6 插件系统
20.6.1 创建插件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98class QlibPlugin: """ Qlib插件基类 """ def __init__(self, config=None): self.config = config or {} def register(self): """注册插件""" raise NotImplementedError def unregister(self): """注销插件""" pass class MyCustomPlugin(QlibPlugin): """ 自定义插件示例 集成自定义的Provider、Handler、Model等 """ def __init__(self, config=None): super().__init__(config) self.providers = [] self.handlers = [] self.models = [] def register(self): """注册所有组件""" # 1. 注册Provider from qlib.data import D provider = CustomCSVProvider(self.config.get('csv_dir')) D.register_provider(provider) self.providers.append(provider) # 2. 注册Handler from qlib.contrib.data.handler import handler_registry handler_registry.register(self.config.get('handler_name', 'MyHandler'))(MyAlpha158) self.handlers.append(MyAlpha158) # 3. 注册Model from qlib.contrib.model import model_registry model_registry.register(self.config.get('model_name', 'MyModel'))(CustomRFModel) self.models.append(CustomRFModel) print(f"Plugin registered: {len(self.providers)} providers, {len(self.handlers)} handlers, {len(self.models)} models") def unregister(self): """注销所有组件""" # 清理逻辑 pass # 插件管理器 class PluginManager: """ 插件管理器 管理多个插件的生命周期 """ def __init__(self): self.plugins = [] def load_plugin(self, plugin_class, config=None): """加载插件""" plugin = plugin_class(config) plugin.register() self.plugins.append(plugin) def unload_all(self): """卸载所有插件""" for plugin in self.plugins: plugin.unregister() self.plugins.clear() # 使用示例 if __name__ == "__main__": # 创建插件管理器 plugin_manager = PluginManager() # 加载自定义插件 plugin_manager.load_plugin( MyCustomPlugin, config={ 'csv_dir': './data/csv', 'handler_name': 'MyAlpha158', 'model_name': 'MyRF', } ) # 使用Qlib...20.7 本章小结
本章介绍了Qlib的扩展开发:
- 附录
附录
本附录提供参考资料、常见问题解答等内容,帮助读者更好地使用Qlib。
A. 术语表
术语 英文 说明 IC Information Coefficient 信息系数,衡量因子预测能力 Rank IC Rank Information Coefficient 秩相关系数,使用Spearman相关 ICIR Information Coefficient Information Ratio IC的信息比率,衡量IC稳定性 Sharpe Ratio Sharpe Ratio 夏普比率,风险调整后收益指标 Max Drawdown Maximum Drawdown 最大回撤 Alpha Alpha 超额收益 Beta Beta 系统性风险度量 Information Ratio Information Ratio 信息比率 Turnover Turnover Rate 换手率 Provider Data Provider 数据提供者 Expression Expression 表达式 Handler Data Handler 数据处理器 Dataset Dataset 数据集 Model Model 模型 Strategy Strategy 策略 Executor Executor 执行器 Exchange Exchange 交易所模拟 Recorder Recorder 实验记录器 CTA Commodity Trading Advisor 商品交易顾问 ATR Average True Range 平均真实波幅 MA Moving Average 移动平均线 EMA Exponential Moving Average 指数移动平均 MACD Moving Average Convergence Divergence 平滑异同移动平均线 RSI Relative Strength Index 相对强弱指标 KDJ Stochastic Oscillator 随机指标 OHLCV Open, High, Low, Close, Volume 开高低收成交量 B. 常见问题
B.1 安装问题
Q1: 安装Qlib时出现依赖冲突?
- 第21章-Qlib算子完全指南:90+算子详解与实战
全面介绍Qlib中的90+个量化算子,包括算子分类、使用方法和实战案例