跳转至




2025

Numpy核心语法[3]

在不同的库之间交换数据,常常会遇到格式问题。比如,我们从第三方数据源拿到的行情数据,它们用的时间字段常常会是字符串。有一些库在存储行情时,对 OHLC 这些字段进行了优化,使用了 4 个字节的浮点数,但如果要传给 talib 进行指标计算,就必须先转换成 8 个字节的浮点数,等等,这就有了类型转换的需求。


此外,我们还会遇到需要将 numpy 数据类型转换为 python 内置类型,比如,将 numpy.float64 转换为 float 的情况。

1. 类型转换和 Typing

1.1. Numpy 内部类型转换

Numpy 内部类型转换,我们只需要使用 astype

1
2
3
4
5
6
7
8
x = np.array (['2023-04-01', '2023-04-02', '2023-04-03'])
print (x.astype (dtype='datetime64[D]'))

x = np.array (['2014', '2015'])
print (x.astype (np.int32))

x = np.array ([2014, 2015])
print (x.astype (np.str_))

Tip

如何将 boolean array 转换成整数类型,特别是,将 True 转为 1,False 转为 - 1? 在涉及到阴阳线的相关计算中,我们常常需要将 open > close 这样的条件转换为符号 1 和 - 1,以方便后续计算。这个转换可以用:

1
2
3
>>> x = np.array ([True, False])
>>> x * 2 - 1
... array ([ 1, -1])

1.2. Numpy 类型与 Python 内置类型转换

如果我们要将 Numpy 数组转换成 Python 数组,可以使用 tolist 函数。

1
2
x = np.array ([1, 2, 3])
print (x.tolist ())

我们通过 item () 函数,将 Numpy 数组中的元素转换成 Python 内置类型。

1
2
3
x = np.array (['2023-04-01', '2023-04-02'])
y = x.astype ('M8[s]')
y [0].item ()

Warning

一个容易忽略的事实是,当我们从 Numpy 数组中取出一个标量时,我们都应该把它转换成为 Python 对象后再使用。否则,会发生一些隐藏的错误,比如下面的例子:

1
2
3
4
5
6
import json
x = np.arange (5)
print (json.dumps ([0]))
print (x [0])

json.dumps ([x [0]])

Warning

这里最后一行会出错。提示 type int64 is not JSON serializable。把最后一行换成 json.dumps ([x [0].item ()]) 则可以正常执行。

1.3. Typing

从 Python 3.1 起,就开始引入类型注解 (type annotation),到 Python 3.8,基本上形成了完整的类型注解体系。我们经常看到函数的参数类型注解,比如,下面的代码:

1
2
3
from typing import List
def add (a: List [int], b: int) -> List [int]:
    return [i + b for i in a]

从此,Python 代码也就有了静态类型检查支持。

NumPy 的 Typing 模块提供了一系列类型别名(type aliases)和协议(protocols),使得开发者能够在类型注解中更精确地表达 NumPy 数组的类型信息。这有助于静态分析工具、IDE 以及类型检查器提供更准确的代码补全、类型检查和错误提示。

这个模块提供的主要类型是 ArrayLike, NDArray 和 DType。

1
2
3
import numpy
from numpy.typing import ArrayLike, NDArray, DTypeLike
import numpy as np

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def calculate_mean (data: ArrayLike) -> float:
    """计算输入数据的平均值,数据可以是任何 ArrayLike 类型"""
    return np.mean (data)

def add_one_to_array (arr: NDArray [np.float64]) -> NDArray [np.float64]:
    """向一个浮点数数组的每个元素加 1,要求输入和输出都是 np.float64 类型的数组"""
    return arr + 1

def convert_to_int (arr: NDArray, dtype: DTypeLike) -> NDArray:
    """将数组转换为指定的数据类型"""
    return arr.astype (dtype)

如果你是在像 vscode 这样的 IDE 中使用上述函数,你就可以看到函数的类型提示。如果传入的参数类型不对,还能在编辑期间,就得到错误提示。

2. 拓展阅读

2.1. Numpy 的数据类型

在 Numpy 中,有以下常见数据类型。每一个数字类型都有一个别名。在需要传入 dtype 参数的地方,一般两者都可以使用。另外,别名在字符串类型、时间和日期类型上,支持得更好。比如,'S5' 是 Ascii 码字符串别外,它除了指定数据类型之外,还指定了字符串长度。datetime64 [S] 除了表明数据是时间日期类型之外,还表明它的精度到秒。


类型 别名
np.int8 i1
np.int16 i2
np.int32 i4
np.int64 i8
np.uint8 u1
np.uint16 u2
np.uint32 u4
np.uint64 u8
np.float16 f2
np.float32 f4,还可指定结尾方式,比如 'f4' 表示大端字节序。其它 float 类型同。
np.float64 f8
np.float128 f16
np.bool_ b1
np.str_ U (后接长度,例如 U10)
np.bytes_ S (后接长度,例如 S5)
np.datetime64 M8 和 M8[D] M8[h] M8[m] M8[s],也可写作 datetime64[D] 等
np.timedelta64 m8 和 m8[D] m8[h] m8[m] m8[s] 等

3. 处理包含 np.nan 的数据

在量化分析中,我们常常会遇到数据为 np.nan 情况。比如,某公司上年利润为负数,今年利润实现正增长,请问要如何表示公司的 YoY 的利润增长呢?


Info

np.nan 是 numpy 中的一个特殊值,表示“Not a Number”,即“不是数字”。注意,在 Numpy 中,尽管 np.nan 不是一个数字,但它确实数字类型。确切地说,它是 float 类型。此外,在 float 类型中,还存在 np.inf(正无穷大)和负无穷大 (np.NINF,或者-np.inf)。

又比如,在计算个股的 RSI 或者移动平均线时,最初的几期数据是无法计算出来的(在回测框架 backtrader 中,它把这种现象称之为技术指标的冷启动)。如果不要求返回的技术指标的取值与输入数据长度一致,则会返回短一些、但全部由有效数据组成的数组;否则,此时我们常常使用 np.NaN 或者 None 来进行填充,以确保返回的数据长度与输入数据长度一致。

