跳转至




Index

Numpy核心语法[3]

在不同的库之间交换数据,常常会遇到格式问题。比如,我们从第三方数据源拿到的行情数据,它们用的时间字段常常会是字符串。有一些库在存储行情时,对 OHLC 这些字段进行了优化,使用了 4 个字节的浮点数,但如果要传给 talib 进行指标计算,就必须先转换成 8 个字节的浮点数,等等,这就有了类型转换的需求。


此外,我们还会遇到需要将 numpy 数据类型转换为 python 内置类型,比如,将 numpy.float64 转换为 float 的情况。

1. 类型转换和 Typing

1.1. Numpy 内部类型转换

Numpy 内部类型转换,我们只需要使用 astype

1
2
3
4
5
6
7
8
x = np.array (['2023-04-01', '2023-04-02', '2023-04-03'])
print (x.astype (dtype='datetime64[D]'))

x = np.array (['2014', '2015'])
print (x.astype (np.int32))

x = np.array ([2014, 2015])
print (x.astype (np.str_))

Tip

如何将 boolean array 转换成整数类型,特别是,将 True 转为 1,False 转为 - 1? 在涉及到阴阳线的相关计算中,我们常常需要将 open > close 这样的条件转换为符号 1 和 - 1,以方便后续计算。这个转换可以用:

1
2
3
>>> x = np.array ([True, False])
>>> x * 2 - 1
... array ([ 1, -1])

1.2. Numpy 类型与 Python 内置类型转换

如果我们要将 Numpy 数组转换成 Python 数组,可以使用 tolist 函数。

1
2
x = np.array ([1, 2, 3])
print (x.tolist ())

我们通过 item () 函数,将 Numpy 数组中的元素转换成 Python 内置类型。

1
2
3
x = np.array (['2023-04-01', '2023-04-02'])
y = x.astype ('M8[s]')
y [0].item ()

Warning

一个容易忽略的事实是,当我们从 Numpy 数组中取出一个标量时,我们都应该把它转换成为 Python 对象后再使用。否则,会发生一些隐藏的错误,比如下面的例子:

1
2
3
4
5
6
import json
x = np.arange (5)
print (json.dumps ([0]))
print (x [0])

json.dumps ([x [0]])

Warning

这里最后一行会出错。提示 type int64 is not JSON serializable。把最后一行换成 json.dumps ([x [0].item ()]) 则可以正常执行。

1.3. Typing

从 Python 3.1 起,就开始引入类型注解 (type annotation),到 Python 3.8,基本上形成了完整的类型注解体系。我们经常看到函数的参数类型注解,比如,下面的代码:

1
2
3
from typing import List
def add (a: List [int], b: int) -> List [int]:
    return [i + b for i in a]

从此,Python 代码也就有了静态类型检查支持。

NumPy 的 Typing 模块提供了一系列类型别名(type aliases)和协议(protocols),使得开发者能够在类型注解中更精确地表达 NumPy 数组的类型信息。这有助于静态分析工具、IDE 以及类型检查器提供更准确的代码补全、类型检查和错误提示。

这个模块提供的主要类型是 ArrayLike, NDArray 和 DType。

1
2
3
import numpy
from numpy.typing import ArrayLike, NDArray, DTypeLike
import numpy as np

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def calculate_mean (data: ArrayLike) -> float:
    """计算输入数据的平均值,数据可以是任何 ArrayLike 类型"""
    return np.mean (data)

def add_one_to_array (arr: NDArray [np.float64]) -> NDArray [np.float64]:
    """向一个浮点数数组的每个元素加 1,要求输入和输出都是 np.float64 类型的数组"""
    return arr + 1

def convert_to_int (arr: NDArray, dtype: DTypeLike) -> NDArray:
    """将数组转换为指定的数据类型"""
    return arr.astype (dtype)

如果你是在像 vscode 这样的 IDE 中使用上述函数,你就可以看到函数的类型提示。如果传入的参数类型不对,还能在编辑期间,就得到错误提示。

2. 拓展阅读

2.1. Numpy 的数据类型

在 Numpy 中,有以下常见数据类型。每一个数字类型都有一个别名。在需要传入 dtype 参数的地方,一般两者都可以使用。另外,别名在字符串类型、时间和日期类型上,支持得更好。比如,'S5' 是 Ascii 码字符串别外,它除了指定数据类型之外,还指定了字符串长度。datetime64 [S] 除了表明数据是时间日期类型之外,还表明它的精度到秒。


类型 别名
np.int8 i1
np.int16 i2
np.int32 i4
np.int64 i8
np.uint8 u1
np.uint16 u2
np.uint32 u4
np.uint64 u8
np.float16 f2
np.float32 f4,还可指定结尾方式,比如 'f4' 表示大端字节序。其它 float 类型同。
np.float64 f8
np.float128 f16
np.bool_ b1
np.str_ U (后接长度,例如 U10)
np.bytes_ S (后接长度,例如 S5)
np.datetime64 M8 和 M8[D] M8[h] M8[m] M8[s],也可写作 datetime64[D] 等
np.timedelta64 m8 和 m8[D] m8[h] m8[m] m8[s] 等

3. 处理包含 np.nan 的数据

在量化分析中,我们常常会遇到数据为 np.nan 情况。比如,某公司上年利润为负数,今年利润实现正增长,请问要如何表示公司的 YoY 的利润增长呢?


Info

np.nan 是 numpy 中的一个特殊值,表示“Not a Number”,即“不是数字”。注意,在 Numpy 中,尽管 np.nan 不是一个数字,但它确实数字类型。确切地说,它是 float 类型。此外,在 float 类型中,还存在 np.inf(正无穷大)和负无穷大 (np.NINF,或者-np.inf)。

又比如,在计算个股的 RSI 或者移动平均线时,最初的几期数据是无法计算出来的(在回测框架 backtrader 中,它把这种现象称之为技术指标的冷启动)。如果不要求返回的技术指标的取值与输入数据长度一致,则会返回短一些、但全部由有效数据组成的数组;否则,此时我们常常使用 np.NaN 或者 None 来进行填充,以确保返回的数据长度与输入数据长度一致。

但是,如果我们要对返回的数组进行统计,比如求均值、最大值、排序,对包含 np.nan 或者 None 的数组,应该如何处理?

3.1. 包含 np.nan 和 np.inf 的数组运算

在 numpy 中,提供了对带 np.nan 的数组进行运算的支持。比如有以下数组:

1
2
3
4
import numpy as np

x = np.array([1, 2, 3, np.nan, 4, 5])
print(x.mean())

我们将得到一个 nan。实际上,多数情况下,我们希望忽略掉 nan,只对有效数据进行运算,此时得到的结果,我们往往仍然认为是有意义的。

因此,Numpy 提供了许多能处理包含 nan 数据的数组输入的运算函数。下面是一个完整的列表:

在这里,我们以输入 np.array([1, 2, 3, np.nan, np.inf, 4, 5]) 为例

函数 nan 处理 inf 处理 输出
nanmin 忽略 inf 1.0
nanmax 忽略 inf inf
nanmean 忽略 inf inf
nanmedian 忽略 inf 3.5
nanstd 传递 inf nan
nanvar 传递 inf nan
nansum 忽略 inf inf
nanquantile 忽略 inf 2.25
nancumsum 忽略 inf inf
nancumprod 忽略 inf inf

对 np.nan 的处理中,主要是三类,一类是传递,其结果导致最终结果也是 nan,比如,在计算方差和标准差时;一类是忽略,比如在找最小值时,忽略掉 np.nan,在余下的元素中进行运算;但在计算 cumsum 和 cumprod 时,"忽略"意味着在该元素的位置上,使用前值来填充。我们看一个不包含 np.inf 的示例:


1
2
3
x = np.array([1, 2, 3, np.nan, 4, 5])
np.nancumprod(x)
np.nancumsum(x)

输出结果是:

1
2
3
array([  1.,   2.,   6.,   6.,  24., 120.])

array([ 1.,  3.,  6.,  6., 10., 15.])

结果中的第 4 个元素都是由第 3 个元素复制而来的。

如果一个数组中包含 inf,则在任何涉及到排序的操作(比如 max, median, quantile)中,这些元素总是被置于数组的最右侧;如果是代数运算,则结果会被传导为 inf。这些地方,Numpy 的处理方式与我们的直觉是一致的。

除了上述函数,np.isnan 和 np.isinf 函数,也能处理包含 np.nan/np.inf 元素的数组。它们的作用是判断数组中的元素是否为 nan/inf,返回值是一个 bool 数组。

3.2. 包含 None 的数组运算

在上一节中,我们介绍的函数能够处理包含 np.nan 和 np.inf 的数组。但是,在 Python 中,None 是任何类型的一个特殊值,如果一个数组包含 None 元素,我们常常仍然会期望能对它进行 sum, mean, max 等运算。但是,Numpy 并没有专门为此准备对应的函数。


但是,我们可以通过 astype 将数组转换为 float 类型,在此过程中,所有的 None 元素都转换为 np.nan,然后就可以进行运算了。

1
2
x = np.array([3,4,None,55])
x.astype(np.float64)

输出为:array([3., 4., nan, 55.])

3.3. 性能提升

当我们调用 np.nan *函数时,它的性能会比普通的函数慢很多。因此,如果性能是我们关注的问题,我们可以使用 bottleneck 这个库中的同名函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from bottleneck import nanstd
import numpy as np
import random

x = np.random.normal(size = 1_000_000)
pos = random.sample(np.arange(1_000_000).tolist(), 5)
x[pos] = np.nan

%timeit nanstd(x)
%timeit np.nanstd(x)

我们担心数组中 np.nan 元素个数会影响到性能,所以,在上面的示例中,在随机生成数组时,我们只生成了 5 个元素。在随后的一次测试中,我们把 nan 元素的个数增加了 10 倍。实验证明,nan 元素的个数对性能没有什么影响。在所有的测试中,bottlenect 的性能比 Numpy 都要快一倍。

Info

根据 bottleneck 的文档,它的许多函数,要比 Numpy 中的同名函数快 10 倍左右。


这是你的量化母语

正如死亡和税收不可避免,Numpy 和 Pandas 对量化人而言,也具有同样的地位 -- 每个量化人都不可避免地要与这两个库打交道。

如果你去研究一些非常重要的量化库,比如 alphalens, empyrical, backtrader, tushare, akshare, jqdatasdk 等,或者一些非常优秀的量化框架比如 quantaxis, zillionare, vnpy 等等,你就会发现它们都依赖于 numpy 和 pandas。实际上,一个库只要依赖于 pandas,它也必将传递依赖到 numpy。

如果说量化人有一种共同的语言的话,它就是 Numpy 和 Pandas。Numpy 和 Pandas 是量化人的母语。


