跳转至




Index

[1020] QuanTide Weekly

本周要闻

  • 幻方量化宣布降低对冲全系产品投资仓位至0
  • 9月CPI、PPI及前三季度GDP数据出炉
  • 潘功胜发声,宏观经济政策应更加重视消费

下周看点

  • 周一:最新LPR报价
  • 周二:华为原生鸿蒙之夜新品发布会
  • 周五:多家银行存量房贷利率调整
  • 周日:全球低空经济论坛年会

本周精选

  • 连载!量化人必会的 Numpy 编程(7)

  • 宁波幻方量化公告,将逐步把对冲全系产品投资仓位降低至0,同时自10月28日起免除对冲系列产品后期的管理费。作出改变的原因是,市场环境变化,对冲系列产品难以同时取得收益和缩小风险敞口,潜在收益风险比明显下降,未来收益将明显低于投资人预期。建议投资者适时调整投资组合,市场低位较适合配置指数增强产品,在风险能力匹配前提下,对冲产品可转至多头。(财联社)
  • 10月13日,国家统计局数据显示,9月份全国居民消费价格(CPI)环比持平,同比上涨0.4%,涨幅回落;工业生产者出厂价格(PPI)环比降幅收窄,同比降幅扩大。CPI、PPI同比表现均弱于市场预期。(证券时报网)
  • 10月18日,统计局公布2024年9月经济数据,9月社零当月同比3.2%,固定资产投资累计同比3.4%,工增当月同比5.4%,三季度GDP同比4.6%。前三季度GDP累计同比4.8%。(财联社)
  • 在10月18日的2024金融街论坛上,央行行长潘功胜重磅发声。谈及实现经济的动态平衡,需要把握好几个重点时,他提到宏观经济政策的作用方向应从过去的更多偏向投资,转向消费与投资并重,并更加重视消费。(财联社)

Numpy量化应用案例[4]

突破旗形整理

最近和一位做量化的私募大佬聊了一下行情,他给我发了这张图片。

75%

这个底部点位,他又一次精准命中了(3143那个点,不是3066。周五上证实际下探到3152点)。不过,我更好奇的是他的研究方法,也就图的下半部分。知道大致的底之后,再结合缺口、前低等一些信息,确实有可能比较精准地预测底部点位。


我当时就回了一句,最近忙着上课,等有时间了,把这个三角形检测写出来。

这个检测并不难,写一个教学示例,一个小时的时间足够了。

在分享我的算法之前,先推荐一个外网的方案。同样是教学代码,显然不如我随手写的优雅,先小小自得一下。不过,这样的好处就是,他的代码可能更容易读懂。

所谓旗形整理(或者说三角形检测),就是下面这张图:

在这张图,每次上涨的局部高点连接起来,构成压力线;而下跌的局部低点连起来,构成支撑线。

如果我们再在开始的位置画一条竖线,就构成了一个小旗帜,这就是旗形的来由。


旗形整理的特别之处是,整理何时结束似乎是可以预测的,因为这两条线之间的交易空间会越来越窄。

当它小于一个ATR时,就是整理必须结束,即将选择方向的时候。

下图显示了随时间推移,震荡幅度越来越小的情况。

75%

最终,股价会选择方向。一旦选择方向,就往往会有一波较大的行情(或者下跌):

75%


所以,能够自动化检测旗形整理,有以下作用:

  1. 如果当前处理在旗形整理中,可以设定合理的波段期望。
  2. 检测临近整理结束,可以减仓等待方向。
  3. 一旦方向确定,立即加仓。

现在,我们就来看如何实现。首先,我们有这样一个标的:

75%

这是已经上涨后的。我们再来看它上涨前的:

75%


肉眼来看,一个旗形整理似有若无。

我们的算法分这样几步:

  1. 找到每阶段的峰和谷的坐标
  2. 通过这些坐标及它们的收盘价,进行趋势线拟合
  3. 通过np.poly1d生成趋势线
  4. 将趋势线和k线图画在一张图上
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
def find_peak_pivots(df, win):
    local_high = (df.close.rolling(win)
                    .apply(lambda x: x.argmax()== win-1))
    local_high[:win] = 0

    # find_runs函数是量化24课内容
    v,s,l = find_runs(local_high)

    peaks = []
    i = 0
    while i < len(v):
        if l[i] >= win // 2:
            if s[i] > 0:
                peaks.append(s[i] - 1)
        for j in range(i+1, len(v)):
            if l[j] >= win // 2:
                peaks.append(s[j] - 1)
                i = j
        if j == len(v)-1:
            break

    return peaks

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def find_valley_pivots(df, win):
    local_min = (df.close.rolling(win)
                .apply(lambda x: x.argmin()== win-1))
    local_min[:win] = 0

    v,s,l = find_runs(local_min)

    valleys = []
    i = 0
    while i < len(v):
        if l[i] >= win // 2:
            if s[i] > 0:
                valleys.append(s[i] - 1)
        for j in range(i+1, len(v)):
            if l[j] >= win // 2:
                valleys.append(s[j] - 1)
                i = j
        if j == len(v)-1:
            break

    return valleys

def trendline(df):
    peaks = find_peak_pivots(df, 20)
    valleys = find_valley_pivots(df, 20)

    y = df.close[peaks].values
    p = np.polyfit(x=peaks, y = y, deg=1)
    upper_trendline = np.poly1d(p)(np.arange(0, len(df)))

    y = df.close[valleys].values
    v = np.polyfit(x=valleys, y = y, deg=1)
    lower_trendline = np.poly1d(v)(np.arange(0, len(df)))

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
    candle = go.Candlestick(x=df.index,
                    open=df['open'],
                    high=df['high'],
                    low=df['low'],
                    close=df['close'],
                    line=dict({"width": 1}),
                    name="K 线",
                    increasing = {
                        "fillcolor":"rgba(255,255,255,0.9)",
                        "line": dict({"color": RED})
                    },
                    decreasing = {
                        "fillcolor": GREEN, 
                        "line": dict(color =  GREEN)
                    })
    upper_trace = go.Scatter(x=df.index, 
                             y=upper_trendline, 
                             mode='lines', 
                             name='压力线')

    lower_trace = go.Scatter(x=df.index, 
                             y=lower_trendline, 
                             mode='lines', 
                             name='支撑线')

    fig = go.Figure(data=[candle,lower_trace, upper_trace])

    fig.show()

最后,我们对该标的在上涨之前的形态进行检测,得到以下结果:


这个结果说明,旗形整理结束时,方向选择受大盘影响,仍有一定不确定性,但没有跌破前低,这是此后能凝聚共识、返身上涨的关键。

我们再来看一个最近一个月翻了7倍的标的:

这是未上涨前的形态:

这是检测出来的旗形整理:


完美捕捉!

当然,这里只是示例代码,在实际运用中,由于我们使用了小样本线性回归,回归结果具有不稳定性,要作为生产代码,还需要辅以其它方法让其预测更稳定。无论如何,我们已经迈出了关键一步。

代码(可运行的ipynb文件)放在知识星球里。正在建设,所以目前是最低价格。

如果有一些代码和术语看不明白(比如为何以ATR来决定整理结束),这些都是我们量化24课的内容,欢迎报名!


好课开讲!


目标清晰 获得感强


为什么选择QuanTide的课程?

第42个因子:年化17.6%,15年累计10倍

题图:第比利斯自由大学,Kahushadze在此任教

《101个公式化因子》是Zura Kahushadze于2015年发表的paper。在这篇paper中,他拿出了在worldquant广泛使用的因子中,便于公式化的因子(约80个),加上其它自创因子,共101个,集结发表在预印论文网站arXiv上。

这一paper甫一发表,便引起业界关注。现在,Alpha101因子已成为国内机构广泛使用的付费因子。但是,Alpha101因子中的公式比较晦涩难懂,使用了自定义的算子、大量魔术数字和数不清的括号嵌套,让无数人不得不从入门到放弃。


然而,如果你因此放弃Alpha101,不能不说是巨大的损失。比如,我们近期对第42个因子进行了回测,发现它在A股有相当好的表现。

Info

回测使用2008年到2022年的数据,随机抽取2000支个股参与回测。考虑到2018年A股才1800支个股左右,这一回测在2018年前几乎是全覆盖。具有很强的代表性。

回测结果表明,这一因子的年代收益达到16.1%, 累计收益达到7倍(15年)。


不过,驾驭Alpha101并不容易。不得不说,它的公式有点晦涩难懂,比如第29号因子,它的公式如下:

1
2
(min(product(rank(rank(scale(log(sum(ts_min(rank(rank((-1 * rank(delta((close - 1),
5))))), 2), 1))))), 1), 5) + ts_rank(delay((-1 * returns), 6), 5))

这只是Alpha101中中等阅读难度的因子。如果我们把它展开,相当于:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
(
    min(
        product(
            rank(
                rank(
                    scale(
                        log(
                            sum(
                                ts_min(
                                    rank(rank((-1 * rank(delta((close - 1), 5))))), 2
                                ),
                                1,
                            )
                        )
                    )
                )
            ),
            1,
        ),
        5,
    )
    + ts_rank(delay((-1 * returns), 6), 5)
)

不仅是了解其含义非常困难,就是实现它也不是件容易的事。而且,Alpha101中还存在大量待优化的部分,以及少部分错误(对于一篇免费、公开的文章,仍然是相当宝贵的资源)。比如,对于42号因子,它仍然有改进空间。这是我们改进后的因子表现(同等条件下,源码仅对学员开放):

我们看到,年化alpha有了1.5%的上涨。而下面这张分层收益图,懂行的人一看就知道,简直是完美。西蒙斯所谓追随美的指引,应该就是指的这种图了。


累积收益图也很完美。A股2008年触顶6124之后,持续下跌数年,但这期间此因子的收益仍然保持上涨。

不过,Alpha101确实很难懂。比如公式001看起来并不复杂:

1
2
3
(rank(Ts_ArgMax(SignedPower((
    (returns < 0) ? stddev(returns, 20) : close), 2.)
    , 5)) -0.5)

但它却做了许多无用操作。它实际上是对现价对近期高点的距离排序,你看明白了吗?所以,这个因子到底有没有效呢?在什么情况下,它会出现出人意料的表现呢?

还有,像这样的因子,从公式到代码,再到结合数据进行因子检验,又该如何操作呢?如果你感兴趣,快来加入我们一起学习吧!

地量见地价?我拿一年的上证数据算了算

多伦多大学校园。2024诺贝尔物理学奖获得者,Geoffrey Hinton在此任教。

股谚云,天量见天价、地量见地价。今天我们就来验证一下。

要把股谚量化,首先要解这道难题:数组中第i个元素是多少周期以来的最小值(最大值)?


比如,有数组如下: 1, 2, 2, 1, 3, 0。那么,第1个元素1,是1周期以来的最小值,第2个元素2,是到目前为止的最大值,所以,也是1周期以来的最小值;但第4个元素1则是从第2个元素以来的最小值,所以它是3周期以来的最小值。

依次计算下去,我们得到这样一个序列: 1, 1, 2, 1, 4, 6。其中的每一项,都是原数组中,对应项到目前为止的最小值。

这个算法有什么用处呢?它可以用在下面的计算当中。

比如,有股谚云,天量见天价,地量见地价。

当行情处在高位,成交量创出一段时间以来的天量之后,后续成交量将难以为继,容易引起下跌;当行情处在低位,成交量创出一段时间以来的地量之后,表明市场人气极度低迷,此时价格容易被操纵,从而引来投机盘。在计算地量时,我们就要知道,当前的成交量是多少期以来的最小值。

比如,如果大盘当前的成交量成为了120天以来的最低量,这时候很可能就会引起大家的关注了。要验证出现地量之后,后面是否真的有行情,就需要进行因子分析或者回测验证。现在的问题是,怎么计算呢?

无脑的双重循环

我们以上面的数组为例,最简单的算法是使用循环:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
def min_range_loop(s):
    minranges = [1]
    for i in range(1, len(s)):
        for j in range(i-1, -1, -1):
            if s[j] < s[i]:
                minranges.append(i - j)
                break
        else:
            minranges.append(i+1)
    return minranges

s = [1,2,2,1,3,0]

min_range_loop(s)

输出为:1, 1, 2, 1, 4, 5

这个算法实现用了双重循环,应该比较耗时。我们生成10000个元素的数组跑一下,发现调用一次需要用时9.5ms。

它山之石,myTT的实现

在myTT中有一个类似的函数实现:

1
2
3
4
5
def LOWRANGE(S):                       
    # LOWRANGE(LOW)表示当前最低价是近多少周期内最低价的最小值 by jqz1226
    rt = np.zeros(len(S))
    for i in range(1,len(S)):  rt[i] = np.argmin(np.flipud(S[:i]>S[i]))
    return rt.astype('int')

它应该也是实现元素i是多少周期之前的最小值,只不过从注释上看,该函数多在计算最低价时使用。但实际上序列s是什么没有关系。

这个函数用了一个循环,还使用了flipuid函数,比较有技巧。这个函数的用法演示如下:

1
2
s = [1, 2, 2, 3, 2, 0]
np.all(np.flipud(s) == s[::-1])

也就是它的作用实际上就是翻转数组。

不过,LOWRANGE函数似乎没有实现它声明的功能,不知道是不是对它的功能理解上有错误。当我们用同一个数组进行测试时,得到的结果与双循环的并不一致。

1
2
s = np.array([1, 2, 2, 3, 2, 0])
LOWRANGE(s)

得到的结果是:

1
array([0, 0, 0, 0, 1, 0])

除此之外,如果同样拿10000个元素的数组进行性能测试,LOWRANGE执行时间要60ms,居然跑输给Python双循环。测试环境使用的Python是3.11版本,不得不说Python3.11的优化非常明显。

如果我们要完全消除循环,应该怎么做呢?

烧脑的向量化

如果我们能把数组[1, 2, 2, 3, 2, 0]展开为:

\(\displaystyle \left[\begin{matrix}1.0 & \text{NaN} & \text{NaN} & \text{NaN} & \text{NaN} & \text{NaN}\\1.0 & 2.0 & \text{NaN} & \text{NaN} & \text{NaN} & \text{NaN}\\1.0 & 2.0 & 2.0 & \text{NaN} & \text{NaN} & \text{NaN}\\1.0 & 2.0 & 2.0 & 3.0 & \text{NaN} & \text{NaN}\\1.0 & 2.0 & 2.0 & 3.0 & 2.0 & \text{NaN}\\1.0 & 2.0 & 2.0 & 3.0 & 2.0 & 0.0\end{matrix}\right]\)

然后实现一个函数,接收该矩阵输入,并能独立计算出每一行最后一列是多少个周期以来的最小值,这个问题就得到了求解。

要实现这个功能,我们可以通过numpy的masked array和triu矩阵来实现。


1
2
3
4
n = len(s)
mask = np.triu(np.ones((n, n), dtype=bool), k=1)
masked = np.ma.array(m, mask=mask)
masked

triu中的k参数决定了生成的三角矩阵中主对角线的位置。k=0,对角线取在主对角线上;k<0,对角线取在主对角线之个k个单位;k>0,对角线取在主对角线之上k个单位。

我们将得到以下输出:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
masked_array(
  data=[[1.0, --, --, --, --, --],
        [1.0, 2.0, --, --, --, --],
        [1.0, 2.0, 2.0, --, --, --],
        [1.0, 2.0, 2.0, 3.0, --, --],
        [1.0, 2.0, 2.0, 3.0, 2.0, --],
        [1.0, 2.0, 2.0, 3.0, 2.0, 0.0]],
  mask=[[False,  True,  True,  True,  True,  True],
        [False, False,  True,  True,  True,  True],
        [False, False, False,  True,  True,  True],
        [False, False, False, False,  True,  True],
        [False, False, False, False, False,  True],
        [False, False, False, False, False, False]],
  fill_value=1e+20)

mask flag为True的部分将不会参与运算。如果我们把masked转给sympy,就可以验证这一点:


1
2
3
4
5
6
from sympy import Matrix

n = len(s)
mask = np.triu(np.ones((n, n), dtype=bool), k=1)
masked = np.ma.array(m, mask=mask)
Matrix(masked)

我们得到了与期望中一样的展开矩阵。