但是,如果我们要对返回的数组进行统计,比如求均值、最大值、排序,对包含 np.nan 或者 None 的数组,应该如何处理?

3.1. 包含 np.nan 和 np.inf 的数组运算

在 numpy 中,提供了对带 np.nan 的数组进行运算的支持。比如有以下数组:

1
2
3
4
import numpy as np

x = np.array([1, 2, 3, np.nan, 4, 5])
print(x.mean())

我们将得到一个 nan。实际上,多数情况下,我们希望忽略掉 nan,只对有效数据进行运算,此时得到的结果,我们往往仍然认为是有意义的。

因此,Numpy 提供了许多能处理包含 nan 数据的数组输入的运算函数。下面是一个完整的列表:

在这里,我们以输入 np.array([1, 2, 3, np.nan, np.inf, 4, 5]) 为例

函数 nan 处理 inf 处理 输出
nanmin 忽略 inf 1.0
nanmax 忽略 inf inf
nanmean 忽略 inf inf
nanmedian 忽略 inf 3.5
nanstd 传递 inf nan
nanvar 传递 inf nan
nansum 忽略 inf inf
nanquantile 忽略 inf 2.25
nancumsum 忽略 inf inf
nancumprod 忽略 inf inf

对 np.nan 的处理中,主要是三类,一类是传递,其结果导致最终结果也是 nan,比如,在计算方差和标准差时;一类是忽略,比如在找最小值时,忽略掉 np.nan,在余下的元素中进行运算;但在计算 cumsum 和 cumprod 时,"忽略"意味着在该元素的位置上,使用前值来填充。我们看一个不包含 np.inf 的示例:


1
2
3
x = np.array([1, 2, 3, np.nan, 4, 5])
np.nancumprod(x)
np.nancumsum(x)

输出结果是:

1
2
3
array([  1.,   2.,   6.,   6.,  24., 120.])

array([ 1.,  3.,  6.,  6., 10., 15.])

结果中的第 4 个元素都是由第 3 个元素复制而来的。

如果一个数组中包含 inf,则在任何涉及到排序的操作(比如 max, median, quantile)中,这些元素总是被置于数组的最右侧;如果是代数运算,则结果会被传导为 inf。这些地方,Numpy 的处理方式与我们的直觉是一致的。

除了上述函数,np.isnan 和 np.isinf 函数,也能处理包含 np.nan/np.inf 元素的数组。它们的作用是判断数组中的元素是否为 nan/inf,返回值是一个 bool 数组。

3.2. 包含 None 的数组运算

在上一节中,我们介绍的函数能够处理包含 np.nan 和 np.inf 的数组。但是,在 Python 中,None 是任何类型的一个特殊值,如果一个数组包含 None 元素,我们常常仍然会期望能对它进行 sum, mean, max 等运算。但是,Numpy 并没有专门为此准备对应的函数。


但是,我们可以通过 astype 将数组转换为 float 类型,在此过程中,所有的 None 元素都转换为 np.nan,然后就可以进行运算了。

1
2
x = np.array([3,4,None,55])
x.astype(np.float64)

输出为:array([3., 4., nan, 55.])

3.3. 性能提升

当我们调用 np.nan *函数时,它的性能会比普通的函数慢很多。因此,如果性能是我们关注的问题,我们可以使用 bottleneck 这个库中的同名函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from bottleneck import nanstd
import numpy as np
import random

x = np.random.normal(size = 1_000_000)
pos = random.sample(np.arange(1_000_000).tolist(), 5)
x[pos] = np.nan

%timeit nanstd(x)
%timeit np.nanstd(x)

我们担心数组中 np.nan 元素个数会影响到性能,所以,在上面的示例中,在随机生成数组时,我们只生成了 5 个元素。在随后的一次测试中,我们把 nan 元素的个数增加了 10 倍。实验证明,nan 元素的个数对性能没有什么影响。在所有的测试中,bottlenect 的性能比 Numpy 都要快一倍。

Info

根据 bottleneck 的文档,它的许多函数,要比 Numpy 中的同名函数快 10 倍左右。


这是你的量化母语

正如死亡和税收不可避免,Numpy 和 Pandas 对量化人而言,也具有同样的地位 -- 每个量化人都不可避免地要与这两个库打交道。

如果你去研究一些非常重要的量化库,比如 alphalens, empyrical, backtrader, tushare, akshare, jqdatasdk 等,或者一些非常优秀的量化框架比如 quantaxis, zillionare, vnpy 等等,你就会发现它们都依赖于 numpy 和 pandas。实际上,一个库只要依赖于 pandas,它也必将传递依赖到 numpy。

如果说量化人有一种共同的语言的话,它就是 Numpy 和 Pandas。Numpy 和 Pandas 是量化人的母语。


具体地说,Numpy 和 Pandas 不仅为量化人提供了类似于表格的数据结构 -- Numpy structured array 和 Pandas DataFrame -- 这对于包括行情数据在内的诸多数据的中间存储是必不可少的;它还提供了许多基础算法。

比如:

  1. 在配对交易 (pair trade) 中,相关性计算是非常重要的一环。无论是 Numpy 还是 Pandas 都提供了相关性计算函数。
  2. 在 Alpha 101 因子计算中,排序操作是一个基础函数 -- 这是分层回测的基础 -- Pandas 通过 rank 方法来提供这一功能。
  3. Maxdrawdown(最大回测)是衡量策略的重要指标。Numpy 通过 numpy.maximum.accumulate 提供了支持。

类似常用的算法非常多,我们将在本课程中一一介绍它们。

课程编排说明

紧扣量化场景来介绍 Numpy 和 Pandas 是本课的一大特点。我们通过分析重要的、流行度较高的量化库源码,找出其中使用 numpy 和 pandas 的地方,再进行归类的提炼,并结合一些量化社区中常问的相关问题 -- 这些往往是量化人在使用 numpy/pandas 时遇到的困难所在 -- 来进行课程编排,确保既系统讲解这两个重要的库,又保证学员在学习后,能立即将学习到的方法与技巧运用到工作中,迅速提高自己的生产力。