具体地说,Numpy 和 Pandas 不仅为量化人提供了类似于表格的数据结构 -- Numpy structured array 和 Pandas DataFrame -- 这对于包括行情数据在内的诸多数据的中间存储是必不可少的;它还提供了许多基础算法。

比如:

  1. 在配对交易 (pair trade) 中,相关性计算是非常重要的一环。无论是 Numpy 还是 Pandas 都提供了相关性计算函数。
  2. 在 Alpha 101 因子计算中,排序操作是一个基础函数 -- 这是分层回测的基础 -- Pandas 通过 rank 方法来提供这一功能。
  3. Maxdrawdown(最大回测)是衡量策略的重要指标。Numpy 通过 numpy.maximum.accumulate 提供了支持。

类似常用的算法非常多,我们将在本课程中一一介绍它们。

课程编排说明

紧扣量化场景来介绍 Numpy 和 Pandas 是本课的一大特点。我们通过分析重要的、流行度较高的量化库源码,找出其中使用 numpy 和 pandas 的地方,再进行归类的提炼,并结合一些量化社区中常问的相关问题 -- 这些往往是量化人在使用 numpy/pandas 时遇到的困难所在 -- 来进行课程编排,确保既系统讲解这两个重要的库,又保证学员在学习后,能立即将学习到的方法与技巧运用到工作中,迅速提高自己的生产力。

任何高效地学习都离不开高强度的练习。本课程安排了大量的练习。无论是演示代码、还是练习,我们都尽可能安排在量化场景下完成,这样会增强您的代入感。但是,这往往也要求您能理解这些场景和数据。

在编写本课程时,作者阅读了大量书籍、博文、论文和开源项目代码。其中一部分与教材关联度较高的,我们以延伸阅读、脚注的方式提供参考链接。如果学员有时间,也可以阅读这部分内容,以获得跟作者同样的视野景深。但如果你时间紧张,也完全可以跳过这些内容,只关注我们课程内容的主线就好。


本课程是专门为量化交易从业者,比如 quant developer, quant researcher 和 quant pm 等人设计。如果您有基础的金融知识,这门课也适用于其它需要学习 Numpy 和 Pandas 的人。课程内容在丰度和深度上都是市面上少见的。

什么是 Numpy

图片来源:numpy.org

Numpy 是 Python 科学计算中的基础包,它是一款开源软件,允许在保留原有版权声明的前提下,自由使用。它的名字来源于 Numeric Programming(数值编程),其前身是 Numeric 库和 Numarray 库。

Numpy 提供了多维数组对象、各种派生对象(比如掩码数组 -- masked array)以及各种用于数组操作的高性能例程,包括数学、逻辑、形状操作、排序、选择、I/O 、离散傅里叶变换、基本线性代数、基本统计运算、随机模拟等等。下图提供了一个更详细的说明:


Numpy 的底层开发语言是 C 语言,并且进行了相当多的优化,这包括并行化支持、使用 OpenBLAS 和高级 SIMD 指令来优化矩阵操作等等。由于 Python 这种胶水语言的灵活性,使得 Numpy 最终得以作为一个 Python 库发布出来。

Tip

很多人认为要提高量化策略的性能,就必须放弃 Python,改用 C/Rust。这种说法又对又不对。

如果一个 Quanter 不懂得利用 OpenBLAS 和 LAPACK,那么即使用 C 开发出来的算法,也很难比通过 Python 调用 Numpy 来得更快。在 Numpy 中,一个最常见的矩阵乘法,就可能利用到多核机器的并行运算能力(即多线程)和高级 CPU 指令以实现快速的 BLAS/LAPACK 运算。这些知识和技巧,是一般人难以掌握的。

你可以通过下面的方法来查看你的 Numpy 是否利用了 OpenBLAS/LAPACK 及高级 SIMD 指令:

1
2
import numpy as np
np.show_config()

Numpy 广泛运用于学术界、金融界和工业界,具有成熟、快速、稳定和活跃的特点。当前的稳定版本是 2.2.0 版(2025 年 3 月),发布于仅仅 1 个季度之前,这足以说明 Numpy 社区开发的活跃度。

Numpy 还是 Pandas, scipy, statsmodels 和 scikit-learn 等众多知名 Python 库的底层依赖库。

什么是 Pandas

Pandas 是用于数据操作和分析的 Python 软件库。它构造在 Numpy 之上,增加了索引、异构数组等功能(相当于 Numpy 的 Structure Array -- 这个概念我们会在本课程后面详细解释),这使它成为处理表格类数据的有力武器。


Pandas 的名字来源于术语 Panel Data(面板数据)和 Python Data Analysis,前者是计量经济学的一个术语,用以表示对同一个体在多个时期观测的数据集。

自 2010 年成为开源项目以来,pandas 已经发展成为相当大的一个库,开发者社区已发展到超过 2500 名不同的贡献者。

Pandas 提供了 Series, DataFrame 两种数据结构。它曾经还提供了 Panel 这种三维数据结构,但最终放弃了。与 Excel 相比,它能更快速地分析更大的数据(一般小于 1 千万行,主要取决于机器的物理内存)。


延伸阅读

来源:Github readme 项目

如果要推荐一本讲解 Pandas 的书,毫无疑问,没人任何书籍能比 《Python for Data Analysis》 更权威了。因为它是由 Pandas 的创建者 Wes McKinney 撰写的!这本书现在提供有网页版供开放访问。读者也可点击 此链接 阅读。在 2023 年 4 月进行更新后,它现在支持到了 pandas 2.0 版本。

Wes Mckinney 是 Pandas 的创建者和终生仁慈独裁者。他现在居住在田纳西的纳什维尔,是 DataPad 的 CEO 和联合创始人。

Wes Mckinney 本科毕业于 MIT,是数学和统计学博士(杜克大学)。在 AQR 资本管理公司工作期间,学习了 Python 并开始构建 pandas。他同时还是 Apache Arrow 的联合创建者。

从 Pandas 的诞生史来看,毫无疑问,Pandas 就是为金融/量化而生的。Wes Mckinney 创建 Pandas 的初衷,就是要解决用 Microsoft Excel 来进行金融数据分析和统计运算时效率低、并且十分繁琐的问题。在今天,量化巨头 Two Sigma[^two-sigma] 是这个项目的重要赞助商。Pandas 的成功,也促进了 Python 的广泛流行。甚至可以说,Mckinney 以一己之力,开拓了 Python 的生存空间。

创建 Pandas 并没有任何收益,Wes Mckinney 最初主要依靠第一份工作的存款和兼职来生活。这是一个类似于《月亮和六便士》的故事,就连主人公的背景都极其相似,都是金融工作者。幸运地是,Wes Mckinney 获得了成功。如果你对这段故事感兴趣,可以阅读 《可持续发展的开源项目将赢得未来》 这篇文章。

烛台密码 三角形整理如何提示玄机

本文是几个月前《三角形整理检测》的后续篇,改进了算法,增加了应用场景的讨论。

《匡醍.因子分析与机器学习策略》课程的最后一课是关于深度学习框架在量化交易中的应用的。考虑很多技术交易者都会看图操作,比如艾略特浪型、头肩顶、三角形整理等等。正好CNN在图像模式识别能力上超越了人类,所以,就打算拿三角形整理的检测作为例子。

要通过CNN网络来实现三角形整理检测,首先需要做到数据标注。我们在课程中已经实作出来一个标注工具。不过,我更希望能够使用算法自动检测到三角形整理模式。这篇文章就将介绍我的算法。

如果你希望拿到本文源码,可以加入我们的星球。加入星球三天后,更可以获得我们研究平台的账号。在平台中提供了可以运行、验证notebook版本,你可以完全复现本文的结果。

Note

先通过算法对k线图进行标注,再通过CNN网络进行识别,感觉这个有点Matryoshka doll了。于是我在课程中换了另一个例子,通过4 channel的一维卷积,实现了预测误差1%的准确率。这篇文章算是课程的边脚料重新加工了。

算法的示意图如下:

三角形检测示意图

首先,我们得找到k线图中的波峰和波谷。在图中,波峰是点1和2,波谷则是点3和4。然后我们作通过点1和2的直线,得到压力线;作通过点3和4的直线,得到支撑线。

在Python中,计算两点之间的直线可以通过np.polyfit来实现。通过该函数,将获得直线的斜率。通过两条直线之间的斜率关系,我们可以进一步得到三角形的形态。

如果记Sr为压力线的斜率,记Ss为支撑线的斜率,那么,三角形的形态可以由以下表格来定义:

压力线方向 支撑线方向 角度对比 标记 说明
Sr>0 Ss>0 abs(sr) > abs(ss) 1 上升且发散三角
Sr>0 Ss>0 abs(sr) < abs(ss) 2 上升且收敛三角
Sr>0 Ss<0 abs(sr) > abs(ss) 3 发散偏上升三角
Sr>0 Ss<0 abs(sr) < abs(ss) 4 发散偏下降三角
Sr<0 Ss>0 abs(sr) > abs(ss) 5 下降且收敛三角
Sr<0 Ss>0 abs(sr) < abs(ss) 6 上升且收敛三角
Sr<0 Ss<0 abs(sr) > abs(ss) 7 下降且收敛三角
Sr<0 Ss<0 abs(sr) < abs(ss) 8 下降且发散三角

部分形态如下图所示:

识别算法的实现代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from zigzag import peak_valley_pivots

def triangle_flag(df, lock_date=None):
    if lock_date is not None:
        peroid_bars = df.loc[:lock_date]
    else:
        peroid_bars = df

    thresh = peroid_bars.close[-120:].pct_change().std() * 3

    pvs = peak_valley_pivots(peroid_bars.close.astype(np.float64), thresh, -1 * thresh)

    if len(pvs) == 0:
        return 0, None, None

    pvs[0] = pvs[-1] = 0
    pos_peaks = np.argwhere(pvs == 1).flatten()[-2:]
    pos_valleys = np.argwhere(pvs == -1).flatten()[-2:]

    if len(pos_peaks) < 2 or len(pos_valleys) < 2:
        return 0, None, None

    minx = min(pos_peaks[0], pos_valleys[0])
    y = df.close[pos_peaks].values
    p = np.polyfit(x=pos_peaks, y=y, deg=1)
    upper_trendline = np.poly1d(p)(np.arange(0, len(df)))

    y = df.close[pos_valleys].values
    v = np.polyfit(x=pos_valleys, y=y, deg=1)
    lower_trendline = np.poly1d(v)(np.arange(0, len(df)))

    sr, ss = p[0], v[0]

    flags = {
        (True, True, True): 1,
        (True, True, False): 2,
        (True, False, True): 3,
        (True, False, False): 4,
        (False, True, True): 5,
        (False, True, False): 6,
        (False, False, True): 7,
        (False, False, False): 8,
    }

    flag = flags[(sr > 0, ss > 0, abs(sr) > abs(ss))]

    return flag, upper_trendline, lower_trendline
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def show_trendline(asset, df, resist, support, flag, width=600, height=400):
    desc = {
        1: "上升且发散三角",
        2: "上升且收敛三角",
        3: "发散偏上升三角",
        4: "发散偏下降三角",
        5: "下降且收敛三角",
        6: "上升且收敛三角",
        7: "下降且收敛三角",
        8: "下降且发散三角",
    }

    if isinstance(df, pd.DataFrame):
        df = df.reset_index().to_records(index=False)

    title = f"flag: {flag} - {desc[flag]}"
    cs = Candlestick(df, title=title, show_volume=False, show_rsi=False, width=width, height=height)
    cs.add_line("support", np.arange(len(df)), support)
    cs.add_line("resist", np.arange(len(df)), resist)
    cs.plot()