\(\displaystyle \left[\begin{matrix}1.0 & \text{NaN} & \text{NaN} & \text{NaN} & \text{NaN} & \text{NaN}\\1.0 & 2.0 & \text{NaN} & \text{NaN} & \text{NaN} & \text{NaN}\\1.0 & 2.0 & 2.0 & \text{NaN} & \text{NaN} & \text{NaN}\\1.0 & 2.0 & 2.0 & 3.0 & \text{NaN} & \text{NaN}\\1.0 & 2.0 & 2.0 & 3.0 & 2.0 & \text{NaN}\\1.0 & 2.0 & 2.0 & 3.0 & 2.0 & 0.0\end{matrix}\right]\)

现在,我们要求解的问题变成,每一行最后一个数是多少周期的最小值。我们进行一个变换:

1
2
3
4
s = np.array([1, 2, 2, 3, 2, 0])
diff = s[-1] - s
rng = np.arange(len(diff))
rng - np.argmax(np.ma.where(diff > 0, rng, -1))

我们用最后一个元素减去数组,然后再比较元素是否大于零,如果大于零,我们就将值设置为索引(rng),否则设置为-1,然后再通过argmax找到最后一个非零值。这样输出元素的最后一个值,就是最小周期数。在此例中是5。

如果s = np.array([1, 2, 2, 3, 2]),那么计算出来的最后一个值是4。 如果s = np.array([1, 2, 2, 3]),这样计算出来的最后一个值是1。 依次类推。这刚好就是在masked array中,按axis = 1计算的结果。

下面是完整的代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
def min_range(s):
    """计算序列s中,元素i是此前多少个周期以来的最小值"""
    n = len(s)

    diff = s[:,None] - s
    mask = np.triu(np.ones((n, n), dtype=bool), k=1)
    masked = np.ma.array(diff, mask=mask)

    rng = np.arange(n)
    ret = rng - np.argmax(np.ma.where(masked > 0, rng, -1), axis=1)
    ret[0] = 1
    if filled[1] <= filled[0]:
        ret[1] = 2
    return ret

我们来验证一下结果:

1
2
s = np.array([1, 2, 2, 3, 2, 0])
min_range(s)

输出结果是1, 1, 2, 1, 4, 6

在最后一个数字上,与loop略有差异。不过,如果是用来寻找地量条件,这个数值一般要比较大才生效,所以,有一点误差可以接受。

消除了两个循环,性能应该有很大的提升吧?

遗憾的是,在同样的测试条件下,这个函数需要822ms,比双循环慢了100倍。花了这么多功夫,还引入了一点小误差,许诺的性能提升不仅没有实现,反而更糟糕了。真是意外啊。

地量见地价?

最后,我们以上证为例,看看这个算法的实际作用。

1
2
3
4
5
6
7
8
import akshare as ak
df = ak.stock_zh_index_daily(symbol="sh000001")

df_one_year = df.tail(250)
df_one_year["minrange"] = min_range_loop(df_one_year["volume"].to_numpy())

ax = df_one_year.plot(x='date', y='close', label='close', color='blue', secondary_y=False)
df_one_year.plot(x='date', y='minrange', label='Min Range', color='red', secondary_y=True, ax=ax)

这里我们使用了akshare数据源,所以,所有人都可以复现。

我们得到的输出如下:

这个图显示了惊人的结果。几乎在每一次地量(大于50天)出现之后,都能立刻迎来一个小的反弹。但大级别的反弹,还需要在地量之后,随着资金不断进场,成交量放大才能出现。

比如,在8月底,上证出现了一年以来的最低地量,随后立即迎来一个小反弹。在反弹失败之后,其它指标也逐渐见底回升,最终迎来了9月底的十年不遇的暴涨行情。

[1013] QuanTide Weekly

本周要闻

  • 10月25日起,存量房贷统一下调!
  • Robotaxi Day草草收场,特斯拉暴跌
  • 一揽子增量财政策略超预期,规模或在5万亿以上
  • 化债概念出炉!

下周看点

  • 周日:9月PPI和CPI指数公布
  • 周一(10月14日)国新办就前三季度进出口数据举办新闻发布会

本周精选

  • 连载!量化人必会的 Numpy 编程(6)

  • 工商银行发布存量房贷利率调整常见问答,透露存量住房贷款都可以调整为不低于LPR-30BP(除京沪深二套外),并在10月25日统一批量调整。以100万、25年期、等额本息计,调整后每月可省支出469元,共节省利息14.06万元。
  • We Robot发布会召开,此前马斯克称其为载入史册,但等到大幕拉开,却只有短短20多分钟的主题介绍,重要技术指标和参数均未公布。随后特斯拉大跌8.78%,其对手莱福特(Lyft)则大涨9.59%。
  • 财政部周六召开发布会,一揽子增量财政政策落地,有分析师认为,保守估计,本次一揽子增量财政政策规模或在5万亿元以上,重点是化债和基层三保。
  • 紧随财政部发布,化债概念受到市场热议。证券时报.数据宝梳理,AMC、城投平台、PPP概念和REITs概念共约37家公司可能受益。在周五大跌中,这些公司多数逆市大涨或者跑赢大盘。

消息来源:东方财富


Numpy量化应用案例[3]

向量化又一例: 多资产中位数去极值

去极值是量化分析预处理中的常见步骤,在机器学习中也很常见。在各种去极值方法中,中位数拉回是对数据分布特性适应性最广、最鲁棒的一种。

我们先介绍绝对中位差(median absolute deviation)的概念:

\[MAD = median(|X_i - median(X)|)\]

为了能将 MAD 当成与标准差\(\sigma\)相一致的估计量,即 \(\(\hat{\sigma} = k. MAD\)\)

这里 k 为比例因子常量,如果分布是正态分布,可以计算出: $$ k = \frac{1}{(\Phi^{-1}(\frac{3}{4}))} \approx 1.4826 $$


基于这个 k 值,取 3 倍则近似于 5。

代码实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from numpy.typing import ArrayLike

def mad_clip(arr: ArrayLike, k: int = 3):
    med = np.median(arr)
    mad = np.median(np.abs(arr - med))

    return np.clip(arr, med - k * mad, med + k * mad)

np.random.seed(78)
arr = np.append(np.random.randint(1, 4, 20), [15, -10])
mad_clip(arr, 3)

这段代码只能对单一资产进行mad_clip。如果要同时对A股所有资产的某种指标去极值,上述方法需要循环5000多次,显然速度较慢。此时,我们可以使用下面的方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def mad_clip(df: Union[NDArray, pd.DataFrame], k: int = 3, axis=1):
    """使用 MAD 3 倍截断法去极值"""

    med = np.median(df, axis=axis).reshape(df.shape[0], -1)
    mad = np.median(np.abs(df - med), axis=axis)

    magic = 1.4826
    offset = k * magic * mad
    med = med.flatten()
    return np.clip(df.T, med - offset, med + offset).T

这一版的 mad_clip 可以接受 numpy ndarray 和 pandas dataframe 作为参数。输入的数据格式是什么,它返回的数据格式就是什么。

我们在np.median调用中,传入了 axis参数。如果axis=0, 表明按列的方向遍历,因此是按行取中位数;axis=1,表明按行的方向遍历,因此是按列取中位数。

我们使用真实数据测试一下:

1
2
3
4
5
6
7
# 加载测试数据
start = datetime.date(2023, 1, 1)
end = datetime.date(2023, 12, 29)
barss = load_bars(start, end, 7)

closes = barss["close"].unstack("asset").iloc[-5:]
closes

输出数据为:

asset/date 002095.XSHE 003042.XSHE 300099.XSHE 301060.XSHE 601689.XSHG 603255.XSHG 688669.XSHG
2023-12-25 23.400000 18.090000 6.10 13.00 73.910004 36.799999 18.080000
2023-12-26 21.059999 17.520000 5.94 12.83 72.879997 37.000000 18.080000
2023-12-27 20.070000 17.590000 6.04 12.84 72.000000 36.840000 18.049999
2023-12-28 20.010000 18.139999 6.11 13.14 72.199997 38.150002 18.440001
2023-12-29 20.270000 18.580000 6.19 13.29 73.500000 37.299999 18.740000

为了测试效果,我们将k设置为较小的值,以观察其效果:

1
mad_clip(closes,k=1)
asset/date 002095.XSHE 003042.XSHE 300099.XSHE 301060.XSHE 601689.XSHG 603255.XSHG 688669.XSHG
2023-12-25 23.400000 18.090000 10.217396 13.00 25.962605 25.962605 18.080000
2023-12-26 21.059999 17.520000 10.296350 12.83 25.863649 25.863649 18.080000
2023-12-27 20.070000 17.590000 10.325655 12.84 25.774343 25.774343 18.049999
2023-12-28 20.010000 18.139999 10.582220 13.14 26.297781 26.297781 18.440001
2023-12-29 20.270000 18.580000 10.659830 13.29 26.820169 26.820169 18.740000

我们看到,原始数据中的73.9被拉回到25.9,6.1被拉回到10.2(以第一行为例),并且都是以行为单位计算的。

min_range: 多少周期以来的最小值?

这是一个很常见的需求,比如,有股谚语云,天量见天价,地量见地价。当行情处在高位,成交量创出一段时间以来的天量之后,后续成交量将难以为继,容易引起下跌;当行情处在低位,成交量创出一段时间以来的地量之后,表明市场人气极度低迷,此时价格容易被操纵,从而引来投机盘。


在通达信公式中有此函数,在麦语言中,对应的方法可能是LOWRANGE。以下是myTT中LowRange函数的实现:

1
2
3
4
5
def LOWRANGE(S):                       
    # LOWRANGE(LOW)表示当前最低价是近多少周期内最低价的最小值 by jqz1226
    rt = np.zeros(len(S))
    for i in range(1,len(S)):  rt[i] = np.argmin(np.flipud(S[:i]>S[i]))
    return rt.astype('int')

这是一个看似简单,但实际上比较难实现的功能。如果我们对上述函数进行测试,会发现它不一定实现了需求(也可能是本文作者对此函数理解有误)。

1
2
3
s = [ 1, 2, 2, 1, 3, 0]

LOWRANGE(np.array(s))

在上述测试中,我们希望得到的输出是[1, 1, 1, 3, 1, 6],但LOWRANG将给出以下输出:

1
array([0, 0, 0, 2, 0, 0])

下面,我们给出该函数的向量化实现。

Warning

该函数在开头的几个输出中,存在出错可能。因不影响因子分析,暂未修复。


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def min_range(s):
    """计算序列s中,元素i是此前多少个周期以来的最小值

    此方法在个别数字上有bug

    Example:
        >>> s = np.array([5, 7, 7, 6, 5, 8, 2])
        >>> min_range(s)
        array([1, 2, 1, 2, 3, 1, 6])
    """
    n = len(s)

    # handle nan
    filled = np.where(np.isnan(s), -np.inf, s)
    diff = filled[:,None] - filled
    mask = np.triu(np.ones((n, n), dtype=bool), k=1)
    masked = np.ma.array(diff, mask=mask)

    rng = np.arange(n)
    ret = rng - np.argmax(np.ma.where(masked > 0, rng, -1), axis=1)
    ret[0] = 1
    if filled[1] <= filled[0]:
        ret[1] = 2
    return ret

s = np.array([5, 7, 7, 6, 5, 8, 2])
min_range(s)

最终输出的结果是:

1
array([1, 1, 2, 3, 4, 1, 6])

在第2个7的位置,输出与期望不一致,但此后计算都正确。这个实现非常有技巧,运用了三角矩阵做mask array,从而消解了循环。


均线计算:SMA和分时均线

使用numpy计算移动均线非常简单,使用np.convolve()即可。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def moving_average(ts: ArrayLike, win: int, padding=True)->np.ndarray:
    kernel = np.ones(win) / win

    arr = np.convolve(ts, kernel, 'valid')
    if padding:
        return np.insert(arr, 0, [np.nan] * (win - 1))
    else:
        return arr

moving_average(np.arange(5), 3)

输出结果为array([nan, nan, 1., 2., 3.])

移动均线是只考虑价格信息的一种均线。分时均价线则则同时纳入成交量和成交价信息的均线,在日内交易中有特别重要的含义。比如,在市场不好的情况下,如果个股价格位于分时均线下方,此前两次上冲均线失败,那么,一旦冲第三次失败,一般认为要尽快卖出。反之亦然。

均价线的计算如下:


如果当前时刻为t,则用开盘以来,直到时刻t为止的成交金额除以成交量,即得到该时刻的累积成交均价。将所有时刻的成交均价连接起来,即构成了分时均价线。

这个功能看似复杂,但由于numpy提供了cumsum函数,因此实际上计算非常简单:

1
2
3
4
5
def intraday_moving_average(bars: DataFrame)->np.ndarray:
    acc_vol = bars["volume"].cumsum()
    acc_money = barss["amount"].cumsum()

    return acc_money / acc_vol

在本环境中,只提供了日线数据,我们以日线代替分钟线进行测试:

1
2
3
4
5
start = datetime.date(2023, 1, 1)
end = datetime.date(2023, 12, 29)
barss = load_bars(start, end, 1)

intraday_moving_average(barss)

计算最大回撤

最大回撤(MDD)是指投资组合从最高点到最低点的最大观察损失,直到达到新的最高点。最大回撤是一定时间周期内的下行风险指标。


\[ MDD = \frac{Trough Value - Peak Value}{Peak Value} \]

max drawdown是衡量投资策略风险的重要指标,因此,在empyrical库中有实现。不过,作为策略风险评估指标,empyrical没必要返回duration等信息,也没有实现滑动窗口下的mdd。现在,我们就来实现滑动版本。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# https://stackoverflow.com/a/21059308
from numpy.lib.stride_tricks import as_strided
import matplotlib.pyplot as plt

def windowed_view(x, window_size):
    """Creat a 2d windowed view of a 1d array.

    `x` must be a 1d numpy array.

    `numpy.lib.stride_tricks.as_strided` is used to create the view.
    The data is not copied.

    Example:

    >>> x = np.array([1, 2, 3, 4, 5, 6])
    >>> windowed_view(x, 3)
    array([[1, 2, 3],
           [2, 3, 4],
           [3, 4, 5],
           [4, 5, 6]])
    """
    y = as_strided(x, shape=(x.size - window_size + 1, window_size),
                   strides=(x.strides[0], x.strides[0]))
    return y

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def rolling_max_dd(x, window_size, min_periods=1):
    """Compute the rolling maximum drawdown of `x`.

    `x` must be a 1d numpy array.
    `min_periods` should satisfy `1 <= min_periods <= window_size`.

    Returns an 1d array with length `len(x) - min_periods + 1`.
    """
    if min_periods < window_size:
        pad = np.empty(window_size - min_periods)
        pad.fill(x[0])
        x = np.concatenate((pad, x))
    y = windowed_view(x, window_size)
    running_max_y = np.maximum.accumulate(y, axis=1)
    dd = y - running_max_y
    return dd.min(axis=1)


np.random.seed(0)
n = 100
s = np.random.randn(n).cumsum()
win = 20

mdd = rolling_max_dd(s, win, min_periods=1)

plt.plot(s, 'b')
plt.plot(mdd, 'g.')

plt.show()

测试表明,当时序s长度为1000时,rolling_max_dd的计算耗时为100𝜇S。

滑动窗口下,生成的mdd与原序列对照图如下:


该方法中,还简单地封装了一个将一维数组转换为滑动窗口视图的函数,可以在其它地方使用。

寻找自适应参数

很多基于技术指标的交易策略往往指定了固定的阈值。比如,一些人会在RSI 80以上做空,在RSI 20以下做多。即使是用在指数和行业板块上,这样的指标仍然不够精确,因为在上行通道中,RSI的顶点会高于下行通道中的RSI顶点;在下行通道中,RSI的底部则会比上行通道中的RSI底部低很多。


此外,不同的标的,RSI取值范围也不一样。不仅仅是RSI,许多技术指标都存在需要根据当前的市场环境和标的,采用自适应参数的情况。

其中一个方案是使用类似于布林带的方案,使用指标均值的标准差上下界。但这个方案隐含了技术指标均值的数据分布服从正态分布的条件。

我们可以放宽这个条件,改用分位数,即numpy的percentile来确定参数阈值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
%precision 2

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

np.random.seed(78)
s = np.random.randn(100)

hbound = np.percentile(s, 95)
lbound = np.percentile(s, 5)