任何高效地学习都离不开高强度的练习。本课程安排了大量的练习。无论是演示代码、还是练习,我们都尽可能安排在量化场景下完成,这样会增强您的代入感。但是,这往往也要求您能理解这些场景和数据。

在编写本课程时,作者阅读了大量书籍、博文、论文和开源项目代码。其中一部分与教材关联度较高的,我们以延伸阅读、脚注的方式提供参考链接。如果学员有时间,也可以阅读这部分内容,以获得跟作者同样的视野景深。但如果你时间紧张,也完全可以跳过这些内容,只关注我们课程内容的主线就好。


本课程是专门为量化交易从业者,比如 quant developer, quant researcher 和 quant pm 等人设计。如果您有基础的金融知识,这门课也适用于其它需要学习 Numpy 和 Pandas 的人。课程内容在丰度和深度上都是市面上少见的。

什么是 Numpy

图片来源:numpy.org

Numpy 是 Python 科学计算中的基础包,它是一款开源软件,允许在保留原有版权声明的前提下,自由使用。它的名字来源于 Numeric Programming(数值编程),其前身是 Numeric 库和 Numarray 库。

Numpy 提供了多维数组对象、各种派生对象(比如掩码数组 -- masked array)以及各种用于数组操作的高性能例程,包括数学、逻辑、形状操作、排序、选择、I/O 、离散傅里叶变换、基本线性代数、基本统计运算、随机模拟等等。下图提供了一个更详细的说明:


Numpy 的底层开发语言是 C 语言,并且进行了相当多的优化,这包括并行化支持、使用 OpenBLAS 和高级 SIMD 指令来优化矩阵操作等等。由于 Python 这种胶水语言的灵活性,使得 Numpy 最终得以作为一个 Python 库发布出来。

Tip

很多人认为要提高量化策略的性能,就必须放弃 Python,改用 C/Rust。这种说法又对又不对。

如果一个 Quanter 不懂得利用 OpenBLAS 和 LAPACK,那么即使用 C 开发出来的算法,也很难比通过 Python 调用 Numpy 来得更快。在 Numpy 中,一个最常见的矩阵乘法,就可能利用到多核机器的并行运算能力(即多线程)和高级 CPU 指令以实现快速的 BLAS/LAPACK 运算。这些知识和技巧,是一般人难以掌握的。

你可以通过下面的方法来查看你的 Numpy 是否利用了 OpenBLAS/LAPACK 及高级 SIMD 指令:

1
2
import numpy as np
np.show_config()

Numpy 广泛运用于学术界、金融界和工业界,具有成熟、快速、稳定和活跃的特点。当前的稳定版本是 2.2.0 版(2025 年 3 月),发布于仅仅 1 个季度之前,这足以说明 Numpy 社区开发的活跃度。

Numpy 还是 Pandas, scipy, statsmodels 和 scikit-learn 等众多知名 Python 库的底层依赖库。

什么是 Pandas

Pandas 是用于数据操作和分析的 Python 软件库。它构造在 Numpy 之上,增加了索引、异构数组等功能(相当于 Numpy 的 Structure Array -- 这个概念我们会在本课程后面详细解释),这使它成为处理表格类数据的有力武器。


Pandas 的名字来源于术语 Panel Data(面板数据)和 Python Data Analysis,前者是计量经济学的一个术语,用以表示对同一个体在多个时期观测的数据集。

自 2010 年成为开源项目以来,pandas 已经发展成为相当大的一个库,开发者社区已发展到超过 2500 名不同的贡献者。

Pandas 提供了 Series, DataFrame 两种数据结构。它曾经还提供了 Panel 这种三维数据结构,但最终放弃了。与 Excel 相比,它能更快速地分析更大的数据(一般小于 1 千万行,主要取决于机器的物理内存)。


延伸阅读

来源:Github readme 项目

如果要推荐一本讲解 Pandas 的书,毫无疑问,没人任何书籍能比 《Python for Data Analysis》 更权威了。因为它是由 Pandas 的创建者 Wes McKinney 撰写的!这本书现在提供有网页版供开放访问。读者也可点击 此链接 阅读。在 2023 年 4 月进行更新后,它现在支持到了 pandas 2.0 版本。

Wes Mckinney 是 Pandas 的创建者和终生仁慈独裁者。他现在居住在田纳西的纳什维尔,是 DataPad 的 CEO 和联合创始人。

Wes Mckinney 本科毕业于 MIT,是数学和统计学博士(杜克大学)。在 AQR 资本管理公司工作期间,学习了 Python 并开始构建 pandas。他同时还是 Apache Arrow 的联合创建者。

从 Pandas 的诞生史来看,毫无疑问,Pandas 就是为金融/量化而生的。Wes Mckinney 创建 Pandas 的初衷,就是要解决用 Microsoft Excel 来进行金融数据分析和统计运算时效率低、并且十分繁琐的问题。在今天,量化巨头 Two Sigma[^two-sigma] 是这个项目的重要赞助商。Pandas 的成功,也促进了 Python 的广泛流行。甚至可以说,Mckinney 以一己之力,开拓了 Python 的生存空间。

创建 Pandas 并没有任何收益,Wes Mckinney 最初主要依靠第一份工作的存款和兼职来生活。这是一个类似于《月亮和六便士》的故事,就连主人公的背景都极其相似,都是金融工作者。幸运地是,Wes Mckinney 获得了成功。如果你对这段故事感兴趣,可以阅读 《可持续发展的开源项目将赢得未来》 这篇文章。

烛台密码 三角形整理如何提示玄机

本文是几个月前《三角形整理检测》的后续篇,改进了算法,增加了应用场景的讨论。

《匡醍.因子分析与机器学习策略》课程的最后一课是关于深度学习框架在量化交易中的应用的。考虑很多技术交易者都会看图操作,比如艾略特浪型、头肩顶、三角形整理等等。正好CNN在图像模式识别能力上超越了人类,所以,就打算拿三角形整理的检测作为例子。

要通过CNN网络来实现三角形整理检测,首先需要做到数据标注。我们在课程中已经实作出来一个标注工具。不过,我更希望能够使用算法自动检测到三角形整理模式。这篇文章就将介绍我的算法。