np.random.seed(78)
start = datetime.date(2023, 1, 1)
end = datetime.date(2023, 12, 29)
barss = load_bars(start, end, 4)

for key, df in barss.groupby("asset"):
    df = df.reset_index().set_index("date")
    flag, resist, support = triangle_flag(df)
    if flag != 0:
        show_trendline(key, df, resist, support, flag)

最后,我们来研究一支个股的情况,看看这个算法可能有哪些用途:

1
2
3
4
5
6
7
8
9
start = datetime.date(2023, 1, 1)
end = datetime.date(2023, 12, 29)
barss = load_bars(start, end, ("300814.XSHE", ))

for key, df in barss.groupby("asset"):
    df = df.reset_index().set_index("date")
    flag, resist, support = triangle_flag(df, datetime.date(2023, 9, 11))
    if flag != 0:
        show_trendline(key, df, resist, support, flag, width=800, height=600)

标的从23年4月19日以来,先后出现4次波峰。随着时间的推移,整理形态也不断变化。

7月12,突破之前的形态是发散偏上升三角。

1
2
3
4
5
6
7
8
9
start = datetime.date(2022, 12, 1)
end = datetime.date(2023, 10, 29)
barss = load_bars(start, end, ("300814.XSHE", ))

for key, df in barss.groupby("asset"):
    df = df.reset_index().set_index("date")
    flag, resist, support = triangle_flag(df, datetime.date(2023, 7,19))
    if flag != 0:
        show_trendline(key, df, resist, support, flag, width=800, height=600)

7月12日突破之后,支撑和压力线发生变化。此时可以计算出9月7日的压力位是48元。但当天只冲击到45.6,随后收了上影线。

1
2
3
4
5
6
7
8
9
start = datetime.date(2022, 12, 1)
end = datetime.date(2023, 10, 29)
barss = load_bars(start, end, ("300814.XSHE", ))

for key, df in barss.groupby("asset"):
    df = df.reset_index().set_index("date")
    flag, resist, support = triangle_flag(df, datetime.date(2023, 8,19))
    if flag != 0:
        show_trendline(key, df, resist, support, flag, width=800, height=600)

此时仍然是上升三角,但9月7日未破压力位后,压力线应该使用最新的两个波峰连线。此时的压力线的斜率比之前的要小,显示后续走势会弱一些。

1
2
3
4
5
6
7
8
9
start = datetime.date(2022, 12, 1)
end = datetime.date(2023, 12,29)
barss = load_bars(start, end, ("300814.XSHE", ))

for key, df in barss.groupby("asset"):
    df = df.reset_index().set_index("date")
    flag, resist, support = triangle_flag(df, datetime.date(2023, 9,15))
    if flag != 0:
        show_trendline(key, df, resist, support, flag, width=800, height=600)

在9月7日新的波峰形成后,新的压力线在11月20日的值为48.5,当天的最高点为46.4,再次未破压力位。此后压力线需要重新计算。新的压力线的斜率进一步减小。形态也由此前的上升且发散三角形,转换为上升且收敛三角形,表明已经到了退出的时间。

1
2
3
4
5
6
7
8
9
start = datetime.date(2022, 12, 1)
end = datetime.date(2023, 12,29)
barss = load_bars(start, end, ("300814.XSHE", ))

for key, df in barss.groupby("asset"):
    df = df.reset_index().set_index("date")
    flag, resist, support = triangle_flag(df)
    if flag != 0:
        show_trendline(key, df, resist, support, flag, width=800, height=600)

上述压力线斜率的变化能够表明价格上升是打开了新的通道,还是预期在走弱,这对我们中短线操作很有帮助。

在通过机器学习构建策略时,我们可以把压力线和支撑线斜率的变化(\(\delta{Sr}\), \(\delta{Ss}\))、压力线和支撑线预测出来的值(\(P_{t+1}\), \(V_{t_1}\))等作为特征,那么,我们就可能更精确地预测未来走势。

DeepSeek只是挖了个坑,还不是掘墓人,但中初级程序员是爬不出来了

在我们的《因子分析与机器学习策略》课程中,提供了从2005年到2023年,长达18年的日线数据(共1100多万条记录)供学员进行因子挖掘与验证。最初,我们是通过functools中的lru_cache装饰器,将数据缓存到内存中的。这样一来,除了首次调用时时间会略长(比如,5秒左右)外,此后的调用都是毫秒级的。

问题的提出

但这样也带来一个问题,就是内存占用太大。一次因子分析课程可能会占用5G以上。由于Jupyterlab没有自动关闭idle kernel的能力(这一点在google Colab和kaggle中都有),我们的内存很快就不够用了。

我们的数据是以字典的方式组织,并保存在磁盘上的:

每支股票的键值是股票代码,对应值则是一个Numpy structured array。这样的数据结构看上去比较独特,不过我们稍后就能看到这样组织的原因。

在进行因子分析之前,用户可能会通过指定universe,以及起止时间来加载行情数据。所谓Universe,就是指一个股票池。用户可能有给定的证券列表,也可能只想指定universe的规模;起止时间用来切换观察的时间窗口,这可能是出于性能的考虑(最初进行程序调试时,只需要用一小段行情数据;调试完成后则需要用全部数据进行回测,或者分段观察)。

最终,它要返回一个DataFrame,以date和asset(即股票代码)为双重索引,包含了OHLC,volume等列,并且这些列要根据end进行前复权(这种复权方式称为动态前复权)。此外,还将包含一个amount列,这一列则无须复权。

因此,这个函数的签名是:

1
2
3
4
def load_bars(start_date:datetime.date, 
              end_date:datetime.date, 
              universe: Tuple[str]|int = 500)->pd.DataFrame:
    pass

学员的学习过程是阅读我们的notebook文档,并尝试单元格中的代码,也可能修改这些代码再运行。因此,这是一个交互式的操作,一般来说,只要用户的等待时间不超过3秒,都是可以接受的。如果响应速度低于1秒,则可以认为是理想的。

去掉缓存后,最初的一个实现的运行速度大致是5秒:

1
2
3
start = datetime.date(2023, 12,1)
end = datetime.date(2023, 12,31)
%time load_bars(start, end, 2000)

后面的测试将使用现样的参数。

当然,如果使用更大的universe,则时间还会加长。

由于这个结果超过了3秒,所以,希望能对代码进行一些优化。性能优化是编程中比较有难度的例子,因为它涉及到对程序运行原理的理解,涉及到对多个技术栈的掌握。在这个过程中我探索了Deep Seek R1的能力边界,可供大家参考。

最初的方案

最初的代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def load_bars_v1(
    start: datetime.date, end: datetime.date, universe: Tuple[str]|int = 500
)->pd.DataFrame:

    if barss is None:
        with open(os.path.join(data_home, "bars_1d_2005_2023.pkl"), "rb") as f:
            barss = pickle.load(f)

    keys = list(barss.keys())
    if isinstance(universe, int):
        if universe == -1:
            selected_keys = keys
        else:
            selected_keys = random.sample(keys, min(universe, len(keys)))
            try:
                pos = selected_keys.index("000001.XSHE")
                swp = selected_keys[0]
                selected_keys[0] = "000001.XSHE"
                selected_keys[pos] = swp
            except ValueError:
                selected_keys[0] = "000001.XSHE"

    else:
        selected_keys = universe

    dfs = []
    for symbol in selected_keys:
        qry = "frame >= @start & frame <= @end"
        df = pd.DataFrame(barss[symbol]).assign(asset=symbol).query(qry)

        if len(df) == 0:
            logger.debug("no bars for %s from %s to %s", symbol, start, end)
            continue
        # 前复权
        last = df.iloc[-1]["factor"]
        adjust_factor = df["factor"] / last
        adjust = ["open", "high", "low", "close", "volume"]
        df.loc[:, adjust] = df.loc[:, adjust].multiply(adjust_factor, axis="index")

        dfs.append(df)

    df = pd.concat(dfs, ignore_index=True)
    df.set_index(["frame", "asset"], inplace=True)
    df.index.names = ["date", "asset"]
    df.drop("factor", axis=1, inplace=True)
    df["price"] = df["open"].shift(-1)
    return df

代码已进行了相当的优化(其中部分也基于AI建议)。比如,将数据保存为字典,先按universe进行筛选,再拼接为dataframe,而不是将所有数据保存为dataframe,通过pandas来按universe进行筛选(将花费数倍时间)。

此外,在进行前复权时,它使用了multiply方法,从而可以一次对多个列进行前复权操作,这一建议正是AI给出来的。

但是,代码中还存在一个for loop,如果消除了这个循环,是否能进一步提升速度呢?

下面,我就使用Deep Seek R1进行尝试。

这是第一轮的prompt:

Quote

我有一个dataframe,以date和asset为联合索引,有open, high, low, close, volume, amount和factor等列。其中factor是复权因子。
现在,要对该数据结构实现以下功能:
1. 筛选出asset 在 selected_symbols列表中,date在[start, end]中的记录 2. 对这些记录,按asset进行分组,然后对 open, high, low, close, volume进行前复权。 3. 结果用dataframe返回,索引仍为date/asset,列为复权后的open, high,low, close, volume字段,以及未处理的amount。
请分别给出基于pandas, pyarrow, polars, duckdb的实现,并建议性能最优的一个。

Deep Seek的思考过程很长,这里略过了。我们直接看它的答案。磁盘读写对速度的影响并不大,所以,我们先把数据从磁盘中加载出来,保存到barss中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
with open(os.path.join(data_home, "bars_1d_2005_2023.pkl"), "rb") as f:
    data = pickle.load(f)

