跳转至




Index

Numpy应用案例[3]

“去极值是量化分析预处理中不可或缺的一步。在各种方法中,中位数拉回法因其鲁棒性和适应性广泛应用。通过 Numpy 的向量化实现,我们可以轻松完成多资产的去极值操作,显著提升计算效率。”


向量化又一例: 多资产中位数去极值

去极值是量化分析预处理中的常见步骤,在机器学习中也很常见。在各种去极值方法中,中位数拉回是对数据分布特性适应性最广、最鲁棒的一种。

我们先介绍绝对中位差(median absolute deviation)的概念:

MAD=median(Ximedian(X))MAD = median(|X_i - median(X)|)

为了能将 MAD 当成与标准差σ\sigma相一致的估计量,即 \(σ^=k.MAD\hat{\sigma} = k. MAD\)

这里 k 为比例因子常量,如果分布是正态分布,可以计算出: k=1(Φ1(34))1.4826 k = \frac{1}{(\Phi^{-1}(\frac{3}{4}))} \approx 1.4826

基于这个 k 值,取 3 倍则近似于 5。

代码实现如下:

1
2
3
4
5
6
7
from numpy.typing import ArrayLike

def mad_clip(arr: ArrayLike, k: int = 3):
    med = np.median(arr)
    mad = np.median(np.abs(arr - med))

    return np.clip(arr, med - k * mad, med + k * mad)

1
2
3
np.random.seed(78)
arr = np.append(np.random.randint(1, 4, 20), [15, -10])
mad_clip(arr, 3)

这段代码只能对单一资产进行mad_clip。如果要同时对A股所有资产的某种指标去极值,上述方法需要循环5000多次,显然速度较慢。此时,我们可以使用下面的方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def mad_clip(df: Union[NDArray, pd.DataFrame], k: int = 3, axis=1):
    """使用 MAD 3 倍截断法去极值"""

    med = np.median(df, axis=axis).reshape(df.shape[0], -1)
    mad = np.median(np.abs(df - med), axis=axis)

    magic = 1.4826
    offset = k * magic * mad
    med = med.flatten()
    return np.clip(df.T, med - offset, med + offset).T

这一版的 mad_clip 可以接受 numpy ndarray 和 pandas dataframe 作为参数。输入的数据格式是什么,它返回的数据格式就是什么。

我们在np.median调用中,传入了 axis参数。如果axis=0, 表明按列的方向遍历,因此是按行取中位数;axis=1,表明按行的方向遍历,因此是按列取中位数。

我们使用真实数据测试一下:


1
2
3
4
5
6
7
# 加载测试数据
start = datetime.date(2023, 1, 1)
end = datetime.date(2023, 12, 29)
barss = load_bars(start, end, 7)

closes = barss["close"].unstack("asset").iloc[-5:]
closes

输出数据为:

asset/date 002095.XSHE 003042.XSHE 300099.XSHE 301060.XSHE 601689.XSHG 603255.XSHG 688669.XSHG
2023-12-25 23.400000 18.090000 6.10 13.00 73.910004 36.799999 18.080000
2023-12-26 21.059999 17.520000 5.94 12.83 72.879997 37.000000 18.080000
2023-12-27 20.070000 17.590000 6.04 12.84 72.000000 36.840000 18.049999
2023-12-28 20.010000 18.139999 6.11 13.14 72.199997 38.150002 18.440001
2023-12-29 20.270000 18.580000 6.19 13.29 73.500000 37.299999 18.740000

为了测试效果,我们将k设置为较小的值,以观察其效果:

1
mad_clip(closes,k=1)
asset/date 002095.XSHE 003042.XSHE 300099.XSHE 301060.XSHE 601689.XSHG 603255.XSHG 688669.XSHG
2023-12-25 23.400000 18.090000 10.217396 13.00 25.962605 25.962605 18.080000
2023-12-26 21.059999 17.520000 10.296350 12.83 25.863649 25.863649 18.080000
2023-12-27 20.070000 17.590000 10.325655 12.84 25.774343 25.774343 18.049999
2023-12-28 20.010000 18.139999 10.582220 13.14 26.297781 26.297781 18.440001
2023-12-29 20.270000 18.580000 10.659830 13.29 26.820169 26.820169 18.740000

我们看到,原始数据中的73.9被拉回到25.9,6.1被拉回到10.2(以第一行为例),并且都是以行为单位计算的。


min_range: 多少周期以来的最小值?

这是一个很常见的需求,比如,有股谚语云,天量见天价,地量见地价。当行情处在高位,成交量创出一段时间以来的天量之后,后续成交量将难以为继,容易引起下跌;当行情处在低位,成交量创出一段时间以来的地量之后,表明市场人气极度低迷,此时价格容易被操纵,从而引来投机盘。

在通达信公式中有此函数,在麦语言中,对应的方法可能是LOWRANGE。以下是myTT中LowRange函数的实现:

1
2
3
4
5
def LOWRANGE(S):                       
    # LOWRANGE(LOW)表示当前最低价是近多少周期内最低价的最小值 by jqz1226
    rt = np.zeros(len(S))
    for i in range(1,len(S)):  rt[i] = np.argmin(np.flipud(S[:i]>S[i]))
    return rt.astype('int')

这是一个看似简单,但实际上比较难实现的功能。如果我们对上述函数进行测试,会发现它不一定实现了需求(也可能是本文作者对此函数理解有误)。

1
2
3
s = [ 1, 2, 2, 1, 3, 0]

LOWRANGE(np.array(s))

在上述测试中,我们希望得到的输出是[1, 1, 1, 3, 1, 6],但LOWRANG将给出以下输出:


1
array([0, 0, 0, 2, 0, 0])

下面,我们给出该函数的向量化实现。

Warning

该函数在开头的几个输出中,存在出错可能。不影响因子分析,暂未修复。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def min_range(s):
    """计算序列s中,元素i是此前多少个周期以来的最小值

    此方法在个别数字上有bug

    Example:
        >>> s = np.array([5, 7, 7, 6, 5, 8, 2])
        >>> min_range(s)
        array([1, 2, 1, 2, 3, 1, 6])
    """
    n = len(s)

    # handle nan
    filled = np.where(np.isnan(s), -np.inf, s)
    diff = filled[:,None] - filled
    mask = np.triu(np.ones((n, n), dtype=bool), k=1)
    masked = np.ma.array(diff, mask=mask)

    rng = np.arange(n)
    ret = rng - np.argmax(np.ma.where(masked > 0, rng, -1), axis=1)
    ret[0] = 1
    if filled[1] <= filled[0]:
        ret[1] = 2
    return ret

1
2
s = np.array([5, 7, 7, 6, 5, 8, 2])
min_range(s)

最终输出的结果是:

1
array([1, 1, 2, 3, 4, 1, 6])

在第2个7的位置,输出与期望不一致,但此后计算都正确。这个实现非常有技巧,运用了三角矩阵做mask array,从而消解了循环。

均线计算:SMA和分时均线

使用numpy计算移动均线非常简单,使用np.convolve()即可。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def moving_average(ts: ArrayLike, win: int, padding=True)->np.ndarray:
    kernel = np.ones(win) / win

    arr = np.convolve(ts, kernel, 'valid')
    if padding:
        return np.insert(arr, 0, [np.nan] * (win - 1))
    else:
        return arr

moving_average(np.arange(5), 3)

输出结果为array([nan, nan, 1., 2., 3.])


移动均线是只考虑价格信息的一种均线。分时均价线则则同时纳入成交量和成交价信息的均线,在日内交易中有特别重要的含义。比如,在市场不好的情况下,如果个股价格位于分时均线下方,此前两次上冲均线失败,那么,一旦冲第三次失败,一般认为要尽快卖出。反之亦然。

均价线的计算如下:

如果当前时刻为t,则用开盘以来,直到时刻t为止的成交金额除以成交量,即得到该时刻的累积成交均价。将所有时刻的成交均价连接起来,即构成了分时均价线。

这个功能看似复杂,但由于numpy提供了cumsum函数,因此实际上计算非常简单:

1
2
3
4
5
def intraday_moving_average(bars: DataFrame)->np.ndarray:
    acc_vol = bars["volume"].cumsum()
    acc_money = barss["amount"].cumsum()

    return acc_money / acc_vol

在本环境中,只提供了日线数据,我们以日线代替分钟线进行测试:

1
2
3
4
5
start = datetime.date(2023, 1, 1)
end = datetime.date(2023, 12, 29)
barss = load_bars(start, end, 1)

intraday_moving_average(barss)

计算最大回撤

最大回撤(MDD)是指投资组合从最高点到最低点的最大观察损失,直到达到新的最高点。最大回撤是一定时间周期内的下行风险指标。

MDD=TroughValuePeakValuePeakValue MDD = \frac{Trough Value - Peak Value}{Peak Value}

max drawdown是衡量投资策略风险的重要指标,因此,在empyrical库中有实现。不过,作为策略风险评估指标,empyrical没必要返回duration等信息,也没有实现滑动窗口下的mdd。现在,我们就来实现滑动版本。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
# https://stackoverflow.com/a/21059308
from numpy.lib.stride_tricks import as_strided
import matplotlib.pyplot as plt

def windowed_view(x, window_size):
    """Creat a 2d windowed view of a 1d array.

    `x` must be a 1d numpy array.

    `numpy.lib.stride_tricks.as_strided` is used to create the view.
    The data is not copied.
    Example:

    >>> x = np.array([1, 2, 3, 4, 5, 6])
    >>> windowed_view(x, 3)
    """

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
    """
    array([[1, 2, 3],
           [2, 3, 4],
           [3, 4, 5],
           [4, 5, 6]])
    """
    y = as_strided(x, shape=(x.size - window_size + 1, window_size),
                   strides=(x.strides[0], x.strides[0]))
    return y

def rolling_max_dd(x, window_size, min_periods=1):
    """Compute the rolling maximum drawdown of `x`.

    `x` must be a 1d numpy array.
    `min_periods` should satisfy `1 <= min_periods <= window_size`.

    Returns an 1d array with length `len(x) - min_periods + 1`.
    """
    if min_periods < window_size:
        pad = np.empty(window_size - min_periods)
        pad.fill(x[0])
        x = np.concatenate((pad, x))
    y = windowed_view(x, window_size)
    running_max_y = np.maximum.accumulate(y, axis=1)
    dd = y - running_max_y
    return dd.min(axis=1)

np.random.seed(0)
n = 100
s = np.random.randn(n).cumsum()
win = 20
mdd = rolling_max_dd(s, win, min_periods=1)

plt.plot(s, 'b')
plt.plot(mdd, 'g.')
plt.show()

测试表明,当时序s长度为1000时,rolling_max_dd的计算耗时为100𝜇S。

滑动窗口下,生成的mdd与原序列对照图如下:

该方法中,还简单地封装了一个将一维数组转换为滑动窗口视图的函数,可以在其它地方使用。


寻找自适应参数

很多基于技术指标的交易策略往往指定了固定的阈值。比如,一些人会在RSI 80以上做空,在RSI 20以下做多。即使是用在指数和行业板块上,这样的指标仍然不够精确,因为在上行通道中,RSI的顶点会高于下行通道中的RSI顶点;在下行通道中,RSI的底部则会比上行通道中的RSI底部低很多。

此外,不同的标的,RSI取值范围也不一样。不仅仅是RSI,许多技术指标都存在需要根据当前的市场环境和标的,采用自适应参数的情况。

其中一个方案是使用类似于布林带的方案,使用指标均值的标准差上下界。但这个方案隐含了技术指标均值的数据分布服从正态分布的条件。

我们可以放宽这个条件,改用分位数,即numpy的percentile来确定参数阈值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
%precision 2

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

np.random.seed(78)
s = np.random.randn(100)

hbound = np.percentile(s, 95)
lbound = np.percentile(s, 5)

1
2
s[s> hbound]
s[s< lbound]

通过percentile找出来超过上下界的数据,输出如下:

1
2
array([2.09, 2.27, 2.21, 2.12, 2.19])
array([-1.68, -2.4 , -1.97, -1.7 , -1.46])

一旦指标超过95%的分位数(hbound),我们就做空;一旦指标低于5%的分位数(lbound),我们就做多。

这里我们也可以使用中位数极值法。一旦指标超过中位数MAD值的3倍,就发出交易信号 。

Numpy应用案例[2]

“线性回归是量化分析中的常用工具,但在大规模数据中,循环实现效率低下。通过 Numpy 的向量化技巧,我们可以将计算提速百倍,轻松应对滑动窗口和批量计算等复杂需求。”


高维打击:多出来的维度如何解决了向量化难题

动量和反转是最重要的量化因子。有很多种刻画动量的算法,但拟合直线的斜率无疑是最直观的一种。

在上面的图形中,如果两条直线对应着两支股票的均线走势,显然,你更愿意买入橘色的那一支,因为它的斜率更大,也就是涨起来更快一些。


人类的视觉有着强大的模式发现能力。我们很容易看出橙色点的上升趋势更强。但是,如果要让计算机也知道哪一组数据更好,则需要通过直线拟合,找出趋势线,再比较趋势线的斜率才能确定。直线拟合,或者曲线拟合(cureve fit),或者多项式拟合,都是线性回归问题。

在这一章中,我们就来讨论直线拟合的方法,并且要从最普通的实现讲到能提速百倍的向量化实现。

线性回归与最小二乘法

如何从一堆随机的数字中,发现其中可能隐藏的、最能反映它们趋势的直线呢?这个问题从大航海时代开始起就困扰着科学家们。在那个时代,水手们需要通过星相来确定自己的纬度(经度则是通过日冕来计算的),这就需要基于人类的观测,准确地描述天体的行为。

在解决天体观测所引起的误差问题时,法国人阿德里安.玛丽.勒让德(Adrien Marie Legendre)最先发现了最小二乘法(1805年),此后,高斯在1809年发表的著作《天体运动论》中也提出了这一方法。最终,他在1829年,给出了最小二乘法是线性回归最佳拟合的理论证明。这一证明被称为高斯-马尔科夫定理。

Info

在勒让德与高斯之间,存在着一场最小二乘法发现权之争。这场争战一点也不亚于牛顿与莱布尼茨关于微积分发现权之争。不过,勒让德提出最小二乘法时,还只是一种猜想,其理论证明是由高斯完成的。勒让德是18世纪后期,与拉格朗日、拉普拉斯齐名的数学家,被称为三L(取自三个人的名字)。