s[s> hbound]
s[s< lbound]

通过percentile找出来超过上下界的数据,输出如下:

1
2
array([2.09, 2.27, 2.21, 2.12, 2.19])
array([-1.68, -2.4 , -1.97, -1.7 , -1.46])

一旦指标超过95%的分位数(hbound),我们就做空;一旦指标低于5%的分位数(lbound),我们就做多。

这里我们也可以使用中位数极值法。一旦指标超过中位数MAD值的3倍,就发出交易信号 。


《因子投资与机器学习策略》开课啦!


目标清晰 获得感强


为什么你值得QuanTide的课程?

[0929] QuanTide Weekly

本周要闻

  • 大涨!沪指本周大涨12.8%,沪深300上涨15.7%。
  • 首份市值管理指引文件出炉,明确指数成分股与破净股的市值管理
  • 长江证券:银行、地产、建筑和非银等板块或更有可能受益于破净公司估值提升计划

下周看点

  • 周一:财新发布9月PMI数据
  • OpenAI10月1日起举办2024年度DevDay活动
  • 周二(10月1日)至10月7日休市

本周精选

  • 连载!量化人必会的 Numpy 编程(5)

  • A股本周全线大涨。消息面上,周二国新办举行新闻发布会,介绍金融支持经济高质量发展有关情况。盘前降准、降低存量房贷利率提振市场信心,盘中,资本市场密集释放重磅利好:证监会研究制定“并购6条”,创设首期3000亿元股票回购增持再贷款,首期5000亿元规模证券基金保险公司互换便利操作,支持汇金公司等中长期资金入市,正在研究创设平准基金。 消息来源:东方财富
  • 财联社9月28日:近日,A股首份市值管理指引文件出炉,明确了指数成分股与破净股的市值管理具体要求。业内人士表示,市值管理新政策有助于被低估的优质资产进行重新定价,尤其是破净幅度较深但盈利能力稳定的央国企,可能存在价值重估空间,带来投资机遇。另有分析人士表示,而在破净的同时还具备高股息率的股票,更加值得投资者关注。
  • 央行公开市场7天期逆回购操作利率调整为1.5%。本周央行还对存准金下调了0.5%。

消息来源:财联社


Numpy量化应用案例[2]

动量和反转是最重要的量化因子。有很多种刻画动量的算法,但拟合直线的斜率无疑是最直观的一种。


在上面的图形中,如果两条直线对应着两支股票的均线走势,显然,你更愿意买入橘色的那一支,因为它的斜率更大,也就是涨起来更快一些。

人类的视觉有着强大的模式发现能力。我们很容易看出橙色点的上升趋势更强。但是,如果要让计算机也知道哪一组数据更好,则需要通过直线拟合,找出趋势线,再比较趋势线的斜率才能确定。直线拟合,或者曲线拟合(cureve fit),或者多项式拟合,都是线性回归问题。

在这一章中,我们就来讨论直线拟合的方法,并且要从最普通的实现讲到能提速百倍的向量化实现。

线性回归与最小二乘法

如何从一堆随机的数字中,发现其中可能隐藏的、最能反映它们趋势的直线呢?这个问题从大航海时代开始起就困扰着科学家们。

在那个时代,水手们需要通过星相来确定自己的纬度(经度则是通过日冕来计算的),这就需要基于人类的观测,准确地描述天体的行为。


在解决天体观测所引起的误差问题时,法国人阿德里安.玛丽.勒让德(Adrien Marie Legendre)最先发现了最小二乘法(1805年),此后,高斯在1809年发表的著作《天体运动论》中也提出了这一方法。最终,他在1829年,给出了最小二乘法是线性回归最佳拟合的理论证明。这一证明被称为高斯-马尔科夫定理。

Info

在勒让德与高斯之间,存在着一场最小二乘法发现权之争。这场争战一点也不亚于牛顿与莱布尼茨关于微积分发现权之争。不过,勒让德提出最小二乘法时,还只是一种猜想,其理论证明是由高斯完成的。勒让德是18世纪后期,与拉格朗日、拉普拉斯齐名的数学家,被称为三L(取自三个人的名字)。

所谓直线拟合(或者线性回归),就是要在由(x,y)构成的点集中,找到一条直线,使得所有点到该直线的距离的平方和最小。在Numpy中,我们最常用的方法是通过Polynomial.fit来进行拟合:

1
2
3
4
5
from numpy.polynomial import Polynomial
import matplotlib.pyplot as plt


x = np.linspace(0, 100, 100)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
rng = np.random.default_rng(seed = 78)
y = 0.05 * x + 2 + rng.normal(scale = 0.3, size=len(x))

# 绘制这些随机点
plt.scatter(np.arange(100), y, s=5)

# 最小二乘法拟合出趋势线
fitted = Polynomial.fit(x, y, deg=1, domain=[])
y_pred = fitted(x)
plt.plot(y_pred)

# 我们关注的斜率
b, a = fitted.coef
print("slope is {a:.3f}")

在很多教程上,我们看到在Numpy中进行多项式回归,使用的是polyfit。但实际上从1.4起,我们应该使用polynomial模块下的类和方法。polyfit被看成是过时的接口。要注意,尽管polynomial被设计为polyfit等函数的替代物,但在行为上有较大不同。

关于Polynomial.fit,需要注意我们传入了deg=1domain=[]两个参数。

指定deg=1,是因为我们希望将由(x,y)表示的点集拟合成为直线;如果希望拟合成为二次曲线,则可以指定deg=2,高次曲线依次类推。


domain是可以省略的一个参数。省略它之后,对我们后面通过fitted(x)求预测直线并没有影响,但是,它对我们要求的斜率(第19行)会有影响。如果在这里我们省略domain = []这个参数,我们得到的系数会是 [2.47, 4.57]。这将与我们造数据时使用的[0.05, 2]相去甚远。而当我们指定domain = []后,我们将看到拟合直线的系数变回为我们期望的[0.05, 2]。

Tip

我们也可以这样调用fit:

1
2
3
fitted = Polynomial.fit(x, y, deg=1, window=(min(x), max(x)))
intercept, slope = fitted.coef
print(f"slope is {slope:.3f}")

或者:

1
2
3
fitted =  Polynomial.fit(x, y, deg=1)
intercept, slope = fitted.convert().coef
print(f"slope is {slope:.3f}")

convert是一个对domain和window进行平移和缩放的转换函数。

线性回归是非常常用的技巧,在scipy, sklearn和statsmodels等库中都有类似的实现。


正因为如此,求价格或者均线的斜率因子,在技术上是一件轻而易举的事。

但是,如果我们要求移动线性回归呢?这可能就需要一点技巧了。

移动线性回归

移动线性回归是指对一个时间序列\(T\)和滑动窗口\(win\),计算出另一个时间序列\(S\),使得

\[ S_i = Slope(T_{[i-win+1, i-win+2, ..., i]}) \]

最简单直接的实现是通过一个循环:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from numpy.polynomial import Polynomial
import matplotlib.pyplot as plt


x = np.linspace(0, 100, 100)
y = np.sin(x/10) + x/10

# 曲线拐点
flags = np.sign(np.diff(np.diff(y)))
pivots = np.argwhere(flags[1:] != flags[:-1]).flatten()

plt.plot(y)

win = 10
S = []

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
for i in range(win, len(y)):
    xi = x[i-win:i]
    yi = y[i-win:i]
    fitted = Polynomial.fit(xi, yi, deg=1, domain=[])
    S.append(fitted.coef[1])

    if i in pivots:
        xj = x[i-win-10:i+10]
        y_pred = fitted(xj)
        plt.plot(xj, y_pred, '--')

print(S)

这段代码计算了从第10个周期起,过去10个点的拟合直线的斜率,并且在曲线的转折点上,我们绘制了它的切线。这些切线,正是通过拟合直线的参数来生成的。


在这里,我们生成曲线的方法是使用了方程x + sin(x)。你会发现,这样生成的曲线,有几分神似股价的波动。这并不奇怪,股价的波动本来就应该可以分解成为一个直流分量与许多正弦波的叠加。直流分量反映了公司的持续经营能力,正弦波则反映了各路短炒资金在该标的上操作留下的痕迹。

回到我们的正题来。现在,我们去掉绘图功能,测试一下这段代码需要执行多长时间:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
x = np.linspace(0, 100, 100)
y = np.sin(x/10) + x/10

def moving_lsq(ts, win: int):
    x = np.arange(len(ts))
    S = []
    for i in range(win, len(ts)):
        xi = x[i-win:i]
        yi = ts[i-win:i]
        fitted = Polynomial.fit(xi, yi, deg=1, domain=[])
        S.append(fitted.coef[1])
    return S

%timeit moving_lsq(y, 10)

90次循环,总共用去25ms的时间。考虑到A股有超过5000支股票,因此全部计算一次,这将会超过2分钟。


要加速这一求解过程,我们必须借助向量化。但是,这一次,再也没有魔法一样的API可以调用,我们必须挽起袖子,从理解底层的数学原理开始,做出自己的实现。

向量化

考虑到一个有m个点的线性回归,对其中的每一个点,都会有:

如果所有的点都落在同一条直线上,那么意味着以下矩阵方程成立:

\[ Y = A\beta + b \]

这里\(A\)即为X,\(\beta\)为要求解的系数:

\[ \beta = {(A^TA)}^{-1}A^TY \]

关于公式推导,可以见《Python programming and Numerical Methods - A Guide for Engineers and Scientists》

我们可以手动来验证一下这个公式:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
x = np.linspace(0, 9, 10)
y = x + np.sin(x/10)

win = 10

A = np.arange(win).reshape((win,1))
pinv = np.dot(np.linalg.inv(np.dot(A.T, A)), A.T)
alpha_1 = np.dot(pinv, y[:, np.newaxis]).item()
alpha_2 = Polynomial.fit(x, y, deg=1, domain=[]).coef[1]

np.isclose(alpha_1, alpha_2, atol=1e-2)

我们用两种方法分别求解了斜率,结果表明,在1e-2的绝对误差约束下,两者是相同的。

注意在第7行中, 我们使用了pinv这个奇怪的变量名。这是因为,在numpy.linalg中存在一个同名函数,正好就是计算\({(A^TA)}^{-1}A^T\)的。

不过,到现在为止,我们仅仅是完成了Polynomial.fit所做的事情。如果我们要一次性计算出所有滑动窗口下的拟合直线斜率,究竟要如何做呢?


注意,我们计算斜率,是通过一个矩阵乘法来实现的。在Numpy中,矩阵乘法天然是向量化的。在第7行中,pinv是一个(1,10)的矩阵,而y则是一个(10,)的向量。如果我们能把滑动窗口下,各组对应的pinv堆叠起来,并且y也堆叠起来成为一个矩阵,那么就能通过矩阵乘法,一次性计算出所有的斜率。

我们先从y看起。当我们对y = np.arange(5)按窗口为3进行滑动时,我们实际上是得到了这样一个矩阵:

\[ \begin{bmatrix}0&1&2\\1&2&3\\2&3&4\\\end{bmatrix} \]

要得到这个矩阵,我们可以使用fancy index:

1
2
3
4
5
y[
    [0, 1, 2],
    [1, 2, 3],
    [2, 3, 4]
]

因此,我们要实现滑动窗口下的y的矩阵,只需要构建出这个fancy index矩阵即可。好消息是,fancy index矩阵非常有规律:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def extract_windows_vectorized(array, win:int):
    start = 0
    max = len(array) - win + 1

    sub_windows = (
        start +
        # expand_dims are used to convert a 1D array to 2D array.
        np.expand_dims(np.arange(win), 0) +
        np.expand_dims(np.arange(max), 0).T
    )

    return array[sub_windows]

arr_1d = np.arange(10, 20)

extract_windows_vectorized(arr_1d, 4)

如果你觉得这个方法难以理解的话,Numpy已经为我们提供了一个名为as_strided的函数,可以一步到位,实现我们要的功能,并且比上述方法更快(1倍):

1
2
3
4
5
6
7
8
9
from numpy.lib.stride_tricks import as_strided

y = np.arange(3, 10)
stride = y.strdies[0]

win = 4
shape = (len(y) - win + 1, win)
strides = (stride, stride)
as_strided(y, shape, strides)

矩阵pinv由x产生。如果时间序列y有100个周期长,那么x的值将会是从0到99。


其中\([x0, x1, ..., x_{win-1}]\)对应\([y0, y1, ..., y_{win-1}]\), \([x1, x2, ..., x_{win}]\)对应\([y1, y2, ..., y_{win}]\)\([x_{-win}, x_{-win+1}, ... x_{-1}]\)对应\([y_{-win}, y_{-win+1}, ..., y_{-1}]\)

因此,我们需要构照的系数矩阵\(A\)即:

1
2
A = as_strided(np.arange(len(y)), shape, strides)
pinv = np.linalg.pinv(A)

接下来的回归运算跟之前一样:

1
alpha = pinv.dot(y).sum(axis = 0)

完整的代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def moving_lsq_vector(ts, win:int):
    stride = ts.strides[0]

    strides = (stride, stride)
    shape = (win,len(ts)-win+1)
    A = as_strided(np.arange(len(ts)), shape= shape, strides=strides)
    pinv = np.linalg.pinv(A)
    y = as_strided(ts, shape=shape, strides = strides)

    return pinv.dot(y).sum(axis=0)

这一版本,比之前使用Polynomial.fit加循环的快了100倍。你可能猜到了,这个100倍近似于我们循环的次数。这就是循环的代价。


现在,运用这里的方法,我们还可以加速别的计算吗?答案是显然的。如果你要快速计算5000支股票过去10天的的5日平均,在获取这些股票最近14天的股价之后,组成了一个(5000, 14)的矩阵\(A\)。现在,我们要做的就是将这个矩阵转换成为3维矩阵,然后再与一个卷积核做乘法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from numpy.typing import NDArray
def batch_move_mean(A: NDArray, win:int)->NDArray:
    """批量计算移动平均线

    Args:
        A: (m*n)的价格矩阵。m为股票支数
        win: 移动平均窗口
    Returns:
        (m * (n-win+1))的矩阵.
    """
    kernel = np.ones(win)/win

    s0, s1 = prices.strides
    m, n = prices.shape

    pm = as_strided(prices, shape=(m, n-win + 1, win), strides=(s0, s1, s1))
    return np.dot(pm, kernel.T).squeeze()

我们通过下面的代码来测试它:

1
2
3
4
5
6
prices = np.array([
    np.arange(0, 14),
    np.arange(10, 24)
])

batch_move_mean(prices, 5)

输出为:

1
2
array([[ 2.,  3.,  4.,  5.,  6.,  7.,  8.,  9., 10., 11.],
       [12., 13., 14., 15., 16., 17., 18., 19., 20., 21.]])

正如我们期待的一样。但快如闪电。

题外话

要不要使用拟合趋势线作为一种动量因子?这是一个值得深入讨论的话题。斜率因子最大的问题,不是所有的时间序列,都有明显的趋势。从数学上看,如果拟合残差较大,说明该时间序列还没有形成明显的趋势,那么斜率因子就不应该投入使用。另一个问题就是线性回归的老问题,即个别outlier对拟合的影响较大。拟合直线究竟是应该使得所有点到该直线的距离和最小,还是应该使得大多数点到该直线的距离和更小(小于前者)?

结论

我们讨论了如何通过numpy来进行线性回归(直线拟合),并且介绍了最新的polynomial API。然后我们介绍了如何利用矩阵运算来实现向量化。


核心点是通过as_strided方法将数组升维,再对升维后的数组执行矩阵运算,以实现向量化。我们还用同样的思路,解决了一次性求5000支股票的移动均线问题。您还看到了像fancy index的实用性举例。这一章技巧较多,算是对前面章节的小结。


《因子投资与机器学习策略》开课啦!


目标清晰 获得感强


为什么你值得QuanTide的课程?

[0922] QuanTide Weekly

本周要闻

  • 何立峰会见中美经济工作组美方代表团
  • 公安机关严查资本市场“小作文”,三名造谣者被罚
  • 证监会全面优化券商风控指标体系
  • 美“生物安全法案”未被纳入参议院2025财年国防授权法案
  • 贵州茅台:拟以30亿元-60亿元回购股份用于注销,上市以来首次发布回购计划