dfs = []
for symbol, bars in data.items():
    df = pd.DataFrame(bars)
    df["asset"] = symbol
    df = df.rename(columns={"frame": "date"})
    dfs.append(df)

barss = pd.concat(dfs,ignore_index=True).set_index(["date", "asset"]).sort_index()
print(len(barss))

# 定义一个一致的universe,便于比较
universe = barss.index.get_level_values("asset").unique().tolist()

以下的代码都出自于 Deep Seek。我们让它对最耗时的筛选和复权部分给出方案:

基于Pandas的实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def adjust_pandas(df, selected_symbols, start, end):
    # 筛选数据
    idx = pd.IndexSlice
    filtered = df.loc[idx[start:end, selected_symbols], :]

    # 按 asset 分组计算最后复权因子
    grouped = filtered.groupby('asset', group_keys=False)
    last_factor = grouped['factor'].transform('last')

    # 计算调整比例
    ratio = filtered['factor'] / last_factor

    # 计算复权后字段
    adjusted = filtered.assign(
        open   = filtered['open'] * ratio,
        high   = filtered['high'] * ratio,
        low    = filtered['low'] * ratio,
        close  = filtered['close'] * ratio,
        volume = filtered['volume'] * (last_factor / filtered['factor'])
    )

    # 选择最终列并保持索引
    return adjusted[['open', 'high', 'low', 'close', 'volume', 'amount']]

%time adjust_pandas(barss, universe, start, end)
adjust_pandas(barss, universe, start, end)

尽管我对Pandas很熟悉了,但仍有一些API是不知道的,比如transform。但运用正确的API,恰恰是 Python中提升性能的关键一招。

这个版本的平均运行时长是7秒。说明pandas的筛选确实很慢。

我们略过pyarrow的版本。pyarrow版本的运行时间大致是3.7秒左右。比原始版本只略有进步。这里也看出python 3.11中,for loop的运行速度已经很快了。

基于Polars的实现

这是它给出的polars的版本:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import polars as pl

def adjust_polars(df, selected_symbols, start, end):
    # 筛选数据
    filtered = df.filter(
        (pl.col("date").is_between(start, end)) &
        (pl.col("asset").is_in(selected_symbols))
    )

    # 计算最后复权因子和调整比例
    adjusted = filtered.with_columns(
        last_factor = pl.col("factor").last().over("asset")
    ).with_columns(
        ratio = pl.col("factor") / pl.col("last_factor"),
        volume_ratio = pl.col("last_factor") / pl.col("factor")
    ).with_columns(
        open   = pl.col("open") * pl.col("ratio"),
        high   = pl.col("high") * pl.col("ratio"),
        low    = pl.col("low") * pl.col("ratio"),
        close  = pl.col("close") * pl.col("ratio"),
        volume = pl.col("volume") * pl.col("volume_ratio")
    )

    # 选择最终列
    return adjusted.select(["date", "asset", "open", "high", "low", "close", "volume", "amount"])

adjust_polars(barss, universe, start, end)

之前我并不熟悉Polars,Deep Seek一下子扩展了我的能力边界。如果我要通过自学,然后来实现这部分功能,我不知道要多久才能发现.over这个API,因为这是 Polars 独有的一个窗口运算API,我无法把知识从我熟悉的Pandas中迁移过来。

但是,这个版本有一个错误:它没有遵从我给的数据格式,而是把barss当成了一个有asset和date列的polars DataFrame。通过使用通义灵码,修正了此错误,修正后的代码如下:

1
%time adjust_polars(pl.from_pandas(barss.reset_index()), universe, start, end)

得到的运行速度是1.01秒,已很接近我们要求的理想速度了。

基于DuckDB的实现

现在,我们来看它给出的duckdb的版本:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import duckdb

def adjust_duckdb(df, selected_symbols, start, end):
    query = f"""
    SELECT 
        date, asset,
        open * (factor / last_factor) AS open,
        high * (factor / last_factor) AS high,
        low * (factor / last_factor) AS low,
        close * (factor / last_factor) AS close,
        volume * (last_factor / factor) AS volume,
        amount
    FROM (
        SELECT 
            *,
            LAST_VALUE(factor) OVER (
                PARTITION BY asset 
                ORDER BY date 
                ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
            ) AS last_factor
        FROM df
        WHERE 
            asset IN ({','.join([f"'{s}'" for s in selected_symbols])})
            AND date BETWEEN '{start}' AND '{end}'
    )
    """
    return duckdb.query(query).to_df()

adjust_duckdb(barss, universe, start, end)

这里也会出现一个类似的错误,即查询语句需要df中存在asset, date列,但它们却出现在索引中。我们略作修正即可运行:

1
%time adjust_duckdb(barss.reset_index(), universe, start, end)

最终运行速度是1.21秒,在这个例子中略慢于polars,在所有方案中排在第二(在另一台机器,使用机械阵列硬盘时,更强的CPU时, duckdb更快)。但是,duckdb方案在数据规模上可能更有优势,即,如果数据集再大一到两个量级,它很可能超过polars。

在polars与duckdb中,需要的都是扁平结果的数据结构(即asset/date不作为索引,而是作为列字段存在),因此,我们可以考虑将数据结构进行重构,使用apache parquet格式写入到磁盘中,这样可以保存整个方案耗时仍保持在1秒左右。

终极咒语:急急如律令

Info

据说急急如律令要翻译成为 quickly, quickly, biu biu biu 😁

在前面,我们代替Deep Seek做了很多思考,是因为担心它对代码的最终执行速度没有sense。现在,我们试一下,直接抛出最终问题,看看会如何:

Quote

我有一个dataframe,以date和asset为联合索引,有open, high, low, close, volume, amount和factor等列。其中factor是复权因子。

现在,要对该数据结构实现以下功能:

  1. 筛选出asset 在 selected_symbols列表中,date在[start, end]中的记录
  2. 对这些记录,按asset进行分组,然后对 open, high, low, close, volume进行前复权。
  3. 结果用dataframe返回,索引仍为date/asset,列为复权后的open, high,low, close, volume字段,以及未处理的amount。

输入数据是1000万条以上,时间跨度是2005年到2023年,到2023年底,大约有5000支股票。输出结果将包含2000支股票的2005年到2023年的数据。请给出基于python,能在1秒左右实现上述功能的方案。

这一次,我们只要求技术方案限定在Python领域内,给了Deep Seek极大的发挥空间。

Deep Seek不仅给出了代码,还给出了『评测报告』,认为它给出的方案,能在某个CPU+内存组合上达到我们要求的速度。

Deep Seek认为,对于千万条记录级别的数据集,必须使用像parallel pandas这样的库来进行并行化才能达成目标。事实上这个认知是错误的

这一次Deep Seek给出的代码可运行度不高,我们没法验证基于并行化之后,速度是不是真的更快了。不过,令人印象深刻的是,它还给出了一个performance benchmark。这是它自己GAN出来的,还是真有人做过类似的测试,或者是从类似的规模推导出来的,就不得而知了。

重要的是,在给了Deek Seek更大的自由发挥空间之后,它找出了之前在筛选时,性能糟糕的重要原因: asset是字符串类型!

在海量记录中进行字符串搜索是相当慢的。在pandas中,我们可以将整数转换为category类型,此后的筛选就快很多了:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import pyarrow as pa
import pyarrow.parquet as pq

data_home = os.path.expanduser(data_home)
origin_data_file = os.path.join(data_home, "bars_1d_2005_2023.pkl")
with open(origin_data_file, 'rb') as f:
    data = pickle.load(f)

dfs = []
for symbol, bars in data.items():
    df = pd.DataFrame(bars)
    df["asset"] = symbol
    df = df.rename(columns={"frame": "date"})
    dfs.append(df)

barss = pd.concat(dfs,ignore_index=True)
barss['asset'] = barss['asset'].astype('category')
print(len(barss))

table = pa.Table.from_pandas(barss)

parquet_file_path = "/tmp/bars_1d_2005_2023_category.parquet"

with open(parquet_file_path, 'wb') as f:
    pq.write_table(table, f)

现在,我们再来看polars或者duckdb的方案的速度:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import polars as pl

def adjust_polars(df, selected_symbols, start, end):
    # 筛选数据
    filtered = df.filter(
        (pl.col("date").is_between(start, end)) &
        (pl.col("asset").is_in(selected_symbols))
    )

    # 计算最后复权因子和调整比例
    adjusted = filtered.with_columns(
        last_factor = pl.col("factor").last().over("asset")
    ).with_columns(
        ratio = pl.col("factor") / pl.col("last_factor"),
        volume_ratio = pl.col("last_factor") / pl.col("factor")
    ).with_columns(
        open   = pl.col("open") * pl.col("ratio"),
        high   = pl.col("high") * pl.col("ratio"),
        low    = pl.col("low") * pl.col("ratio"),
        close  = pl.col("close") * pl.col("ratio"),
        volume = pl.col("volume") * pl.col("volume_ratio")
    )

    # 选择最终列
    return adjusted.select([pl.col("date"), pl.col("asset"), pl.col("open"), pl.col("high"), pl.col("low"), pl.col("close"), pl.col("volume"), pl.col("amount")])

# 示例调用
start = datetime.date(2005, 1, 1)
end = datetime.date(2023, 12, 31)

barss = pl.read_parquet("/tmp/bars_1d_2005_2023_category.parquet")

universe = random.sample(barss['asset'].unique().to_list(), 2000)

%time adjust_polars(barss, universe, start, end)

结果是只需要91ms,令人印象深刻。duckdb的方案需要390ms,可能是因为我们需要在Python域拼接大量的selected_symbols字符串的原因。

借助 Deep Seek,我们把一个需要5秒左右的操作,加速到了0.1秒,速度提升了50倍。

本文测试都在一台mac m1机器上运行,RAM是16GB。当运行在其它机器上,因CPU,RAM及硬盘类型不同,数据表现甚至排名都会有所不同_。

结论

这次探索中,仅从解决问题的能力上看,Deep Seek、通义和豆包都相当于中级程序员,即能够较好地完成一个小模块的功能性需求,它情绪稳定,细微之处的代码质量更高。

当我们直接要求给出某个数据集下,能达到指定响应速度的Python方案时,Deep Seek有点用力过猛。从结果上看,如果我们通过单机、单线程就能达到91ms左右的响应速度,那么它给出的多进程方案,很可能是要劣于这个结果的。Deep Seek只是遵循了常见的优化思路,但它没有通过实际测试来修正自己的方案。

这说明,它们还无法完全替代人类程序员,特别是高级程序员:对于AI给出的结果,我们仍然需要验证、优化甚至是推动AI向前进,而这刚好是高级程序员才能做到的事情。