所谓直线拟合(或者线性回归),就是要在由(x,y)构成的点集中,找到一条直线,使得所有点到该直线的距离的平方和最小。在Numpy中,我们最常用的方法是通过Polynomial.fit来进行拟合:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from numpy.polynomial import Polynomial
import matplotlib.pyplot as plt


x = np.linspace(0, 100, 100)

rng = np.random.default_rng(seed = 78)
y = 0.05 * x + 2 + rng.normal(scale = 0.3, size=len(x))

# 绘制这些随机点
plt.scatter(np.arange(100), y, s=5)

# 最小二乘法拟合出趋势线
fitted = Polynomial.fit(x, y, deg=1, domain=[])
y_pred = fitted(x)
plt.plot(y_pred)

# 我们关注的斜率
b, a = fitted.coef
print("slope is {a:.3f}")

在很多教程上,我们看到在Numpy中进行多项式回归,使用的是polyfit。但实际上从1.4起,我们应该使用polynomial模块下的类和方法。polyfit被看成是过时的接口。要注意,尽管polynomial被设计为polyfit等函数的替代物,但在行为上有较大不同。


关于Polynomial.fit,需要注意我们传入了deg=1domain=[]两个参数。指定deg=1,是因为我们希望将由(x,y)表示的点集拟合成为直线;如果希望拟合成为二次曲线,则可以指定deg=2,高次曲线依次类推。

domain是可以省略的一个参数。省略它之后,对我们后面通过fitted(x)求预测直线并没有影响,但是,它对我们要求的斜率(第19行)会有影响。如果在这里我们省略domain = []这个参数,我们得到的系数会是 [2.47, 4.57]。这将与我们造数据时使用的[0.05, 2]相去甚远。而当我们指定domain = []后,我们将看到拟合直线的系数变回为我们期望的[0.05, 2]。

Tip

我们也可以这样调用fit:

1
2
3
fitted = Polynomial.fit(x, y, deg=1, window=(min(x), max(x)))
intercept, slope = fitted.coef
print(f"slope is {slope:.3f}")

或者:

1
2
3
fitted =  Polynomial.fit(x, y, deg=1)
intercept, slope = fitted.convert().coef
print(f"slope is {slope:.3f}")

convert是一个对domain和window进行平移和缩放的转换函数。

线性回归是非常常用的技巧,在scipy, sklearn和statsmodels等库中都有类似的实现。正因为如此,求价格或者均线的斜率因子,在技术上是一件轻而易举的事。


但是,如果我们要求移动线性回归呢?这可能就需要一点技巧了。

移动线性回归

移动线性回归是指对一个时间序列TT和滑动窗口winwin,计算出另一个时间序列SS,使得

Si=Slope(T[iwin+1,iwin+2,...,i]) S_i = Slope(T_{[i-win+1, i-win+2, ..., i]})

最简单直接的实现是通过一个循环:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from numpy.polynomial import Polynomial
import matplotlib.pyplot as plt


x = np.linspace(0, 100, 100)
y = np.sin(x/10) + x/10

# 曲线拐点
flags = np.sign(np.diff(np.diff(y)))
pivots = np.argwhere(flags[1:] != flags[:-1]).flatten()

plt.plot(y)

win = 10
S = []
for i in range(win, len(y)):
    xi = x[i-win:i]
    yi = y[i-win:i]
    fitted = Polynomial.fit(xi, yi, deg=1, domain=[])
    S.append(fitted.coef[1])

1
2
3
4
5
6
    if i in pivots:
        xj = x[i-win-10:i+10]
        y_pred = fitted(xj)
        plt.plot(xj, y_pred, '--')

print(S)

这段代码计算了从第10个周期起,过去10个点的拟合直线的斜率,并且在曲线的转折点上,我们绘制了它的切线。这些切线,正是通过拟合直线的参数来生成的。


在这里,我们生成曲线的方法是使用了方程x + sin(x)。你会发现,这样生成的曲线,有几分神似股价的波动。这并不奇怪,股价的波动本来就应该可以分解成为一个直流分量与许多正弦波的叠加。直流分量反映了公司的持续经营能力,正弦波则反映了各路短炒资金在该标的上操作留下的痕迹。

回到我们的正题来。现在,我们去掉绘图功能,测试一下这段代码需要执行多长时间:

《因子投资与机器学习策略》喊你上课啦!

面向策略研究员的专业课程,涵盖因子挖掘因子检验和基于机器学习的策略开发三大模块,构建你的个人竞争优势!

    课程助教: 宽粉
  1. 全网独家精讲 Alphalens 分析报告,助你精通因子检验和调优。
  2. 超 400 个独立因子,分类精讲底层逻辑,学完带走 350+ 因子实现。
  3. 课程核心价值观:Learning without thought is labor lost. Know-How & Know-Why.
  4. 三大实用模型,奠定未来研究框架1:聚类算法寻找配对交易标的(中性策略核心)、基于 XGBoost 的资产定价、趋势交易模型。
  5. 领先的教学手段:SBP(Slidev Based Presentation)、INI(In-place Notebook Interaction)和基于 Nbgrader(UCBerkley 使用中)的作业系统。

1. 示例模型思路新颖。未来一段时间,你都可以围绕这些模型增加因子、优化参数,构建出领先的量化策略系统。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
x = np.linspace(0, 100, 100)
y = np.sin(x/10) + x/10

def moving_lsq(ts, win: int):
    x = np.arange(len(ts))
    S = []
    for i in range(win, len(ts)):
        xi = x[i-win:i]
        yi = ts[i-win:i]
        fitted = Polynomial.fit(xi, yi, deg=1, domain=[])
        S.append(fitted.coef[1])
    return S

%timeit moving_lsq(y, 10)

90次循环,总共用去25ms的时间。考虑到A股有超过5000支股票,因此全部计算一次,这将会超过2分钟。

要加速这一求解过程,我们必须借助向量化。但是,这一次,再也没有魔法一样的API可以调用,我们必须挽起袖子,从理解底层的数学原理开始,做出自己的实现。


向量化

考虑到一个有m个点的线性回归,对其中的每一个点,都会有:

如果所有的点都落在同一条直线上,那么意味着以下矩阵方程成立:

Y=Aβ+b Y = A\beta + b

这里AA即为X,β\beta为要求解的系数:

β=(ATA)1ATY \beta = {(A^TA)}^{-1}A^TY

关于公式推导,可以见《Python programming and Numerical Methods - A Guide for Engineers and Scientists》

我们可以手动来验证一下这个公式:

1
2
3
4
x = np.linspace(0, 9, 10)
y = x + np.sin(x/10)

win = 10

1
2
3
4
5
6
A = np.arange(win).reshape((win,1))
pinv = np.dot(np.linalg.inv(np.dot(A.T, A)), A.T)
alpha_1 = np.dot(pinv, y[:, np.newaxis]).item()
alpha_2 = Polynomial.fit(x, y, deg=1, domain=[]).coef[1]

np.isclose(alpha_1, alpha_2, atol=1e-2)

我们用两种方法分别求解了斜率,结果表明,在1e-2的绝对误差约束下,两者是相同的。

注意在第7行中, 我们使用了pinv这个奇怪的变量名。这是因为,在numpy.linalg中存在一个同名函数,正好就是计算$ {(ATA)}A^T$的。

不过,到现在为止,我们仅仅是完成了Polynomial.fit所做的事情。如果我们要一次性计算出所有滑动窗口下的拟合直线斜率,究竟要如何做呢?

注意,我们计算斜率,是通过一个矩阵乘法来实现的。在Numpy中,矩阵乘法天然是向量化的。在第7行中,pinv是一个(1,10)的矩阵,而y则是一个(10,)的向量。如果我们能把滑动窗口下,各组对应的pinv堆叠起来,并且y也堆叠起来成为一个矩阵,那么就能通过矩阵乘法,一次性计算出所有的斜率。

我们先从y看起。当我们对y = np.arange(5)按窗口为3进行滑动时,我们实际上是得到了这样一个矩阵:

[012123234] \begin{bmatrix}0&1&2\\1&2&3\\2&3&4\\\end{bmatrix}

要得到这个矩阵,我们可以使用fancy index:

1
2
3
4
5
y[
    [0, 1, 2],
    [1, 2, 3],
    [2, 3, 4]
]

因此,我们要实现滑动窗口下的y的矩阵,只需要构建出这个fancy index矩阵即可。好消息是,fancy index矩阵非常有规律:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def extract_windows_vectorized(array, win:int):
    start = 0
    max = len(array) - win + 1

    sub_windows = (
        start +
        # expand_dims are used to convert a 1D array to 2D array.
        np.expand_dims(np.arange(win), 0) +
        np.expand_dims(np.arange(max), 0).T
    )

    return array[sub_windows]

arr_1d = np.arange(10, 20)

extract_windows_vectorized(arr_1d, 4)

如果你觉得这个方法难以理解的话,Numpy已经为我们提供了一个名为as_strided的函数,可以一步到位,实现我们要的功能,并且比上述方法更快(1倍):


1
2
3
4
5
6
7
8
9
from numpy.lib.stride_tricks import as_strided

y = np.arange(3, 10)
stride = y.strdies[0]

win = 4
shape = (len(y) - win + 1, win)
strides = (stride, stride)
as_strided(y, shape, strides)

矩阵pinv由x产生。如果时间序列y有100个周期长,那么x的值将会是从0到99。其中[x0,x1,...,xwin1][x0, x1, ..., x_{win-1}]对应[y0,y1,...,ywin1][y0, y1, ..., y_{win-1}], [x1,x2,...,xwin][x1, x2, ..., x_{win}]对应[y1,y2,...,ywin][y1, y2, ..., y_{win}][xwin,xwin+1,...x1][x_{-win}, x_{-win+1}, ... x_{-1}]对应[ywin,ywin+1,...,y1][y_{-win}, y_{-win+1}, ..., y_{-1}]

因此,我们需要构照的系数矩阵AA即:

1
2
A = as_strided(np.arange(len(y)), shape, strides)
pinv = np.linalg.pinv(A)

接下来的回归运算跟之前一样:

1
alpha = pinv.dot(y).sum(axis = 0)

完整的代码如下:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def moving_lsq_vector(ts, win:int):
    stride = ts.strides[0]

    strides = (stride, stride)
    shape = (win,len(ts)-win+1)
    A = as_strided(np.arange(len(ts)), shape= shape, strides=strides)
    pinv = np.linalg.pinv(A)
    y = as_strided(ts, shape=shape, strides = strides)

    return pinv.dot(y).sum(axis=0)

这一版本,比之前使用Polynomial.fit加循环的快了100倍。你可能猜到了,这个100倍近似于我们循环的次数。这就是循环的代价。

现在,运用这里的方法,我们还可以加速别的计算吗?答案是显然的。如果你要快速计算5000支股票过去10天的的5日平均,在获取这些股票最近14天的股价之后,组成了一个(5000, 14)的矩阵AA。现在,我们要做的就是将这个矩阵转换成为3维矩阵,然后再与一个卷积核做乘法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from numpy.typing import NDArray
def batch_move_mean(A: NDArray, win:int)->NDArray:
    """批量计算移动平均线

    Args:
        A: (m*n)的价格矩阵。m为股票支数
        win: 移动平均窗口
    Returns:
        (m * (n-win+1))的矩阵.
    """
    kernel = np.ones(win)/win

1
2
3
4
5
    s0, s1 = prices.strides
    m, n = prices.shape

    pm = as_strided(prices, shape=(m, n-win + 1, win), strides=(s0, s1, s1))
    return np.dot(pm, kernel.T).squeeze()

我们通过下面的代码来测试它:

1
2
3
4
5
6
prices = np.array([
    np.arange(0, 14),
    np.arange(10, 24)
])

batch_move_mean(prices, 5)

输出为:

1
2
array([[ 2.,  3.,  4.,  5.,  6.,  7.,  8.,  9., 10., 11.],
       [12., 13., 14., 15., 16., 17., 18., 19., 20., 21.]])

正如我们期待的一样。但快如闪电。


题外话

要不要使用拟合趋势线作为一种动量因子?这是一个值得深入讨论的话题。斜率因子最大的问题,不是所有的时间序列,都有明显的趋势。从数学上看,如果拟合残差较大,说明该时间序列还没有形成明显的趋势,那么斜率因子就不应该投入使用。另一个问题就是线性回归的老问题,即个别outlier对拟合的影响较大。拟合直线究竟是应该使得所有点到该直线的距离和最小,还是应该使得大多数点到该直线的距离和更小(小于前者)?

结论

我们讨论了如何通过numpy来进行线性回归(直线拟合),并且介绍了最新的polynomial API。然后我们介绍了如何利用矩阵运算来实现向量化。核心点是通过as_strided方法将数组升维,再对升维后的数组执行矩阵运算,以实现向量化。我们还用同样的思路,解决了一次性求5000支股票的移动均线问题。您还看到了像fancy index的实用性举例。这一章技巧较多,算是对前面章节的小结。

Numpy应用案例[1]

“在很多量化场景下,我们都需要统计某个事件连续发生的次数,比如连续涨停、N 连阳等。通过 Numpy 的向量化操作,我们可以快速实现这些需求,既高效又简洁。”


1. 连续值统计

在很多量化场景下,我们都需要统计某个事件连续发生了多少次,比如,连续涨跌停、N连阳、计算Connor's RSI中的streaks等等。比如,要判断下列收盘价中,最大的连续涨停次数是多少?最长的N连阳数是多少?

1
2
a = [15.28, 16.81, 18.49, 20.34, 21.2, 20.5, 22.37, 24.61, 27.07, 29.78, 
    32.76, 36.04]

假设我们以10%的涨幅为限,则可以将上述数组转换为:

1
2
pct = np.diff(a) / a[:-1]
pct > 0.1

我们将得到以下数组:

1
2
flags = [True, False,  True, False, False, False,  True, False,  True,
        True,  True]

这仍然不能计算出最大连续涨停次数,但它是很多此类问题的一个基本数据结构,我们将原始的数据按条件转换成类似的数组之后,就可以使用下面的神器了:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from numpy.typing import ArrayLike
from typing import Tuple
import numpy as np