如果你希望拿到本文源码,可以加入我们的星球。加入星球三天后,更可以获得我们研究平台的账号。在平台中提供了可以运行、验证notebook版本,你可以完全复现本文的结果。

Note

先通过算法对k线图进行标注,再通过CNN网络进行识别,感觉这个有点Matryoshka doll了。于是我在课程中换了另一个例子,通过4 channel的一维卷积,实现了预测误差1%的准确率。这篇文章算是课程的边脚料重新加工了。

算法的示意图如下:

三角形检测示意图

首先,我们得找到k线图中的波峰和波谷。在图中,波峰是点1和2,波谷则是点3和4。然后我们作通过点1和2的直线,得到压力线;作通过点3和4的直线,得到支撑线。

在Python中,计算两点之间的直线可以通过np.polyfit来实现。通过该函数,将获得直线的斜率。通过两条直线之间的斜率关系,我们可以进一步得到三角形的形态。

如果记Sr为压力线的斜率,记Ss为支撑线的斜率,那么,三角形的形态可以由以下表格来定义:

压力线方向 支撑线方向 角度对比 标记 说明
Sr>0 Ss>0 abs(sr) > abs(ss) 1 上升且发散三角
Sr>0 Ss>0 abs(sr) < abs(ss) 2 上升且收敛三角
Sr>0 Ss<0 abs(sr) > abs(ss) 3 发散偏上升三角
Sr>0 Ss<0 abs(sr) < abs(ss) 4 发散偏下降三角
Sr<0 Ss>0 abs(sr) > abs(ss) 5 下降且收敛三角
Sr<0 Ss>0 abs(sr) < abs(ss) 6 上升且收敛三角
Sr<0 Ss<0 abs(sr) > abs(ss) 7 下降且收敛三角
Sr<0 Ss<0 abs(sr) < abs(ss) 8 下降且发散三角

部分形态如下图所示:

识别算法的实现代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from zigzag import peak_valley_pivots

def triangle_flag(df, lock_date=None):
    if lock_date is not None:
        peroid_bars = df.loc[:lock_date]
    else:
        peroid_bars = df

    thresh = peroid_bars.close[-120:].pct_change().std() * 3

    pvs = peak_valley_pivots(peroid_bars.close.astype(np.float64), thresh, -1 * thresh)

    if len(pvs) == 0:
        return 0, None, None

    pvs[0] = pvs[-1] = 0
    pos_peaks = np.argwhere(pvs == 1).flatten()[-2:]
    pos_valleys = np.argwhere(pvs == -1).flatten()[-2:]

    if len(pos_peaks) < 2 or len(pos_valleys) < 2:
        return 0, None, None

    minx = min(pos_peaks[0], pos_valleys[0])
    y = df.close[pos_peaks].values
    p = np.polyfit(x=pos_peaks, y=y, deg=1)
    upper_trendline = np.poly1d(p)(np.arange(0, len(df)))

    y = df.close[pos_valleys].values
    v = np.polyfit(x=pos_valleys, y=y, deg=1)
    lower_trendline = np.poly1d(v)(np.arange(0, len(df)))

    sr, ss = p[0], v[0]

    flags = {
        (True, True, True): 1,
        (True, True, False): 2,
        (True, False, True): 3,
        (True, False, False): 4,
        (False, True, True): 5,
        (False, True, False): 6,
        (False, False, True): 7,
        (False, False, False): 8,
    }

    flag = flags[(sr > 0, ss > 0, abs(sr) > abs(ss))]

    return flag, upper_trendline, lower_trendline
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def show_trendline(asset, df, resist, support, flag, width=600, height=400):
    desc = {
        1: "上升且发散三角",
        2: "上升且收敛三角",
        3: "发散偏上升三角",
        4: "发散偏下降三角",
        5: "下降且收敛三角",
        6: "上升且收敛三角",
        7: "下降且收敛三角",
        8: "下降且发散三角",
    }

    if isinstance(df, pd.DataFrame):
        df = df.reset_index().to_records(index=False)

    title = f"flag: {flag} - {desc[flag]}"
    cs = Candlestick(df, title=title, show_volume=False, show_rsi=False, width=width, height=height)
    cs.add_line("support", np.arange(len(df)), support)
    cs.add_line("resist", np.arange(len(df)), resist)
    cs.plot()


np.random.seed(78)
start = datetime.date(2023, 1, 1)
end = datetime.date(2023, 12, 29)
barss = load_bars(start, end, 4)

for key, df in barss.groupby("asset"):
    df = df.reset_index().set_index("date")
    flag, resist, support = triangle_flag(df)
    if flag != 0:
        show_trendline(key, df, resist, support, flag)

最后,我们来研究一支个股的情况,看看这个算法可能有哪些用途:

1
2
3
4
5
6
7
8
9
start = datetime.date(2023, 1, 1)
end = datetime.date(2023, 12, 29)
barss = load_bars(start, end, ("300814.XSHE", ))

for key, df in barss.groupby("asset"):
    df = df.reset_index().set_index("date")
    flag, resist, support = triangle_flag(df, datetime.date(2023, 9, 11))
    if flag != 0:
        show_trendline(key, df, resist, support, flag, width=800, height=600)

标的从23年4月19日以来,先后出现4次波峰。随着时间的推移,整理形态也不断变化。

7月12,突破之前的形态是发散偏上升三角。

1
2
3
4
5
6
7
8
9
start = datetime.date(2022, 12, 1)
end = datetime.date(2023, 10, 29)
barss = load_bars(start, end, ("300814.XSHE", ))

for key, df in barss.groupby("asset"):
    df = df.reset_index().set_index("date")
    flag, resist, support = triangle_flag(df, datetime.date(2023, 7,19))
    if flag != 0:
        show_trendline(key, df, resist, support, flag, width=800, height=600)

7月12日突破之后,支撑和压力线发生变化。此时可以计算出9月7日的压力位是48元。但当天只冲击到45.6,随后收了上影线。

1
2
3
4
5
6
7
8
9
start = datetime.date(2022, 12, 1)
end = datetime.date(2023, 10, 29)
barss = load_bars(start, end, ("300814.XSHE", ))