但这也仅仅是因为AI还不能四处走动的原因。因为这个原因,它不能像人类一样,知道自己有哪些测试环境可供方案验证,从而找出具体环境下的最优方案。

在铁皮机箱以内,它是森林之王,人类无法与之较量。但就像人不能拔着自己的头发离开地球一样,它的能力,也暂时被封印在铁皮机箱之内。但是,一旦它学会了拔插头,开电源,高级程序员的职业终点就不再是35岁,而是AI获得自己的莲花肉身之时。

至于初中级程序员,目前看是真不需要了。1万元的底薪,加上社保,这能买多少token? 2025年的毕业生,怎么办?

用HDBSCAN聚类算法选股是否有效

前篇文章提到可以用HDBSCAN算法来来对资产进行聚类,在聚类完成之后,对聚类结果进行协整检验,通过计算对冲比,可以构造成平稳序列。我们知道一个平稳时间序列的均值、方差恒定并且有自协相关特性,那么,一旦它偏离了均值,迟早都会回归到均值上。利用这一点,可以生成交易信号。

那怎样证明HDBSCN算法对寻到协整对的交易策略是有效的呢?下面我们来一步步分析。

首先,从前面的文章我们知道了HDBSCN算法是一种基于密度的聚类算法,它通过计算每个样本的密度来确定样本的聚类类别。HDBSCN算法的优点是它可以自动确定聚类中心,并且可以处理高维数据。下面是用python代码来实现HDBSCN算法的关键代码,获取历史数据的时间是从2022年1月1日至2023年12月31日。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
start_date = datetime.date(2022, 1, 1)
end_date = datetime.date(2023,12,31)
barss = load_bars(start_date, end_date, 2000)   #获取历史资产数据,这里选取2000条数据
closes = barss["close"].unstack().ffill().dropna(axis=1, how='any') #处理缺失值,将close列的MultiIndex转换为DataFrame二维表格,并使用ffill()方法填充缺失值。
clusterer = hdbscan.HDBSCAN(min_cluster_size=3, min_samples=2)# 使用 HDBSCAN 进行聚类,python可以直接安装hdbscan包
cluster_labels = clusterer.fit_predict(closes.T)  #转置是因为要对资产(特征)聚类

clustered = closes.T.copy()
clustered['cluster'] = cluster_labels# 将聚类结果添加到 DataFrame 中

clustered = clustered[clustered['cluster'] != -1] # 剔除类别为-1的点,这些是噪声,而不是一个类别
clustered_close = clustered.drop("cluster", axis=1)

unique_clusters = set(cluster_labels)
num_clusters = len(unique_clusters) # 获取有效的簇数量
print(f"有效的簇数量为:{num_clusters}")  

tsne = TSNE(n_components=3, random_state=42)
tsne_results = tsne.fit_transform(clustered_close)  # 使用t-SNE进行降维,便于后面的簇类可视化
reduced_tsne = pd.DataFrame(data=tsne_results, columns=['tsne_1', 'tsne_2', 'tsne_3'], index=clustered_close.index)# 将t-SNE结果添加到DataFrame中
reduced_tsne['cluster'] = clustered['cluster']

fig_tsne = px.scatter_3d(
    reduced_tsne, 
    x='tsne_1', y='tsne_2', z='tsne_3',
    color='cluster', 
    title='t-SNE Clustering of Stock Returns',
    labels={'tsne_1': 't-SNE Component 1', 'tsne_2': 't-SNE Component 2'}
)  #进行3D散点图可视化
fig_tsne.layout.width = 1200
fig_tsne.layout.height = 1100
fig_tsne.show()
导入必要的库,运行后可以得到下面的3D图:

3D图展示的是股票的3D空间分布,不同颜色代表不同的聚类类别,从图中可以观察到,这期间的2000支股票被分成了40多个类,除了一个包含420支股票的第39簇,其它簇都少于20支股票。下面分析第35簇,里面有3支股票,从2022年10月1日左右三支股票都是持续上升的。 那是否可以继续用HDBSCAN聚类算法对2022年10月份之前的数据进行聚类,看上面的3支股票是否也被分在了一起。还是上面的代码,只是将时间改为了2022年1月1日到2022年10月1日,运行后得到下面的3D图:

和上面的3D图相比,分为一类的股票(特征)更相似,所以聚集的更密集。那上面第35簇中的三支股票是否还是会被分为一类呢?可以将每一簇都都可视化展示出来(和前面文章一样的方法),观察那三支股票在这次分类的那一簇中。通过观察,我们发现这三支股票是被分在了一类,但是这一类里面有大概600支股票,我们看看这600多支股票的可视化结果: 我们想要知道的是这三支股票在持续上升之前会有什么特征,虽然这一个大类里面的股票数据有点多,但观察上面的趋势图可以知道它们的走势大致是相同的,所以才会被分为一类。那继续将这600多支股票在用HDBSCA进行聚类,之前的3支股票会不会被分为一类?也就是说需要验证3支股票在持续上升前的大致趋势也是一致。感兴趣的读者可以自己尝试一下,下面是小编验证的结论: 对这600支股票重新在2022年1月到2022年10月进行聚类,总共被分成了三类,之前的那三支股票有2支被分在了一簇,另外一支在另一类里面。这个结果我认为足以说明这600支股票在2022年1月到2022年10月之间的趋势是相似的,所以600多支股票只被分为三类。 所以,HDBSCAN算法对选股是有效的,它选出来的一类趋势类似,所以可以用前一篇内容中构造这一类股票的平稳序列来产生交易信号。

hdbscan 聚类算法扫描配对交易 速度提升99倍

配对交易是一种交易策略,由摩根士丹利的量化分析师农齐奥.塔尔塔里亚在 20 世纪 80 年代首创。他现在是意大利雅典娜资产管理公司的创始人。该策略涉及监控两只历史上相关性较强的证券,并监控两者之间的价格差。一旦价格差超出一个阈值(比如 n 倍的标准差),价格回归就是大概率事件,于是交易者就做空价格高的一支,做多价格低的一支,从而获利。

睽违17年,ta-lib重装出发!

在你看到这篇文章时,2024年已经余额不足,而新的一年,正在等待我们冲刺。新的一年,作为量化人,你将在新年里收到哪些礼物呢?

先来晒一下我个人收到的礼物吧。昨天一早,收到了孙乐总赠送的《山河独憔悴》,并且很贴心地为我题了字。孙乐总是民主党派人士,江苏省收藏家协会理事和勋奖章收藏专业委员会副秘书长、美国钱币学会会员。

在书序中有这样一段话:

知识的本质被认为是『看』世界,是去看出事物的本质和真相。这本书则是关于换一个角度去理解历史,把『什么看成什么』和把『什么不看成什么』,将这种观看变成一种哲学行为。我想,我们做量化,归根结底也是要从数据的表现中跳出来,把『什么看成什么』这是找到规律,把『什么不看成什么』,这是去伪存真,过滤噪声。

这里也有一点小花絮。这本书作者倾向的名字是《江山独自憔悴》。我不懂声律,但也觉得这个标题更响亮和更有韵律感。不过,书中搜集的百余张精美的图片(铜版纸彩印)就在这里,并没有隐去,也没有打码,它们就是曾经的世界。期待这本书尽快在京东和当当上上架。

这个新年,你会收到什么样的礼物呢?不过,作为量化人,我们所有人都收到了一份重磅礼物,ta-lib的c库更新了!hV3*ifARjC@cp8O!

ta-lib重装出发!

就在新年前几天,ta-lib悄没声地更新了0.6.1版本。而上一个版本,还是17年之前发布的0.4.0.

这么久没有更新的原因是,原作者Mario Fortier一直希望找到一位更年轻的开发者来维护这个库,他本人觉得自己对更现代的C++语言特征,尤其是跨平台编译这一块有些陌生。尽管实际上他是一名非常资深的C++网络开发者,并且发明了在3G通信中使用的实时数据压缩算法。不过,他最近的工作已转向了Python,并且创办了自己的公司(从事区块链和网络软件开发)。

不过久久没能找到接班人,Mario决定继续干下去。于是,在12月23日,他发布了0.6.1版本。这个版本没有增加新的功能,主要是解决编译和自动化工具相关的问题。之前安装ta-lib的c库对初学者而言并非坦途,特别是在Windows下:要么接受恶意软件的风险,要么自己从下载好几个G的Visual studio编译器开始。这也是为什么《量化24课》需要讲解Ta-lib安装问题的原因之一。

0.6.1一发布就收到热烈反馈--包括bug report,于是,在三天后, Mario又发布了0.6.2这个版本 -- 得益于在0.6.1上所做的工具,我们看到新版的ta-lib的可维护性大大增强,以致于可以在3天之内发布新的版本 -- 这包括引入了Github Actions使得整个打包和发布工作自动化。

现在,在windows下安装ta-lib变得轻而易举:

不过,它的python-wrapper尽管也为0.6.1进行了更新,但是,没能通过我们的安装测试 -- 在安装python-talib(通过pip install TA-Lib)时,仍然提示需要有vsc++ build tools 14。相信这个问题能很快解决。

在mac下,最新的ta-lib安装非常简单,只需要执行:

1
brew install ta-lib

如果之前已经安装了ta-lib的旧版本,它会提示我们此次安装将会更新。

在debian系列的Linux(即Ubuntu, Mint)下,安装也很容易,下载后缀为*.deb的安装包,再执行命令:

1
sudo dpkg -i ta-lib_0.6.0_*.deb

即可。在Linux下支持的cpu架构包括了386, amd64架构和arm64架构,*号就是用来匹配这个架构的。对其它系列的Linux,则仍然要从源码构建,不过,在Linux下进行构建非常容易。

ta-lib的周边

ta-lib最重要的周边应该是github上的ta-lib-python这个库了。它集得了接近1万的star,这个级别的star数本来应该是被AI项目占据的 -- 这也充分说明近年来量化金融的受众正在迅速扩大。

ta-lib-python也迅速响应了ta-lib的更新,最新发布的0.5.2,已经适配了ta-lib的0.6.1版本。根据我们的测试,在mac上整个安装过程非常丝滑,先安装ta-lib的c库,再安装ta-lib-python不会出任何错误。但是在Windows下,即使已经通过msi安装了ta-lib的c库,但它的python-wrapper似乎仍然无法找到已经安装的c库和头文件,因此,仍然需要本地构建ta-lib的c库。

在等待ta-lib c库更新的日子里,ta-lib-python也并不有闲着。它先是完成了通过cython,而不是swig来绑定c库这一转换,据称带来了2~4倍的性能提升。更激动人心的是,Python 3.13放出了GIL-Free模式之后,作者正在尝试这个GIL-Free版本。一旦Cython 3.1正式发布(ta-lib-python依赖于这个版本),很有可能ta-lib-python就会立即支持GIL-Free。