def find_runs(x: ArrayLike) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Find runs of consecutive items in an array.

    Args:
        x: the sequence to find runs in

    Returns:
        A tuple of unique values, start indices, and length of runs
    """

    # ensure array
    x = np.asanyarray(x)
    if x.ndim != 1:
        raise ValueError("only 1D array supported")
    n = x.shape[0]

    # handle empty array
    if n == 0:
        return np.array([]), np.array([]), np.array([])

    else:
        # find run starts
        loc_run_start = np.empty(n, dtype=bool)
        loc_run_start[0] = True
        np.not_equal(x[:-1], x[1:], out=loc_run_start[1:])
        run_starts = np.nonzero(loc_run_start)[0]

        run_values = x[loc_run_start]  # find run values
        run_lengths = np.diff(np.append(run_starts, n))  # find run lengths

        return run_values, run_starts, run_lengths

1
2
3
pct = np.diff(a) / a[:-1]
v,s,l = find_runs(pct > 0.099)
(v, s, l)

输出结果为:

(array([ True, False, True]), array([0, 3, 6]), array([3, 3, 5]))

输出结果是一个由三个数组组成的元组,分别表示:

  • value: unique values
  • start: start indices
  • length: length of runs

在上面的输出中,v[0]为True,表示这是一系列涨停的开始,s[0]则是对应的起始位置,此时索引为0; l[0]则表示该连续的涨停次数为3次。同样,我们可以知道,原始数组中,最长连续涨停(v[2])次数为5(l[2]),从索引6(s[2])开始起。

所以,要找出原始序列中的最大连续涨停次数,只需要找到l中的最大值即可。但要解决这个问题依然有一点技巧,我们需要使用第4章中介绍的 mask array。

1
2
3
4
v_ma = np.ma.array(v, mask = ~v)
pos = np.argmax(v_ma * l)

print(f"最大连续涨停次数{l[pos]},从索引{s[pos]}:{a[s[pos]]}开始。")

在这里,mask array的作用是,既不让 v == False 的数据参与计算


(后面的 v_ma * l),又保留这些元素的次序(索引)不变,以便后面我们调用 argmax 函数时,找到的索引跟v, s, l中的对应位置是一致的。

我们创建的v_ma是一个mask array,它的值为:

1
2
3
masked_array(data=[True, --, True],
             mask=[False,  True, False],
       fill_value=True)

当它与另一个整数数组相乘时,True就转化为数字1,这样相乘的结果也仍然是一个mask array:

1
2
3
masked_array(data=[3, --, 5],
             mask=[False,  True, False],
       fill_value=True)

当arg_max作用在mask array时,它会忽略掉mask为True的元素,但保留它们的位置,因此,最终pos的结果为2,对应的 v,s,l中的元素值分别为: True, 6, 5。

如果要统计最长N连涨呢?这是一个比寻找涨停更容易的任务。不过,这一次,我们将不使用mask array来实现:

1
2
3
4
v,s,l = find_runs(np.diff(a) > 0)
pos = np.argmax(v * l)

print(f"最长N连涨次数{l[pos]},从索引{s[pos]}:{a[s[pos]]}开始。")

输出结果是: 最长N连涨次数6,从索引5:20.5开始。


这里的关键是,当Numpy执行乘法时,True会被当成数字1,而False会被当成数字0,于是,乘法结果自然消除了没有连续上涨的部分,从而不干扰argmax的计算。

当然,使用mask array可能在语义上更清楚一些,尽管mask array的速度会慢一点,但正确和易懂常常更重要。

2. 计算 Connor's RSI中的streaks

Connor's RSI(Connor's Relative Strength Index)是一种技术分析指标,它是由Nirvana Systems开发的一种改进版的相对强弱指数(RSI)。Connor's RSI与传统RSI的主要区别在于它考虑了价格连续上涨或下跌的天数,也就是所谓的“连胜”(winning streaks)和“连败”(losing streaks)。这种考虑使得Connor's RSI能够更好地反映市场趋势的强度。

在前面介绍了find_runs函数之后,计算streaks就变得非常简单了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def streaks(close):
    result = []
    conds = [close[1:]>close[:-1], close[1:]<close[:-1]]
    flags = np.select(conds, [1, -1], 0)
    v, _, l = find_runs(flags)
    for i in range(len(v)):
        if v[i] == 0:
            result.extend([0] * l[i])
        else:
            result.extend([v[i] * x for x in range(1, (l[i] + 1))])
    return np.insert(result, 0, 0)

这段代码首先将股价序列划分为上涨、下跌和平盘三个子系列,然后对每个子系列计算连续上涨或下跌的天数,并将结果合并成一个新的数组。在streaks中,连续上涨天数要用正数表示,连续下跌天数用负数表示,所以在第5行中,通过np.select将条件数组转换为[1, 0, -1]的序列,后面使用乘法就能得到正确的连续上涨(下跌)天数了。


Numpy核心语法[6]

“Masked Array 是 Numpy 中的重要概念,能帮助我们在保持数据完整性的同时,屏蔽无效值进行运算。而 ufunc 则通过底层 C 实现的向量化操作,让复杂计算变得高效且简洁。”


1. Masked Array

你可能常常在一些接近底层的库中,看到 Numpy masked array 的用法。Masked Array 是 Numpy 中很重要的概念。考虑这样的情景,你有一个数据集,其中包含了一些缺失的数据或者无效值。这些”不合格“的数据,可能以 np.nan,np.inf, None 或者其它仅仅是语法上有效的值来表示(比如,在 COVID-19 数据集中,病例数出现负数)的。如何在保持数据集的完整性不变的前提下,仍然能对数据进行运算呢?

Note

这里有一个真实的例子。你可以在 Kaggle 上找到一个 COVID-19 的数据集,这个数据集中,就包含了累积病例数为负数的情况。该数据集由 Johns Hoopkins University 收集并提供。

很显然,我们无法直接对这些数据进行运算。请看下面的例子:

1
2
3
x = np.array([1, 2, 3, np.inf, np.nan, None])
np.mean(x)
np.nanmean(x)

只要数据中包含 np.nan, np.inf 或者 None,numpy 的函数就无法处理它们。即使数据在语法上合法,但在语义上无效,Numpy 强行进行计算,结果也是错误的。


这里有一个量化中可能遇到的真实场景,某公司有一年的年利润为零,这样使得它的 YoY 利润增长在次年变得无法计算。如果我们需要利用 YoY 数据进一步进行运算,我们就需要屏蔽这一年的无效值。否则,我们会连 YoY 利润的均值都无法计算出来。

这里有一个补救的方法,就是将原数据拷贝一份,并且将无效值替换为 np.nan。此后多数运算,都可以用np.nan*来计算。这个方法我们已经介绍过了。但是,如果你是原始数据的收集者,显然你应该照原样发布数据,任何修改都是不合适的;如果你是数据的应用者,当然应该对数据进行预处理后,才开始运算。但是,你又很可能缺少了对数据进行预处理所必须的信息 -- 你怎么能想到像-1, 0 这样看起来人畜无害的小可爱们,竟然是隐藏着的错误呢?

《因子投资与机器学习策略》喊你上课啦!

面向策略研究员的专业课程,涵盖因子挖掘因子检验和基于机器学习的策略开发三大模块,构建你的个人竞争优势!

    课程助教: 宽粉
  1. 全网独家精讲 Alphalens 分析报告,助你精通因子检验和调优。
  2. 超 400 个独立因子,分类精讲底层逻辑,学完带走 350+ 因子实现。
  3. 课程核心价值观:Learning without thought is labor lost. Know-How & Know-Why.
  4. 三大实用模型,奠定未来研究框架1:聚类算法寻找配对交易标的(中性策略核心)、基于 XGBoost 的资产定价、趋势交易模型。
  5. 领先的教学手段:SBP(Slidev Based Presentation)、INI(In-place Notebook Interaction)和基于 Nbgrader(UCBerkley 使用中)的作业系统。

1. 示例模型思路新颖。未来一段时间,你都可以围绕这些模型增加因子、优化参数,构建出领先的量化策略系统。

为了解决这个问题,Numpy 就提供了 Masked Array。但是我们不打算在 Masked Array 上过多着墨。关于 Masked Array,我们可以借用这样一句话,很多人不需要知道 Masked Array,知道 Masked Array 的人则已经精通它了。

有一点需要注意的是,仅在需要时,使用 Masked Array。因为可能与您相像的相反,Masked Array 不会提高性能,反而,它大大降低了性能:

1
2
3
4
5
6
7
8
import numpy as np

# NUMPY VERSION 1.24.4
g = np.random.random((5000,5000))
indx = np.random.randint(0,4999,(500,2))
g_nan = g.copy()
g_nan[indx] = np.nan
mask =  np.full((5000,5000),False,dtype=bool)

1
2
3
4
5
6
7
mask[indx] = True
g_mask = np.ma.array(g,mask=mask)

%timeit (g_mask + g_mask)**2
# 901 MS ± 52.3 MS PER LOOP ...
%timeit (g_nan + g_nan)**2
# 109 MS ± 72.2 ΜS PER LOOP ...

可以看出,Masked Array 的性能慢了接近 9 倍。

Tip

如果你不得不对含有 np.nan 的数组进行运算,那么可以尝试使用 bottleneck 库中的 nan *函数。由于并不存在 nansquare 函数,但是考虑到求方差的运算中必然包含乘方运算,因此我们可以考虑通过 nanvar 函数来评测 numpy 与 bottleneck 的性能差异。

1
2
3
4
5
6
    %timeit np.var(g_mask)
    # 587 MS ± 37.9 MS PER LOOP ...
    %timeit np.nanvar(g_nan)
    # 281 MS ± 1.46 MS PER ...
    %timeit nanvar(g_nan)
    # 61 MS ± 362 ΜS PER LOOP ...

bottleneck 要比 numpy 快接近 5 倍。如果你使用的 numpy 版本较旧,那么 bottleneck 还会快得更多。


2. ufunc

ufunc 是 Numpy 中的重要概念,它对两个输入数组同时进行逐元素的操作(比如,相加,比较大小等)。在 Numpy 中大约定义了 61 个左右的 ufunc。这些操作都是由底层的 C 语言实现的,并且支持向量化,因此,它们往往具有更快的速度。

比如,在 numpy 中,求数组中的最大值,有两个相似的函数, np.maxnp.maximum可以达成这一目标。后者是 ufunc,前者不是,两者除了用法上有所区别之外,后者的速度也要快一些。

1
2
3
4
5
6
arr = np.random.normal(size=(1_000_000,))

%timeit np.max(arr)
# 801 MS ± 54.7 MS PER LOOP ...
%timeit np.maximum.reduce(arr)
# 775 MS ± 12.1 MS PER LOOP ...

np.maximum作为 ufunc,它本来是要接收两个参数的,并不能用来求一维数组的最大值。这种情况下,我们要使用reduce操作才能得到想要的结果。

这里np.maximum是一个 ufunc,则reduce是 unfunc 对象(在 Python 中,一切都是对象,包括函数)的属性之一。ufunc的其它属性还有accumulateouterreduceat等。

accumulate是 ufunc 中的另一个常用属性,可能你之前已经有所接触。比如,在求最大回撤时,我们就会用到它:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# 模拟一个股价序列
n = 1000
xs = np.random.randn(n).cumsum()

# 最大回撤结束期
i = np.argmax(np.maximum.accumulate(xs) - xs) 

# 最大回撤开始期
j = np.argmax(xs[:i]) 

# 最大回撤
mdd = (xs[j] - xs[i])/xs[j]

plt.plot(xs)
plt.plot([i, j], [xs[i], xs[j]], 'o', color='Red', markersize=10)

50%

简洁就是美。在使用了accumulate之后,我们发现,计算最大回撤竟简单到只有两三行代码即可实现。


ufunc 如此好用,你可能要问,为何我用到的却不多?实际上,你很可能每天都在使用ufunc。许多二元数学操作,它们都是对 ufunc 的封装。比如,当我们调用A + B时,实际上是调用了np.add(A, B)这个 ufunc。二者在功能和性能上都是等价的。其它的 ufunc 还有逻辑运算、比较运算等。只要某种运算接受两个数组作为参数,那么,很可能 Numpy 就已经实现了相应的 ufunc 操作。此外,一些三角函数,尽管只接受一个数组参数,但它们也是 ufunc。

因此,我们需要特别关注和学习的 ufunc 函数,可能主要就是maximumminimum等。这里再举一个在量化场景下,使用maximum的常用例子 -- 求上影线长度。

Tip

长上影线是资产向上攻击失败后留下的痕迹。它对股价后来的走势分析有一定帮助。首先,资金在这个点位发起过攻击,暴露了资金的意图。其次,攻击失败,接下来往往会有洗盘(或者溃败)。股价底部的长上影线,也被有经验的股民称为仙人指路。后面出现拉升的概率较大。上影线出现在高位时,则很可能是见顶信号。此时在较低级别的 k 线上,很可能已经出现均线拐头等比较明显的见顶信号。

现在,我们就来实现长上影线的检测。上影线的定义是:

upper_shadow=highmax(open,close) upper\_shadow = high - max(open, close)

下图也显示了上影线:


如果 upper_shadow > threshold,则可认为出现了长上影线(当然,需要对 upper_shadow 进行归一化)。检测单日的上影线很简单,我们下面的代码将演示如何向量化地求解:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import numpy as np
import pandas as pd

rng = np.random.default_rng(seed=78)
matrix = rng.uniform(0.98, 1.02, (4, 30)).cumprod(axis=1)
opn = matrix[0]
close = matrix[-1]
high = np.max(matrix, axis=0)

upper_shadow = (high - np.maximum(opn, close))/close
np.round(upper_shadow, 2)

第 10 行的代码完全由 ufunc 组成。这里我们使用了 np.sub(减法), np.maximum, np.divide(除法)。maximum 从两个等长的数组 opn 和 close 中,逐元素比较并取出最大的那一个,组成一个新的数组,该数组也与 opn, close 等长。

如果要求下影线长度,则可以使用 minimum。


版权声明 本课程全部文字、图片、代码、习题等所有材料,除声明引用外,均由作者本人开发。所有草稿版本均通过第三方 git 服务进行管理,作为拥有版权的证明。未经书面作者授权,请勿引用。
本文写作时,少量代码及文本参考了通义灵码生成内容。


  1. https://zh.wikipedia.org/wiki/%E9%97%B0%E7%A7%92 

Numpy核心语法[5]

“日期和时间的处理从来都不简单。时区、夏令时、闰秒等问题让时间计算变得复杂。Numpy 提供了高效的日期时间处理工具,帮助我们轻松应对这些挑战。”


1. 日期和时间

一些第三方数据源传递给我们的行情数据,常常会用字符串形式,或者整数(从 unix epoch time 起)格式来表示行情的时间。比如,akshare 和 tushare 许多接口给出的行情数据就是字符串格式;而 QMT 很多时候,会将行情时间用整数表示。掌握这些格式与 Numpy 的日期时间格式转换、以及 Numpy 到 Python 对象的时间日期转换是非常有必要的。

但是在任何编程语言中,日期和时间的处理从来都不简单。

Info

很少有程序员/研究员了解这一点:日期和时间并不是一个数学上或者物理上的一个客观概念。时区的划分、夏令时本身就是一个政治和法律上的概念;一些地方曾经使用过夏令时,后来又取消了这种做法。其次,关于闰秒 [^闰秒] 的决定,也并不是有章可循的,它是由一个委员会开会来临时决定的。这种决定每年做一次。所有这些决定了我们无法通过一个简单的数学公式来计算时间及其变化,特别是在时区之间的转换。

关于时间,首先我们要了解有所谓的 timezone aware 时间和 timezone naive 时间。当我们说到晚上 8 时开会时,这个时间实际上默认地包含了时区的概念。如果这是一个跨国会议,但你在通知时不告诉与会方时区,就会导致其它人无法准时出席 -- 他们将会在自己时区的晚上 8 时上线。

如果一个时间对象不包含时区,它就是 timezone naive 的;否则,它是 timezone aware 的。但这只是针对时间对象(比如,Python 中的 datetime.datetime)才有的意义;日期对象(比如,Python 中的 datetime.date)是没有时区的。


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import pytz
import datetime

# 通过 DATETIME.NOW() 获得的时间没有时区信息
# 返回的是标准时间,即 UTC 时间,等同于调用 UTCNOW()
now = datetime.datetime.now()
print(f"now() without param: {now}, 时区信息{now.tzinfo}")

now = datetime.datetime.utcnow()
print(f"utcnow: {now}, 时区信息{now.tzinfo}")

# 构造 TIMEZONE 对象
cn_tz = pytz.timezone('Asia/Shanghai')
now = datetime.datetime.now(cn_tz)
print(f"现在时间{now}, 时区信息{now.tzinfo}")
print("现在日期:", now.date())

try:
    print(now.date().tzinfo)
except AttributeError:
    print("日期对象没有时区信息")

上述代码将依次输出:

1
2
3
4
5
now() 不带参数2024-05-19 11:03:41.550328, 时区信息 None
utcnow: 2024-05-19 11:03:41.550595, 时区信息 None
现在时间 2024-05-19 19:03:41.550865+08:00, 时区信息 Asia/Shanghai
现在日期2024-05-19
日期对象没有时区信息

不过,限于篇幅,我们对时间问题的介绍只能浅尝辄止。在这里,我们主要关注在 Numpy 中,日期/时间如何表示,它们彼此之间如何比较、转换,以及如何与 Python 对象进行比较和转换。

在 Numpy 中,日期/时间总是用一个 64 位整数(np.datetime64)来表示,此外,还关联了一个表示其单位(比如,纳秒、秒等)的元数据结构。np.datetime64是没有时区概念的。

1
2
3
tm = np.datetime64('1970-01-01T00:00:00')
print(tm)
print(tm.dtype)

这将显示为:

1
2
1970-01-01T00:00:00
datetime64[s]

这里的[s]就是我们前面所说的时间单位。其它常见单位还有[ms][us][ns]等等。

除了从字符串解释之外,我们还可以直接将 Python 对象转换成np.datetime64,反之亦然:

1
2
3
4
5
tm = np.datetimet64(datetime.datetime.now())
print(tm)

print(tm.item())
print(tm.astype(datetime.datetime))

下面我们来看看如何实现不同格式之间的批量转换。这在处理 akshare, tushare 或者 QMT 等第三方数据源提供的行情数据时,非常常见。

首先我们构造一个时间数组。顺便提一句,这里我们将使用np.timedelta64这个时间差分类型:

1
2
3
now = np.datetime64(datetime.datetime.now())
arr = np.array([now + np.timedelta64(i, 'm') for i in range(3)])
arr

输出结果如下:

1
2
3
4
array(['2024-05-19T12:57:47.349178', 
       '2024-05-19T12:58:47.349178',
       '2024-05-19T12:59:47.349178'], 
     dtype='datetime64[us]')

我们可以通过np.datetime64.astype()方法将时间数组转换为 Python 的时间对象:

1
2
3
4
time_arr = arr.astype(datetime.datetime)

# 转换后的数组,每个元素都是 TIMEZONE NAIVE 的 DATETIME 对象
print(type(time_arr[0]))

1
2
3
4
5
6
# !!! 技巧
# 如何把 NP.DATETIME64 数组转换为 PYTHON DATETIME.DATE 数组?
date_arr = arr.astype('datetime64[D]').astype(datetime.date)
# 或者 -- 两者的容器不一样
date_arr = arr.astype('datetime64[D]').tolist()
print(type(date_arr[0]))

这里的关键是,我们之前生成的arr数组,其元素类型为np.datetime64[us]。它到 Python datetime.date的转换将损失精度,所以 Numpy 要求我们显式地指定转换类型。

如何将以字符串表示的时间数组转换为 Numpy datetime64 对象数组呢?答案仍然是 astype() 方法。

1
2
3
4
5
6
# 将时间数组转换为字符串数组
str_arr_time = arr_time.astype(str)
print(str_arr_time)

# 再将字符串数组转换为 DATETIME64 数组,精度指定为 D
str_arr_time.astype('datetime64[D]')

显示结果为:

1
2
3
4
array(['2024-05-19T12:57:47.349178', 
       '2024-05-19T12:58:47.349178',
       '2024-05-19T12:59:47.349178'], 
       dtype='datetime64[us]')

1
2
3
4
array([
    '2024-05-19', 
    '2024-05-19'],               
    dtype='datetime64[D]')

最后,我们给一个 QMT 获取交易日历后的格式转换示例。在 QMT 中,我们通过get_trading_dates来获取交易日历,该函数返回的是一个整数数组,每个元素的数值,是从 unix epoch 以来的毫秒数。

我们可以通过以下方法对其进行转换:

1
2
3
4
import numpy as np

days = get_trading_dates('SH', start_time='', end_time='', count=10)
np.array(days, dtype='datetime64[ms]').astype(datetime.date)

QMT 官方没有直接给出交易日历转换方案,但给出åå了如何将 unix epoch 时间戳转换为 Python 时间对象(但仍以字符串表示):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import time

def conv_time(ct):
    # conv_time(1476374400000) --> '20161014000000.000'
    local_time = time.localtime(ct / 1000)
    data_head = time.strftime('%Y%m%d%H%M%S', local_time)
    data_secs = (ct - int(ct)) * 1000
    time_stamp = '%s.%03d' % (data_head, data_secs)
    return time_stamp

conv_time(1693152000000)

我们需要对每一个数组元素使用上述解析方法。官方方案的优点是不依赖任何三方库。不过,没有量化程序能离开 Numpy 库,所以,我们的方案并未增加第三方库的依赖。


2. 字符串操作

你的数据源、或者本地存储方案很可能使用 Numpy Structured Array 或者 Rec Array 返回证券列表。很显然,证券列表中一定会包括字符串,因为它一定会存在证券代码列和证券名称列。有一些还会返回证券的地域属性和其它属性,这也往往是字符串。

对证券列表,我们常常有以下查询操作:

  1. 获取在某个板块上市的股票列表,比如,北交所、科创板和创业板与主板的个股交易规则上有一些不同,因此,我们的策略很可能需要单独为这些板块构建。这就有了按板块过滤证券列表的需要。也可能我们要排除 ST,刚上市新股。这些都可以通过字符串操作来实现。
  2. 市场上有时候会出现魔幻的名字炒作。比如龙年炒龙字头(或者含龙的个股)、炒作“东方”、炒作“中”字头。作为量化人,参与这样的炒作固然不可取,但我们要拥有分析市场、看懂市场的能力。

Numpy 中的大多数字符串操作都封装在 numpy.char 这个包下面。它主要提供了一些用于格式化的操作(比如左右填充对齐、大小写转换等)、查找和替换操作。

下面的代码展示了如何从证券列表中过滤创业板:

1
2
import numpy as np
import numpy.char as nc

1
2
3
4
5
6
7
8
9
# 生成 STRUCTURED ARRAY, 字段有 SYMBOL, NAME, IPO DATE
arr = np.array([('600000.SH', '中国平安', '1997-08-19'),
                ('000001.SZ', '平安银行', '1997-08-19'),
                ('301301.SZ', '川宁生物', '2012-01-01')
                ], dtype=[('symbol', 'S10'), ('name', 'S10'), ('ipo_date', 'datetime64[D]')])

def get_cyb(arr):
    mask = np.char.startswith(arr["symbol"], b"30")
    return arr[mask]

Question

我们在查找创业板股票时,使用的是 b"30"来进行匹配。为何要用 b"30"而不是"30"?

注意第 11 行,我们要通过np.char.startswith()来使用startswith函数。任何一个 numpy array 对象都没有这个方法。

".SZ"是我们的数据源给股票编制的交易所代码。不同的数据源,可能使用不同的交易所代码。比如,聚宽数据源会使用.XSHG 表示上交所,.XSHE 表示深交所。现在,如果我们要将上述代码转换为聚宽的格式,应该如何操作?

1
2
3
4
5
# 生成 STRUCTURED ARRAY, 字段有 SYMBOL, NAME, IPO DATE
arr = np.array([('600000.SH', '中国平安', '1997-08-19'),
                ('000001.SZ', '平安银行', '1997-08-19'),
                ('301301.SZ', '川宁生物', '2012-01-01')
                ], dtype=[('symbol', 'U10'), ('name', 'U10'), ('ipo_date', 'datetime64[D]')])

1
2
3
4
5
6
7
8
9
def translate_exchange_code(arr):
    symbols = np.char.replace(arr["symbol"], ".SH", ".XSHG")
    print(symbols)
    symbols = np.char.replace(symbols, ".SZ", ".XSHE")

    arr["symbol"] = symbols
    return arr

translate_exchange_code(arr)

这一次,我们把 symbol 和 name 的定义改为 Unicode 型,以避免我们查找时,要输入像 b"30"这样的字面量。

但输出的结果可能让人意外,因为我们将得到这样的输出:

1
2
3
4
array([('600000.XSH', '中国平安', '1997-08-19'),
       ('000001.XSH', '平安银行', '1997-08-19'),
       ('301301.XSH', '川宁生物', '2012-01-01')],
      dtype=[('symbol', '<U10'), ('name', '<U10'), ('ipo_date', '<M8[D]')])

Question

发生了什么?我们得到了一堆以".XSH"结尾的 symbol,它们本应该是"600000.XSHG"这样的字符串。错在哪里,又该如何修改?

在上面的示例中,如果我们把替换字符串改为空字符串,就实现了删除操作。这里就不演示了。

char 模块还提供了字符串相等比较函数equal:


1
2
3
4
arr = array([('301301.SZ', '川宁生物', '2012-01-01')],
      dtype=[('symbol', '<U10'), ('name', '<U10'), ('ipo_date', '<M8[D]')])

arr[np.char.equal(arr["symbol"], "301301.SZ")]

在这个特殊的场景下,我们也可以直接使用以下语法:

1
arr[arr["symbol"] == "301301.SZ"]

Tip

np.char 下的函数很多,如何记忆?实际上,这些函数多数是 Python 中 str 的方法。如果你熟悉 Pandas,就会发现 Pandas 中也有同样的用法。因此,像upper, lower, strip这样的str函数,你可以直接拿过来用。

Numpy 中的字符串函数另一个比较常用的场景,就是执行格式化。你可以通过ljust, 'center', rjust在显示一个数组前,将它们的各列数据进行左右空格填充,这样,输出时就可以比较整齐。

Question

2024 年 5 月 10 日起,南京化纤走出 7 连板行情,短短 7 日,股价翻倍。市场上还有哪些名字中包含化纤的个股?它们的涨跌是否存在相关性或者跨周期相关性?


Numpy核心语法[4]

“随机数和采样是量化中的高频操作。通过 Numpy 的 random 模块,我们可以轻松生成符合正态分布的收益率数组,并利用 np.cumprod() 计算价格走势,快速模拟资产的夏普率与价格关系。”


1. 随机数和采样

随机数和采样是量化中的高频使用的操作。在造数据方面非常好用。我们在前面的示例中,已经使用过了 normal() 函数,它是来自 numpy.random 模块下的一个重要函数。借由这个函数,我们就能生成随机波动、但总体上来看又是上涨、下跌或者震荡的价格序列。

Tip

我们会在何时需要造价格序列?除了前面讲过的例子外,这里再举一例:我们想知道夏普为SS的资产,它的价格走势是怎么样的?价格走势与夏普的关系如何?要回答这个问题,我们只能使用“蒙”特卡洛方法,造出若干模拟数据,然后计算其夏普并绘图。此时我们一般选造一个符合正态分布的收益率数组,然后对它进行加权(此时即可算出夏普),最后通过 np.cumprod() 函数计算出价格走势,进行绘图。

我们通过一个例子来说明夏普与股价走势之间的关系:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import numpy as np
from empyrical import sharpe_ratio
import matplotlib.pyplot as plt

returns_ = np.random.normal(0, 0.02, size=100)
legend = []

for alpha in (-0.01, 0, 0.01):
    returns = returns_ + alpha
    prices = np.cumprod(returns + 1)
    sharpe = sharpe_ratio(returns)
    _ = plt.plot(prices)
    legend.append(f"{sharpe:.1f}")

1
2
lines = plt.gca().lines
plt.legend(lines, legend)

从绘制的图形可以看出,当 alpha 为 1%时,夏普率可达 8.2。国内优秀的基金经理可以在一年内,做到 2~3 左右的夏普率。大家可以调整 alpha 这个参数,看看 alpha 与夏普率的关系。

50%

1.1. The legacy: np.random module

迄今为止,我们在网上看到的多数关于 numpy random 的教程,都是使用的 np.random module 下面的函数。除了 normal 方法之外,random 包中还有以下函数:

函数 说明
randint(a,b,shape) 生成在区间 (a,b) 之间,形状为 shape 的随机整数数组。
rand(shape) 生成 shape 形状的随机数组,使用 [0,1) 区间的均匀分布来填充。
random(shape) 生成 shape 形状的随机数组,使用均匀分布填充
randn(d1, d2, ...) 生成 shape 形状的随机数组,使用正态分布来填充。
standard_normal(shape) 生成 shape 形状的随机数组,使用标准正态分布来填充。

函数 说明
normal(loc,scale,shape) 生成 shape 形状的随机数组,使用正态分布来填充,loc 是均值,scale 是标准差。
choice(a,size,replace,p) 从 a 中随机抽取 size 个元素,如果 replace=True, 则允许重复抽取,否则不允许重复抽取。p 表示概率,如果 p=None, 则表示每个元素等概率抽取。
shuffle(a) 将 a 中的元素随机打乱。
seed(seed) 设置随机数种子,如果 seed=None, 则表示使用系统时间作为随机数种子。

可以看出,numpy 为使用同一个功能,往往提供了多个方法。我们记忆这些方法,首先是看生成的随机数分布。最朴素的分布往往有最朴素的名字,比如,rand, randint 和 random 都用来生成均匀分布,而 normal, standard_normal 和 randn 用来生成正态分布。

除了均匀分布之外,Numpy 还提供了许多著名的分布的生成函数,比如 f 分布、gama 分布、hypergeometric(超几何分布),beta, weibull 等等。

在同一类别中,numpy 为什么还要提供多个函数呢?有一些是为了方便那些曾经使用其它知名库(比如 matlab) 的人而提供的。

randn 就是这样的例子,它是 matlab 中一个生成正态随机分布的函数,现在被 numpy 移植过来了。我们这里看到的另一个函数,rand 也是这样。而对应的 random,则是 Numpy 按自己的 API 风格定义的函数。

choice 方法在量化中有比较具体的应用。比如,我们可能想要从一个大的股票池中,随机抽取 10 只股票先进行一个小的试验,然后根据结果,再考虑抽取更多的股票。

seed 函数用来设置随机数生成器的种子。在进行单元测试,或者进行演示时(这两种情况下,我们都需要始终生成相同的随机数序列)非常有用。


1.2. New Style: default_rng

我们在上一节介绍了一些随机数生成函数,但没有介绍它的原理。Numpy 生成的随机数是伪随机数,它们是使用一个随机数生成器(RNG)来生成的。RNG 的输出是随机的,但是相同的输入总是会生成相同的输出。我们调用的每一个方法,实际上是在这个序列上的一个抽取动作(根据输入的 size/shape)。

在 numpy.random 模块中,存在一个全局的 RNG。在我们调用具体的随机函数时,实际上是通过这个全局的 RNG 来产生随机数的。而这个全局的 RNG,总会有人在它之上调用 seed 方法来初始化。这会产生一些问题,因为你不清楚何时、在何地、以哪个参数被人重置了 seed。

由于这个原因,现在已经不推荐直接使用 numpy.random 模块中的这些方法了。更好的方法是,为每一个具体地应用创建一个独立的 RNG,然后在这个对象上,调用相应的方法:

1
2
rng = np.random.default_rng(seed=123)
rng.random(size=10)

rng 是一个 Random Generator 对象,在初始化时,我们需要给它传入一个种子。如果省略,那么 Numpy 会使用系统时间作为种子。

rng 拥有大多数前一节中提到的方法,比如 normal, f, gamma 等;但从 matlab 中移植过来的方法不再出现在这个对象上。另外,randint 被 rng.integers 替代。


除此之外,default_rng 产生的随机数生成器对象,在算法上采用了 PCG64 算法,与之前版本采用的算法相比,它不仅能返回统计上更好的随机数,而且速度上也会快 4 倍。

Warning

在 numpy 中还存在一个 RandomState 类。它使用了较慢的梅森扭曲器生成伪随机数。现在,这个类已经过时,不再推荐使用。

1.3. 数据集平衡示例

我们已经介绍了 choice 的功能,现在我们来举一个例子,如何使用 choice 来平衡数据集。

在监督学习中,我们常常遇到数据不平衡的问题,比如,我们希望训练一个分类器,但是训练集的类别分布不均衡。我们可以通过 choice 方法对数据集进行 under sampling 或者 over sampling 来解决这个问题。

为了便于理解,我们先生成一个不平衡的训练数据集。这个数据集共有 3 列,其中前两列是特征(你可以想像成因子特征),第三列则是标签。

1
2
3
4
5
6
import pandas as pd
import numpy as np

rng = np.random.default_rng(seed=42)
x = rng.random((10,3))
x[:,-1] = rng.choice([0,1], len(x), p=[0.2, 0.8])

我们通过下面的方法对这个数据集进行可视化,以验证它确实是一个不平衡的数据集。

1
2
df = pd.DataFrame(x, columns=['factor1', 'factor2', 'label'])
df.label.value_counts().plot(kind='bar')

运行结果为:

50%

要在此基础上,得到一个新的平衡数据集,我们有两种思路,一种是 under sampling,即从多数类的数据中抽取部分数据,使得它与最小分类的数目相等;另一种是 over sampling,即从少数类的数据中复制部分数据,使得它与最大的类的数目相等。

下面的例子演示了如何进行 under sampling:

1
labels, counts = np.unique(x[:,-1], return_counts=True)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# 最小分类的标签
min_label = labels[np.argmin(counts)]

# 最小分类样本的数量,作为 UNDER SAMPLING 的样本数量
min_label_count = np.min(counts)

# 最小分类无须抽取,全部提取
results = [
    x[x[:,-1] == min_label]
]

# 对其它分类标签进行遍历,需要先剔除最小分类
for label in np.delete(labels, np.argmin(counts)):
    sampled = rng.choice(x[x[:,-1]== label], min_label_count)
    results.append(sampled)

np.concatenate(results)

这段代码先是找到最小分类及它的数量,然后遍历每个标签,再通过 rng.choice 对其它分类随机抽取最小分类的数量,最后把所有的子集拼接起来。

这段示例代码可用以多个标签的情况。如果要进行 over sampling,只要把其中的 min 换成 max 就可以了。

2. IO 操作

我们直接使用 Numpy 读写文件的场合并不多。提高 IO 读写性能一直都不是 Numpy 的重点,我们也只需要稍加了解即可。


2.1. 读写 CSV 文件

Numpy 可以从 CSV 格式的文本文件中读取数据,主要有以下方法:

api 描述
loadtxt 解析文本格式的表格数据
savetxt 将数据保存为文本文件
genfromtxt 同上,但允许数据中有缺失值,提供了更高级的用法
recfromtxt 是 genfromtxt 的快捷方式,自动推断为 record array
recfromcsv 同上,如果分隔符为逗号,无须额外指定

我们通过下面的示例简单演示一下各自的用法:

1
2
3
4
5
6
7
import io
import numpy 

buffer = io.StringIO("""1,2""")

# 默认情况下,LOADTXT 只能读取浮点数
numpy.loadtxt(buffer, delimiter=",")

这会输出数组array([1., 2.])

1
2
3
4
buffer = io.StringIO("""1,2,hello""")

# 通过指定 DTYPE 参数,可以读取其它类型
numpy.loadtxt(buffer, delimiter=",", dtype=[("age", "i4"), ("score", "f4"), ("name", "U8")])

这样我们将得到一个 Structured Array,其中第三列为字符串类型。如果我们不指定 dtype 参数,那么 loadtxt 将会解析失败。

1
2
3
4
buffer = io.StringIO("""
1,2,hello
""")
numpy.genfromtxt(buffer, delimiter=",")

这一次我们使用了 genfromtxt 来加载数据,但没有指定 dtype 参数,genfromtxt 会将非数字列解析为 nan。因此,这段代码将输出:`array([1., 2., nan])

现在,我们也给 genfromtxt 加上 dtype 参数:

1
2
3
buffer = io.StringIO("""1,2,hello""")

numpy.genfromtxt(buffer, delimiter=",", dtype=[("age", "i4"), ("score", "f4"), ("name", "U8")])

此时我们得到的结果是:array((1, 2., 'hello'), dtype=[('age', '<i4'), ('score', '<f4'), ('name', '<U8')])。注意它是 Structured Array。

recfromtxt 则不需要 dtype, 会自动推断数据类型。

1
2
3
buffer = io.StringIO("""1,2,hello""")

numpy.recfromtxt(buffer,delimiter=",")

这段代码输出为rec.array((1, 2, b'hello'), dtype=[('f0', '<i8'), ('f1', '<i8'), ('f2', 'S5')])。如果推断不准确,我们也可以自己加上 dtype 参数。

如果我们使用 recfromcsv,则连 delimiter 参数都可以省掉。

1
2
buffer = io.StringIO("""age,score,name1,2,hello""")
numpy.recfromcsv(buffer)

输出跟上一例的结果一样。

出于速度考虑,我们还可以使用其它库来解析 CSV 文件,再转换成为 numpy 数组。比如:

1
2
3
4
5
# 利用 CSV.READER() 来解析,比 NUMPY 快 8 倍
np.asarray(list(csv.reader()))

# 利用 PANDAS 来解析,比 NUMPY 快 22 倍
pd.read_csv(buffer).to_records()

2.2. 读写二进制文件

如果我们不需要与外界交换数据,数据都是自产自销型的,也可以使用二进制文件来保存数据。

使用 numpy.save 函数来将单个数组保存数据为二进制文件,使用 numpy.load 函数来读取 numpy.save 保存的数据。这样保存的文件,文件扩展名为.npy。


如果要保存多个数组,则可以使用 savez 命令。这样保存的文件,文件扩展名为.npz。如果有更复杂的需求,可以使用 Hdf5,pyarrow 等库来进行保存数据。

Numpy处理表格数据

一开始,Numpy 的数组只能存放同质的元素,即元素必须有相同的数据类型。但对表格类数据而言,它们往往是由一条条记录组成的,而这些记录,又是由不同数据类型的数据组成的。

如何做到在Numpy中也能处理大规模的表格数据呢?


1. Structured Array

为了满足这种需求,Numpy 扩展出一种名为 Structured Array 的数据格式。它是一种 一维数组,每一个元素都是一个命名元组。

我们可以这样声明一个 Structured Array:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import numpy as np
import datetime
dtypes = [
        ("frame", "O"),
        ("code", "O"),
        ("open", "f4"),
        ("high", "f4"),
        ("low", "f4"),
        ("close", "f4")
    ]
secs = np.array (
    [
        (datetime.date (2024, 3, 18), "600000", 8.9, 9.1, 8.8, 9),
        (datetime.date (2024, 3, 19), "600000", 8.9, 9.1, 8.8, 9),
    ], dtype = dtypes
)

在这个数据结构中,共有 6 个字段,它们的名字和类型通过 dtype 来定义。这是一个 List [Tuple] 类型。在初始化数据部分,它也是一个 List [Tuple]。


Warning

初学者很容易犯的一个错误,就是使用 List [List] 来初始化 Numpy Structured Array,而不是 List [Tuple] 类型。这会导致 Numpy 在构造数组时,对应不到正确的数据类型,报出一些很奇怪的错误。
比如,下面的初始化是错误的:

1
2
3
4
secs = np.array ([
    [datetime.date (2024, 3, 18), "600000", 8.9, 9.1, 8.8, 9],
    [datetime.date (2024, 3, 19), "600000", 8.9, 9.1, 8.8, 9]
], dtype=dtypes)
这段代码会报告一个难懂的 "Type Error: float () argument must be a string or ..."

我们使用上一节学过的 inspecting 方法来查看 secs 数组的一些特性:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
print (f"secs 的维度是 {secs.ndim}")
print (f"secs 的 shape 是 {secs.shape}")
print (f"secs 的 size 是 {secs.size}")
print (f"secs 的 length 是 {len (secs)}")

print (f"secs [0] 的类型是 {type (secs [0])}")
print (f"secs [0] 的维度是 {secs [0].ndim}")
print (f"secs [0] 的 shape 是 {secs [0].shape}")
print (f"secs [0] 的 size 是 {secs [0].size}")
print (f"secs [0] 的 length 是 {len (secs [0])}")

可以看出,secs 数组是 一维数组,它的 shape (2,) 也正是一维数组的 shape 的表示法。前一节还介绍过这几个属性的关系,大家可以自行验证下是否仍然得到满足。


Tip

这里 size 仍然等于 shape 各元素的取值之积。注意对 secs 而言,它的 size 与 length 是相等的,但对 secs [0] 而言,它的 size 和 length 是不相等的。我们在开发 Zillionare 量化框架时时,遇到过由此产生的一个 bug。

但 secs 的元素类型则是 numpy.void,它在本质上是一个 named tuple,所以,我们可以这样访问其中的任一字段:

1
2
3
4
print (secs [0]["frame"])

# 不使用列名(字段名),使用其序号也是可以的
print (secs [0][0])

我们还可以以列优先的顺序来访问其中的一个 “单元格”:

1
print (secs ["frame"][0])

对表格数据,遍历是很常见的操作,我们可以这样遍历:

1
2
for (frame, code, opn, high, low, close) in secs:
    print (frame, code, opn, high, low, close)

Numpy structured array 在这部分的语法要比 Pandas 的 DataFrame 易用许多。我们在后面介绍 Pandas 时,还会提及这一点。


Warning

修改 cell 值时,索引的先后不能互换:

1
2
3
4
5
6
7
8
    data = np.array ([("aaron", "label")], dtype=[("name", "O"), ("label", "O")])
    filter = data ["name"] == "aaron"

    new_label = "blogger"
    data ["label"][filter] = new_label

    # this won't change
    data [filter]["label"] = new_label
这里的最后一行,并不会生效。

2. 运算类

2.1. 比较和逻辑运算

我们在上一节介绍定位、查找时,已经接触到了数据比较,比如:arr > 1。它的结果将数组中的每一个元素都与 1 进行比较,并且返回一个布尔型的数组。

现在,我们要扩充比较的指令:

函数 描述
all 如果数组中的元素全为真,返回 True。可用以判断一组条件是否同时成立。
any 如果数组中至少有一个元素为真,则返回 True。用以判断一组条件是否至少有一个成立
isclose 判断两个数组中的元素是否一一近似相等,返回所有的比较结果
allclose 判断两个数组中的元素是否全部近似相等

函数 描述
equal 判断两个数组中的元素是否一一相等,返回所有的比较结果。
not_equal 一一判断两个数组中的元素是否不相等,返回所有的比较结果
isfinite 是否为数字且不为无限大
isnan 测试是否为非数字
isnat 测试对象是否不为时间类型
isneginf 测试对象是否为负无限大
isposinf 测试对象是否为正无限大

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# 开启多行输出模式
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

np.random.seed (78)
returns = np.random.normal (0, 0.03, size=4)
returns
# 判断是否全下跌
np.all (returns <= 0)
np.any (returns <= 0)

# 模拟一个起始价格为 8 元的价格序列
prices = np.cumprod (1+returns) * 8

# 对应的涨停价如下
buy_limit_prices = [8.03, 8.1, 8.1, 8.3]

# 判断是否涨停
np.isclose (prices, buy_limit_prices, atol=1e-2)

Tip

为什么需要存在判断近似相等的函数?这是因为,数字分为整型和浮点型。凡是带小数点的数字,都可以看成浮点型。许多浮点数不能精确表达,所以它们是不会相等的,只能比较两个浮点数的差值,如果差值的绝对值小于某个可以接受的小数,才能认为这两个数近似相等。

因此,如果我们拿到了个股的收盘价和涨停价,要判断此时个股有没有涨停,就只能用 isclose 来进行比较,而不能使用 equal。

参数 atol 表示绝对误差,表示两个浮点数之间的差值小于这个值,就可以认为这两个数近似相等。

除了判断一个数组中的元素要么都为 True,要么至少一个为 True 之外,有时候我们还希望进行模糊一点的判断,比如,如果过去 20 天中,超过 60% 的是收阳线,此时我们可以用 np.count_nonzero,或者 np.sum 来统计数组中为真的情况:

1
2
np.count_nonzero (returns > 0)
np.sum (returns > 0)

在上一节进行比较的示例中,我们都只使用了单个条件。如果我们要按多个条件的组合进行查找,就需要依靠逻辑运算来实现。

在 Numpy 中,逻辑运算既可以通过函数、也可以通过运算符来完成:


函数 运算符 描述 python 等价物
logical_and & 执行逻辑与操作 and
logical_or | 执行逻辑或操作 or
logical_not ~ 执行逻辑或操作 not
logical_xor '^' 执行逻辑异或操作 xor

Tip

如果你对编程语言不是特别熟悉,就会难以理解这里的布尔运算,但它们在量化中运用非常广泛,并且在后面讲 pandas 时,我们还会遇到

逻辑与 a&b 的含义是, 只有当条件 a 与 b 都为真时,表达式才成立 逻辑或 a|b 的含义是,a 与 b 之中,任何一个为真即成立 逻辑非~b 的含义是,如果 b 为真,则表达式不成立,反之则成立 逻辑异或 a ^ b 的含义是,只有两个不同时才为真。

逻辑运算有什么用呢?比如我们在选股时,有以下表格数据:

股票 pe mom
AAPL 30.5 0.1
GOOG 32.3 0.3
TSLA 900.1 0.5
MSFT 35.6 0.05

上述表格可以用 Numpy 的 Structured Array 来表示为:

1
2
3
4
5
6
tickers = np.array ([
    ("APPL", 30.5, 0.1),
    ("GOOG", 32.3, 0.3),
    ("TSLA", 900.1, 0.5),
    ("MSFT", 35.6, 0.05)
], dtype=[("ticker", "O"), ("pe", "f4"), ("mom", "f4")])

现在,我们要找出求 PE < 35, 动量 (mom) > 0.2 的记录,那么我们可以这样构建条件表达式:

1
(tickers ["pe"] < 35) & (tickers ["mom"] > 0.2)

Numpy 会把 pe 这一列的所有值跟 35 进行比较,然后再与 mom 与 0.2 比较的结果进行逻辑与运算,这相当于:

1
np.array ((1,1,0,0)) & np.array ((0, 1, 1, 0))

在 Numpy 中,True 与 1 的值在做逻辑运算时是相等的;0 与 False 也是。

如果不借助于 Numpy 的逻辑操作,我们就要用 Python 的逻辑操作。很不幸,这必须使用循环。如果计算量大,这将会比较耗时间。

Tip

这里解释下异或操作。它比较拧巴。如果两个操作数取值一样,那么结果为 False;否则为 True,非常不团结。

在量化中使用异或操作的例子仍然最可能来自于选股。比如,如果我们要求两个选股条件,只能有一个成立时,才买入;否则不买入,就可以使用异或运算。


Tip

在多个条件中,投资者为什么会想要只有一个条件成立?这可能是因为他们认为这两个条件可能互相冲突,或者他们想要在两种投资策略之间进行平衡。

2.2. 集合运算

在交易中,我们常常要执行调仓操作。做法一般是,选确定新的投资组合,然后与当前的投资组合进行比较,找出需要卖出的股票,以及需要买入的股票。这个操作,就是集合运算。在 Python 中,我们一般是通过 set 语法来实现。

在 Numpy 中,我们可以使用通过以下方法来实现集合运算:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import numpy as np

# 创建两个一维数组
x = np.array ([1, 2, 3, 4, 5])
y = np.array ([4, 5, 6, 7, 8])

# 计算交集
intersection = np.intersect1d (x, y)
print ("Intersection (交集):", intersection)

# 计算并集
union = np.union1d (x, y)
print ("Union (并集):", union)

diff = np.setdiff1d (x, y)
print ("x - y:", diff)


此外,我们还可能使用 in1d (a1, a2) 方法来判断 a1 中的元素是否都在 a2 中存在。比如,在调仓换股中,如果当前持仓都在买入计划中,则不需要执行调仓。

2.3. 数学运算和统计

Numpy 中数学相关的运算有线性代数运算(当然还有基本代数运算)、统计运算、金融指标运算等等。

2.3.1. 线性代数

线性代数在量化中有重要用途。比如,在现代资产组合理论(MPT)中,我们要计算资产组合收益率及协方差,都要使用矩阵乘法。大家可以参考 投资组合理论与实战 系列文章,下面是其中的一段代码:

1
2
3
...
cov = np.cov (port_returns.T)
port_vol = np.sqrt (np.dot (np.dot (weights, cov), weights.T))

矩阵乘法是线性代数中的一个核心概念,它涉及到两个矩阵的特定元素按照规则相乘并求和,以生成一个新的矩阵。具体来说,如果有一个矩阵 A 为 m×nm \times n 维,另一个矩阵 B 为 n×pn \times p 维,那么它们的乘积 C=ABC = AB 将会是一个 m×pm \times p 维的矩阵。乘法的规则是 A 的每一行与 B 的每一列对应元素相乘后求和。

下面通过一个具体的例子来说明矩阵乘法的过程:

假设我们有两个矩阵 A 和 B:


A=[23 14] A = \begin {bmatrix} 2 & 3 \ 1 & 4 \end {bmatrix} B=[12 31] B = \begin {bmatrix} 1 & 2 \ 3 & 1 \end {bmatrix} 要计算 AB,我们遵循以下步骤:

取 A 的第一行 (2,3)(2, 3) 与的第一列 (1,3)(1,3) 相乘并求和得到 C11=[2×1+3×3=11]C_{11} = [2\times1 + 3\times3 = 11]

同理,取 A 的第一行与 B 的第二列 (2,1)(2, 1) 相乘并求和得到 C12=[2×2+3×1=7]C_{12} = [2\times2 + 3\times1 = 7]

取 A 的第二行 (1,4)(1, 4) 与 B 的第一列相乘并求和得到 C21=[1×1+4×3=13]C_{21} = [1\times1 + 4\times3 = 13]

取 A 的第二行与 B 的第二列相乘并求和得到 C22=[1×2+4×1=5]C_{22} = [1\times2 + 4\times1 = 5]

因此,矩阵 C = AB 为:

C=[117136 ] C = \begin {bmatrix} 11 & 7 \\ 13 & 6 \ \end {bmatrix}

与代数运算不同,矩阵乘法不满足交换律,即一般情况下 ABBAAB \neq BA

在 Numpy 中,我们可以使用 np.dot () 函数来计算矩阵乘法。


上述示例使用 numpy 来表示,即为:

1
2
3
4
A = np.array ([[2,3],[1,4]])
B = np.array ([[1,2],[3,1]])

np.dot (A, B)

最终我们将得到与矩阵 C 相同的结果。

除此之外,矩阵逆运算 (np.linalg.inv) 在计算最优投资组合权重时,用于求解方程组,特征值和特征向量 (np.linalg.eig, np.linalg.svd) 在分析资产回报率的主成分,进行风险分解时使用。

2.3.2. 统计运算

常用的统计运算包括:

函数 描述
np.mean 计算平均值
np.median 计算中位数
np.std 计算标准差
np.var 计算方差
np.min 计算最小值
np.max 计算最大值
np.percentile 用于计算历史数据的分位点
np.quantile 用于计算历史数据的分位数,此函数与 percentile 功能相同
np.corr 用于计算两个变量之间的相关性

np.percentile 与 np.quantile 功能相同,都是用于计算分位数。


两者在参数上略有区别。当我们对同一数组,给 quantile 传入分位点 0.25 时,如果给 percentile 传入分位点 25 时,两者的结果将完全一样。也就是后者要乘以 100。在量化交易中,quantile 用得可能会多一些。

Tip

在 pandas 中存在 quantile 函数,但没有 percentile 函数。

np.percentile(或者 np.quantile)的常见应用是计算 25%, 50% 和 75% 的分位数。用来绘制箱线图(Boxplot)。

此外,我们也常用它来选择自适应参数。比如,在 RSI 的应用中,一般推荐是低于 20(或者 30)作为超卖,此时建议买入;推荐是高于 80(或者 70)作为超买,此时建议卖出。但稍微进行一些统计分析,你就会发现这些阈值并不是固定的。如果我们以过去一段时间的 RSI 作为统计,找出它的 95% 分位作为卖点,15% 作为买点,往往能得到更好的结果。

2.3.3. 量化指标的计算

有一些常用的量化指标的计算,也可以使用 Numpy 来完成,比如,计算移动平均线,就可以使用 Numpy 提供的 convolve 函数。

1
2
3
4
5
import numpy as np
def moving_average (data, window_size):
    return np.convolve(data,
                       np.ones(window_size)/window_size, 
                       'valid')

当然,很多人习惯使用 talib,或者 pandas 的 rolling 函数来进行计算。convolve(卷积)是神经网络 CNN 的核心,正是这个原因,我们这里提一下。

np.convolve 的第二个参数,就是卷积核。这里我们是实现的是简单移动平均,所以,卷积核就是一个由相同的数值组成的数组,它们的长度就是窗口大小,它们的和为 1。

如果我们把卷积核替换成其它值,还可以实现 WMA 等指标。从信号处理的角度看,移动平均是信号平滑的一种,使用不同的卷积核,就可以实现不同的平滑效果。

在量化中,还有一类计算,这里也提一下,就是多项式回归。比如,某两支股票近期都呈上升趋势,我们想知道哪一支涨得更好?这时候我们就可以进行多项式回归,将其拟合成一条直线,再比较它们的斜率。

下面的代码演示了如何使用 Numpy 进行多项式回归。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import numpy as np
import matplotlib.pyplot as plt

returns = np.random.normal (0, 0.02, size=100)
alpha = 0.01
close = np.cumprod (1 + returns + alpha)

a, b = np.polyfit (np.arange (100), close, deg=1)

# 使用 a, b 构建回归线的 y 值
regression_line = a * np.arange (100) + b

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
# 绘制原始的 close 曲线
plt.figure (figsize=(10, 6))
plt.plot (close, label='Close Price', color='blue')

# 绘制回归线
plt.plot (regression_line, label='Regression Line', color='red', linestyle='--')

# 添加图例、标题和坐标轴标签
plt.title ('Stock Close Price vs Regression Line')
plt.xlabel ('Time Period')
plt.ylabel ('Price')
plt.legend ()

# 显示图表
plt.grid (True)
plt.show ()

这将生成下图:


题图: Photo by Steve Harvey on Unsplash

Numpy核心语法[1]

1. 基本数据结构

NumPy 的核心数据结构是 ndarray(即 n-dimensional array,多维数组)数据结构。这是一个多维度、同质并且大小固定的数组对象。

为了表达记录类型的数据,Numpy又拓展出名为Structured Array的数据结构。


它用一个 void 类型的元组来表示一条记录,从而使得 numpy 也可以用来表达记录型的数据。因此,在 Numpy 中,实际上跟数组有关的数据类型主要是两种。

1. 基本数据结构

前一种数组格式广为人知,我们将以它为例介绍多数 Numpy 操作。而后一种数据格式,在量化中也常常用到,比如,通过聚宽[1]的jqdatasdk获得的行情数据,就允许返回这种数据类型,与 DataFrame 相比,在存取上有不少简便之处。我们将在后面专门用一个章节来介绍。

在使用 Numpy 之前,我们要先安装和导入 Numpy 库:

1
2
# 安装 NUMPY
pip install numpy

一般地,我们通过别名np来导入和使用 numpy:

1
import numpy as np

为了在 Notebook 中运行这些示例时,能更加醒目地显示结果,我们首先定义一个 cprint 函数,它将原样输出提示信息,但对变量使用红色字体来输出,以示区别:


1
2
3
4
5
6
7
8
from termcolor import colored

def cprint(formatter: str, *args):
    colorful = [colored(f"{item}", 'red') for item in args]
    print(formatter.format(*colorful))

# 测试一下 CPRINT
cprint("这是提示信息,后接红色字体输出的变量值:{}", "hello!")

接下来,我们将介绍基本的增删改查操作。

1.1. 创建数组

1.1.1. 通过 Python List 创建

我们可以通过np.array的语法来创建一个简单的数组,在这个语法中,我们可以提供 Python 列表,或者任何具有 Iterable 接口的对象,比如元组。

1
2
arr = np.array([1, 2, 3])
cprint("create a simple numpy array: {}", arr)
1.1.2. 预置特殊数组

很多时候,我们希望 Numpy 为我们创建一些具有特殊值的数组。Numpy 也的确提供了这样的支持,比如:


函数 描述
zeros
zeros_like
创建全 0 的数。zeros_like 接受另一个数组,并生成相同形状和数据类型的 zeros 数组。常用于初始化。以下*_like 类推。
ones
ones_like
创建全 1 的数组
full
full_like
创建一个所有元素都填充为n的数组
empty
empty_like
创建一个空数组
eye
identity
创建单位矩阵
random.random 创建一个随机数组
random.normal 创建一个符合正态分布的随机数组
random.dirichlet 创建一个符合狄利克雷分布的随机数组
arange 创建一个递增数组
linspace 创建一个线性增长数组。与 arange 的区别在于,此方法默认生成全闭区间数组。并且,它的元素之间的间隔可以为浮点数。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# 创建特殊类型的数组
cprint("全 0 数组:\n{}", np.zeros(3))
cprint("全 1 数组:\n{}", np.ones((2, 3)))
cprint("单位矩阵:\n{}", np.eye(3))
cprint("由数字 5 填充的矩阵:\n{}", np.full((3,2), 5))

cprint("空矩阵:\n{}", np.empty((2, 3)))
cprint("随机矩阵:\n{}",np.random.random(10))
cprint("正态分布的数组:\n{}",np.random.normal(10))
cprint("狄利克雷分布的数组:\n{}",np.random.dirichlet(np.ones(10)))
cprint("顺序增长的数组:\n{}", np.arange(10))
cprint("线性增长数组:\n{}", np.linspace(0, 2, 9))

Warning

尽管 empty 函数的名字暗示它应该生成一个空数组,但实际上生成的数组,每个元素都是有值的,只不过这些值既不是 np.nan,也不是 None,而是随机值。我们在使用 empty 生成的数组之前,一定要对它进行初始化,处理掉这些随机值。

生成正态分布数组很有用。我们在做一些研究时,常常需要生成满足某种条件的价格序列,再进一步研究和比较它的特性。

比如,如果我们想研究上升趋势和下降趋势下的某些指标,就需要有能力先构建出符合趋势的价格序列。下面的例子就演示了如何生成这样的序列,并且绘制图形:

1
2
import numpy as np
import matplotlib.pyplot as plt

1
2
3
4
5
6
7
8
9
returns = np.random.normal(0, 0.02, size=100)

fig, axes = plt.subplots(1, 3, figsize=(12,4))
c0 = np.random.randint(5, 50)

for i, alpha in enumerate((-0.01, 0, 0.01)):
    r = returns + alpha
    close = np.cumprod(1 + r) * c0
    axes[i].plot(close)

绘制的图形如下:


示例中还提到了 Dirichlet(狄利克雷)分布数组。这个数组具有这样的特点,它的所有元素加起来会等于 1。比如,在现代投资组合理论中的有效前沿优化中,我们首先需要初始化各个资产的权重(随机值),并且满足资产权重之和等于 1 的约束(显然!),此时我们就可以使用 Dirichlet[2] 分布。


1.1.3. 通过已有数组转换

我们还可以从已有的数组中,通过复制、切片、重复等方法,创建新的数组:

1
2
3
4
5
6
7
8
# 复制一个数组
cprint("通过 np.copy 创建:{}", np.copy(np.arange(5)))

# 复制数组的另一种方法
cprint("通过 arr.copy: {}", np.arange(5).copy())

# 使用切片,提取原数组的一部分
cprint("通过切片:{}", np.arange(5)[:2])

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# 合并两个数组
arr = np.concatenate((np.arange(3), np.arange(2)))
cprint("通过 concatenate 合并:{}", arr)

# 重复一个数组
arr = np.repeat(np.arange(3), 2)
cprint("通过 repeat 重复原数组:{}", arr)

# 重复一个数组,注意与 NP.REPEAT 的差异
# NP.TILE 的语义类似于 PYTHON 的 LIST 乘法
arr = np.tile(np.arange(3), 2)
cprint("通过 tile 重复原数组:{}", arr)

Question

np.copy 与 arr.copy 有何不同?在 Numpy 中还有哪些类似函数对,有何规律?


注意在 concatenate 函数中,axis 的作用:

1
2
3
4
5
6
arr = np.arange(6).reshape((3,2))

# 在 ROW 方向上拼接,相当于增加行,默认行为
cprint("按 axis=0 拼接:\n{}", np.concatenate((arr, arr), axis=0))
# 在 COL 方向上拼接,相当于扩展列
cprint("按 axis=1 拼接:\n{}", np.concatenate((arr, arr), axis=1))

1.2. 增加/删除和修改元素

Numpy 数组是固定大小的,一般我们不推荐频繁地往数组中增加或者删除元素。


但如果确实有这种需求,我们可以使用下面的方法来实现增加或者删除:

函数 使用说明
append values添加到arr的末尾。
insert obj(可以是下标、slicing)指定的位置处,插入数值value(可以是标量,也可以是数组)
delete 删除指定下标处的元素

示例如下:

1
2
3
arr = np.arange(6).reshape((3,2))
np.append(arr, [[7,8]], axis=0)
cprint("指定在行的方向上操作、n{}", arr)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
arr = np.arange(6).reshape((3,2))
arr = np.insert(arr.reshape((3,2)), 1, -10)
cprint("不指定 axis,数组被扁平化:\n{}", arr)

arr = np.arange(6).reshape((3,2))
arr = np.insert(arr, 1, (-10, -10), axis=0)
cprint("np.insert:\n{}", arr)

arr = np.delete(arr, [1], axis=1)
cprint("deleting col 1:\n{}", arr)

Tip

请一定运行一下这里的代码,特别是关于 insert 的部分,了解所谓的扁平化是怎么回事。

有时候我们需要修改个别元素的值,应该这样操作:


1
2
3
arr = np.arange(6).reshape(2,3)

arr[0,2] = 3

这里涉及到如何定位一个数组元素的问题,也正是我们下一节的内容。

1.3. 定位、读取和搜索

1.3.1. 索引和切片

Numpy 中索引和切片语法大致类似于 Python,主要区别在于对多维数组的支持:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
arr = np.arange(6).reshape((3,2))
cprint("原始数组:\n{}", arr)

# 切片语法
cprint("按行切片:{}", arr[1, :])
cprint("按列切片:{}", arr[:, -1])
cprint("逆排数组:\n {}", arr[: : -1])

# FANCY INDEXING
cprint("fancy index: 使用下标数组:\n {}", arr[[2, 1, 0]])

上述切片语法在 Python 中也存在,但只能支持到一维,因此,对下面的 Python 数组,类似操作会出错:


1
2
3
arr = np.arange(6).reshape((3,2)).tolist()

arr[1, :]

提示 list indices must be integers or slices, not tuple。

1.3.2. 查找、筛选和替换

在上一节中,我们是通过索引来定位一个数组元素。但很多时候,我们得先通过条件运算把符合要求的索引找出来。这一节将介绍相关方法。

函数 使用说明
np.searchsorted 在有序数组中搜索指定的数值,返回索引。
np.nonzero 返回非零元素的索引,用以查找数组中满足条件的元素。
np.flatnonzero 同 nonzero,但返回输入数组的展平版本中非零的索引。
np.argwere 返回满足条件的元素的索引,相当于 nonzero 的转置版本
np.argmin 返回数组中最小元素的索引(注意不是返回满足条件的最小索引)
np.argmax 返回数组中最大元素的索引
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# 查找
arr = [0, 2, 2, 2, 3]
pos = np.searchsorted(arr, 2, 'right')
cprint("在数组 {} 中寻找等于 2 的位置,返回 {}, 数值是 {}", 
        arr, pos, arr[pos - 1])

arr = np.arange(6).reshape((2, 3))
cprint("arr[arr > 1]: {}", arr[arr > 1])

# NONZERO 的用法
mask = np.nonzero(arr > 1)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
cprint("nonzero 返回结果是:{}", mask)
cprint("筛选后的数组是:{}", arr[mask])

# ARGWHERE 的用法
mask = np.argwhere(arr > 1)
cprint("argwere 返回的结果是:{}", mask)

# 多维数组不能直接使用 ARGWHERE 结果来筛选
# 下面的语句不能得到正确结果,一般会出现 INDEXERROR
arr[mask]

# 但对一维数组筛选我们可以用:
arr = np.arange(6)
mask = np.argwhere(arr > 1)
arr[mask.flatten()[0]]

# 寻找最大值的索引
arr = [1, 2, 2, 1, 0]
cprint("最大值索引是:{}", np.argmax(arr))

使用 searchsorted 要注意,数组本身一定是有序的,不然不会得出正确结果。

第 10 行到第 21 行代码,显示了如何查找一个数组中符合条件的数据,并且返回它的索引。

argwhere 返回值相当于 nonzero 的转置,在多维数组的情况下,它不能直接用作数组的索引。请自行对比 nonzero 与 argwhere 的用法。

在量化中,有很多情况需要实现筛选功能。比如,在计算上下影线时,我们是用公式(highmax(open,close))/(highlow)(high - max(open, close))/(high - low)来进行计算的。


如果我们要一次性地计算过去 n 个周期的所有上影线,并且不使用循环的话,那么我们就要使用 np.where, np.select 等筛选功能。

下面的例子显示了如何使用 np.select 来计算上影线:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import pandas as pd
import numpy as np

bars = pd.DataFrame({
    "open": [10, 10.2, 10.1],
    "high": [11, 10.5, 9.3],
    "low": [9.8, 9.8, 9.25],
    "close": [10.1, 10.2, 10.05]
})

max_oc = np.select([bars.close > bars.open, 
                    bars.close <= bars.open], 
                    [bars.close, bars.open])
print(max_oc)

shadow = (bars.high - max_oc)/(bars.high - bars.low)
print(shadow)

np.where 是与 np.select 相近的一个函数,不过它只接受一个条件。

1
2
arr = np.arange(6)
cprint("np.where: {}", np.where(arr > 3, 3, arr))

这段代码实现了将 3 以上的数字截断为 3 的功能。


这种功能被称为 clip,在因子预处理中是非常常用的一个技巧,用来处理异常值 (outlier)。

但它没有办法实现两端截断。此时,但 np.select 能做到,这是 np.where 与 np.select 的主要区别:

1
2
arr = np.arange(6)
cprint("np.select: {}", np.select([arr<2, arr>4], [2, 4], arr))
其结果是,生成的数组,小于 2 的被替换成 2,大于 4 的被替换成 4,其它的保持不变。

1.4. 审视 (inspecting) 数组

当我们调用其它人的库时,往往需要与它们交换数据。这时就可能出现数据格式不兼容的问题。为了有能力进行查错,我们必须掌握查看 Numpy 数组特性的一些方法。

我们先如下生成一个简单的数组,再查看它的各种特性:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
arr = np.ones((3,2))
cprint("dtype is: {}", arr.dtype)
cprint("shape is: {}", arr.shape)
cprint("ndim is: {}", arr.ndim)
cprint("size is: {}", arr.size)
cprint("'len' is also available: {}", len(arr))

# DTYPE
dt = np.dtype('>i4')
cprint("byteorder is: {}", dt.byteorder)
cprint("name of the type is: {}", dt.name)
cprint('is ">i4" a np.int32?: {}', dt.type is np.int32)

1
2
3
4
5
# 复杂的 DTYPE
complex = np.dtype([('name', 'U8'), ('score', 'f4')])
arr = np.array([('Aaron', 85), ('Zoe', 90)], dtype=complex)
cprint("A structured Array: {}", arr)
cprint("Dtype of structured array: {}", arr.dtype)

正如 Python 对象都有自己的数据类型一样,Numpy 数组也有自己的数据类型。我们可以通过arr.dtype来查看数组的数据类型。

从第 3 行到第 6 行,我们分别输出了数组的 shape, ndim, size 和 len 等属性。ndim 告诉我们数组的维度。shape 告诉我们每个维度的 size 是多少。shape 本身是一个 tuple, 这个 tuple 的 size,也等于 ndim。

size 在不带参数时,返回的是 shape 各元素取值的乘积。len 返回的是第一维的长度。

2. 数组操作

我们在前面的例子中,已经看到过一些引起数组形状改变的例子。比如,要生成一个3×23×2的数组,我们先用 np.arange(6) 来生成一个一维数组,再将它的形状改变为 (2, 3)。

另一个例子是使用 np.concatenate,从而改变了数组的行或者列。

2.1. 升维

我们可以通过 reshape, hstack, vstack 来改变数组的维度:


1
2
3
4
5
6
7
8
9
cprint("increase ndim with reshape:\n{}", 
        np.arange(6).reshape((3,2)))

# 将两个一维数组,堆叠为 2*3 的二维数组
cprint("createing from stack: {}", 
        np.vstack((np.arange(3), np.arange(4,7))))

# 将两个 (3,1)数组,堆叠为(3,2)数组
np.hstack((np.array([[1],[2],[3]]), np.array([[4], [5], [6]])))

2.2. 降维

通过 ravel, flatten, reshape, *split 等操作对数组进行降维。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
cprint("ravel: {}", arr.ravel())

cprint("flatten: {}", arr.flatten())

# RESHAPE 也可以用做扁平化
cprint("flatten by reshape: {}", arr.reshape(-1,))

# 使用 HSPLIT, VSPLIT 进行降维
x = np.arange(6).reshape((3, 2))
cprint("split:\n{}", np.hsplit(x, 2))

# RAVEL 与 FLATTEN 的区别:RAVEL 可以操作 PYTHON 的 LIST
np.ravel([[1,2,3],[4, 5, 6]])

这里一共介绍了 4 种方法。ravel 与 flatten 用法比较接近。ravel 的行为与 flatten 类似,只不过 ravel 是 np 的一个函数,可作用于 ArrayLike 的数组。


通过 reshape 来进行扁平化也是常用操作。此外,还介绍了 vsplit, hsplit 函数,它们的作用刚好与 vstack,hstack 相反。

2.3. 转置

此外,对数组进行转置也是此类例子中的一个。比如,在前面我们提到,np.argwhere 的结果,实际上是 np.nonzero 的转置,我们来验证一下:

1
2
3
4
5
x = np.arange(6).reshape(2,3)
cprint("argwhere: {}", np.argwhere(x > 1))

# 我们再来看 NP.NONZERO 的转置
cprint("nonzero: {}", np.array(np.nonzero(x > 1)).T)

两次输出结果完全一样。在这里,我们是通过.T来实现的转置,它是一个语法糖,正式的函数是transpose


当然,由于 reshape 函数极其强大,我们也可以使用它来完成转置:

1
2
3
cprint("transposing array from \n{} to \n{}", 
    np.arange(6).reshape((2,3)),
    np.arange(6).reshape((3,2)))

狄利克雷,德国数学家。他对数论、傅里叶级数理论和其他数学分析学领域有杰出贡献,并被认为是最早给出现代函数定义的数学家之一和解析数论创始人之一。Dirichlet 数组可作为 MPT 求解中的初始值。

比Deepseek还要Deep!起底GBDT做回归预测的秘密

决策树是机器学习中一类重要的算法。它本质是这样一种算法,即将由程序hard-coded的各种if-else逻辑,改写成为可以通过数据训练得到的模型,而该模型在效果上等价于硬编码的if-else逻辑。

1
2
3
4
5
for 有房, 年薪 in [("有", "40万"), ("有", "20万"), ("无", "100万")]:
    if 有房 == "有" and 年薪 > "30万":
        print("见家长!")
    else:
        print("下次一定")

这样做的好处是,大大增强了算法的普适性:只要有标注数据,无须编码,都可以转换成为对应的决策树模型,条件越复杂,这种优越性就表现的越明显。此外,在决策树的训练过程中,也自然地考虑了数据分布的统计特征、加入了容错(只要数据标注是正确的)。

单细胞生物: 决策树

比如,假如我是霸总的助理,要根据他的生活习惯来安排明天是否工作。我收集到过往的数据如下:

1
2
3
4
5
6
7
8
data = {
    '天气': ['晴', '晴', '晴', '晴', '阴', '阴', '雨', '雨'],
    '气温': ['高温', '高温', '舒适', '凉爽', '凉爽', '凉爽', '凉爽', '凉爽'],
    '宜工作': [0, 0, 1, 1, 1, 1, 0, 0],
}

df = pd.DataFrame(data)
df
天气 气温 宜工作
0 高温 0
1 高温 0
2 舒适 1
3 凉爽 1
4 凉爽 1
5 凉爽 1
6 凉爽 0
7 凉爽 0

我们就可以用决策树来训练一个模型,从而为他安排明天的出差。如果哪一天他与某个女艺人热恋了,这样会新增一个判断条件,如果头一天晚上学了英语,第二天就不工作了,这样我们就只需要改数据就行了。

1
2
3
4
5
6
7
8
9
data = {
    '天气': ['晴', '晴', '晴', '晴', '阴', '阴', '雨', '雨'],
    '气温': ['高温', '高温', '舒适', '凉爽', '凉爽', '凉爽', '凉爽', '凉爽'],
    '学英语':[0, 1, 0, 0, 1, 0, 0, 1],
    '宜工作': [0, 0, 1, 1, 1, 1, 0, 0],
}

df = pd.DataFrame(data)
df

下面这个决策树模型简单是简单了点,不过,它涉及到了决策树模型构建的全部过程:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import numpy as np
import pandas as pd
from sklearn.tree import DecisionTreeClassifier, plot_tree
import matplotlib.pyplot as plt

# 创建示例数据
data = {
    '天气': ['晴', '晴', '晴', '晴', '阴', '阴', '雨', '雨'],
    '气温': ['高温', '高温', '舒适', '凉爽', '凉爽', '凉爽', '凉爽', '凉爽'],
    '学英语':[0, 0, 0, 0, 1, 0, 0, 1],
    '宜工作': [0, 0, 1, 1, 0, 1, 0, 0],
}

df = pd.DataFrame(data)
df

# 将分类变量转换为数值
df['天气'] = df['天气'].map({'晴': 0, '阴': 1, '雨': 2})
df['气温'] = df['气温'].map({'高温': 0, '舒适': 1, '凉爽': 2})

X = df[['天气', '气温', '学英语']]
y = df['宜工作']

# 创建并训练决策树模型
clf = DecisionTreeClassifier()
clf.fit(X, y)

# 可视化决策树
plt.figure(figsize=(12, 8))
plot_tree(clf, filled=True, feature_names=['天气', '气温', '学英语'], class_names=['诸事不宜', '宜工作'], fontsize=10)
plt.title('霸总工作否?')
plt.show()

# 预测第二天是否工作
weather = "晴"
temp = "高温"
dating=0

sample = pd.DataFrame([(weather, temp, dating)], columns=["天气", "气温", "学英语"])
sample['天气'] = sample['天气'].map({'晴': 0, '阴': 1, '雨': 2})
sample['气温'] = sample['气温'].map({'高温': 0, '舒适': 1, '凉爽': 2})

prediction = clf.predict(sample)
dating_desc = "没学英语" if dating == 0 else "昨晚学了英语"
if prediction[0] == 1:
    print(weather, temp, dating_desc, "宜工作")
else:
    print(weather, temp, dating_desc, "诸事不宜")

增加

Numpy核心语法[3]

在不同的库之间交换数据,常常会遇到格式问题。比如,我们从第三方数据源拿到的行情数据,它们用的时间字段常常会是字符串。有一些库在存储行情时,对 OHLC 这些字段进行了优化,使用了 4 个字节的浮点数,但如果要传给 talib 进行指标计算,就必须先转换成 8 个字节的浮点数,等等,这就有了类型转换的需求。


此外,我们还会遇到需要将 numpy 数据类型转换为 python 内置类型,比如,将 numpy.float64 转换为 float 的情况。

1. 类型转换和 Typing

1.1. Numpy 内部类型转换

Numpy 内部类型转换,我们只需要使用 astype

1
2
3
4
5
6
7
8
x = np.array (['2023-04-01', '2023-04-02', '2023-04-03'])
print (x.astype (dtype='datetime64[D]'))

x = np.array (['2014', '2015'])
print (x.astype (np.int32))

x = np.array ([2014, 2015])
print (x.astype (np.str_))

Tip

如何将 boolean array 转换成整数类型,特别是,将 True 转为 1,False 转为 - 1? 在涉及到阴阳线的相关计算中,我们常常需要将 open > close 这样的条件转换为符号 1 和 - 1,以方便后续计算。这个转换可以用:

1
2
3
>>> x = np.array ([True, False])
>>> x * 2 - 1
... array ([ 1, -1])

1.2. Numpy 类型与 Python 内置类型转换

如果我们要将 Numpy 数组转换成 Python 数组,可以使用 tolist 函数。

1
2
x = np.array ([1, 2, 3])
print (x.tolist ())

我们通过 item () 函数,将 Numpy 数组中的元素转换成 Python 内置类型。

1
2
3
x = np.array (['2023-04-01', '2023-04-02'])
y = x.astype ('M8[s]')
y [0].item ()

Warning

一个容易忽略的事实是,当我们从 Numpy 数组中取出一个标量时,我们都应该把它转换成为 Python 对象后再使用。否则,会发生一些隐藏的错误,比如下面的例子:

1
2
3
4
5
6
import json
x = np.arange (5)
print (json.dumps ([0]))
print (x [0])

json.dumps ([x [0]])

Warning

这里最后一行会出错。提示 type int64 is not JSON serializable。把最后一行换成 json.dumps ([x [0].item ()]) 则可以正常执行。

1.3. Typing

从 Python 3.1 起,就开始引入类型注解 (type annotation),到 Python 3.8,基本上形成了完整的类型注解体系。我们经常看到函数的参数类型注解,比如,下面的代码:

1
2
3
from typing import List
def add (a: List [int], b: int) -> List [int]:
    return [i + b for i in a]

从此,Python 代码也就有了静态类型检查支持。

NumPy 的 Typing 模块提供了一系列类型别名(type aliases)和协议(protocols),使得开发者能够在类型注解中更精确地表达 NumPy 数组的类型信息。这有助于静态分析工具、IDE 以及类型检查器提供更准确的代码补全、类型检查和错误提示。

这个模块提供的主要类型是 ArrayLike, NDArray 和 DType。

1
2
3
import numpy
from numpy.typing import ArrayLike, NDArray, DTypeLike
import numpy as np

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def calculate_mean (data: ArrayLike) -> float:
    """计算输入数据的平均值,数据可以是任何 ArrayLike 类型"""
    return np.mean (data)

def add_one_to_array (arr: NDArray [np.float64]) -> NDArray [np.float64]:
    """向一个浮点数数组的每个元素加 1,要求输入和输出都是 np.float64 类型的数组"""
    return arr + 1

def convert_to_int (arr: NDArray, dtype: DTypeLike) -> NDArray:
    """将数组转换为指定的数据类型"""
    return arr.astype (dtype)

如果你是在像 vscode 这样的 IDE 中使用上述函数,你就可以看到函数的类型提示。如果传入的参数类型不对,还能在编辑期间,就得到错误提示。

2. 拓展阅读

2.1. Numpy 的数据类型

在 Numpy 中,有以下常见数据类型。每一个数字类型都有一个别名。在需要传入 dtype 参数的地方,一般两者都可以使用。另外,别名在字符串类型、时间和日期类型上,支持得更好。比如,'S5' 是 Ascii 码字符串别外,它除了指定数据类型之外,还指定了字符串长度。datetime64 [S] 除了表明数据是时间日期类型之外,还表明它的精度到秒。


类型 别名
np.int8 i1
np.int16 i2
np.int32 i4
np.int64 i8
np.uint8 u1
np.uint16 u2
np.uint32 u4
np.uint64 u8
np.float16 f2
np.float32 f4,还可指定结尾方式,比如 'f4' 表示大端字节序。其它 float 类型同。
np.float64 f8
np.float128 f16
np.bool_ b1
np.str_ U (后接长度,例如 U10)
np.bytes_ S (后接长度,例如 S5)
np.datetime64 M8 和 M8[D] M8[h] M8[m] M8[s],也可写作 datetime64[D] 等
np.timedelta64 m8 和 m8[D] m8[h] m8[m] m8[s] 等

3. 处理包含 np.nan 的数据

在量化分析中,我们常常会遇到数据为 np.nan 情况。比如,某公司上年利润为负数,今年利润实现正增长,请问要如何表示公司的 YoY 的利润增长呢?


Info

np.nan 是 numpy 中的一个特殊值,表示“Not a Number”,即“不是数字”。注意,在 Numpy 中,尽管 np.nan 不是一个数字,但它确实数字类型。确切地说,它是 float 类型。此外,在 float 类型中,还存在 np.inf(正无穷大)和负无穷大 (np.NINF,或者-np.inf)。

又比如,在计算个股的 RSI 或者移动平均线时,最初的几期数据是无法计算出来的(在回测框架 backtrader 中,它把这种现象称之为技术指标的冷启动)。如果不要求返回的技术指标的取值与输入数据长度一致,则会返回短一些、但全部由有效数据组成的数组;否则,此时我们常常使用 np.NaN 或者 None 来进行填充,以确保返回的数据长度与输入数据长度一致。

但是,如果我们要对返回的数组进行统计,比如求均值、最大值、排序,对包含 np.nan 或者 None 的数组,应该如何处理?

3.1. 包含 np.nan 和 np.inf 的数组运算

在 numpy 中,提供了对带 np.nan 的数组进行运算的支持。比如有以下数组:

1
2
3
4
import numpy as np

x = np.array([1, 2, 3, np.nan, 4, 5])
print(x.mean())

我们将得到一个 nan。实际上,多数情况下,我们希望忽略掉 nan,只对有效数据进行运算,此时得到的结果,我们往往仍然认为是有意义的。

因此,Numpy 提供了许多能处理包含 nan 数据的数组输入的运算函数。下面是一个完整的列表:

在这里,我们以输入 np.array([1, 2, 3, np.nan, np.inf, 4, 5]) 为例

函数 nan 处理 inf 处理 输出
nanmin 忽略 inf 1.0
nanmax 忽略 inf inf
nanmean 忽略 inf inf
nanmedian 忽略 inf 3.5
nanstd 传递 inf nan
nanvar 传递 inf nan
nansum 忽略 inf inf
nanquantile 忽略 inf 2.25
nancumsum 忽略 inf inf
nancumprod 忽略 inf inf

对 np.nan 的处理中,主要是三类,一类是传递,其结果导致最终结果也是 nan,比如,在计算方差和标准差时;一类是忽略,比如在找最小值时,忽略掉 np.nan,在余下的元素中进行运算;但在计算 cumsum 和 cumprod 时,"忽略"意味着在该元素的位置上,使用前值来填充。我们看一个不包含 np.inf 的示例:


1
2
3
x = np.array([1, 2, 3, np.nan, 4, 5])
np.nancumprod(x)
np.nancumsum(x)

输出结果是:

1
2
3
array([  1.,   2.,   6.,   6.,  24., 120.])

array([ 1.,  3.,  6.,  6., 10., 15.])

结果中的第 4 个元素都是由第 3 个元素复制而来的。

如果一个数组中包含 inf,则在任何涉及到排序的操作(比如 max, median, quantile)中,这些元素总是被置于数组的最右侧;如果是代数运算,则结果会被传导为 inf。这些地方,Numpy 的处理方式与我们的直觉是一致的。

除了上述函数,np.isnan 和 np.isinf 函数,也能处理包含 np.nan/np.inf 元素的数组。它们的作用是判断数组中的元素是否为 nan/inf,返回值是一个 bool 数组。

3.2. 包含 None 的数组运算

在上一节中,我们介绍的函数能够处理包含 np.nan 和 np.inf 的数组。但是,在 Python 中,None 是任何类型的一个特殊值,如果一个数组包含 None 元素,我们常常仍然会期望能对它进行 sum, mean, max 等运算。但是,Numpy 并没有专门为此准备对应的函数。


但是,我们可以通过 astype 将数组转换为 float 类型,在此过程中,所有的 None 元素都转换为 np.nan,然后就可以进行运算了。

1
2
x = np.array([3,4,None,55])
x.astype(np.float64)

输出为:array([3., 4., nan, 55.])

3.3. 性能提升

当我们调用 np.nan *函数时,它的性能会比普通的函数慢很多。因此,如果性能是我们关注的问题,我们可以使用 bottleneck 这个库中的同名函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from bottleneck import nanstd
import numpy as np
import random

x = np.random.normal(size = 1_000_000)
pos = random.sample(np.arange(1_000_000).tolist(), 5)
x[pos] = np.nan

%timeit nanstd(x)
%timeit np.nanstd(x)

我们担心数组中 np.nan 元素个数会影响到性能,所以,在上面的示例中,在随机生成数组时,我们只生成了 5 个元素。在随后的一次测试中,我们把 nan 元素的个数增加了 10 倍。实验证明,nan 元素的个数对性能没有什么影响。在所有的测试中,bottlenect 的性能比 Numpy 都要快一倍。

Info

根据 bottleneck 的文档,它的许多函数,要比 Numpy 中的同名函数快 10 倍左右。