for key, df in barss.groupby("asset"):
    df = df.reset_index().set_index("date")
    flag, resist, support = triangle_flag(df, datetime.date(2023, 8,19))
    if flag != 0:
        show_trendline(key, df, resist, support, flag, width=800, height=600)

此时仍然是上升三角,但9月7日未破压力位后,压力线应该使用最新的两个波峰连线。此时的压力线的斜率比之前的要小,显示后续走势会弱一些。

1
2
3
4
5
6
7
8
9
start = datetime.date(2022, 12, 1)
end = datetime.date(2023, 12,29)
barss = load_bars(start, end, ("300814.XSHE", ))

for key, df in barss.groupby("asset"):
    df = df.reset_index().set_index("date")
    flag, resist, support = triangle_flag(df, datetime.date(2023, 9,15))
    if flag != 0:
        show_trendline(key, df, resist, support, flag, width=800, height=600)

在9月7日新的波峰形成后,新的压力线在11月20日的值为48.5,当天的最高点为46.4,再次未破压力位。此后压力线需要重新计算。新的压力线的斜率进一步减小。形态也由此前的上升且发散三角形,转换为上升且收敛三角形,表明已经到了退出的时间。

1
2
3
4
5
6
7
8
9
start = datetime.date(2022, 12, 1)
end = datetime.date(2023, 12,29)
barss = load_bars(start, end, ("300814.XSHE", ))

for key, df in barss.groupby("asset"):
    df = df.reset_index().set_index("date")
    flag, resist, support = triangle_flag(df)
    if flag != 0:
        show_trendline(key, df, resist, support, flag, width=800, height=600)

上述压力线斜率的变化能够表明价格上升是打开了新的通道,还是预期在走弱,这对我们中短线操作很有帮助。

在通过机器学习构建策略时,我们可以把压力线和支撑线斜率的变化(\(\delta{Sr}\), \(\delta{Ss}\))、压力线和支撑线预测出来的值(\(P_{t+1}\), \(V_{t_1}\))等作为特征,那么,我们就可能更精确地预测未来走势。

DeepSeek只是挖了个坑,还不是掘墓人,但中初级程序员是爬不出来了

在我们的《因子分析与机器学习策略》课程中,提供了从2005年到2023年,长达18年的日线数据(共1100多万条记录)供学员进行因子挖掘与验证。最初,我们是通过functools中的lru_cache装饰器,将数据缓存到内存中的。这样一来,除了首次调用时时间会略长(比如,5秒左右)外,此后的调用都是毫秒级的。

问题的提出

但这样也带来一个问题,就是内存占用太大。一次因子分析课程可能会占用5G以上。由于Jupyterlab没有自动关闭idle kernel的能力(这一点在google Colab和kaggle中都有),我们的内存很快就不够用了。

我们的数据是以字典的方式组织,并保存在磁盘上的:

每支股票的键值是股票代码,对应值则是一个Numpy structured array。这样的数据结构看上去比较独特,不过我们稍后就能看到这样组织的原因。

在进行因子分析之前,用户可能会通过指定universe,以及起止时间来加载行情数据。所谓Universe,就是指一个股票池。用户可能有给定的证券列表,也可能只想指定universe的规模;起止时间用来切换观察的时间窗口,这可能是出于性能的考虑(最初进行程序调试时,只需要用一小段行情数据;调试完成后则需要用全部数据进行回测,或者分段观察)。

最终,它要返回一个DataFrame,以date和asset(即股票代码)为双重索引,包含了OHLC,volume等列,并且这些列要根据end进行前复权(这种复权方式称为动态前复权)。此外,还将包含一个amount列,这一列则无须复权。

因此,这个函数的签名是:

1
2
3
4
def load_bars(start_date:datetime.date, 
              end_date:datetime.date, 
              universe: Tuple[str]|int = 500)->pd.DataFrame:
    pass

学员的学习过程是阅读我们的notebook文档,并尝试单元格中的代码,也可能修改这些代码再运行。因此,这是一个交互式的操作,一般来说,只要用户的等待时间不超过3秒,都是可以接受的。如果响应速度低于1秒,则可以认为是理想的。

去掉缓存后,最初的一个实现的运行速度大致是5秒:

1
2
3
start = datetime.date(2023, 12,1)
end = datetime.date(2023, 12,31)
%time load_bars(start, end, 2000)

后面的测试将使用现样的参数。

当然,如果使用更大的universe,则时间还会加长。

由于这个结果超过了3秒,所以,希望能对代码进行一些优化。性能优化是编程中比较有难度的例子,因为它涉及到对程序运行原理的理解,涉及到对多个技术栈的掌握。在这个过程中我探索了Deep Seek R1的能力边界,可供大家参考。

最初的方案

最初的代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def load_bars_v1(
    start: datetime.date, end: datetime.date, universe: Tuple[str]|int = 500
)->pd.DataFrame:

    if barss is None:
        with open(os.path.join(data_home, "bars_1d_2005_2023.pkl"), "rb") as f:
            barss = pickle.load(f)

    keys = list(barss.keys())
    if isinstance(universe, int):
        if universe == -1:
            selected_keys = keys
        else:
            selected_keys = random.sample(keys, min(universe, len(keys)))
            try:
                pos = selected_keys.index("000001.XSHE")
                swp = selected_keys[0]
                selected_keys[0] = "000001.XSHE"
                selected_keys[pos] = swp
            except ValueError:
                selected_keys[0] = "000001.XSHE"

    else:
        selected_keys = universe

    dfs = []
    for symbol in selected_keys:
        qry = "frame >= @start & frame <= @end"
        df = pd.DataFrame(barss[symbol]).assign(asset=symbol).query(qry)

        if len(df) == 0:
            logger.debug("no bars for %s from %s to %s", symbol, start, end)
            continue
        # 前复权
        last = df.iloc[-1]["factor"]
        adjust_factor = df["factor"] / last
        adjust = ["open", "high", "low", "close", "volume"]
        df.loc[:, adjust] = df.loc[:, adjust].multiply(adjust_factor, axis="index")

        dfs.append(df)

    df = pd.concat(dfs, ignore_index=True)
    df.set_index(["frame", "asset"], inplace=True)
    df.index.names = ["date", "asset"]
    df.drop("factor", axis=1, inplace=True)
    df["price"] = df["open"].shift(-1)
    return df