下周看点

  • 中证A500指数周一发布。此前传相关基金提前结束募集
  • 2024深圳eVTOL暨低空经济展开幕,首届中国数字人大会在京举办
  • 周三、周五,再迎ETF及A50交割日!

本周精选

  • 连载!量化人必会的 Numpy 编程 (4)

  • 中美经济工作组9月19日至20日在京会晤。此次会议由财政部副部长廖岷与美国副财长尚博共同主持,两国经济领域相关部门到会交流。何立峰20日会见美国财政部副部长尚博一行。
  • 公安机关近日依法查处一起自媒体运营人员恶意编造网络谣言进行吸粉引流、非法牟利、扰乱社会秩序的案件。经公安部网安局调查,刘某(男,36岁)、陈某(男,46岁)、邵某(男,26岁)为博取关注、吸粉引流、谋取利益,故意编造发布涉转融通谣言信息,误导公众认知,涉嫌扰乱金融秩序。
  • 证监会发布《证券公司风险控制指标计算标准规定》。业内人士指出,此举预计将释放近千亿元资金,促进有效提升资本使用效率,加大服务实体经济和居民财富管理力度。
  • 美当地时间19日,美参议院军事委员会对外公布了参议院版本2025财年国防授权法案(简称NDAA),其中纳入93项修正案,但不包含“生物安全法案”相关提案。此前,美众议院已通过不包含生物安全法案的NDAA,下一步美参众两院将对NDAA进行谈判合并。此前受相关传闻影响,CXO板块持续受到压制。
  • 美股三大指数周五收盘涨跌不一,均录得周线两连涨。道指续创新高,本周累涨1.61%;标普累涨1.36%;纳指累涨1.49%。

消息来源:财联社


Numpy量化场景应用案例[1]

连续值统计

在很多量化场景下,我们都需要统计某个事件连续发生了多少次,比如,连续涨跌停、N连阳、计算Connor's RSI中的streaks等等。比如,要判断下列收盘价中,最大的连续涨停次数是多少?最长的N连阳数是多少?

1
2
a = [15.28, 16.81, 18.49, 20.34, 21.2, 20.5, 
     22.37, 24.61, 27.07, 29.78, 32.76, 36.04]

假设我们以10%的涨幅为限,则可以将上述数组转换为:

1
2
pct = np.diff(a) / a[:-1]
pct > 0.1

我们将得到以下数组:

1
2
flags = [True, False,  True, False, False, False,  True, False,  True,
        True,  True]

这仍然不能计算出最大连续涨停次数,但它是很多此类问题的一个基本数据结构,我们将原始的数据按条件转换成类似的数组之后,就可以使用下面的神器了:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from numpy.typing import ArrayLike
from typing import Tuple
import numpy as np