另一个重要的周边是polars-talib,也正在积极开发中,尽管目前还只是0.1.4版本。它是polars的一个扩展,根据测试,计算速度比在pandas中使用talib(ta-lib-python)快了100多倍。

除此之外,一个用rust实现的ta-lib也正在开发中。

美好的事情正在发生,就在这个新年将来到来的时刻。

下一站

ta-lib的下一站更精彩。在扫除了构建和自动化障碍之后,ta-lib将可以以更快的速度、更小的精力来发布新的功能。长期以来,ta-lib缺失了一些深受大家喜爱的技术指标,比如KDJ, PVT, TMO等等。社区已经提出了14个待实现的指标,其中有两个,即RMA和PVI已经排上日程。

Tip

如果你正在发愁挖不到新的因子,建议自己先实现RMA和PVI。毕竟这两个指标优先被排上日程是有原因的。当然,我也很期待Connors RSI和Awesome Oscillator指标的实现。

摇人!

如果你喜欢Quantide公众号的文章,也想和我们一起研究量化,学习、开发、分享,欢迎加入匡醍!我们欢迎 the crazy ones, the misfits,the rebels!

草丛后面藏着什么?雄狮少年2

如果模型预测准确率超过85%,这台印钞机应该值多少马内?

在第13课中,拿决策树介绍了机器学习的原理之后,有的学员已经积极开始思考,之前学习了那么多因子,但都是单因子模型,可否使用决策树模型把这些因子整合到一个策略里呢?

在与学员交流之后,我已经把思路进行了分享。这也是我们第13课的习题,我把参考答案跟大家分享一下。

预告一下,我们训练出来的模型,在验证集和测试集上,三个类别的f1-score都取得到85%左右的惊人分数!

这个模型是如何构建的呢?

特征数据

首先是选定特征,我们选择了以下几个特征,都是我们在课程中介绍过、已经提供了源码的:

  1. 一阶导因子。这个因子能反映个股一段时间内的趋势
  2. 二阶导因子。这个因子能较好地预测短期趋势变盘。
  3. RSI因子。经典技术因子,能在一定程度上预测顶底。
  4. weekday因子。主要增加一个与上述量价信息完全无关的新的维度。

这些因子的实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
def d2_factor(df, win: int = 2)->pd.Series:
    close = df.close/df.close[0]
    d1 = close.diff()
    d2 = d1.diff()
    factor = d2.rolling(win).mean()

    return factor

def rsi_factor(df, win: int = 6)->pd.Series:
    return ta.RSI(df.close, win)


def d1_factor(df, win: int = 20)->pd.Series:
    df["log"] = np.log(df.close)
    df["diff"] = df["log"].diff()
    return df["diff"].rolling(win).mean() * -1

def week_day(dt: pd.DatetimeIndex)->pd.Series:
    return dt.weekday

训练数据使用了2018年以来,到2023年底,共6年的数据,随机抽取2000支个股作为样本池。

1
2
3
4
5
start = datetime.date(2023, 1, 1)
end = datetime.date(2023, 12, 29)

np.random.seed(78)
barss = load_bars(start, end, 2000)

上述因子中,前三个因子都是时序因子。但我们挖掘这些因子时,都是通过alphalens进行挖掘的,alphalens通过分层对其进行了排序,所以,我们还要对上述因子按横截面进行排序,这样得到一些rank_*因子。这个方法相法于进行特征增广(Feature Augmentation)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
features = {
}

def rank_feature(feature):
    return feature.groupby(level=0).rank(pct=True)

for name, func in [("d2", d2_factor),
                   ("rsi", rsi_factor),
                   ("d1", d1_factor)]:
    feature = barss.groupby(level=1).apply(d2_factor).droplevel(0)
    features[name] = feature
    features[f"rank_{name}"] = rank_feature(feature)

# 计算rank
features = pd.DataFrame(features)
features['weekday'] = week_day(features.index.get_level_values(0))

for win in (1, 5, 10):
    features[f"ret_{win}"] = barss.groupby(level=1)["price"].pct_change(win)

features.dropna(how='any', inplace=True)
features.tail()

输出得到的是特征数据集。

其中的"ret_1", "ret_5", "ret_10"是每列对应的未来1,5和10日的收益率。这里延续了之前的做法,对每一个时间点T0,按T1开盘价买入,T2开盘价卖出的规则计算收益率。在训练模型时,一次只使用其中的一列。

回归还是分类?

现在,面临的选择是,如何将上述数据转化为可训练的数据。这里有三个问题:

  1. 如何选择模型?即使我们确定要使用决策树,也还有一个问题,是要使用回归还是分类?
  2. 如何划分训练集和测试集?这是所有问题中看起来最简单的一步。
  3. 我们能把不同样本的数据同时作为训练的输入吗?这个问题看起来有点费思量。

从我们选定的特征来看,要让模型执行回归任务是有点太扯了,做分类更有道理,但也要进行一些转换。我们推荐的方案是使用分类模型,测试集划分先用sklearn自带的train_test_split来完成。

Tip

为什么说从选定的特征来看,不能让模型执行回归任务?把机器学习运用到量化交易,决不是照着模型的文档来套用这么简单,你必须深谙各个模型的原理,以及领域知识如何适配到模型。这些原理,在《因子分析与机器学习策略》中有讲解,老师还提供一对一辅导。

按分类模型进行训练之前,我们还要把ret_1转换成分类标签。转换方法如下:

1
2
3
4
5
6
7
from sklearn.model_selection import train_test_split

X = features.filter(regex='^(?!ret_).*$')
y = np.select((features["ret_1"] > 0.01, 
               features["ret_1"] < -0.01), 
              (1, 2), default=0)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

我们把涨幅大于1%的标注为类别『1』,跌幅大于1%的标为类别『2』,其余的打上标签『0』。标签0也可以认为是无法归类。

通过下面的代码进行训练:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score, classification_report

# 初始化并训练模型
clf = DecisionTreeClassifier(random_state=42)
clf.fit(X_train, y_train)

# 预测并评估模型
y_pred = clf.predict(X_test)

print("Accuracy:", accuracy_score(y_test, y_pred))
print("Classification Report:\n", classification_report(y_test, y_pred))

运行结果让人极度舒适。因为在一个三分类问题中,随机盲猜的准确率是33%,我们首次运行的结果就已经遥遥领先于随机盲猜了。

不过,三类目标的support数量并不均衡,这还是让我们有一点点担忧:目标0的分类一家独大,会不会导致准确率虚高?

再平衡!欠采样后,报告会不会很难看?

让我们先解决这个问题。我们可以采用一种名为欠采样(undersampling)的方法,分别抽取三类样本数中最小个数的样本数进行训练,这样一来,模型看到的训练数据就相对平衡了。

尽管undersampling实现起来很简单,不过,我们还有更简单的方法,就是使用imbalance库:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from imblearn.under_sampling import RandomUnderSampler

rus = RandomUnderSampler(random_state=42)
X_resampled, y_resampled = rus.fit_resample(X, y)

X_train_resampled, X_test_resampled, y_train_resampled, y_test_resampled = train_test_split(
    X_resampled, y_resampled, test_size=0.2, random_state=42
)

# 训练模型
clf_resampled = DecisionTreeClassifier(random_state=42)
clf_resampled.fit(X_train_resampled, y_train_resampled)

# 预测并评估模型
y_pred_resampled = clf_resampled.predict(X_test_resampled)

print("Resampled Accuracy:", accuracy_score(y_test_resampled, y_pred_resampled))
print("Resampled Classification Report:\n", classification_report(y_test_resampled, y_pred_resampled))

经过类别平衡后,准确率基本不变,但在我们关注的类别1和类别2上,它的准召率还分别提升了8%和3%

Tip

imblearn还提供另一种平衡数据的方法,称为SMOTE(Synthetic Minority Over-sampling Technique),它通过复制或者合成少数类样本来平衡数据集。不过,合成数据这事儿,对金融数据来说,可能不太靠谱,我们宁可严格一点,使用欠采样。如果我们使用SMOTE方法进行过采样,训练后的模型在类别1和类别2上能达到60%和57%的f1-score。

为什么这么优秀?

我们刚刚得到的结果无疑是超出预期的优秀!

在欠采样平衡的版本中,如果模型预测次日买入会盈利,它的精确率是54%,也即在100次预测为盈利的情况下,有54次是正确的;而在余下的不正确的46次中,也只有18次是亏损,剩下的则是涨跌在1%以内的情况

这个分析数据可以通过confusion matrix得到:

1
2
3
4
5
6
7
8
9
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt

cm = confusion_matrix(y_test_resampled, y_pred_resampled)

disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=clf_resampled.classes_)
disp.plot(cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.show()

在上图中,预测分类为1的样本中,有15%(3556个)应该是下跌的样本,有27%(6374)个则是未知(或者平盘)的样本。

实际上,这是57:15的盈亏比例,或者说,已经达到了79%的精确率。这有多厉害呢?在索普的赌局中,他计算出赢的概率只要超过50%就会下注。因为只要胜得的天平偏向自己一点点,通过不断的累积中,高频交易最终会积累可观的利润!

这可能跟你在别处看到的模型很不一样。你可能从来没有想到过,机器学习模型在预测中竟然能如此有效。你可能开始寻找一些说辞,模型泛化肯定不好,这些数据只能代表过去等等。但其实这还只是前戏,高潮还在后面。

这个模型在构建过程中,固然还有一些可以探讨的地方,但它确实够好,原因是:

  1. 我们提供了高质量的特征数据。我们没有盲目地追求特征数量。得益于我们在《因子分析与机器学习策略》课程前半部分的深入学习,我们很清楚,什么样的特征能够在一个模型中相融共洽,我们甚至能猜到每个特征在什么节点下产生什么样的贡献。
  2. 我们知道特征能产生什么样的结果,于是使用了正确的任务模型(分类),并且使用了一定的构建技巧(gap)。

Tip

如果你想预测次日的价格呢?这不一定无法做到,在课程练习中,我们就给出过一个示例,在某种情况下能够预测价格可以达到的最高点。随着这些特征的增加,就会有更多的场景被覆盖。换句话说,要预测价格,你得提取跟预测价格相关的特征。

再思考:训练集与测试集的划分

将训练数据划分为训练集与测试集是机器学习中的一个非常基础的步骤。像很多其它教程一样,这里我们使用的是train_test_split函数。它简单好用,但并不适合量化交易

它会随机抽取训练数据和测试数据,这样就撕裂开了数据之间的天然联系,甚至可能导致测试集数据早于训练集数据,从面可能产生未来数据的情况。

正确的做法是按时间顺序划分。在sklearn中还提供了名为TimeSeriesSplit的类,它能够为交叉验证提供正确训练、测试集划分。

你也可以使用我们之前介绍过的tsfresh库中的类似方法来完成这个任务。

不过,在这里,我们还用不上交叉验证,所以,我们手动将数据划分为训练集、验证集和测试集。

Tip

决策树并不支持增量学习。也就是说,你不能拿已经训练好的模型,在此基础上通过新的数据进行新的训练。在这种情况下,如果数据量不足,可能就不适合交叉验证。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from sklearn.tree import DecisionTreeClassifier

X = features.filter(regex='^(?!ret_).*$')
y = np.select((features["ret_1"] > 0.01, 
               features["ret_1"] < -0.01), 
              (1, 2), default=0)

n = len(X)
X_train = X_train[:int(n*0.7)]
y_train = y_train[:int(n*0.7)]

X_validation = X[int(n*0.7):int(n*0.9)]
y_validation = y[int(n*0.7):int(n*0.9)]

X_test = X[int(n*0.9):]
y_test = y[int(n*0.9):]


clf = DecisionTreeClassifier(random_state=42)
clf.fit(X_train, y_train)

y_pred = clf.predict(X_validation)

from sklearn.metrics import accuracy_score, classification_report

print("Accuracy:", accuracy_score(y_validation, y_pred))
print("Classification Report:\n", classification_report(y_validation, y_pred))

WorldQuant? Word Count!

如果你去商场逛,你会发现,销量最好的店和最好的商品总是占据人气中心。对股票来说也是一样,被新闻和社交媒体频频提起的个股,往往更容易获得更大的成交量。

如果一支个股获得了人气,那它的成交量一定很大,两者之间有强相关关系。但是,成交量落后于人气指标。当一支个股成交量开始放量时,有可能已经追不上了(涨停)。如果我们能提前发现人气指标,就有可能获得提前介入的时机。

那么具体怎么操作(建模)呢?

我们首先讲解一点信息检索的知识,然后介绍如何运用统计学和信息检索的知识,来把上述问题模型化。

TF-IDF

TF 是 Term Frequency 的意思。它是对一篇文档中,某个词语共出现了多少次的一个统计。IDF 则是 Inverse Document Frequency 的意思,大致来说,如果一个词在越多的文档中出现,那么,它携带的信息量就越少。

比如,我们几乎每句话都会用到『的、地、得』,这样的词几乎在每个语境(文档)中都会出现,因此就不携带信息量。新闻业常讲的一句话,狗咬人不是新闻,人咬狗才是新闻,本质上也是前者太常出现,所以就不携带信息量了。

最早发明 TF-IDF 的人应该是康奈尔大学的杰拉德·索尔顿(康奈尔大学的计算机很强)和英国的计算机科学家卡伦·琼斯。到今天,美国计算机协会(ACM)还会每三年颁发一次杰拉德·索尔顿奖,以表彰信息检索领域的突出贡献者。

TF-IDF 的构建过程如下:

假如我们有 3 篇文档,依次是:

  1. 苹果 橙子 香蕉
  2. 苹果 香蕉 香蕉
  3. 橙子 香蕉 梨

看上去不像文档,不过这确实是文档的最简化的形式--就是一堆词的组合(在 TF-IDF 时代,还无法处理词的顺序)。在第 1 篇文档中,橙子、香蕉和苹果词各出现 1 次,每个词的 TF 都记为 1,我们得到:

1
2
3
4
5
TF_1 = {
    '苹果': 1/3,
    '香蕉': 1/3,
    '橙子': 1/3,
}

在第二篇文档中,苹果出现 1 次,香蕉出现 2 次,橙子和梨都没有出现。于是得到:

1
2
3
4
TF_2 = {
    '苹果': 1/3,
    '香蕉': 2/3,
}

第三篇文档中,TF 的计算依次类推。

IDF 实际上是每个词的信息权重,它的计算按以下公式进行:

\[ \text{IDF}(t) = \log \left( \frac{N + 1}{1 + \text{DF}(t)} \right) + 1 \]
  1. DF:每个词在多少篇文档中出现了。
  2. N 是文档总数,在这里共有 3 篇文档,所以\(N=3\)
  3. 公式中,分子与分母都额外加 1,一方面是为了避免 0 作为分母,因为\(DF(t)\)总是正的,另外也是一种 L1 正则化。这是 sklearn 中的做法。

这样我们可以算出所有词的 IDF:

\[ 苹果 = 橙子 = \log \left( \frac{4}{2+1} \right) + 1 = 1.2876 \]
\[ 梨 = \log \left( \frac{4}{1+1} \right) + 1 = 1.6931 \]

因为梨这个词很少出现,所以,一旦它出现,就是人咬狗事件,所以它的信息权重就大。而香蕉则是一个烂大街的词,在 3 篇文档中都有出现过,所有我们惩罚它,让它的信息权重为负:

\[ 香蕉 = \log \left( \frac{4}{3+1} \right) + 1 = 1 \]

最后,我们得到每个词的 TF-IDF:

\[ TF-IDF=TF\times{IDF} \]

这样我们以所有可能的词为列,每篇文档中,出现的词的 TF-IDF 为值,就得到如下稀疏矩阵:

苹果 香蕉 橙子
文档 1 1.2876/3 1/3 1.2876/3 0
文档 2 1.2876/3 1/3 * 2 0 0
文档 3 0 1/3 1.2876/3 1.6931/3

在 sklearn 中,最后还会对每个词的 TF-IDF 进行 L2 归一化。这里就不手动计算了。

我们把每一行称为文档的向量,它代表了文档的特征。如果两篇文档的向量完全相同,那么它们很可能实际上是同一篇文章(近似。因为,即使完全使用同样的词和词频,也可以写出不同的文章。

比如,『可以清心也』这几个字,可以排列成『以清心也可』,也可以排列成『心也可以清』,或者『清心也可以』,都是语句通顺的文章。

插播一则招人启事,这是我司新办公地:

新场子肯定缺人。但这个地方还在注册中,所以提前发招聘信息,算是粉丝福利。

Info

急招课程助理(武汉高校,三个月以上实习生可)若干人。课程助理要求有一定的量化基础,能编辑一些量化方向的文章,热爱学习,有自媒体经验更好。

在实际应用中,我们可以使用 sklearn 的 TfidfVectorizer 来实现 TF-IDF 的计算:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import pandas as pd
def jieba_tokenizer(text):
    return list(jieba.cut(text))

d1 = "苹果橙子香蕉"
d2 = "苹果香蕉香蕉"
d3 = "橙子香蕉梨"

vectorizer = TfidfVectorizer(tokenizer = jieba_tokenizer)
matrix = vectorizer.fit_transform([d1, d2, d3])

df = pd.DataFrame(matrix.toarray(), columns = vectorizer.get_feature_names_out())
df
橙子 苹果 香蕉
0 0.000000 0.619805 0.619805 0.481334
1 0.000000 0.000000 0.541343 0.840802
2 0.720333 0.547832 0.000000 0.425441

结果与我们手工计算的有所不同,是因为我们在手工计算时,省略了计算量相对较大的L2归一化。

从上面的例子可以看出,TF-IDF 是用来提取文章的特征向量的。有了这个特征向量,就可以通过计算余弦距离,来比较两篇文档是否相似。这可以用在论文查重、信息检索、比较文学和像今日头条这样的图文推荐应用上。

比如,要证明曹雪芹只写了《红楼梦》的前87回,就可以把前87回和后面的文本分别计算TF-IDF,然后计算余弦距离,此时就能看出差别了。

又比如,如果用TF-IDF分析琼瑶的作品,你会发现,如果去掉一些最重要的名词之后,许多她的文章的相似度会比较高。下面是对《还珠格格》分析后,得到的最重要的词汇及其TF-IDF:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
紫薇: 0.0876
皇帝: 0.0754
尔康: 0.0692
皇后: 0.0621
五阿哥: 0.0589
容嬷嬷: 0.0573
小燕子: 0.0556
四阿哥: 0.0548
福晋: 0.0532
金锁: 0.0519

跟你的印象是否一致?但是,TF-IDF的分析方法,在量化交易中有何作用,目前还没有例证。

讲到这里,关于 TF-IDF 在量化交易中的作用,基本上就讲完了。因为,接下来,我们要跳出 TF-IDF 的窠臼,自己构建因子了!

word-count 因子

根据 TF-IDF 的思想,这里提出一个 word-count 因子。它的构建方法是,通过 tushare 获取每天的新闻信息,用 jieba 进行分词,统计每天上市公司名称出现的次数。这是 TF 部分。

在 IDF 构建部分,我们做法与经典方法不一样,但更简单、更适合量化场景。这个方法就是,我们取每个词 TF 的移动平均做为 IDF。这个IDF就构成了每个词的基准噪声,一旦某天某个词的频率显著大于基准噪声,就说明该公司上新闻了!

最后,我们把当天某个词的出现频率除以它的移动平均的读数作为因子。显然,这个数值越大,它携带的信息量也越大,表明该词(也就是该公司)最近在新闻上被频频提起。

获取新闻文本数据

我们可以通过 tushare 的 news 接口来获取新闻。 这个方法是:

1
2
3
4
news = pro.news(src='sina', 
                date=start,
                end_date=end,
)

我们把获取的新闻数据先保存到本地,以免后面还可能进行其它挖掘:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
请在本地尝试以下代码,不要在Quantide Reseach环境中运行!!

```python
def retry_fetch(start, end, offset):
    i = 1
    while True:
        try:
            df =pro.news(**{
                "start_date": start,
                "end_date": end,
                "src": "sina",
                "limit": 1000,
                "offset": offset
            }, fields=[
                "datetime",
                "content",
                "title",
                "channels",
                "score"])
            return df
        except Exception as e:
            print(f"fetch_new failed, retry after {i} hours")
            time.sleep(i * 3600)
            i = min(i*2, 10)

def fetch_news(start, end):
    for i in range(1000):
        offset = i * 1000
        df = retry_fetch(start, end, offset)

        df_start = arrow.get(df.iloc[0]["datetime"]).format("YYYYMMDD_HHmmss")
        df_end = arrow.get(df.iloc[-1]["datetime"]).format("YYYYMMDD_HHmmss")
        df.to_csv(os.path.join(data_home, f"{df_start}_{df_end}.news.csv"))
        if len(df) == 0:
            break

        # tushare 对新闻接口调用次数及单次返回的新闻条数都有限制
        time.sleep(3.5 * 60)
```

在统计新闻中上市公司出现的词频时,我们需要先给 jieba 增加自定义词典,以免出现分词错误。比如,如果不添加关键词『万科 A』,那么它一定会被 jieba 分解为万科和 A 两个词。

增加自定义词典的代码如下:

1
2
3
4
5
6
7
8
def init():
    # get_stock_list 是自定义的函数,用于获取股票列表。在 quantide research 环境可用
    stocks = get_stock_list(datetime.date(2024,11,1), code_only=False)
    stocks = set(stocks.name)
    for name in stocks:
        jieba.add_word(name)

    return stocks

这里得到的证券列表,后面还要使用,所以作为函数返回值。

接下来,就是统计词频了:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def count_words(news, stocks)->pd.DataFrame:
    data = []
    for dt, content, _ in news.to_records(index=False):
        words = jieba.cut(content)
        word_counts = Counter(words)
        for word, count in word_counts.items():
            if word in stocks:
                data.append((dt, word, count))
    df = pd.DataFrame(data, columns=['date', 'word', 'count'])
    df["date"] = pd.to_datetime(df['date'])
    df.set_index('date', inplace=True)

    return df

tushare 返回的数据共有三列,其中 date, content 是我们关心的字段。公司名词频就从 content 中提取。

然后我们对所有已下载的新闻进行分析,统计每日词频和移动均值:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def count_words_in_files(stocks, ma_groups=None):
    ma_groups = ma_groups or [30, 60, 250]
    # 获取指定日期范围内的数据
    results = []

    files = glob.glob(os.path.join(data_home, "*.news.csv"))
    for file in files:
        news = pd.read_csv(file, index_col=0)

        df = count_words(news, stocks)
        results.append(df)

    df = pd.concat(results)
    df = df.sort_index()
    df = df.groupby("word").resample('D').sum()
    df.drop("word", axis=1, inplace=True)
    df = df.swaplevel()
    unstacked = df.unstack(level="word").fillna(0)
    for win in ma_groups:
        df[f"ma_{win}"] = unstacked.rolling(window=win).mean().stack()

    return df

count_words_in_files(stocks)

最后,完整的代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import os
import glob
import jieba
from collections import Counter
import time

data_home = "/data/news"
def init():
    stocks = get_stock_list(datetime.date(2024,12,2), code_only=False)
    stocks = set(stocks.name)
    for name in stocks:
        jieba.add_word(name)

    return stocks

def count_words(news, stocks)->pd.DataFrame:
    data = []
    for dt, content, *_ in news.to_records(index=False):
        if content is None or not isinstance(content, str):
            continue

        try:
            words = jieba.cut(content)
            word_counts = Counter(words)
            for word, count in word_counts.items():
                if word in stocks:
                    data.append((dt, word, count))
        except Exception as e:
            print(dt, content)
    df = pd.DataFrame(data, columns=['date', 'word', 'count'])
    df["date"] = pd.to_datetime(df['date'])
    df.set_index('date', inplace=True)

    return df

def count_words_in_files(stocks, ma_groups=None):
    ma_groups = ma_groups or [30, 60, 250]
    # 获取指定日期范围内的数据
    results = []

    files = glob.glob(os.path.join(data_home, "*.news.csv"))
    for file in files:
        news = pd.read_csv(file, index_col=0)

        df = count_words(news, stocks)
        results.append(df)

    df = pd.concat(results)
    df = df.sort_index()
    df = df.groupby("word").resample('D').sum()
    df.drop("word", axis=1, inplace=True)
    df = df.swaplevel()
    unstacked = df.unstack(level="word").fillna(0)
    for win in ma_groups:
        df[f"ma_{win}"] = unstacked.rolling(window=win).mean().stack()

    return df.sort_index(), unstacked.sort_index()

stocks = init()
factor, raw = count_words_in_files(stocks)
factor.tail(20)

这里计算的仍然是原始数据。最终因子化要通过 factor["count"]/factor["ma_30"] 来计算并执行 rank,这里的 ma_30 可以替换为 ma_60, ma_250 等。

跟以往的文章不同,这一次我们没有直接得到好的结果。我们的研究其实多数时候都是寂寞的等待,然后挑一些成功的例子发表而已。毕竟,发表不成功的例子,估计意义不大(很少人看)。

但是这一篇有所不同。我们没有得到结果,主要是因为数据还在积累中。这篇文章从草稿到现在,已经半个月了,但是我仍然没能获取到 1 年以上的新闻数据,所以,无法准确得到每家公司的『基底噪声』,从而也就无法得到每家公司最新的信息熵。但要获得 1 年以上的数据,大概还要一个月左右的时间。所以,先把已经获得的成果发出来。

尽管没有直接的结果,但是我们的研究演示了对文本数据如何建模的一个方法,也演示了如何使用TF-IDF,并且在因子化方向也比较有新意,希望能对读者有所启发。

我们已经抓取的新闻数据截止到今年的 8 月 20 日,每天都会往前追赶大约 10 天左右。这些数据和上述代码,可以在我们的 quantide research 平台上获取和运行。加入星球,即可获得平台账号。

周一到周五,哪天能买股?做对了夏普22.5!

在第12课我们讲了如何从量、价、时、空四个维度来拓展因子(或者策略)。在时间维度上,我们指出从周一到周五,不同的时间点买入,收益是不一样的。这篇文章我们就来揭示下,究竟哪一天买入收益更高。

问题定义如下:

假设我们分别在周一、周二,...,周五以收盘价买入,持有1, 2, 3, 4, 5天,并以收盘价卖出,求平均收益、累积收益和夏普率。我们选择更有代表性的中证1000指数作为标的。

获取行情数据

1
2
3
4
5
6
7
df = pro.index_daily(**{
    "ts_code": "000852.SH"
})

df.index = pd.to_datetime(df.trade_date)
df.sort_index(ascending=True, inplace=True)
df.tail(10)

我们得到的数据将会是从2005年1月4日,到最近的一个交易日为止。在2024年11月间,这将得到约4800条记录。

我们先看一下它的总体走势:

1
df.close.plot()

如果我们从2005年1月4日买入并持有的话,19年间大约是得到5倍的收益。记住这个数字。

计算分组收益

接下来我们计算不同日期买入并持有不同period的收益。这里实际上有一个简单的算法,就是我们先按持有期period计算每天的对应收益,然后再按weekday进行分组,就得到了结果。

我们先给df增加分组标志:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# 给df增加一列,作为分组标志
df["weekday"] = df.index.map(lambda x: x.weekday())

# 将数字转换为更易阅读的周几
df["weekday"] = df.weekday.map({
    0: "周一",
    1: "周二",
    2: "周三",
    3: "周四",
    4: "周五"
})
df = df[["close", "weekday"]]
df.tail()

此时我们会得到:

close weekday
trade_date
2024-11-11 6579.0054 周一
2024-11-12 6491.9723 周二
2024-11-13 6474.3941 周三
2024-11-14 6272.1911 周四
2024-11-15 6125.5126 周五
2024-11-18 5974.5576 周一
2024-11-19 6130.2848 周二
2024-11-20 6250.8029 周三
2024-11-21 6262.1644 周四
2024-11-22 6030.4882 周五

接下来我们计算每期的收益。

1
2
3
4
for period in range(1, 6):
    df[f"{period}D"] = df.close.pct_change(period).shift(-period)

df.tail(10)

此时我们会得到:

close weekday 1D 2D 3D 4D 5D
trade_date
2024-11-11 6579.0054 周一 -0.013229 -0.015901 -0.046635 -0.068930 -0.091875
2024-11-12 6491.9723 周二 -0.002708 -0.033854 -0.056448 -0.079701 -0.055713
2024-11-13 6474.3941 周三 -0.031231 -0.053886 -0.077202 -0.053149 -0.034535
2024-11-14 6272.1911 周四 -0.023386 -0.047453 -0.022625 -0.003410 -0.001599
2024-11-15 6125.5126 周五 -0.024644 0.000779 0.020454 0.022309 -0.015513
2024-11-18 5974.5576 周一 0.026065 0.046237 0.048139 0.009361 NaN
2024-11-19 6130.2848 周二 0.019659 0.021513 -0.016279 NaN NaN
2024-11-20 6250.8029 周三 0.001818 -0.035246 NaN NaN NaN
2024-11-21 6262.1644 周四 -0.036996 NaN NaN NaN NaN
2024-11-22 6030.4882 周五 NaN NaN NaN NaN NaN

现在,我们来计算不同时间点买入时的累积收益率:

1
2
3
4
5
def cum_weekday_returns(df, period):
    return ((1 + df[f"{period}D"]).cumprod() - 1).reset_index(drop=True)

returns_1d = df.groupby('weekday').apply(lambda x: cum_weekday_returns(x, 1))
returns_1d.swaplevel().unstack().plot()

从累积收益图中我们可以看到,周五买入的收益最高,约为5.47倍。看起来这个结果只比买入并持有略好一点,但实际上,资金占有率只有买入并持有的20%。因此,如果算年化Alpha的话,它要比买入并持有高许多。

当然,我们有更好的指标来评估周五买入策略的效果,即夏普率。我们先来看每天交易的夏普率:

1
2
from empyrical import sharpe_ratio
sharpe_ratio(df.close.pct_change())

我们得到的结果是0.46。下面我们计算从周一到周五,不同时间点买入的夏普率:

1
2
3
4
for tm in ("周一", "周二", "周三", "周四", "周五"):
    returns = returns_1d.swaplevel().unstack()[tm]

    print(tm, f"{sharpe_ratio(returns):.1f}")

从结果中看,周二买入的夏普甚至更高。但周三和周四买入的夏普率都为负,这解释了为什么每日买入的夏普率不高的原因。

终极boss

上面我们仅仅介绍了周五买入,持有一天的收益。考虑到周一、周二买入的夏普都很高,显然,如果周五买入,并持有多天,有可能收益会更高。具体应该持有几天会更好,收益会高多少呢?可能会超出你的想像

你可能读了很多文章,花了很多时间尝试复现它,最终却一无所获:要么代码不完整、要么数据拿不到,或者文章根本就是错的。但我们不想给你带来这样负面的体验。跟本号的其它文章一样,这篇文章的结论是可复现的,并且使用的数据你一样可以获得。你可以加入尝试加入我的星球,通过Quantide Research平台运行和验证本文。如果证实了它的效果,再把代码拷贝到本地,加入你的择时策略中。如果效果不能验证,你也可以退出星球。

1
2
3
4
5
6
def cum_weekday_returns(df, period):
    return ((1 + df[f"{period}D"]).cumprod() - 1).reset_index(drop=True)

for period in range(1, 6):
    returns = df.groupby('weekday').apply(lambda x: cum_weekday_returns(x, period))
    returns.swaplevel().unstack().plot(title=f"持有{period}天")