代码已进行了相当的优化(其中部分也基于AI建议)。比如,将数据保存为字典,先按universe进行筛选,再拼接为dataframe,而不是将所有数据保存为dataframe,通过pandas来按universe进行筛选(将花费数倍时间)。

此外,在进行前复权时,它使用了multiply方法,从而可以一次对多个列进行前复权操作,这一建议正是AI给出来的。

但是,代码中还存在一个for loop,如果消除了这个循环,是否能进一步提升速度呢?

下面,我就使用Deep Seek R1进行尝试。

这是第一轮的prompt:

Quote

我有一个dataframe,以date和asset为联合索引,有open, high, low, close, volume, amount和factor等列。其中factor是复权因子。
现在,要对该数据结构实现以下功能:
1. 筛选出asset 在 selected_symbols列表中,date在[start, end]中的记录 2. 对这些记录,按asset进行分组,然后对 open, high, low, close, volume进行前复权。 3. 结果用dataframe返回,索引仍为date/asset,列为复权后的open, high,low, close, volume字段,以及未处理的amount。
请分别给出基于pandas, pyarrow, polars, duckdb的实现,并建议性能最优的一个。

Deep Seek的思考过程很长,这里略过了。我们直接看它的答案。磁盘读写对速度的影响并不大,所以,我们先把数据从磁盘中加载出来,保存到barss中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
with open(os.path.join(data_home, "bars_1d_2005_2023.pkl"), "rb") as f:
    data = pickle.load(f)

dfs = []
for symbol, bars in data.items():
    df = pd.DataFrame(bars)
    df["asset"] = symbol
    df = df.rename(columns={"frame": "date"})
    dfs.append(df)

barss = pd.concat(dfs,ignore_index=True).set_index(["date", "asset"]).sort_index()
print(len(barss))

# 定义一个一致的universe,便于比较
universe = barss.index.get_level_values("asset").unique().tolist()

以下的代码都出自于 Deep Seek。我们让它对最耗时的筛选和复权部分给出方案:

基于Pandas的实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def adjust_pandas(df, selected_symbols, start, end):
    # 筛选数据
    idx = pd.IndexSlice
    filtered = df.loc[idx[start:end, selected_symbols], :]

    # 按 asset 分组计算最后复权因子
    grouped = filtered.groupby('asset', group_keys=False)
    last_factor = grouped['factor'].transform('last')

    # 计算调整比例
    ratio = filtered['factor'] / last_factor

    # 计算复权后字段
    adjusted = filtered.assign(
        open   = filtered['open'] * ratio,
        high   = filtered['high'] * ratio,
        low    = filtered['low'] * ratio,
        close  = filtered['close'] * ratio,
        volume = filtered['volume'] * (last_factor / filtered['factor'])
    )

    # 选择最终列并保持索引
    return adjusted[['open', 'high', 'low', 'close', 'volume', 'amount']]

%time adjust_pandas(barss, universe, start, end)
adjust_pandas(barss, universe, start, end)

尽管我对Pandas很熟悉了,但仍有一些API是不知道的,比如transform。但运用正确的API,恰恰是 Python中提升性能的关键一招。

这个版本的平均运行时长是7秒。说明pandas的筛选确实很慢。

我们略过pyarrow的版本。pyarrow版本的运行时间大致是3.7秒左右。比原始版本只略有进步。这里也看出python 3.11中,for loop的运行速度已经很快了。

基于Polars的实现

这是它给出的polars的版本:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import polars as pl

def adjust_polars(df, selected_symbols, start, end):
    # 筛选数据
    filtered = df.filter(
        (pl.col("date").is_between(start, end)) &
        (pl.col("asset").is_in(selected_symbols))
    )

    # 计算最后复权因子和调整比例
    adjusted = filtered.with_columns(
        last_factor = pl.col("factor").last().over("asset")
    ).with_columns(
        ratio = pl.col("factor") / pl.col("last_factor"),
        volume_ratio = pl.col("last_factor") / pl.col("factor")
    ).with_columns(
        open   = pl.col("open") * pl.col("ratio"),
        high   = pl.col("high") * pl.col("ratio"),
        low    = pl.col("low") * pl.col("ratio"),
        close  = pl.col("close") * pl.col("ratio"),
        volume = pl.col("volume") * pl.col("volume_ratio")
    )

    # 选择最终列
    return adjusted.select(["date", "asset", "open", "high", "low", "close", "volume", "amount"])

adjust_polars(barss, universe, start, end)

之前我并不熟悉Polars,Deep Seek一下子扩展了我的能力边界。如果我要通过自学,然后来实现这部分功能,我不知道要多久才能发现.over这个API,因为这是 Polars 独有的一个窗口运算API,我无法把知识从我熟悉的Pandas中迁移过来。

但是,这个版本有一个错误:它没有遵从我给的数据格式,而是把barss当成了一个有asset和date列的polars DataFrame。通过使用通义灵码,修正了此错误,修正后的代码如下:

1
%time adjust_polars(pl.from_pandas(barss.reset_index()), universe, start, end)

得到的运行速度是1.01秒,已很接近我们要求的理想速度了。

基于DuckDB的实现

现在,我们来看它给出的duckdb的版本:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import duckdb

def adjust_duckdb(df, selected_symbols, start, end):
    query = f"""
    SELECT 
        date, asset,
        open * (factor / last_factor) AS open,
        high * (factor / last_factor) AS high,
        low * (factor / last_factor) AS low,
        close * (factor / last_factor) AS close,
        volume * (last_factor / factor) AS volume,
        amount
    FROM (
        SELECT 
            *,
            LAST_VALUE(factor) OVER (
                PARTITION BY asset 
                ORDER BY date 
                ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
            ) AS last_factor
        FROM df
        WHERE 
            asset IN ({','.join([f"'{s}'" for s in selected_symbols])})
            AND date BETWEEN '{start}' AND '{end}'
    )
    """
    return duckdb.query(query).to_df()

adjust_duckdb(barss, universe, start, end)