def find_runs(x: ArrayLike) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Find runs of consecutive items in an array.

    Args:
        x: the sequence to find runs in

    Returns:
        A tuple of unique values, start indices, and length of runs
    """

    # ensure array
    x = np.asanyarray(x)
    if x.ndim != 1:
        raise ValueError("only 1D array supported")
    n = x.shape[0]

    # handle empty array
    if n == 0:
        return np.array([]), np.array([]), np.array([])

    else:
        # find run starts
        loc_run_start = np.empty(n, dtype=bool)
        loc_run_start[0] = True
        np.not_equal(x[:-1], x[1:], out=loc_run_start[1:])
        run_starts = np.nonzero(loc_run_start)[0]

        # find run values
        run_values = x[loc_run_start]

        # find run lengths
        run_lengths = np.diff(np.append(run_starts, n))

        return run_values, run_starts, run_lengths


pct = np.diff(a) / a[:-1]
v,s,l = find_runs(pct > 0.099)
(v, s, l)

输出结果为:

(array([ True, False, True]), array([0, 3, 6]), array([3, 3, 5]))

输出结果是一个由三个数组组成的元组,分别表示:


  • value: unique values
  • start: start indices
  • length: length of runs

在上面的输出中,v[0]为True,表示这是一系列涨停的开始,s[0]则是对应的起始位置,此时索引为0; l[0]则表示该连续的涨停次数为3次。同样,我们可以知道,原始数组中,最长连续涨停(v[2])次数为5(l[2]),从索引6(s[2])开始起。

所以,要找出原始序列中的最大连续涨停次数,只需要找到l中的最大值即可。但要解决这个问题依然有一点技巧,我们需要使用第4章中介绍的 mask array。

1
2
3
4
v_ma = np.ma.array(v, mask = ~v)
pos = np.argmax(v_ma * l)

print(f"最大连续涨停次数{l[pos]},从索引{s[pos]}:{a[s[pos]]}开始。")

在这里,mask array的作用是,既不让 v == False 的数据参与计算(后面的 v_ma * l),又保留这些元素的次序(索引)不变,以便后面我们调用 argmax 函数时,找到的索引跟v, s, l中的对应位置是一致的。

我们创建的v_ma是一个mask array,它的值为:

1
2
3
masked_array(data=[True, --, True],
             mask=[False,  True, False],
       fill_value=True)

当它与另一个整数数组相乘时,True就转化为数字1,这样相乘的结果也仍然是一个mask array:


1
2
3
masked_array(data=[3, --, 5],
             mask=[False,  True, False],
       fill_value=True)

当arg_max作用在mask array时,它会忽略掉mask为True的元素,但保留它们的位置,因此,最终pos的结果为2,对应的 v,s,l中的元素值分别为: True, 6, 5。

如果要统计最长N连涨呢?这是一个比寻找涨停更容易的任务。不过,这一次,我们将不使用mask array来实现:

1
2
3
4
v,s,l = find_runs(np.diff(a) > 0)

pos = np.argmax(v * l)
print(f"最长N连涨次数{l[pos]},从索引{s[pos]}:{a[s[pos]]}开始。")

输出结果是: 最长N连涨次数6,从索引5:20.5开始。

这里的关键是,当Numpy执行乘法时,True会被当成数字1,而False会被当成数字0,于是,乘法结果自然消除了没有连续上涨的部分,从而不干扰argmax的计算。

当然,使用mask array可能在语义上更清楚一些,尽管mask array的速度会慢一点,但正确和易懂常常更重要。

计算 Connor's RSI中的streaks

Connor's RSI(Connor's Relative Strength Index)是一种技术分析指标,它是由Nirvana Systems开发的一种改进版的相对强弱指数(RSI)。Connor's RSI与传统RSI的主要区别在于它考虑了价格连续上涨或下跌的天数,也就是所谓的“连胜”(winning streaks)和“连败”(losing streaks)。这种考虑使得Connor's RSI能够更好地反映市场趋势的强度。


在前面介绍了find_runs函数之后,计算streaks就变得非常简单了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def streaks(close):
    result = []
    conds = [close[1:]>close[:-1], close[1:]<close[:-1]]

    flags = np.select(conds, [1, -1], 0)
    v, _, l = find_runs(flags)
    for i in range(len(v)):
        if v[i] == 0:
            result.extend([0] * l[i])
        else:
            result.extend([v[i] * x for x in range(1, (l[i] + 1))])

    return np.insert(result, 0, 0)

这段代码首先将股价序列划分为上涨、下跌和平盘三个子系列,然后对每个子系列计算连续上涨或下跌的天数,并将结果合并成一个新的数组。在streaks中,连续上涨天数要用正数表示,连续下跌天数用负数表示,所以在第5行中,通过np.select将条件数组转换为[1, 0, -1]的序列,后面使用乘法就能得到正确的连续上涨(下跌)天数了。

中位数极值法去极值

在因子分析中,我们常常需要对数据进行去极值处理,以减少对异常值的影响。中位数极值法(Median Absolute Deviation,MAD)是一种常用的去极值方法,它通过计算数据中中位数(median)和绝对离差(absolute deviation)来确定异常值。

这里需要先介绍绝对中位差(median absolute deviation) 的概念:

\[MAD = median(|X_i - median(X)|)\]

为了能 MAD 当成与标准差\(\sigma\)估计相一致的估计量,即


\[\hat{\sigma} = k. MAD\]

这里 k 为比例因子常量,如果分布是正态分布,可以计算出: $$ k = \frac{1}{(\Phi^{-1}(\frac{3}{4}))} \approx 1.4826 $$

基于这个 k 值,取 3 倍则近似于 5。

在对多个资产同时执行去极值时,我们可以使用下面的方法,以实现向量化并行操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
def mad_clip(df: Union[NDArray, pd.DataFrame], k: int = 3, axis=1):
    """使用 MAD 3 倍截断法去极值

    Args:
        df: 输入数据,要求索引为日期,资产名为列,单元格值为因子的宽表
        k: 截断倍数。
        axis: 截断方向
    """

    med = np.median(df, axis=axis).reshape(df.shape[0], -1)
    mad = np.median(np.abs(df - med), axis=axis)

    return np.clip(df.T, med.flatten() - k * 1.4826 * mad,
                   med.flatten() + k * mad).T


《因子投资与机器学习策略》开课啦!


目标清晰 获得感强


为什么你值得QuanTide的课程?

节前迎来揪心一幕!谁来告诉我,A股现在有没有低估?

节前迎来揪心一幕,主要指数均创出今年最低周收盘。很自然,我们也想知道,现在处于什么状态,存在着低估机会吗?这篇文章,我们从市盈利的角度来探讨是存在机会,还是要警惕陷阱。

我们通过akshare来获取沪指导市盈率。实际上,akshare中的这个数据又来自乐咕乐股网站。

1
2
3
4
5
6
7
import akshare as ak

pe = ak.stock_market_pe_lg(symbol="上证")
pe.set_index("日期", inplace=True)
pe.index.name = "date"
pe.rename(columns={"平均市盈率": "pe", "指数": "price"}, inplace=True)
pe.tail(15)

我们将得到从1999年以来的所有数据。它的格式如下:

表1 市盈率与指数

低估的八月?

我们可以通过quantile函数,找出PE的25%, 50%和75%分位数:

1
2
3
4
5
percentiles = []
for i in range(1, 4):
    percentiles.append(pe["pe"].quantile(i/4))

percentiles

我们发现,从1999年以来,25%, 50%和75%分位数分别是 13.9, 17.4和 33.5。

那么,到上个月底(2024年8月),沪指的市盈率处于什么位置呢?

我们可以用以下方法来计算出这个位置:

1
2
3
rank = pe.rank().loc[datetime.date(2024,8,30), "pe"]
percentile = rank / len(pe)
percentile

结果显示,当前的市盈率处在10.6%分位数左右,也就是属于1999年以来较低的位置。

如果仅根据统计数据来看,显然沪指在2024年8月底,是被低估了,换句话说,这里出现了买入机会。

但是,分位数是静态的,它不能反映数据的运行趋势

趋势中的PE

如果你打开东财的客户端,找到沪指的资料页,就可以看到PE的历史走势。近十年的走势图如下:

不过这似乎也给出不了更多信息。从这个图来看,沪指目前也是在低估中。

那么,我们把历年的价格走势叠加到PE走势上,看看PE的峰谷是否真的会对应价格的峰谷。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import matplotlib.pyplot as plt
import matplotlib.dates as mdates

fig, ax1 = plt.subplots(figsize=(60,6))

color = "tab:blue"
ax1.plot(pe["price"], label="Index", color=color)
ax1.set_xlabel("Year")
ax1.set_ylabel("Index", color="tab:blue")
ax1.xaxis.set_major_locator(mdates.MonthLocator(bymonth=[2, 5, 8, 11]))
ax1.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m')) 

color = "tab:red"
ax2 = ax1.twinx()
ax2.plot(pe["pe"], label="PE", color=color)
ax2.set_ylabel("PE", color=color)

for i in range(1, 4):
    quantile = pe["pe"].quantile(i/4)
    ax2.axhline(quantile, color='gray', linestyle='--', label=f"{i/4:02.0%}")

plt.title("Index vs PE")

fig.tight_layout()
for date in pe.index[::3]:
    ax1.axvline(date, color='gray', linestyle=':', linewidth=0.5)
plt.gcf().autofmt_xdate()

plt.legend(loc="upper left")
plt.show()

我们会得到如下所示的绘图:

1999年-2024年

从图中可以看出,从1999年到2024年间,总体上存在沪指上涨,PE下降的趋势。沪指上涨应该是反应了GDP增长的长期趋势;而PE下降,则主要是由于资产供应数量的增加,导致资产泡沫不断被挤出的趋势。

为了能研究细节部分,我们将上图切割成为5个子图:

1999年1月-2004年5月

2004年6月-2009年5月

2009年6月-2014年7月

2014年6月-2019年10月

2019年8月-2024年8月

思考与结论

  • 思考: 在哪些阶段,PE走势与沪指完全一致?在这种走势中,谁是因变量,谁是自变量?它反应什么样的现象?

  • 回答:在1999年到2001年,2005年8月到2008年5月,以及2014年5月到1015年5月等时间段PE的走势与沪指完全一致。在这种走势中,由于市场炒作十分激烈,公司盈利的变化相对于股价的变化而言,完全可以忽略不计,PE的走势完全由股价决定,从而导致PE曲线与股价走势极为相似。一旦出现这种现象,就说明市场已经过度投机了。

  • 思考:大约从2023年8月起,似乎可以看出,沪指的下降速度快于PE的下降速度。比如,2024年2月,沪指出现阶段低点,PE也出现阶段低点。但2024年9月,沪指创新低之后,PE并没有创新低。这种背离反映了什么现象?能否用一个常见的术语来表述它?

  • 回答: 在2024年9月,沪指创新低之后,PE并没有创新低,出现了背离。这种情况其实从峰值分析也可以看出来。比如,2024年6月(参见表我1),PE值创2023年8月以来的新高,但沪指却低于2023年8月后任何一个高点。这种背离可以用低市盈率陷阱来描述。这个指标提醒我们,当前指数虽然在下跌,但上市公司的盈利能力也可能在下降,从而出现指数小涨,PE大涨;指数跌得多,PE跌得少的情况。

因此,如果仅从分位数统计来看,当下的A股是低估的。但如果考虑到市盈率总体上一直在下降的趋势,以及最近一年来PE与指数涨跌的背离情况,判断A股是否已经低估还存有疑问,应该纳入更多维度进行判断。

文章来源于《因子分析与机器学习策略》第2课的习题。

[0915] QuanTide Weekly

本周要闻

  • 主要指数创今年最低周收盘,也是5年最低周收盘
  • 8月有增有降,广义货币增长6.5%,狭义货币下降7.3%。
  • 一意孤行!美提高部分对华301关税 中方:强烈不满 坚决反对
  • 茅台业绩说明会之后,本周白酒指数再跌3.21%

下周看点

  • 周三(晚),美联储公布议息决议。
  • 周五央行公布最新LPR利率。
  • A股将有2只新股发行,创业板1只,北交所1支

本周精选

  • 连载!量化人必会的 Numpy 编程 (3)

本周要闻

  • 本周大盘继续调整,主要指数均创出今年最低周收盘,也是5年最低周收盘,其中,上证指数2700点大关岌岌可危,深证成指跌破8000点大关。本周留下一个9点的跳空缺口,这是今年第一次周K线留下跳空缺口新民晚报
  • 9月13日,央行发布8月社会融资规模统计数据,央行有关部门负责人进行了解读,指出央行认真贯彻落实党中央、国务院决策部署,稳健的货币政策灵活适度、精准有效,强化逆周期调节,为经济社会发展营造良好的货币金融环境。红网.财富频道
  • 9月13日,美国贸易代表办公室宣布将提高部分中国商品的301关税。中方对此强烈不满,坚决反对。自9月27日起,中国制造的电动汽车的关税将上调至100%,太阳能电池上调至50%,电动汽车电池、关键矿产、钢铁、铝、口罩和岸边集装箱起重机将上调至25%,而包括半导体芯片在内的其他产品的关税上调也将在未来两年内生效。东方财富

量化人必会的Numpy编程(3)

1. 处理包含 np.nan 的数据

在量化分析中,我们常常会遇到数据为 np.nan 情况。比如,某公司上年利润为负数,今年利润实现正增长,请问要如何表示公司的 YoY 的利润增长呢?

Info

np.nan 是 numpy 中的一个特殊值,表示“Not a Number”,即“不是数字”。注意,在 Numpy 中,尽管 np.nan 不是一个数字,但它确实数字类型。确切地说,它是 float 类型。此外,在 float 类型中,还存在 np.inf(正无穷大)和负无穷大 (np.NINF,或者-np.inf)。

又比如,在计算个股的 RSI 或者移动平均线时,最初的几期数据是无法计算出来的(在回测框架 backtrader 中,它把这种现象称之为技术指标的冷启动)。如果不要求返回的技术指标的取值与输入数据长度一致,则会返回短一些、但全部由有效数据组成的数组;否则,此时我们常常使用 np.NaN 或者 None 来进行填充,以确保返回的数据长度与输入数据长度一致。

但是,如果我们要对返回的数组进行统计,比如求均值、最大值、排序,对包含 np.nan 或者 None 的数组,应该如何处理?


1.1. 包含 np.nan 和 np.inf 的数组运算

在 numpy 中,提供了对带 np.nan 的数组进行运算的支持。比如有以下数组:

1
2
3
4
import numpy as np

x = np.array([1, 2, 3, np.nan, 4, 5])
print(x.mean())

我们将得到一个 nan。实际上,多数情况下,我们希望忽略掉 nan,只对有效数据进行运算,此时得到的结果,我们往往仍然认为是有意义的。

因此,Numpy 提供了许多能处理包含 nan 数据的数组输入的运算函数。下面是一个完整的列表:

在这里,我们以输入 np.array([1, 2, 3, np.nan, np.inf, 4, 5]) 为例

函数 nan 处理 inf 处理 输出
nanmin 忽略 inf 1.0
nanmax 忽略 inf inf
nanmean 忽略 inf inf
nanmedian 忽略 inf 3.5
nanstd 传递 inf nan
nanvar 传递 inf nan
nansum 忽略 inf inf
nanquantile 忽略 inf 2.25
nancumsum 忽略 inf inf
nancumprod 忽略 inf inf

对 np.nan 的处理中,主要是三类,一类是传递,其结果导致最终结果也是 nan,比如,在计算方差和标准差时;一类是忽略,比如在找最小值时,忽略掉 np.nan,在余下的元素中进行运算;但在计算 cumsum 和 cumprod 时,"忽略"意味着在该元素的位置上,使用前值来填充。我们看一个不包含 np.inf 的示例:

1
2
3
x = np.array([1, 2, 3, np.nan, 4, 5])
np.nancumprod(x)
np.nancumsum(x)
输出结果是:

1
2
3
array([  1.,   2.,   6.,   6.,  24., 120.])

array([ 1.,  3.,  6.,  6., 10., 15.])
结果中的第 4 个元素都是由第 3 个元素复制而来的。

如果一个数组中包含 inf,则在任何涉及到排序的操作(比如 max, median, quantile)中,这些元素总是被置于数组的最右侧;如果是代数运算,则结果会被传导为 inf。这些地方,Numpy 的处理方式与我们的直觉是一致的。

除了上述函数,np.isnan 和 np.isinf 函数,也能处理包含 np.nan/np.inf 元素的数组。它们的作用是判断数组中的元素是否为 nan/inf,返回值是一个 bool 数组。

1.2. 包含 None 的数组运算

在上一节中,我们介绍的函数能够处理包含 np.nan 和 np.inf 的数组。但是,在 Python 中,None 是任何类型的一个特殊值,如果一个数组包含 None 元素,我们常常仍然会期望能对它进行 sum, mean, max 等运算。但是,Numpy 并没有专门为此准备对应的函数。


但是,我们可以通过 astype 将数组转换为 float 类型,在此过程中,所有的 None 元素都转换为 np.nan,然后就可以进行运算了。

1
2
x = np.array([3,4,None,55])
x.astype(np.float64)

输出为:array([3., 4., nan, 55.])

1.3. 性能提升

当我们调用 np.nan *函数时,它的性能会比普通的函数慢很多。因此,如果性能是我们关注的问题,我们可以使用 bottleneck 这个库中的同名函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from bottleneck import nanstd
import numpy as np
import random

x = np.random.normal(size = 1_000_000)
pos = random.sample(np.arange(1_000_000).tolist(), 5)
x[pos] = np.nan

%timeit nanstd(x)
%timeit np.nanstd(x)

我们担心数组中 np.nan 元素个数会影响到性能,所以,在上面的示例中,在随机生成数组时,我们只生成了 5 个元素。在随后的一次测试中,我们把 nan 元素的个数增加了 10 倍。实验证明,nan 元素的个数对性能没有什么影响。在所有的测试中,bottlenect 的性能比 Numpy 都要快一倍。

Info

根据 bottleneck 的文档,它的许多函数,要比 Numpy 中的同名函数快 10 倍左右。


2. 随机数和采样

随机数和采样是量化中的高频使用的操作。在造数据方面非常好用。我们在前面的示例中,已经使用过了 normal() 函数,它是来自 numpy.random 模块下的一个重要函数。借由这个函数,我们就能生成随机波动、但总体上来看又是上涨、下跌或者震荡的价格序列。

Tip

我们会在何时需要造价格序列?除了前面讲过的例子外,这里再举一例:我们想知道夏普为\(S\)的资产,它的价格走势是怎么样的?价格走势与夏普的关系如何?要回答这个问题,我们只能使用“蒙”特卡洛方法,造出若干模拟数据,然后计算其夏普并绘图。此时我们一般选造一个符合正态分布的收益率数组,然后对它进行加权(此时即可算出夏普),最后通过 np.cumprod() 函数计算出价格走势,进行绘图。

我们通过一个例子来说明夏普与股价走势之间的关系:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import numpy as np
from empyrical import sharpe_ratio
import matplotlib.pyplot as plt

returns_ = np.random.normal(0, 0.02, size=100)
legend = []
for alpha in (-0.01, 0, 0.01):
    returns = returns_ + alpha
    prices = np.cumprod(returns + 1)
    sharpe = sharpe_ratio(returns)
    _ = plt.plot(prices)
    legend.append(f"{sharpe:.1f}")

lines = plt.gca().lines
plt.legend(lines, legend)

从绘制的图形可以看出,当 alpha 为 1%时,夏普率可达 8.2。国内优秀的基金经理可以在一年内,做到 2~3 左右的夏普率。大家可以调整 alpha 这个参数,看看 alpha 与夏普率的关系。

2.1. The legacy: np.random module

迄今为止,我们在网上看到的多数关于 numpy random 的教程,都是使用的 np.random module 下面的函数。除了 normal 方法之外,random 包中还有以下函数:


函数 说明
randint(a,b,shape) 生成在区间 (a,b) 之间,形状为 shape 的随机整数数组。
rand(shape) 生成 shape 形状的随机数组,使用 [0,1) 区间的均匀分布来填充。
random(shape) 生成 shape 形状的随机数组,使用均匀分布填充
randn(d1, d2, ...) 生成 shape 形状的随机数组,使用正态分布来填充。
standard_normal(shape) 生成 shape 形状的随机数组,使用标准正态分布来填充。
normal(loc,scale,shape) 生成 shape 形状的随机数组,使用正态分布来填充,loc 是均值,scale 是标准差。
choice(a,size,replace,p) 从 a 中随机抽取 size 个元素,如果 replace=True, 则允许重复抽取,否则不允许重复抽取。p 表示概率,如果 p=None, 则表示每个元素等概率抽取。
shuffle(a) 将 a 中的元素随机打乱。
seed(seed) 设置随机数种子,如果 seed=None, 则表示使用系统时间作为随机数种子。

可以看出,numpy 为使用同一个功能,往往提供了多个方法。我们记忆这些方法,首先是看生成的随机数分布。最朴素的分布往往有最朴素的名字,比如,rand, randint 和 random 都用来生成均匀分布,而 normal, standard_normal 和 randn 用来生成正态分布。

除了均匀分布之外,Numpy 还提供了许多著名的分布的生成函数,比如 f 分布、gama 分布、hypergeometric(超几何分布),beta, weibull 等等。

在同一类别中,numpy 为什么还要提供多个函数呢?有一些是为了方便那些曾经使用其它知名库(比如 matlab) 的人而提供的。randn 就是这样的例子,它是 matlab 中一个生成正态随机分布的函数,现在被 numpy 移植过来了。我们这里看到的另一个函数,rand 也是这样。而对应的 random,则是 Numpy 按自己的 API 风格定义的函数。

choice 方法在量化中有比较具体的应用。比如,我们可能想要从一个大的股票池中,随机抽取 10 只股票先进行一个小的试验,然后根据结果,再考虑抽取更多的股票。


seed 函数用来设置随机数生成器的种子。在进行单元测试,或者进行演示时(这两种情况下,我们都需要始终生成相同的随机数序列)非常有用。

2.2. New Style: default_rng

我们在上一节介绍了一些随机数生成函数,但没有介绍它的原理。Numpy 生成的随机数是伪随机数,它们是使用一个随机数生成器(RNG)来生成的。RNG 的输出是随机的,但是相同的输入总是会生成相同的输出。我们调用的每一个方法,实际上是在这个序列上的一个抽取动作(根据输入的 size/shape)。

在 numpy.random 模块中,存在一个全局的 RNG。在我们调用具体的随机函数时,实际上是通过这个全局的 RNG 来产生随机数的。而这个全局的 RNG,总会有人在它之上调用 seed 方法来初始化。这会产生一些问题,因为你不清楚何时、在何地、以哪个参数被人重置了 seed。

由于这个原因,现在已经不推荐直接使用 numpy.random 模块中的这些方法了。更好的方法是,为每一个具体地应用创建一个独立的 RNG,然后在这个对象上,调用相应的方法:

1
2
rng = np.random.default_rng(seed=123)
rng.random(size=10)

rng 是一个 Random Generator 对象,在初始化时,我们需要给它传入一个种子。如果省略,那么 Numpy 会使用系统时间作为种子。

rng 拥有大多数前一节中提到的方法,比如 normal, f, gamma 等;但从 matlab 中移植过来的方法不再出现在这个对象上。


另外,randint 被 rng.integers 替代。

除此之外,default_rng 产生的随机数生成器对象,在算法上采用了 PCG64 算法,与之前版本采用的算法相比,它不仅能返回统计上更好的随机数,而且速度上也会快 4 倍。

Warning

在 numpy 中还存在一个 RandomState 类。它使用了较慢的梅森扭曲器生成伪随机数。现在,这个类已经过时,不再推荐使用。

2.3. 数据集平衡示例

我们已经介绍了 choice 的功能,现在我们来举一个例子,如何使用 choice 来平衡数据集。

在监督学习中,我们常常遇到数据不平衡的问题,比如,我们希望训练一个分类器,但是训练集的类别分布不均衡。我们可以通过 choice 方法对数据集进行 under sampling 或者 over sampling 来解决这个问题。

为了便于理解,我们先生成一个不平衡的训练数据集。这个数据集共有 3 列,其中前两列是特征(你可以想像成因子特征),第三列则是标签。

1
2
3
4
5
6
import pandas as pd
import numpy as np

rng = np.random.default_rng(seed=42)
x = rng.random((10,3))
x[:,-1] = rng.choice([0,1], len(x), p=[0.2, 0.8])

我们通过下面的方法对这个数据集进行可视化,以验证它确实是一个不平衡的数据集。

1
2
df = pd.DataFrame(x, columns=['factor1', 'factor2', 'label'])
df.label.value_counts().plot(kind='bar')

运行结果为:

要在此基础上,得到一个新的平衡数据集,我们有两种思路,一种是 under sampling,即从多数类的数据中抽取部分数据,使得它与最小分类的数目相等;另一种是 over sampling,即从少数类的数据中复制部分数据,使得它与最大的类的数目相等。

下面的例子演示了如何进行 under sampling:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
labels, counts = np.unique(x[:,-1], return_counts=True)

# 最小分类的标签
min_label = labels[np.argmin(counts)]

# 最小分类样本的数量,作为 UNDER SAMPLING 的样本数量
min_label_count = np.min(counts)

# 最小分类无须抽取,全部提取
results = [
    x[x[:,-1] == min_label]
]

# 对其它分类标签进行遍历,需要先剔除最小分类
for label in np.delete(labels, np.argmin(counts)):
    sampled = rng.choice(x[x[:,-1]== label], min_label_count)
    results.append(sampled)

np.concatenate(results)

这段代码先是找到最小分类及它的数量,然后遍历每个标签,再通过 rng.choice 对其它分类随机抽取最小分类的数量,最后把所有的子集拼接起来。

这段示例代码可用以多个标签的情况。如果要进行 over sampling,只要把其中的 min 换成 max 就可以了。

3. IO 操作

我们直接使用 Numpy 读写文件的场合并不多。提高 IO 读写性能一直都不是 Numpy 的重点,我们也只需要稍加了解即可。

3.1. 读写 CSV 文件

Numpy 可以从 CSV 格式的文本文件中读取数据,主要有以下方法:


api 描述
loadtxt 解析文本格式的表格数据
savetxt 将数据保存为文本文件
genfromtxt 同上,但允许数据中有缺失值,提供了更高级的用法
recfromtxt 是 genfromtxt 的快捷方式,自动推断为 record array
recfromcsv 同上,如果分隔符为逗号,无须额外指定

我们通过下面的示例简单演示一下各自的用法:

1
2
3
4
5
6
7
import io
import numpy 

buffer = io.StringIO("""1,2""")

# 默认情况下,LOADTXT 只能读取浮点数
numpy.loadtxt(buffer, delimiter=",")

这会输出数组array([1., 2.])

1
2
3
4
buffer = io.StringIO("""1,2,hello""")

# 通过指定 DTYPE 参数,可以读取其它类型
numpy.loadtxt(buffer, delimiter=",", dtype=[("age", "i4"), ("score", "f4"), ("name", "U8")])

这样我们将得到一个 Structured Array,其中第三列为字符串类型。如果我们不指定 dtype 参数,那么 loadtxt 将会解析失败。

1
2
3
4
buffer = io.StringIO("""
1,2,hello
""")
numpy.genfromtxt(buffer, delimiter=",")

这一次我们使用了 genfromtxt 来加载数据,但没有指定 dtype 参数,genfromtxt 会将非数字列解析为 nan。因此,这段代码将输出:`array([1., 2., nan])

现在,我们也给 genfromtxt 加上 dtype 参数:


1
2
3
4
5
buffer = io.StringIO("""
1,2,hello
""")

numpy.genfromtxt(buffer, delimiter=",", dtype=[("age", "i4"), ("score", "f4"), ("name", "U8")])

此时我们得到的结果是:array((1, 2., 'hello'), dtype=[('age', '<i4'), ('score', '<f4'), ('name', '<U8')])。注意它是 Structured Array。

recfromtxt 则不需要 dtype, 会自动推断数据类型。

1
2
3
4
5
buffer = io.StringIO("""
1,2,hello
""")

numpy.recfromtxt(buffer,delimiter=",")

这段代码输出为rec.array((1, 2, b'hello'), dtype=[('f0', '<i8'), ('f1', '<i8'), ('f2', 'S5')])。如果推断不准确,我们也可以自己加上 dtype 参数。

如果我们使用 recfromcsv,则连 delimiter 参数都可以省掉。

1
2
3
4
5
buffer = io.StringIO("""
age,score,name
1,2,hello
""")
numpy.recfromcsv(buffer)

输出跟上一例的结果一样。

出于速度考虑,我们还可以使用其它库来解析 CSV 文件,再转换成为 numpy 数组。比如:


1
2
3
4
5
# 利用 CSV.READER() 来解析,比 NUMPY 快 8 倍
np.asarray(list(csv.reader()))

# 利用 PANDAS 来解析,比 NUMPY 快 22 倍
pd.read_csv(buffer).to_records()

3.2. 读写二进制文件

如果我们不需要与外界交换数据,数据都是自产自销型的,也可以使用二进制文件来保存数据。

使用 numpy.save 函数来将单个数组保存数据为二进制文件,使用 numpy.load 函数来读取 numpy.save 保存的数据。这样保存的文件,文件扩展名为.npy。

如果要保存多个数组,则可以使用 savez 命令。这样保存的文件,文件扩展名为.npz。

如果有更复杂的需求,可以使用 Hdf5,pyarrow 等库来进行保存数据。

4. 日期和时间

一些第三方数据源传递给我们的行情数据,常常会用字符串形式,或者整数(从 unix epoch time 起)格式来表示行情的时间。比如,akshare 和 tushare 许多接口给出的行情数据就是字符串格式;而 QMT 很多时候,会将行情时间用整数表示。掌握这些格式与 Numpy 的日期时间格式转换、以及 Numpy 到 Python 对象的时间日期转换是非常有必要的。

但是在任何编程语言中,日期和时间的处理从来都不简单。


Info

很少有程序员/研究员了解这一点:日期和时间并不是一个数学上或者物理上的一个客观概念。时区的划分、夏令时本身就是一个政治和法律上的概念;一些地方曾经使用过夏令时,后来又取消了这种做法。其次,关于闰秒 1 的决定,也并不是有章可循的,它是由一个委员会开会来临时决定的。这种决定每年做一次。

所有这些决定了我们无法通过一个简单的数学公式来计算时间及其变化,特别是在时区之间的转换。

关于时间,首先我们要了解有所谓的 timezone aware 时间和 timezone naive 时间。当我们说到晚上 8 时开会时,这个时间实际上默认地包含了时区的概念。如果这是一个跨国会议,但你在通知时不告诉与会方时区,就会导致其它人无法准时出席 -- 他们将会在自己时区的晚上 8 时上线。

如果一个时间对象不包含时区,它就是 timezone naive 的;否则,它是 timezone aware 的。但这只是针对时间对象(比如,Python 中的 datetime.datetime)才有的意义;日期对象(比如,Python 中的 datetime.date)是没有时区的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import pytz
import datetime

# 通过 DATETIME.NOW() 获得的时间没有时区信息
# 返回的是标准时间,即 UTC 时间,等同于调用 UTCNOW()
now = datetime.datetime.now()
print(f"now() without param: {now}, 时区信息{now.tzinfo}")

now = datetime.datetime.utcnow()
print(f"utcnow: {now}, 时区信息{now.tzinfo}")

# 构造 TIMEZONE 对象
cn_tz = pytz.timezone('Asia/Shanghai')
now = datetime.datetime.now(cn_tz)
print(f"现在时间{now}, 时区信息{now.tzinfo}")

1
2
3
4
5
6
print("现在日期:", now.date())

try:
    print(now.date().tzinfo)
except AttributeError:
    print("日期对象没有时区信息")

上述代码将依次输出:

1
2
3
4
5
now() 不带参数:2024-05-19 11:03:41.550328, 时区信息 None
utcnow: 2024-05-19 11:03:41.550595, 时区信息 None
现在时间 2024-05-19 19:03:41.550865+08:00, 时区信息 Asia/Shanghai
现在日期:2024-05-19
日期对象没有时区信息

不过,限于篇幅,我们对时间问题的介绍只能浅尝辄止。在这里,我们主要关注在 Numpy 中,日期/时间如何表示,它们彼此之间如何比较、转换,以及如何与 Python 对象进行比较和转换。

在 Numpy 中,日期/时间总是用一个 64 位整数(np.datetime64)来表示,此外,还关联了一个表示其单位(比如,纳秒、秒等)的元数据结构。np.datetime64是没有时区概念的。

1
2
3
tm = np.datetime64('1970-01-01T00:00:00')
print(tm)
print(tm.dtype)

这将显示为:

1
2
1970-01-01T00:00:00
datetime64[s]

这里的[s]就是我们前面所说的时间单位。其它常见单位还有[ms][us][ns]等等。

除了从字符串解释之外,我们还可以直接将 Python 对象转换成np.datetime64,反之亦然:


1
2
3
4
5
tm = np.datetimet64(datetime.datetime.now())
print(tm)

print(tm.item())
print(tm.astype(datetime.datetime))

下面我们来看看如何实现不同格式之间的批量转换。这在处理 akshare, tushare 或者 QMT 等第三方数据源提供的行情数据时,非常常见。

首先我们构造一个时间数组。顺便提一句,这里我们将使用np.timedelta64这个时间差分类型:

1
2
3
now = np.datetime64(datetime.datetime.now())
arr = np.array([now + np.timedelta64(i, 'm') for i in range(3)])
arr

输出结果如下:

1
2
3
4
array(['2024-05-19T12:57:47.349178', 
       '2024-05-19T12:58:47.349178',
       '2024-05-19T12:59:47.349178'], 
     dtype='datetime64[us]')

我们可以通过np.datetime64.astype()方法将时间数组转换为 Python 的时间对象:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
time_arr = arr.astype(datetime.datetime)

# 转换后的数组,每个元素都是 TIMEZONE NAIVE 的 DATETIME 对象
print(type(time_arr[0]))

# !!! 技巧
# 如何把 NP.DATETIME64 数组转换为 PYTHON DATETIME.DATE 数组?
date_arr = arr.astype('datetime64[D]').astype(datetime.date)
# 或者 -- 两者的容器不一样
date_arr = arr.astype('datetime64[D]').tolist()
print(type(date_arr[0]))

这里的关键是,我们之前生成的arr数组,其元素类型为np.datetime64[us]。它到 Python datetime.date的转换将损失精度,所以 Numpy 要求我们显式地指定转换类型。

如何将以字符串表示的时间数组转换为 Numpy datetime64 对象数组呢?答案仍然是 astype() 方法。

1
2
3
4
5
6
# 将时间数组转换为字符串数组
str_arr_time = arr_time.astype(str)
print(str_arr_time)

# 再将字符串数组转换为 DATETIME64 数组,精度指定为 D
str_arr_time.astype('datetime64[D]')

显示结果为:

1
2
3
4
5
6
7
8
9
array(['2024-05-19T12:57:47.349178', 
       '2024-05-19T12:58:47.349178',
       '2024-05-19T12:59:47.349178'], 
       dtype='datetime64[us]')

array([
    '2024-05-19', 
    '2024-05-19'],               
    dtype='datetime64[D]')

最后,我们给一个 QMT 获取交易日历后的格式转换示例。在 QMT 中,我们通过get_trading_dates来获取交易日历,该函数返回的是一个整数数组,每个元素的数值,是从 unix epoch 以来的毫秒数。

我们可以通过以下方法对其进行转换:

1
2
3
4
import numpy as np

days = get_trading_dates('SH', start_time='', end_time='', count=10)
np.array(days, dtype='datetime64[ms]').astype(datetime.date)

QMT 官方没有直接给出交易日历转换方案,但给出了如何将 unix epoch 时间戳转换为 Python 时间对象(但仍以字符串表示):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import time
def conv_time(ct):
    '''
    conv_time(1476374400000) --> '20161014000000.000'
    '''
    local_time = time.localtime(ct / 1000)
    data_head = time.strftime('%Y%m%d%H%M%S', local_time)
    data_secs = (ct - int(ct)) * 1000
    time_stamp = '%s.%03d' % (data_head, data_secs)
    return time_stamp

conv_time(1693152000000)

我们需要对每一个数组元素使用上述解析方法。官方方案的优点是不依赖任何三方库。不过,没有量化程序能离开 Numpy 库,所以,我们的方案并未增加第三方库的依赖。

字符串操作

你的数据源、或者本地存储方案很可能使用 Numpy Structured Array 或者 Rec Array 返回证券列表。很显然,证券列表中一定会包括字符串,因为它一定会存在证券代码列和证券名称列。有一些还会返回证券的地域属性和其它属性,这也往往是字符串。

对证券列表,我们常常有以下查询操作:

  1. 获取在某个板块上市的股票列表,比如,北交所、科创板和创业板与主板的个股交易规则上有一些不同,因此,我们的策略很可能需要单独为这些板块构建。这就有了按板块过滤证券列表的需要。也可能我们要排除 ST,刚上市新股。这些都可以通过字符串操作来实现。

  1. 市场上有时候会出现魔幻的名字炒作。比如龙年炒龙字头(或者含龙的个股)、炒作“东方”、炒作“中”字头。作为量化人,参与这样的炒作固然不可取,但我们要拥有分析市场、看懂市场的能力。

Numpy 中的大多数字符串操作都封装在 numpy.char 这个包下面。它主要提供了一些用于格式化的操作(比如左右填充对齐、大小写转换等)、查找和替换操作。

下面的代码展示了如何从证券列表中过滤创业板:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import numpy as np
import numpy.char as nc

# 生成 STRUCTURED ARRAY, 字段有 SYMBOL, NAME, IPO DATE
arr = np.array([('600000.SH', '中国平安', '1997-08-19'),
                ('000001.SZ', '平安银行', '1997-08-19'),
                ('301301.SZ', '川宁生物', '2012-01-01')
                ], dtype=[('symbol', 'S10'), ('name', 'S10'), ('ipo_date', 'datetime64[D]')])

def get_cyb(arr):
    mask = np.char.startswith(arr["symbol"], b"30")
    return arr[mask]

Question

我们在查找创业板股票时,使用的是 b"30"来进行匹配。为何要用 b"30"而不是"30"?

注意第 11 行,我们要通过np.char.startswith()来使用startswith函数。任何一个 numpy array 对象都没有这个方法。

".SZ"是我们的数据源给股票编制的交易所代码。不同的数据源,可能使用不同的交易所代码。比如,聚宽数据源会使用.XSHG 表示上交所,.XSHE 表示深交所。现在,如果我们要将上述代码转换为聚宽的格式,应该如何操作?


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# 生成 STRUCTURED ARRAY, 字段有 SYMBOL, NAME, IPO DATE
arr = np.array([('600000.SH', '中国平安', '1997-08-19'),
                ('000001.SZ', '平安银行', '1997-08-19'),
                ('301301.SZ', '川宁生物', '2012-01-01')
                ], dtype=[('symbol', 'U10'), ('name', 'U10'), ('ipo_date', 'datetime64[D]')])

def translate_exchange_code(arr):
    symbols = np.char.replace(arr["symbol"], ".SH", ".XSHG")
    print(symbols)
    symbols = np.char.replace(symbols, ".SZ", ".XSHE")

    arr["symbol"] = symbols
    return arr

translate_exchange_code(arr)

这一次,我们把 symbol 和 name 的定义改为 Unicode 型,以避免我们查找时,要输入像 b"30"这样的字面量。

但输出的结果可能让人意外,因为我们将得到这样的输出:

1
2
3
4
array([('600000.XSH', '中国平安', '1997-08-19'),
       ('000001.XSH', '平安银行', '1997-08-19'),
       ('301301.XSH', '川宁生物', '2012-01-01')],
      dtype=[('symbol', '<U10'), ('name', '<U10'), ('ipo_date', '<M8[D]')])

Question

发生了什么?我们得到了一堆以".XSH"结尾的 symbol,它们本应该是"600000.XSHG"这样的字符串。错在哪里,又该如何修改?

在上面的示例中,如果我们把替换字符串改为空字符串,就实现了删除操作。这里就不演示了。

char 模块还提供了字符串相等比较函数equal:

1
2
3
4
arr = array([('301301.SZ', '川宁生物', '2012-01-01')],
      dtype=[('symbol', '<U10'), ('name', '<U10'), ('ipo_date', '<M8[D]')])

arr[np.char.equal(arr["symbol"], "301301.SZ")]

在这个特殊的场景下,我们也可以直接使用以下语法:

1
arr[arr["symbol"] == "301301.SZ"]

Tip

np.char 下的函数很多,如何记忆?实际上,这些函数多数是 Python 中 str 的方法。如果你熟悉 Pandas,就会发现 Pandas 中也有同样的用法。因此,像upper, lower, strip这样的str函数,你可以直接拿过来用。

Numpy 中的字符串函数另一个比较常用的场景,就是执行格式化。你可以通过ljust, 'center', rjust在显示一个数组前,将它们的各列数据进行左右空格填充,这样,输出时就可以比较整齐。

Question

2024 年 5 月 10 日起,南京化纤走出 7 连板行情,短短 7 日,股价翻倍。市场上还有哪些名字中包含化纤的个股?它们的涨跌是否存在相关性或者跨周期相关性?

Masked Array

你可能常常在一些接近底层的库中,看到 Numpy masked array 的用法。Masked Array 是 Numpy 中很重要的概念。考虑这样的情景,你有一个数据集,其中包含了一些缺失的数据或者无效值。这些”不合格“的数据,可能以 np.nan,np.inf, None 或者其它仅仅是语法上有效的值来表示(比如,在 COVID-19 数据集中,病例数出现负数)的。


如何在保持数据集的完整性不变的前提下,仍然能对数据进行运算呢?

Note

这里有一个真实的例子。你可以在 Kaggle 上找到一个 COVID-19 的数据集,这个数据集中,就包含了累积病例数为负数的情况。该数据集由 Johns Hoopkins University 收集并提供。

很显然,我们无法直接对这些数据进行运算。请看下面的例子:

1
2
3
x = np.array([1, 2, 3, np.inf, np.nan, None])
np.mean(x)
np.nanmean(x)

只要数据中包含 np.nan, np.inf 或者 None,numpy 的函数就无法处理它们。即使数据在语法上合法,但在语义上无效,Numpy 强行进行计算,结果也是错误的。

这里有一个量化中可能遇到的真实场景,某公司有一年的年利润为零,这样使得它的 YoY 利润增长在次年变得无法计算。如果我们需要利用 YoY 数据进一步进行运算,我们就需要屏蔽这一年的无效值。否则,我们会连 YoY 利润的均值都无法计算出来。

这里有一个补救的方法,就是将原数据拷贝一份,并且将无效值替换为 np.nan。此后多数运算,都可以用np.nan*来计算。这个方法我们已经介绍过了。但是,如果你是原始数据的收集者,显然你应该照原样发布数据,任何修改都是不合适的;如果你是数据的应用者,当然应该对数据进行预处理后,才开始运算。但是,你又很可能缺少了对数据进行预处理所必须的信息 -- 你怎么能想到像-1, 0 这样看起来人畜无害的小可爱们,竟然是隐藏着的错误呢?


为了解决这个问题,Numpy 就提供了 Masked Array。但是我们不打算在 Masked Array 上过多着墨。关于 Masked Array,我们可以借用这样一句话,很多人不需要知道 Masked Array,知道 Masked Array 的人则已经精通它了。

有一点需要注意的是,仅在需要时,使用 Masked Array。因为可能与您相像的相反,Masked Array 不会提高性能,反而,它大大降低了性能:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import numpy as np

# NUMPY VERSION 1.24.4
g = np.random.random((5000,5000))
indx = np.random.randint(0,4999,(500,2))
g_nan = g.copy()
g_nan[indx] = np.nan
mask =  np.full((5000,5000),False,dtype=bool)
mask[indx] = True
g_mask = np.ma.array(g,mask=mask)

%timeit (g_mask + g_mask)**2
# 901 MS ± 52.3 MS PER LOOP ...
%timeit (g_nan + g_nan)**2
# 109 MS ± 72.2 ΜS PER LOOP ...

可以看出,Masked Array 的性能慢了接近 9 倍。

Tip

如果你不得不对含有 np.nan 的数组进行运算,那么可以尝试使用 bottleneck 库中的 nan *函数。由于并不存在 nansquare 函数,但是考虑到求方差的运算中必然包含乘方运算,因此我们可以考虑通过 nanvar 函数来评测 numpy 与 bottleneck 的性能差异。

1
2
3
4
5
6
    %timeit np.var(g_mask)
    # 587 MS ± 37.9 MS PER LOOP ...
    %timeit np.nanvar(g_nan)
    # 281 MS ± 1.46 MS PER ...
    %timeit nanvar(g_nan)
    # 61 MS ± 362 ΜS PER LOOP ...

1
bottleneck 要比 numpy 快接近 5 倍。如果你使用的 numpy 版本较旧,那么 bottleneck 还会快得更多。

ufunc

ufunc 是 Numpy 中的重要概念,它对两个输入数组同时进行逐元素的操作(比如,相加,比较大小等)。在 Numpy 中大约定义了 61 个左右的 ufunc。这些操作都是由底层的 C 语言实现的,并且支持向量化,因此,它们往往具有更快的速度。

比如,在 numpy 中,求数组中的最大值,有两个相似的函数, np.maxnp.maximum可以达成这一目标。后者是 ufunc,前者不是,两者除了用法上有所区别之外,后者的速度也要快一些。

1
2
3
4
5
6
arr = np.random.normal(size=(1_000_000,))

%timeit np.max(arr)
# 801 MS ± 54.7 MS PER LOOP ...
%timeit np.maximum.reduce(arr)
# 775 MS ± 12.1 MS PER LOOP ...

np.maximum作为 ufunc,它本来是要接收两个参数的,并不能用来求一维数组的最大值。这种情况下,我们要使用reduce操作才能得到想要的结果。

这里np.maximum是一个 ufunc,则reduce是 unfunc 对象(在 Python 中,一切都是对象,包括函数)的属性之一。ufunc的其它属性还有accumulateouterreduceat等。

accumulate是 ufunc 中的另一个常用属性,可能你之前已经有所接触。比如,在求最大回撤时,我们就会用到它:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# 模拟一个股价序列
n = 1000
xs = np.random.randn(n).cumsum()

# 最大回撤结束期
i = np.argmax(np.maximum.accumulate(xs) - xs) 

# 最大回撤开始期
j = np.argmax(xs[:i]) 

# 最大回撤
mdd = (xs[j] - xs[i])/xs[j]

plt.plot(xs)
plt.plot([i, j], [xs[i], xs[j]], 'o', color='Red', markersize=10)

简洁就是美。在使用了accumulate之后,我们发现,计算最大回撤竟简单到只有两三行代码即可实现。

ufunc 如此好用,你可能要问,为何我用到的却不多?实际上,你很可能每天都在使用ufunc。许多二元数学操作,它们都是对 ufunc 的封装。


比如,当我们调用A + B时,实际上是调用了np.add(A, B)这个 ufunc。二者在功能和性能上都是等价的。其它的 ufunc 还有逻辑运算、比较运算等。只要某种运算接受两个数组作为参数,那么,很可能 Numpy 就已经实现了相应的 ufunc 操作。此外,一些三角函数,尽管只接受一个数组参数,但它们也是 ufunc。

因此,我们需要特别关注和学习的 ufunc 函数,可能主要就是maximumminimum等。这里再举一个在量化场景下,使用maximum的常用例子 -- 求上影线长度。

Tip

长上影线是资产向上攻击失败后留下的痕迹。它对股价后来的走势分析有一定帮助。首先,资金在这个点位发起过攻击,暴露了资金的意图。其次,攻击失败,接下来往往会有洗盘(或者溃败)。股价底部的长上影线,也被有经验的股民称为仙人指路。后面出现拉升的概率较大。上影线出现在高位时,则很可能是见顶信号。此时在较低级别的 k 线上,很可能已经出现均线拐头等比较明显的见顶信号。

现在,我们就来实现长上影线的检测。上影线的定义是:

\[ upper\_shadow = high - max(open, close) \]

下图显示了上影线:


如果 \(\(upper\_shadow > threshold\)\),则可认为出现了长上影线(当然,需要对 upper_shadow 进行归一化)。检测单日的上影线很简单,我们下面的代码将演示如何向量化地求解:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import numpy as np
import pandas as pd

rng = np.random.default_rng(seed=78)
matrix = rng.uniform(0.98, 1.02, (4, 30)).cumprod(axis=1)
opn = matrix[0]
close = matrix[-1]
high = np.max(matrix, axis=0)

upper_shadow = (high - np.maximum(opn, close))/close
np.round(upper_shadow, 2)

第 10 行的代码完全由 ufunc 组成。这里我们使用了 np.sub(减法), np.maximum, np.divide(除法)。maximum 从两个等长的数组 opn 和 close 中,逐元素比较并取出最大的那一个,组成一个新的数组,该数组也与 opn, close 等长。

如果要求下影线长度,则可以使用 minimum。


《因子投资与机器学习策略》开课啦!


目标清晰 获得感强


为什么你值得QuanTide的课程?



  1. https://zh.wikipedia.org/wiki/%E9%97%B0%E7%A7%92 

牛人太多:小市值因子之父,毕业论文被大佬狂怼

Rolf W. Banz,瑞士人,70 年代在芝加哥大学获得博士学位,并在该校任教过。此后他在伦敦经营了一家专注于小盘股投资的投资精品店,1991 年出售给 Alliance Capital。职业生涯的最后阶段,他回到了瑞士,在一家瑞士私人银行的资产管理子公司担任了高级职位。

退休后,他是一名博主。最后一篇博文,停留在 2017 年底。

小市值因子之父

Banz 因发现小市值因子一战成名。1981 年,在他的博士毕业论文中写道,基于纽交所长达 40 年的数据,小市值股票月均收益率比其他股票高 0.4%。这个发现深刻地影响了此后的市场。在后 Banz 时代,基金经理超配小盘股一度成了一种信仰。

在国内,研究表明,A 股市场在 2016 年以前,规模因子的显著性甚至超过了欧美等发达国家市场。该因子在 2017-2018 年期间,弱于大市值资产,但从 2021 年起,又跑赢了沪深 300。

这张图来自聚宽平台的回测,数据表明,两年时间里,小市值获得到 51.98%的超额收益,是相当不错的。

Rolf Banz 也因此获得了一个非正式的 title,小市值因子之父。

这个 title,不仅仅是因为小市值因子。如果说威廉. 夏普定义了第一个因子的话,那么第二个因子,小市值因子,则是 Banz 发现的。他完成了一生二,甚至是零生一的这一步。

不被大佬接受的论文

按理说,搞出了这么牛 x 的因子,作为论文答辩应该很顺利,对吧?但是,当 Banz 进行答辩时,意外发生了。

审查论文的都是大牛人,有迈伦·斯科尔斯、默顿·米勒和尤金·法玛,他们后来都获得了诺贝尔奖。但是,没有一个人喜欢 Banz 的发现。尤其是尤金·法玛(他是有效市场理论的提出者)。为什么?

有效市场的一个推论就是,只应该有一个因子,就是市场因子。因为,既然市场是有效的,就不应该存在信息差和错误定价。没有错误定价,就没有市场异象和新因子。

所以,当 Banz 在台上介绍自己的成果时,默顿·米勒就在台下问斯科尔斯,他的错误是什么?一众评委都天然地倾向否认 Banz 的结论。他们出席评审的目的,就是为了证明他是错的!

Banz 在学术成就上,是不如斯科尔斯,法玛他们这些人的,他也是一名著名的金融学家,但没能获得重要级别的大奖。但他提出的小市值因子如果用以投资,会非常有效。

Banz 自己的投资相对于其它金融学家来说比较成功。与之对比的是,斯科尔斯曾经掌管了一个名为长期资本管理的对冲基金,这个基金 1998 年倒闭,损失了数十亿美元。

简单即是美

Banz 的这篇论文并不复杂,只有 16 页纸。论文中并没有使用高深的数学,只是使用了基础的统计科学,GLS 和 OLS。如果当时有计算机可用的话,这个推导过程会显得格外简单。

Quote

if you can’t explain it simply, you don’t understand it well enough. - Einstein

Banz 发表论文时,他研究的数据来自 1926 年到 1975 年。这些数据就在芝大的 CRSP 数据库里。在漫长的 50 多年时间里,一直在静静地等待有人发现她的价值。芝大经济学人才辈出,期间不知道有多少人接触过这份数据。如此低垂和芳香的果实最终却被 Banz 采摘。

我想,这足以说明,在任何时代,在任何圈子,都会有一些低垂的果实,等待有心人去采摘

Banz 在论文中提到,他无法介绍小市值效应产生的原因。

但实际上这是容易解释的。从生物学的角度来看,幼小、成长性和风险总是相伴相随;从经济学的角度来看,一些很大的公司,往往已经做到了行业的天花板,做到了很高的市占率,如果他们也想市值 double 的话,就必须开拓一个新的赛道,并且这个赛道还得跟之前的一样宽。

但是,对大公司来说,创新和死亡都很困难。

如果在 Banz 之前的学者,能跳出计量经济学的窠臼看等市场,同时又具有扎实的统计和数学功底,发现这个因子的时间应该能早很多。

因为造原子弹的最大秘密,就是原子弹是可造的。

后 Banz 时代的 Banz

尽管小市值因子的论文使用的统计学知识非常基础,但扎实二字,其实比我们想像的要难。就这样一篇论文,尽管在形式上无懈可击,但仍然受到大量的质疑。

1999 年,Tyler Shumway and Vincent Warther 指出,Banz 的研究方法存在显著错误,他忘了减去退市股的负面效应。他们的文章发表为《 The Delisting Bias in CRSP's Nasdaq Data and Its Implications for the Size Effect》。

2011 年,Gary A. Miller and Scott A. MacKillop 认为,小市值并不优于大型股。他们使用标普 500 指数来代替大型股,使用 CRSP9-10 指数来代表小型股,两者都可以追溯到 1926 年。根据他们的研究,CRSP 9-10 和 DFA 基金在 1980 到 2000 年的回报率低于标准普尔 500 指数,虽然差距不大。

Note

实际上这个结论很容易误导人。它实际上证明了小市值因子的有效性。因为标普 500 指数的表现,远远超过了大多数基金经理。另外,根据 Miller & MacKillop,小市值因子只是在夏普上低于大型股 -- 这也是 Banz 在论文中就已经指出的事实。

Banz 对这些质疑没有进行回应。尽管他在 2012 年就开始撰写 个人博客,但就像完全没看到这些质疑一样。

[0908] QuanTide Weekly

本周要闻

  • 央行:降准有空间 利率进一步下行面临一定约束
  • 巴菲特再次减持美国银行,这次要做空自己的祖国?
  • 存量房贷下调预期落空,沪指连续三日跌破2800点

下周看点

  • 周一,8月CPI/PPI 数据发布
  • 周一,茅台业绩说明会,对白酒行业的未来发展预判是重要信号

本周精选

  • 那些必读的量化论文
  • 连载!量化人必会的 Numpy 编程 (2)

本周要闻

  • 央行货币政策司邹澜9月5日表示,是否降准降息还需观察经济走势。年初降准的政策效果还在持续显现,目前金融机构的平均法定存款准备金率大约为7%,还有一定的空间。同时,受银行存款向资管产品分流的速度、银行净息差收窄的幅度等因素影响,存贷款利率进一步下行面临一定的约束
  • 当地时间9月5日,美国证券交易委员会披露,巴菲特于9月4日前后连续三日减持美国银行套现约7.6亿美元。今年7月,他曾连续9天减持美国银行,在减持前,美国银行为其第二大重仓股,也是伯克希尔最赚钱的公司之一。
  • 沪指本周下跌2.69%,连续三日跌破2800点。目前沪指PE值处在12%分位左右,属较低位置。消息面上,上周盛传的存量房贷降息预期落空。
  • 证监会离职人员入股拟上市企业新规落地,要求更严、核查范围更广

消息来源:东方财富、财联社


那些必读的量化论文

  1. Portfolio Selection, Markovitz, 1952。在这篇文章里,马科维茨提出了现代资产组合理论(MPT),使他获得了 1990 年诺贝尔经济学奖。该论文通过将风险与回报之间的相关性纳入计算中来扩展了常见的风险回报权衡。
  2. A New Interpretation of Information Rate,Kelly, 1956。本文是当今著名的凯利准则的正式表述。该模型广泛应用于赌场游戏,特别是风险管理。作者推导出了一个公式,该公式确定了最佳分配规模,以便随着时间的推移最大化财富增长。
  3. Capital Asset Prices: A Theory of Market Equilibrium under Conditions of Risk (Sharpe, 1964)。基于Markovitz的工作,CAPM证明了只有一种有效的投资组合,即市场投资组合。这篇文章提出了著名的Beta概念。Sharpe和Markovitz是师生关系,同年获得诺贝尔经济学奖。
  4. Efficient Capital Markets: a Review of Theory and Empirical Work,Fama, 1970。这篇论文是首次提出非常流行的“有效市场假说”概念的开创性著作,尽管现在这一理论受到很大的质疑,但是它的学术价值仍然很高。
  5. The Pricing of Options and Corporate Liabilities, 1973, Black & Scholes。著名的BS公式,利用物理传热方程作为估算期权价格的起点。这也是为什么对冲基金喜欢物理系学生的原因。

  1. Does the Stock Market Overreact? 1985, Bondt & Thaler。这篇文章质疑了有效市场假说,Bondt和Thaler提出,有统计上显着的证据表明相反的情况,投资者往往会对意外的新闻事件反应过度。这也是行为金融学的经典研究之一。行为金融学在近年来是诺奖的大热门。它的底层哲学是主观价值论和与人为本的思想。Thaler也是诺奖获得者,并在电影《大空头》中扮演了自己。
  2. A closed-form GARCH option valuation model,1997, Heston & Nandi。本文提出了一种封闭式公式,用于评估现货资产,并使用广义自回归条件异方差(GARCH) 模型对其方差进行建模。由于其复杂性和实用性, GARCH 模型在 20 世纪 90 年代估计波动率方面广受欢迎,金融业也积极采用它们。
  3. Optimal Execution of Portfolio Transactions,2000,Almgren & Chriss。对于每个负责完善交易执行算法的量化开发人员来说,这篇论文绝对是必读的学术文献。文章指出,价格波动来自外生性(市场本身的波动)和内生性(自己的订单对市场造成的影响)。这是一种量子效应!作者通过最小化交易成本和波动风险的组合,形式化了一种执行和衡量交易执行绩效的方法。
  4. Incorporating Signals into Optimal Trading,2017,Lehalle。与 Almgren 和 Chris (2000) 的工作非常相似,本文讨论的是最佳交易执行。作者通过将马尔可夫信号纳入最优交易框架,进一步完善了该领域所做的工作,并针对带有漂移的随机过程(Ornstein-Uhlenbeck 过程)的资产特殊情况得出了最优交易策略。
  5. The Performance of Mutual Funds in the Period 1945-1964, Michael Jessen。Sharpe在他的文章里提出了Beta这个概念,而Alpha这个概念,就是由Jessen在这篇论文中提出的。

  1. Common risk factors in the returns on stocks and bonds, 1993, Eugene F. Fama。在这篇文章里,Famma提出了三因子。
  2. Review of Financial Studies,2017, Stambaugh, Yuan。这篇论文出现的比较晚,因此可以把之前出现的、与因子投资相关的重要论文都评述一篇,因而就成为了我们快速了解行业的一篇重要论文。这篇文章发表比较晚,但也有了952次引用。
  3. 151 Trading Strategies, 2018, Zura Kakushadze。作者来自world quant,是Alpha101的作者之一。这篇论文引用了大量的论文(2000+),也是我们进行泛读的好材料。

量化人必会的 NUMPY 编程 (2) - 核心语法

1. Structured Array

一开始,Numpy的数组只能存放同质的元素,即这些元素都必须有同样的数据类型。但对很多表格类数据,它们往往是由一条条记录组成的,而这些记录,又是由不同数据类型的数据组成的。比如,以最常见的行情数据来讲,它就必须至少包含时间、证券代码和OHLC等数据。

为了满足这种需求,Numpy扩展出一种名为Structured Array的数据格式。它是一种一维数组,每一个元素都是一个命名元组。

我们可以这样声明一个Structured Array:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import numpy as np
import datetime
dtypes = [
        ("frame", "O"),
        ("code", "O"),
        ("open", "f4"),
        ("high", "f4"),
        ("low", "f4"),
        ("close", "f4")
    ]
secs = np.array(
    [
        (datetime.date(2024, 3, 18), "600000", 8.9, 9.1, 8.8, 9),
        (datetime.date(2024, 3, 19), "600000", 8.9, 9.1, 8.8, 9),
    ], dtype = dtypes
)

在这个数据结构中,共有6个字段,它们的名字和类型通过dtype来定义。这是一个List[Tuple]类型。在初始化数据部分,它也是一个List[Tuple]。

Warning

初学者很容易犯的一个错误,就是使用List[List]来初始化Numpy Structured Array,而不是List[Tuple]类型。这会导致Numpy在构造数组时,对应不到正确的数据类型,报出一些很奇怪的错误。
比如,下面的初始化是错误的:

1
2
3
4
secs = np.array([
    [datetime.date(2024, 3, 18), "600000", 8.9, 9.1, 8.8, 9],
    [datetime.date(2024, 3, 19), "600000", 8.9, 9.1, 8.8, 9]
], dtype=dtypes)
这段代码会报告一个难懂的"Type Error: float() argument must be a string or ..."

我们使用上一节学过的inspecting方法来查看secs数组的一些特性:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
print(f"secs的维度是{secs.ndim}")
print(f"secs的shape是{secs.shape}")
print(f"secs的size是{secs.size}")
print(f"secs的length是{len(secs)}")

print(f"secs[0]的类型是{type(secs[0])}")
print(f"secs[0]的维度是{secs[0].ndim}")
print(f"secs[0]的shape是{secs[0].shape}")
print(f"secs[0]的size是{secs[0].size}")
print(f"secs[0]的length是{len(secs[0])}")

可以看出,secs数组是一维数组,它的shape (2,)也正是一维数组的shape的表示法。前一节还介绍过这几个属性的关系,大家可以自行验证下是否仍然得到满足。


但secs的元素类型则是numpy.void,它在本质上是一个named tuple,所以,我们可以这样访问其中的任一字段:

1
2
3
4
print(secs[0]["frame"])

# 不使用列名(字段名),使用其序号也是可以的
print(secs[0][0])

我们还可以以列优先的顺序来访问其中的一个“单元格”:

1
print(secs["frame"][0])

对表格数据,遍历是很常见的操作,我们可以这样遍历:

1
2
for (frame, code, opn, high, low, close) in secs:
    print(frame, code, opn, high, low, close)

Numpy structured array在这部分的语法要比Pandas的DataFrame易用许多。我们在后面介绍Pandas时,还会提及这一点。

2. 运算类

2.1. 比较和逻辑运算

我们在上一节介绍定位、查找时,已经接触到了比较,比如:arr > 1。它的结果将数组中的每一个元素都与1进行比较,并且返回一个布尔型的数组。

现在,我们要扩充比较的指令:


函数 描述
all 如果数组中的元素全为真,返回True。可用以判断一组条件是否同时成立。
any 如果数组中至少有一个元素为真,则返回True。用以判断一组条件是否至少有一个成立
isclose 判断两个数组中的元素是否一一近似相等,返回所有的比较结果
allclose 判断两个数组中的元素是否全部近似相等
equal 判断两个数组中的元素是否一一相等,返回所有的比较结果。
not_equal 一一判断两个数组中的元素是否不相等,返回所有的比较结果
isfinite 是否为数字且不为无限大
isnan 测试是否为非数字
isnat 测试对象是否不为时间类型
isneginf 测试对象是否为负无限大
isposinf 测试对象是否为正无限大
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# 开启多行输出模式
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

np.random.seed(78)
returns = np.random.normal(0, 0.03, size=4)
returns
# 判断是否全下跌
np.all(returns <= 0)
np.any(returns <= 0)

# 模拟一个起始价格为8元的价格序列
prices = np.cumprod(1+returns) * 8

# 对应的涨停价如下
buy_limit_prices = [8.03, 8.1, 8.1, 8.3]

# 判断是否涨停
np.isclose(prices, buy_limit_prices, atol=1e-2)

除了判断一个数组中的元素要么都为True,要么至少一个为True之外,有时候我们还希望进行模糊一点的判断,比如,如果过去20天中,超过60%的是收阳线,此时我们可以用np.count_nonzero,或者np.sum来统计数组中为真的情况:


1
2
np.count_nonzero(returns > 0)
np.sum(returns > 0)

在上一节进行比较的示例中,我们都只使用了单个条件。如果我们要按多个条件的组合进行查找,就需要依靠逻辑运算来实现。

在Numpy中,逻辑运算既可以通过函数、也可以通过运算符来完成:

函数 运算符 描述 python等价物
logical_and & 执行逻辑与操作 and
logical_or | 执行逻辑或操作 or
logical_not ~ 执行逻辑或操作 not
logical_xor '^' 执行逻辑异或操作 xor

逻辑运算有什么用呢?比如我们在选股时,有以下表格数据:

股票 pe mom
AAPL 30.5 0.1
GOOG 32.3 0.3
TSLA 900.1 0.5
MSFT 35.6 0.05

上述表格可以用Numpy的Structured Array来表示为:

1
2
3
4
5
6
tickers = np.array([
    ("APPL", 30.5, 0.1),
    ("GOOG", 32.3, 0.3),
    ("TSLA", 900.1, 0.5),
    ("MSFT", 35.6, 0.05)
], dtype=[("ticker", "O"), ("pe", "f4"), ("mom", "f4")])

现在,我们要找出求PE < 35, 动量 (mom) > 0.2的记录,那么我们可以这样构建条件表达式:


1
(tickers["pe"] < 35) & (tickers["mom"] > 0.2)

Numpy会把pe这一列的所有值跟35进行比较,然后再与mom与0.2比较的结果进行逻辑与运算,这相当于:

1
np.array((1,1,0,0)) & np.array((0, 1, 1, 0))
在Numpy中,True与1的值在做逻辑运算时是相等的;0与False也是。

如果不借助于Numpy的逻辑操作,我们就要用Python的逻辑操作。很不幸,这必须使用循环。如果计算量大,这将会比较耗时间。

在量化中使用异或操作的例子仍然最可能来自于选股。比如,如果我们要求两个选股条件,只能有一个成立时,才买入;否则不买入,就可以使用异或运算。

2.2. 集合运算

在交易中,我们常常要执行调仓操作。做法一般是,选确定新的投资组合,然后与当前的投资组合进行比较,找出需要卖出的股票,以及需要买入的股票。这个操作,就是集合运算。在Python中,我们一般是通过set语法来实现。

在Numpy中,我们可以使用通过以下方法来实现集合运算:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import numpy as np

# 创建两个一维数组
x = np.array([1, 2, 3, 4, 5])
y = np.array([4, 5, 6, 7, 8])

# 计算交集
intersection = np.intersect1d(x, y)
print("Intersection (交集):", intersection)

# 计算并集
union = np.union1d(x, y)
print("Union (并集):", union)


1
2
diff = np.setdiff1d(x, y)
print("x - y:", diff)

此外,我们还可能使用in1d(a1, a2)方法来判断a1 中的元素是否都在 a2 中存在。比如,在调仓换股中,如果当前持仓都在买入计划中,则不需要执行调仓。

2.3. 数学运算和统计

Numpy中数学相关的运算有线性代数运算(当然还有基本代数运算)、统计运算、金融指标运算等等。

2.3.1. 线性代数

线性代数在量化中有重要用途。比如,在现代资产组合理论(MPT)中,我们要计算资产组合收益率及协方差,都要使用矩阵乘法。大家可以参考投资组合理论与实战系列文章,下面是其中的一段代码:

1
2
3
...
cov = np.cov(port_returns.T)
port_vol = np.sqrt(np.dot(np.dot(weights, cov), weights.T))

矩阵乘法是线性代数中的一个核心概念,它涉及到两个矩阵的特定元素按照规则相乘并求和,以生成一个新的矩阵。具体来说,如果有一个矩阵A 为 \(m \times n\) 维,另一个矩阵B 为 \(n \times p\) 维,那么它们的乘积 \(C = AB\)将会是一个\(m \times p\)维的矩阵。乘法的规则是A的每一行与B的每一列对应元素相乘后求和。

下面通过一个具体的例子来说明矩阵乘法的过程:

假设我们有两个矩阵A和B:


$$ A = \begin{bmatrix} 2 & 3 \ 1 & 4 \end{bmatrix} $$ 和 $$ B = \begin{bmatrix} 1 & 2 \ 3 & 1 \end{bmatrix} $$ 要计算AB,我们遵循以下步骤:

取A的第一行 \((2, 3)\) 与的第一列\((1,3)\) 相乘并求和得到\(C_{11} = [2\times1 + 3\times3 = 11]\)

同理,取A的第一行与B的第二列\((2, 1)\)相乘并求和得到\(C_{12} = [2\times2 + 3\times1 = 7]\)

取A的第二行\((1, 4)\)与B的第一列相乘并求和得到\(C_{21} = [1\times1 + 4\times3 = 13]\)

取A的第二行与B的第二列相乘并求和得到\(C_{22} = [1\times2 + 4\times1 = 5]\)

因此,矩阵C = AB为:

\[ C = \begin{bmatrix} 11 & 7 \\ 13 & 6 \ \end{bmatrix} \]

与代数运算不同,矩阵乘法不满足交换律,即一般情况下\(AB \neq BA\)

在Numpy中,我们可以使用np.dot()函数来计算矩阵乘法。


上述示例使用numpy来表示,即为:

1
2
3
4
A = np.array([[2,3],[1,4]])
B = np.array([[1,2],[3,1]])

np.dot(A, B)

最终我们将得到与矩阵C相同的结果。

除此之外,矩阵逆运算(np.linalg.inv)在计算最优投资组合权重时,用于求解方程组,特征值和特征向量(np.linalg.eig, np.linalg.svd)在分析资产回报率的主成分,进行风险分解时使用。

2.3.2. 统计运算

常用的统计运算包括:

函数 描述
np.mean 计算平均值
np.median 计算中位数
np.std 计算标准差
np.var 计算方差
np.min 计算最小值
np.max 计算最大值
np.percentile 用于计算历史数据的分位点
np.quantile 用于计算历史数据的分位数,此函数与percentile功能相同
np.corr 用于计算两个变量之间的相关性

np.percentile与np.quantile功能相同,都是用于计算分位数。两者在参数上略有区别。


当我们对同一数组,给quantile传入分位点0.25时,如果给percentile传入分位点25时,两者的结果将完全一样。也就是后者要乘以100。在量化交易中,quantile用得可能会多一些。

np.percentile(或者np.quantile)的常见应用是计算25%, 50%和75%的分位数。用来绘制箱线图(Boxplot)。

此外,我们也常用它来选择自适应参数。比如,在RSI的应用中,一般推荐是低于20(或者30)作为超卖,此时建议买入;推荐是高于80(或者70)作为超买,此时建议卖出。但稍微进行一些统计分析,你就会发现这些阈值并不是固定的。如果我们以过去一段时间的RSI作为统计,找出它的95%分位作为卖点,15%作为买点,往往能得到更好的结果。

2.3.3. 量化指标的计算

有一些常用的量化指标的计算,也可以使用Numpy来完成,比如,计算移动平均线,就可以使用Numpy提供的convolve函数。

1
2
3
import numpy as np
def moving_average(data, window_size):
    return np.convolve(data, np.ones(window_size)/window_size, 'valid')

当然,很多人习惯使用talib,或者pandas的rolling函数来进行计算。convolve(卷积)是神经网络CNN的核心,正是这个原因,我们这里提一下。

np.convolve的第二个参数,就是卷积核。这里我们是实现的是简单移动平均,所以,卷积核就是一个由相同的数值组成的数组,它们的长度就是窗口大小,它们的和为1。


如果我们把卷积核替换成其它值,还可以实现WMA等指标。从信号处理的角度看,移动平均是信号平滑的一种,使用不同的卷积核,就可以实现不同的平滑效果。

在量化中,还有一类计算,这里也提一下,就是多项式回归。比如,某两支股票近期都呈上升趋势,我们想知道哪一支涨得更好?这时候我们就可以进行多项式回归,将其拟合成一条直线,再比较它们的斜率。

下面的代码演示了如何使用Numpy进行多项式回归。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import numpy as np
import matplotlib.pyplot as plt

returns = np.random.normal(0, 0.02, size=100)
alpha = 0.01
close = np.cumprod(1 + returns + alpha)

a, b = np.polyfit(np.arange(100), close, deg=1)

# 继续之前的代码

# 使用a, b构建回归线的y值
regression_line = a * np.arange(100) + b

# 绘制原始的close曲线
plt.figure(figsize=(10, 6))
plt.plot(close, label='Close Price', color='blue')

# 绘制回归线
plt.plot(regression_line, label='Regression Line', color='red', linestyle='--')

# 添加图例、标题和坐标轴标签
plt.title('Stock Close Price vs Regression Line')
plt.xlabel('Time Period')
plt.ylabel('Price')
plt.legend()

# 显示图表
plt.grid(True)
plt.show()

这将生成下图:

3. 类型转换和Typing

在不同的库之间交换数据,常常会遇到格式问题。比如,我们从第三方数据源拿到的行情数据,它们用的时间字段常常会是字符串(这是代码少写了几行吗?!)。有一些库在存储行情时,对OHLC这些字段进行了优化,使用了4个字节的浮点数,但如果要传给talib进行指标计算,就必须先转换成8个字节的浮点数,等等,这就有了类型转换的需求。

此外,我们还会遇到需要将numpy数据类型转换为python内置类型,比如,将numpy.float64转换为float的情况。


3.1. Numpy内部类型转换

Numpy内部类型转换,我们只需要使用astype函数即可。

1
2
3
4
5
6
7
8
x = np.array(['2023-04-01', '2023-04-02', '2023-04-03'])
print(x.astype(dtype='datetime64[D]'))

x = np.array(['2014', '2015'])
print(x.astype(np.int32))

x = np.array([2014, 2015])
print(x.astype(np.str_))

Tips

如何将boolean array转换成整数类型,特别是,将True转为1,False转为-1? 在涉及到阴阳线的相关计算中,我们常常需要将 open > close这样的条件转换为符号1和-1,以方便后续计算。这个转换可以用:

1
2
3
>>> x = np.array([True, False])
>>> x * 2 - 1
... array([ 1, -1])

3.2. Numpy类型与Python内置类型转换

如果我们要将Numpy数组转换成Python数组,可以使用tolist函数。

1
2
x = np.array([1, 2, 3])
print(x.tolist())

我们通过item()函数,将Numpy数组中的元素转换成Python内置类型。

1
2
3
x = np.array(['2023-04-01', '2023-04-02'])
y = x.astype('M8[s]')
y[0].item()

Warning

一个容易忽略的事实是,当我们从Numpy数组中取出一个标量时,我们都应该把它转换成为Python对象后再使用。否则,会发生一些隐藏的错误,比如下面的例子:

1
2
3
4
5
6
import json
x = np.arange(5)
print(json.dumps([0]))
print(x[0])

json.dumps([x[0]])
这里最后一行会出错。提示type int64 is not JSON serializable。把最后一行换成json.dumps([x[0].item()])则可以正常执行。

3.3. Typing

从Python 3.1起,就开始引入类型注解(type annotation),到Python 3.8,基本上形成了完整的类型注解体系。我们经常看到函数的参数类型注解,比如,下面的代码:

1
2
3
from typing import List
def add(a: List[int], b: int) -> List[int]:
    return [i + b for i in a]

从此,Python代码也就有了静态类型检查支持。

NumPy的Typing模块提供了一系列类型别名(type aliases)和协议(protocols),使得开发者能够在类型注解中更精确地表达NumPy数组的类型信息。这有助于静态分析工具、IDE以及类型检查器提供更准确的代码补全、类型检查和错误提示。

这个模块提供的主要类型是ArrayLike, NDArray和DType。


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import numpy
from numpy.typing import ArrayLike, NDArray, DTypeLike
import numpy as np

def calculate_mean(data: ArrayLike) -> float:
    """计算输入数据的平均值,数据可以是任何ArrayLike类型"""
    return np.mean(data)

def add_one_to_array(arr: NDArray[np.float64]) -> NDArray[np.float64]:
    """向一个浮点数数组的每个元素加1,要求输入和输出都是np.float64类型的数组"""
    return arr + 1

def convert_to_int(arr: NDArray, dtype: DTypeLike) -> NDArray:
    """将数组转换为指定的数据类型"""
    return arr.astype(dtype)

如果你是在像vscode这样的IDE中使用上述函数,你就可以看到函数的类型提示。如果传入的参数类型不对,还能在编辑期间,就得到错误提示。

4. 拓展阅读

4.1. Numpy的数据类型

在Numpy中,有以下常见数据类型。每一个数字类型都有一个别名。在需要传入dtype参数的地方,一般两者都可以使用。另外,别名在字符串类型、时间和日期类型上,支持得更好。比如,'S5'是Ascii码字符串别外,它除了指定数据类型之外,还指定了字符串长度。datetime64[S]除了表明数据是时间日期类型之外,还表明它的精度到秒。


类型 别名 类别 别名
np.int8 i1 np.float16 f2
np.int16 i2 np.float32 f4,还可指定结尾方式,比如'f4'表示大端字节序。其它float类型同。
np.int32 i4 np.float64 f8
np.int64 i8 np.float128 f16
np.uint8 u1 np.bool_ b1
np.uint16 u2 np.str_ U (后接长度,例如U10)
np.uint32 u4 np.bytes_ S (后接长度,例如S5)
np.uint64 u8 np.datetime64
np.timedelta64 m8和m8[D] m8[h] m8[m] m8[s]等

《因子投资与机器学习策略》开课啦!


9月8日晚,在线直播,不见不散


点击入会