这里也会出现一个类似的错误,即查询语句需要df中存在asset, date列,但它们却出现在索引中。我们略作修正即可运行:

1
%time adjust_duckdb(barss.reset_index(), universe, start, end)

最终运行速度是1.21秒,在这个例子中略慢于polars,在所有方案中排在第二(在另一台机器,使用机械阵列硬盘时,更强的CPU时, duckdb更快)。但是,duckdb方案在数据规模上可能更有优势,即,如果数据集再大一到两个量级,它很可能超过polars。

在polars与duckdb中,需要的都是扁平结果的数据结构(即asset/date不作为索引,而是作为列字段存在),因此,我们可以考虑将数据结构进行重构,使用apache parquet格式写入到磁盘中,这样可以保存整个方案耗时仍保持在1秒左右。

终极咒语:急急如律令

Info

据说急急如律令要翻译成为 quickly, quickly, biu biu biu 😁

在前面,我们代替Deep Seek做了很多思考,是因为担心它对代码的最终执行速度没有sense。现在,我们试一下,直接抛出最终问题,看看会如何:

Quote

我有一个dataframe,以date和asset为联合索引,有open, high, low, close, volume, amount和factor等列。其中factor是复权因子。

现在,要对该数据结构实现以下功能:

  1. 筛选出asset 在 selected_symbols列表中,date在[start, end]中的记录
  2. 对这些记录,按asset进行分组,然后对 open, high, low, close, volume进行前复权。
  3. 结果用dataframe返回,索引仍为date/asset,列为复权后的open, high,low, close, volume字段,以及未处理的amount。

输入数据是1000万条以上,时间跨度是2005年到2023年,到2023年底,大约有5000支股票。输出结果将包含2000支股票的2005年到2023年的数据。请给出基于python,能在1秒左右实现上述功能的方案。

这一次,我们只要求技术方案限定在Python领域内,给了Deep Seek极大的发挥空间。

Deep Seek不仅给出了代码,还给出了『评测报告』,认为它给出的方案,能在某个CPU+内存组合上达到我们要求的速度。

Deep Seek认为,对于千万条记录级别的数据集,必须使用像parallel pandas这样的库来进行并行化才能达成目标。事实上这个认知是错误的

这一次Deep Seek给出的代码可运行度不高,我们没法验证基于并行化之后,速度是不是真的更快了。不过,令人印象深刻的是,它还给出了一个performance benchmark。这是它自己GAN出来的,还是真有人做过类似的测试,或者是从类似的规模推导出来的,就不得而知了。

重要的是,在给了Deek Seek更大的自由发挥空间之后,它找出了之前在筛选时,性能糟糕的重要原因: asset是字符串类型!

在海量记录中进行字符串搜索是相当慢的。在pandas中,我们可以将整数转换为category类型,此后的筛选就快很多了:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import pyarrow as pa
import pyarrow.parquet as pq

data_home = os.path.expanduser(data_home)
origin_data_file = os.path.join(data_home, "bars_1d_2005_2023.pkl")
with open(origin_data_file, 'rb') as f:
    data = pickle.load(f)

dfs = []
for symbol, bars in data.items():
    df = pd.DataFrame(bars)
    df["asset"] = symbol
    df = df.rename(columns={"frame": "date"})
    dfs.append(df)

barss = pd.concat(dfs,ignore_index=True)
barss['asset'] = barss['asset'].astype('category')
print(len(barss))

table = pa.Table.from_pandas(barss)

parquet_file_path = "/tmp/bars_1d_2005_2023_category.parquet"

with open(parquet_file_path, 'wb') as f:
    pq.write_table(table, f)

现在,我们再来看polars或者duckdb的方案的速度:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import polars as pl

def adjust_polars(df, selected_symbols, start, end):
    # 筛选数据
    filtered = df.filter(
        (pl.col("date").is_between(start, end)) &
        (pl.col("asset").is_in(selected_symbols))
    )

    # 计算最后复权因子和调整比例
    adjusted = filtered.with_columns(
        last_factor = pl.col("factor").last().over("asset")
    ).with_columns(
        ratio = pl.col("factor") / pl.col("last_factor"),
        volume_ratio = pl.col("last_factor") / pl.col("factor")
    ).with_columns(
        open   = pl.col("open") * pl.col("ratio"),
        high   = pl.col("high") * pl.col("ratio"),
        low    = pl.col("low") * pl.col("ratio"),
        close  = pl.col("close") * pl.col("ratio"),
        volume = pl.col("volume") * pl.col("volume_ratio")
    )

    # 选择最终列
    return adjusted.select([pl.col("date"), pl.col("asset"), pl.col("open"), pl.col("high"), pl.col("low"), pl.col("close"), pl.col("volume"), pl.col("amount")])

# 示例调用
start = datetime.date(2005, 1, 1)
end = datetime.date(2023, 12, 31)

barss = pl.read_parquet("/tmp/bars_1d_2005_2023_category.parquet")

universe = random.sample(barss['asset'].unique().to_list(), 2000)

%time adjust_polars(barss, universe, start, end)

结果是只需要91ms,令人印象深刻。duckdb的方案需要390ms,可能是因为我们需要在Python域拼接大量的selected_symbols字符串的原因。

借助 Deep Seek,我们把一个需要5秒左右的操作,加速到了0.1秒,速度提升了50倍。

本文测试都在一台mac m1机器上运行,RAM是16GB。当运行在其它机器上,因CPU,RAM及硬盘类型不同,数据表现甚至排名都会有所不同_。

结论

这次探索中,仅从解决问题的能力上看,Deep Seek、通义和豆包都相当于中级程序员,即能够较好地完成一个小模块的功能性需求,它情绪稳定,细微之处的代码质量更高。

当我们直接要求给出某个数据集下,能达到指定响应速度的Python方案时,Deep Seek有点用力过猛。从结果上看,如果我们通过单机、单线程就能达到91ms左右的响应速度,那么它给出的多进程方案,很可能是要劣于这个结果的。Deep Seek只是遵循了常见的优化思路,但它没有通过实际测试来修正自己的方案。

这说明,它们还无法完全替代人类程序员,特别是高级程序员:对于AI给出的结果,我们仍然需要验证、优化甚至是推动AI向前进,而这刚好是高级程序员才能做到的事情。

但这也仅仅是因为AI还不能四处走动的原因。因为这个原因,它不能像人类一样,知道自己有哪些测试环境可供方案验证,从而找出具体环境下的最优方案。

在铁皮机箱以内,它是森林之王,人类无法与之较量。但就像人不能拔着自己的头发离开地球一样,它的能力,也暂时被封印在铁皮机箱之内。但是,一旦它学会了拔插头,开电源,高级程序员的职业终点就不再是35岁,而是AI获得自己的莲花肉身之时。

至于初中级程序员,目前看是真不需要了。1万元的底薪,加上社保,这能买多少token? 2025年的毕业生,怎么办?

用HDBSCAN聚类算法选股是否有效

前篇文章提到可以用HDBSCAN算法来来对资产进行聚类,在聚类完成之后,对聚类结果进行协整检验,通过计算对冲比,可以构造成平稳序列。我们知道一个平稳时间序列的均值、方差恒定并且有自协相关特性,那么,一旦它偏离了均值,迟早都会回归到均值上。利用这一点,可以生成交易信号。

那怎样证明HDBSCN算法对寻到协整对的交易策略是有效的呢?下面我们来一步步分析。

首先,从前面的文章我们知道了HDBSCN算法是一种基于密度的聚类算法,它通过计算每个样本的密度来确定样本的聚类类别。HDBSCN算法的优点是它可以自动确定聚类中心,并且可以处理高维数据。下面是用python代码来实现HDBSCN算法的关键代码,获取历史数据的时间是从2022年1月1日至2023年12月31日。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
start_date = datetime.date(2022, 1, 1)
end_date = datetime.date(2023,12,31)
barss = load_bars(start_date, end_date, 2000)   #获取历史资产数据,这里选取2000条数据
closes = barss["close"].unstack().ffill().dropna(axis=1, how='any') #处理缺失值,将close列的MultiIndex转换为DataFrame二维表格,并使用ffill()方法填充缺失值。
clusterer = hdbscan.HDBSCAN(min_cluster_size=3, min_samples=2)# 使用 HDBSCAN 进行聚类,python可以直接安装hdbscan包
cluster_labels = clusterer.fit_predict(closes.T)  #转置是因为要对资产(特征)聚类

clustered = closes.T.copy()
clustered['cluster'] = cluster_labels# 将聚类结果添加到 DataFrame 中

clustered = clustered[clustered['cluster'] != -1] # 剔除类别为-1的点,这些是噪声,而不是一个类别
clustered_close = clustered.drop("cluster", axis=1)

unique_clusters = set(cluster_labels)
num_clusters = len(unique_clusters) # 获取有效的簇数量
print(f"有效的簇数量为:{num_clusters}")  

tsne = TSNE(n_components=3, random_state=42)
tsne_results = tsne.fit_transform(clustered_close)  # 使用t-SNE进行降维,便于后面的簇类可视化
reduced_tsne = pd.DataFrame(data=tsne_results, columns=['tsne_1', 'tsne_2', 'tsne_3'], index=clustered_close.index)# 将t-SNE结果添加到DataFrame中
reduced_tsne['cluster'] = clustered['cluster']

fig_tsne = px.scatter_3d(
    reduced_tsne, 
    x='tsne_1', y='tsne_2', z='tsne_3',
    color='cluster', 
    title='t-SNE Clustering of Stock Returns',
    labels={'tsne_1': 't-SNE Component 1', 'tsne_2': 't-SNE Component 2'}
)  #进行3D散点图可视化
fig_tsne.layout.width = 1200
fig_tsne.layout.height = 1100
fig_tsne.show()
导入必要的库,运行后可以得到下面的3D图:

3D图展示的是股票的3D空间分布,不同颜色代表不同的聚类类别,从图中可以观察到,这期间的2000支股票被分成了40多个类,除了一个包含420支股票的第39簇,其它簇都少于20支股票。下面分析第35簇,里面有3支股票,从2022年10月1日左右三支股票都是持续上升的。 那是否可以继续用HDBSCAN聚类算法对2022年10月份之前的数据进行聚类,看上面的3支股票是否也被分在了一起。还是上面的代码,只是将时间改为了2022年1月1日到2022年10月1日,运行后得到下面的3D图:

和上面的3D图相比,分为一类的股票(特征)更相似,所以聚集的更密集。那上面第35簇中的三支股票是否还是会被分为一类呢?可以将每一簇都都可视化展示出来(和前面文章一样的方法),观察那三支股票在这次分类的那一簇中。通过观察,我们发现这三支股票是被分在了一类,但是这一类里面有大概600支股票,我们看看这600多支股票的可视化结果: 我们想要知道的是这三支股票在持续上升之前会有什么特征,虽然这一个大类里面的股票数据有点多,但观察上面的趋势图可以知道它们的走势大致是相同的,所以才会被分为一类。那继续将这600多支股票在用HDBSCA进行聚类,之前的3支股票会不会被分为一类?也就是说需要验证3支股票在持续上升前的大致趋势也是一致。感兴趣的读者可以自己尝试一下,下面是小编验证的结论: 对这600支股票重新在2022年1月到2022年10月进行聚类,总共被分成了三类,之前的那三支股票有2支被分在了一簇,另外一支在另一类里面。这个结果我认为足以说明这600支股票在2022年1月到2022年10月之间的趋势是相似的,所以600多支股票只被分为三类。 所以,HDBSCAN算法对选股是有效的,它选出来的一类趋势类似,所以可以用前一篇内容中构造这一类股票的平稳序列来产生交易信号。

hdbscan 聚类算法扫描配对交易 速度提升99倍

配对交易是一种交易策略,由摩根士丹利的量化分析师农齐奥.塔尔塔里亚在 20 世纪 80 年代首创。他现在是意大利雅典娜资产管理公司的创始人。该策略涉及监控两只历史上相关性较强的证券,并监控两者之间的价格差。一旦价格差超出一个阈值(比如 n 倍的标准差),价格回归就是大概率事件,于是交易者就做空价格高的一支,做多价格低的一支,从而获利。