跳转至




2025

烛台密码 三角形整理如何提示玄机

本文是几个月前《三角形整理检测》的后续篇,改进了算法,增加了应用场景的讨论。

《匡醍.因子分析与机器学习策略》课程的最后一课是关于深度学习框架在量化交易中的应用的。考虑很多技术交易者都会看图操作,比如艾略特浪型、头肩顶、三角形整理等等。正好CNN在图像模式识别能力上超越了人类,所以,就打算拿三角形整理的检测作为例子。

要通过CNN网络来实现三角形整理检测,首先需要做到数据标注。我们在课程中已经实作出来一个标注工具。不过,我更希望能够使用算法自动检测到三角形整理模式。这篇文章就将介绍我的算法。

如果你希望拿到本文源码,可以加入我们的星球。加入星球三天后,更可以获得我们研究平台的账号。在平台中提供了可以运行、验证notebook版本,你可以完全复现本文的结果。

Note

先通过算法对k线图进行标注,再通过CNN网络进行识别,感觉这个有点Matryoshka doll了。于是我在课程中换了另一个例子,通过4 channel的一维卷积,实现了预测误差1%的准确率。这篇文章算是课程的边脚料重新加工了。

算法的示意图如下:

三角形检测示意图

首先,我们得找到k线图中的波峰和波谷。在图中,波峰是点1和2,波谷则是点3和4。然后我们作通过点1和2的直线,得到压力线;作通过点3和4的直线,得到支撑线。

在Python中,计算两点之间的直线可以通过np.polyfit来实现。通过该函数,将获得直线的斜率。通过两条直线之间的斜率关系,我们可以进一步得到三角形的形态。

如果记Sr为压力线的斜率,记Ss为支撑线的斜率,那么,三角形的形态可以由以下表格来定义:

压力线方向 支撑线方向 角度对比 标记 说明
Sr>0 Ss>0 abs(sr) > abs(ss) 1 上升且发散三角
Sr>0 Ss>0 abs(sr) < abs(ss) 2 上升且收敛三角
Sr>0 Ss<0 abs(sr) > abs(ss) 3 发散偏上升三角
Sr>0 Ss<0 abs(sr) < abs(ss) 4 发散偏下降三角
Sr<0 Ss>0 abs(sr) > abs(ss) 5 下降且收敛三角
Sr<0 Ss>0 abs(sr) < abs(ss) 6 上升且收敛三角
Sr<0 Ss<0 abs(sr) > abs(ss) 7 下降且收敛三角
Sr<0 Ss<0 abs(sr) < abs(ss) 8 下降且发散三角

部分形态如下图所示:

识别算法的实现代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from zigzag import peak_valley_pivots

def triangle_flag(df, lock_date=None):
    if lock_date is not None:
        peroid_bars = df.loc[:lock_date]
    else:
        peroid_bars = df

    thresh = peroid_bars.close[-120:].pct_change().std() * 3

    pvs = peak_valley_pivots(peroid_bars.close.astype(np.float64), thresh, -1 * thresh)

    if len(pvs) == 0:
        return 0, None, None

    pvs[0] = pvs[-1] = 0
    pos_peaks = np.argwhere(pvs == 1).flatten()[-2:]
    pos_valleys = np.argwhere(pvs == -1).flatten()[-2:]

    if len(pos_peaks) < 2 or len(pos_valleys) < 2:
        return 0, None, None

    minx = min(pos_peaks[0], pos_valleys[0])
    y = df.close[pos_peaks].values
    p = np.polyfit(x=pos_peaks, y=y, deg=1)
    upper_trendline = np.poly1d(p)(np.arange(0, len(df)))

    y = df.close[pos_valleys].values
    v = np.polyfit(x=pos_valleys, y=y, deg=1)
    lower_trendline = np.poly1d(v)(np.arange(0, len(df)))

    sr, ss = p[0], v[0]

    flags = {
        (True, True, True): 1,
        (True, True, False): 2,
        (True, False, True): 3,
        (True, False, False): 4,
        (False, True, True): 5,
        (False, True, False): 6,
        (False, False, True): 7,
        (False, False, False): 8,
    }

    flag = flags[(sr > 0, ss > 0, abs(sr) > abs(ss))]

    return flag, upper_trendline, lower_trendline
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def show_trendline(asset, df, resist, support, flag, width=600, height=400):
    desc = {
        1: "上升且发散三角",
        2: "上升且收敛三角",
        3: "发散偏上升三角",
        4: "发散偏下降三角",
        5: "下降且收敛三角",
        6: "上升且收敛三角",
        7: "下降且收敛三角",
        8: "下降且发散三角",
    }

    if isinstance(df, pd.DataFrame):
        df = df.reset_index().to_records(index=False)

    title = f"flag: {flag} - {desc[flag]}"
    cs = Candlestick(df, title=title, show_volume=False, show_rsi=False, width=width, height=height)
    cs.add_line("support", np.arange(len(df)), support)
    cs.add_line("resist", np.arange(len(df)), resist)
    cs.plot()


np.random.seed(78)
start = datetime.date(2023, 1, 1)
end = datetime.date(2023, 12, 29)
barss = load_bars(start, end, 4)

for key, df in barss.groupby("asset"):
    df = df.reset_index().set_index("date")
    flag, resist, support = triangle_flag(df)
    if flag != 0:
        show_trendline(key, df, resist, support, flag)

最后,我们来研究一支个股的情况,看看这个算法可能有哪些用途:

1
2
3
4
5
6
7
8
9
start = datetime.date(2023, 1, 1)
end = datetime.date(2023, 12, 29)
barss = load_bars(start, end, ("300814.XSHE", ))

for key, df in barss.groupby("asset"):
    df = df.reset_index().set_index("date")
    flag, resist, support = triangle_flag(df, datetime.date(2023, 9, 11))
    if flag != 0:
        show_trendline(key, df, resist, support, flag, width=800, height=600)

标的从23年4月19日以来,先后出现4次波峰。随着时间的推移,整理形态也不断变化。

7月12,突破之前的形态是发散偏上升三角。

1
2
3
4
5
6
7
8
9
start = datetime.date(2022, 12, 1)
end = datetime.date(2023, 10, 29)
barss = load_bars(start, end, ("300814.XSHE", ))

for key, df in barss.groupby("asset"):
    df = df.reset_index().set_index("date")
    flag, resist, support = triangle_flag(df, datetime.date(2023, 7,19))
    if flag != 0:
        show_trendline(key, df, resist, support, flag, width=800, height=600)

7月12日突破之后,支撑和压力线发生变化。此时可以计算出9月7日的压力位是48元。但当天只冲击到45.6,随后收了上影线。

1
2
3
4
5
6
7
8
9
start = datetime.date(2022, 12, 1)
end = datetime.date(2023, 10, 29)
barss = load_bars(start, end, ("300814.XSHE", ))

for key, df in barss.groupby("asset"):
    df = df.reset_index().set_index("date")
    flag, resist, support = triangle_flag(df, datetime.date(2023, 8,19))
    if flag != 0:
        show_trendline(key, df, resist, support, flag, width=800, height=600)

此时仍然是上升三角,但9月7日未破压力位后,压力线应该使用最新的两个波峰连线。此时的压力线的斜率比之前的要小,显示后续走势会弱一些。

1
2
3
4
5
6
7
8
9
start = datetime.date(2022, 12, 1)
end = datetime.date(2023, 12,29)
barss = load_bars(start, end, ("300814.XSHE", ))

for key, df in barss.groupby("asset"):
    df = df.reset_index().set_index("date")
    flag, resist, support = triangle_flag(df, datetime.date(2023, 9,15))
    if flag != 0:
        show_trendline(key, df, resist, support, flag, width=800, height=600)

在9月7日新的波峰形成后,新的压力线在11月20日的值为48.5,当天的最高点为46.4,再次未破压力位。此后压力线需要重新计算。新的压力线的斜率进一步减小。形态也由此前的上升且发散三角形,转换为上升且收敛三角形,表明已经到了退出的时间。

1
2
3
4
5
6
7
8
9
start = datetime.date(2022, 12, 1)
end = datetime.date(2023, 12,29)
barss = load_bars(start, end, ("300814.XSHE", ))

for key, df in barss.groupby("asset"):
    df = df.reset_index().set_index("date")
    flag, resist, support = triangle_flag(df)
    if flag != 0:
        show_trendline(key, df, resist, support, flag, width=800, height=600)

上述压力线斜率的变化能够表明价格上升是打开了新的通道,还是预期在走弱,这对我们中短线操作很有帮助。

在通过机器学习构建策略时,我们可以把压力线和支撑线斜率的变化(\(\delta{Sr}\), \(\delta{Ss}\))、压力线和支撑线预测出来的值(\(P_{t+1}\), \(V_{t_1}\))等作为特征,那么,我们就可能更精确地预测未来走势。

DeepSeek只是挖了个坑,还不是掘墓人,但中初级程序员是爬不出来了

在我们的《因子分析与机器学习策略》课程中,提供了从2005年到2023年,长达18年的日线数据(共1100多万条记录)供学员进行因子挖掘与验证。最初,我们是通过functools中的lru_cache装饰器,将数据缓存到内存中的。这样一来,除了首次调用时时间会略长(比如,5秒左右)外,此后的调用都是毫秒级的。

问题的提出

但这样也带来一个问题,就是内存占用太大。一次因子分析课程可能会占用5G以上。由于Jupyterlab没有自动关闭idle kernel的能力(这一点在google Colab和kaggle中都有),我们的内存很快就不够用了。

我们的数据是以字典的方式组织,并保存在磁盘上的:

每支股票的键值是股票代码,对应值则是一个Numpy structured array。这样的数据结构看上去比较独特,不过我们稍后就能看到这样组织的原因。

在进行因子分析之前,用户可能会通过指定universe,以及起止时间来加载行情数据。所谓Universe,就是指一个股票池。用户可能有给定的证券列表,也可能只想指定universe的规模;起止时间用来切换观察的时间窗口,这可能是出于性能的考虑(最初进行程序调试时,只需要用一小段行情数据;调试完成后则需要用全部数据进行回测,或者分段观察)。

最终,它要返回一个DataFrame,以date和asset(即股票代码)为双重索引,包含了OHLC,volume等列,并且这些列要根据end进行前复权(这种复权方式称为动态前复权)。此外,还将包含一个amount列,这一列则无须复权。

因此,这个函数的签名是:

1
2
3
4
def load_bars(start_date:datetime.date, 
              end_date:datetime.date, 
              universe: Tuple[str]|int = 500)->pd.DataFrame:
    pass

学员的学习过程是阅读我们的notebook文档,并尝试单元格中的代码,也可能修改这些代码再运行。因此,这是一个交互式的操作,一般来说,只要用户的等待时间不超过3秒,都是可以接受的。如果响应速度低于1秒,则可以认为是理想的。

去掉缓存后,最初的一个实现的运行速度大致是5秒:

1
2
3
start = datetime.date(2023, 12,1)
end = datetime.date(2023, 12,31)
%time load_bars(start, end, 2000)

后面的测试将使用现样的参数。

当然,如果使用更大的universe,则时间还会加长。

由于这个结果超过了3秒,所以,希望能对代码进行一些优化。性能优化是编程中比较有难度的例子,因为它涉及到对程序运行原理的理解,涉及到对多个技术栈的掌握。在这个过程中我探索了Deep Seek R1的能力边界,可供大家参考。

最初的方案

最初的代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def load_bars_v1(
    start: datetime.date, end: datetime.date, universe: Tuple[str]|int = 500
)->pd.DataFrame:

    if barss is None:
        with open(os.path.join(data_home, "bars_1d_2005_2023.pkl"), "rb") as f:
            barss = pickle.load(f)

    keys = list(barss.keys())
    if isinstance(universe, int):
        if universe == -1:
            selected_keys = keys
        else:
            selected_keys = random.sample(keys, min(universe, len(keys)))
            try:
                pos = selected_keys.index("000001.XSHE")
                swp = selected_keys[0]
                selected_keys[0] = "000001.XSHE"
                selected_keys[pos] = swp
            except ValueError:
                selected_keys[0] = "000001.XSHE"

    else:
        selected_keys = universe

    dfs = []
    for symbol in selected_keys:
        qry = "frame >= @start & frame <= @end"
        df = pd.DataFrame(barss[symbol]).assign(asset=symbol).query(qry)

        if len(df) == 0:
            logger.debug("no bars for %s from %s to %s", symbol, start, end)
            continue
        # 前复权
        last = df.iloc[-1]["factor"]
        adjust_factor = df["factor"] / last
        adjust = ["open", "high", "low", "close", "volume"]
        df.loc[:, adjust] = df.loc[:, adjust].multiply(adjust_factor, axis="index")

        dfs.append(df)

    df = pd.concat(dfs, ignore_index=True)
    df.set_index(["frame", "asset"], inplace=True)
    df.index.names = ["date", "asset"]
    df.drop("factor", axis=1, inplace=True)
    df["price"] = df["open"].shift(-1)
    return df

代码已进行了相当的优化(其中部分也基于AI建议)。比如,将数据保存为字典,先按universe进行筛选,再拼接为dataframe,而不是将所有数据保存为dataframe,通过pandas来按universe进行筛选(将花费数倍时间)。

此外,在进行前复权时,它使用了multiply方法,从而可以一次对多个列进行前复权操作,这一建议正是AI给出来的。

但是,代码中还存在一个for loop,如果消除了这个循环,是否能进一步提升速度呢?

下面,我就使用Deep Seek R1进行尝试。

这是第一轮的prompt:

Quote

我有一个dataframe,以date和asset为联合索引,有open, high, low, close, volume, amount和factor等列。其中factor是复权因子。
现在,要对该数据结构实现以下功能:
1. 筛选出asset 在 selected_symbols列表中,date在[start, end]中的记录 2. 对这些记录,按asset进行分组,然后对 open, high, low, close, volume进行前复权。 3. 结果用dataframe返回,索引仍为date/asset,列为复权后的open, high,low, close, volume字段,以及未处理的amount。
请分别给出基于pandas, pyarrow, polars, duckdb的实现,并建议性能最优的一个。

Deep Seek的思考过程很长,这里略过了。我们直接看它的答案。磁盘读写对速度的影响并不大,所以,我们先把数据从磁盘中加载出来,保存到barss中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
with open(os.path.join(data_home, "bars_1d_2005_2023.pkl"), "rb") as f:
    data = pickle.load(f)

dfs = []
for symbol, bars in data.items():
    df = pd.DataFrame(bars)
    df["asset"] = symbol
    df = df.rename(columns={"frame": "date"})
    dfs.append(df)

barss = pd.concat(dfs,ignore_index=True).set_index(["date", "asset"]).sort_index()
print(len(barss))

# 定义一个一致的universe,便于比较
universe = barss.index.get_level_values("asset").unique().tolist()

以下的代码都出自于 Deep Seek。我们让它对最耗时的筛选和复权部分给出方案:

基于Pandas的实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def adjust_pandas(df, selected_symbols, start, end):
    # 筛选数据
    idx = pd.IndexSlice
    filtered = df.loc[idx[start:end, selected_symbols], :]

    # 按 asset 分组计算最后复权因子
    grouped = filtered.groupby('asset', group_keys=False)
    last_factor = grouped['factor'].transform('last')

    # 计算调整比例
    ratio = filtered['factor'] / last_factor

    # 计算复权后字段
    adjusted = filtered.assign(
        open   = filtered['open'] * ratio,
        high   = filtered['high'] * ratio,
        low    = filtered['low'] * ratio,
        close  = filtered['close'] * ratio,
        volume = filtered['volume'] * (last_factor / filtered['factor'])
    )

    # 选择最终列并保持索引
    return adjusted[['open', 'high', 'low', 'close', 'volume', 'amount']]

%time adjust_pandas(barss, universe, start, end)
adjust_pandas(barss, universe, start, end)

尽管我对Pandas很熟悉了,但仍有一些API是不知道的,比如transform。但运用正确的API,恰恰是 Python中提升性能的关键一招。

这个版本的平均运行时长是7秒。说明pandas的筛选确实很慢。

我们略过pyarrow的版本。pyarrow版本的运行时间大致是3.7秒左右。比原始版本只略有进步。这里也看出python 3.11中,for loop的运行速度已经很快了。

基于Polars的实现

这是它给出的polars的版本:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import polars as pl

def adjust_polars(df, selected_symbols, start, end):
    # 筛选数据
    filtered = df.filter(
        (pl.col("date").is_between(start, end)) &
        (pl.col("asset").is_in(selected_symbols))
    )

    # 计算最后复权因子和调整比例
    adjusted = filtered.with_columns(
        last_factor = pl.col("factor").last().over("asset")
    ).with_columns(
        ratio = pl.col("factor") / pl.col("last_factor"),
        volume_ratio = pl.col("last_factor") / pl.col("factor")
    ).with_columns(
        open   = pl.col("open") * pl.col("ratio"),
        high   = pl.col("high") * pl.col("ratio"),
        low    = pl.col("low") * pl.col("ratio"),
        close  = pl.col("close") * pl.col("ratio"),
        volume = pl.col("volume") * pl.col("volume_ratio")
    )

    # 选择最终列
    return adjusted.select(["date", "asset", "open", "high", "low", "close", "volume", "amount"])

adjust_polars(barss, universe, start, end)

之前我并不熟悉Polars,Deep Seek一下子扩展了我的能力边界。如果我要通过自学,然后来实现这部分功能,我不知道要多久才能发现.over这个API,因为这是 Polars 独有的一个窗口运算API,我无法把知识从我熟悉的Pandas中迁移过来。

但是,这个版本有一个错误:它没有遵从我给的数据格式,而是把barss当成了一个有asset和date列的polars DataFrame。通过使用通义灵码,修正了此错误,修正后的代码如下:

1
%time adjust_polars(pl.from_pandas(barss.reset_index()), universe, start, end)

得到的运行速度是1.01秒,已很接近我们要求的理想速度了。

基于DuckDB的实现

现在,我们来看它给出的duckdb的版本:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import duckdb

def adjust_duckdb(df, selected_symbols, start, end):
    query = f"""
    SELECT 
        date, asset,
        open * (factor / last_factor) AS open,
        high * (factor / last_factor) AS high,
        low * (factor / last_factor) AS low,
        close * (factor / last_factor) AS close,
        volume * (last_factor / factor) AS volume,
        amount
    FROM (
        SELECT 
            *,
            LAST_VALUE(factor) OVER (
                PARTITION BY asset 
                ORDER BY date 
                ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
            ) AS last_factor
        FROM df
        WHERE 
            asset IN ({','.join([f"'{s}'" for s in selected_symbols])})
            AND date BETWEEN '{start}' AND '{end}'
    )
    """
    return duckdb.query(query).to_df()

adjust_duckdb(barss, universe, start, end)

这里也会出现一个类似的错误,即查询语句需要df中存在asset, date列,但它们却出现在索引中。我们略作修正即可运行:

1
%time adjust_duckdb(barss.reset_index(), universe, start, end)

最终运行速度是1.21秒,在这个例子中略慢于polars,在所有方案中排在第二(在另一台机器,使用机械阵列硬盘时,更强的CPU时, duckdb更快)。但是,duckdb方案在数据规模上可能更有优势,即,如果数据集再大一到两个量级,它很可能超过polars。

在polars与duckdb中,需要的都是扁平结果的数据结构(即asset/date不作为索引,而是作为列字段存在),因此,我们可以考虑将数据结构进行重构,使用apache parquet格式写入到磁盘中,这样可以保存整个方案耗时仍保持在1秒左右。

终极咒语:急急如律令

Info

据说急急如律令要翻译成为 quickly, quickly, biu biu biu 😁

在前面,我们代替Deep Seek做了很多思考,是因为担心它对代码的最终执行速度没有sense。现在,我们试一下,直接抛出最终问题,看看会如何:

Quote

我有一个dataframe,以date和asset为联合索引,有open, high, low, close, volume, amount和factor等列。其中factor是复权因子。

现在,要对该数据结构实现以下功能:

  1. 筛选出asset 在 selected_symbols列表中,date在[start, end]中的记录
  2. 对这些记录,按asset进行分组,然后对 open, high, low, close, volume进行前复权。
  3. 结果用dataframe返回,索引仍为date/asset,列为复权后的open, high,low, close, volume字段,以及未处理的amount。

输入数据是1000万条以上,时间跨度是2005年到2023年,到2023年底,大约有5000支股票。输出结果将包含2000支股票的2005年到2023年的数据。请给出基于python,能在1秒左右实现上述功能的方案。

这一次,我们只要求技术方案限定在Python领域内,给了Deep Seek极大的发挥空间。

Deep Seek不仅给出了代码,还给出了『评测报告』,认为它给出的方案,能在某个CPU+内存组合上达到我们要求的速度。

Deep Seek认为,对于千万条记录级别的数据集,必须使用像parallel pandas这样的库来进行并行化才能达成目标。事实上这个认知是错误的

这一次Deep Seek给出的代码可运行度不高,我们没法验证基于并行化之后,速度是不是真的更快了。不过,令人印象深刻的是,它还给出了一个performance benchmark。这是它自己GAN出来的,还是真有人做过类似的测试,或者是从类似的规模推导出来的,就不得而知了。

重要的是,在给了Deek Seek更大的自由发挥空间之后,它找出了之前在筛选时,性能糟糕的重要原因: asset是字符串类型!

在海量记录中进行字符串搜索是相当慢的。在pandas中,我们可以将整数转换为category类型,此后的筛选就快很多了:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import pyarrow as pa
import pyarrow.parquet as pq

data_home = os.path.expanduser(data_home)
origin_data_file = os.path.join(data_home, "bars_1d_2005_2023.pkl")
with open(origin_data_file, 'rb') as f:
    data = pickle.load(f)

dfs = []
for symbol, bars in data.items():
    df = pd.DataFrame(bars)
    df["asset"] = symbol
    df = df.rename(columns={"frame": "date"})
    dfs.append(df)

barss = pd.concat(dfs,ignore_index=True)
barss['asset'] = barss['asset'].astype('category')
print(len(barss))

table = pa.Table.from_pandas(barss)

parquet_file_path = "/tmp/bars_1d_2005_2023_category.parquet"

with open(parquet_file_path, 'wb') as f:
    pq.write_table(table, f)

现在,我们再来看polars或者duckdb的方案的速度:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import polars as pl

def adjust_polars(df, selected_symbols, start, end):
    # 筛选数据
    filtered = df.filter(
        (pl.col("date").is_between(start, end)) &
        (pl.col("asset").is_in(selected_symbols))
    )

    # 计算最后复权因子和调整比例
    adjusted = filtered.with_columns(
        last_factor = pl.col("factor").last().over("asset")
    ).with_columns(
        ratio = pl.col("factor") / pl.col("last_factor"),
        volume_ratio = pl.col("last_factor") / pl.col("factor")
    ).with_columns(
        open   = pl.col("open") * pl.col("ratio"),
        high   = pl.col("high") * pl.col("ratio"),
        low    = pl.col("low") * pl.col("ratio"),
        close  = pl.col("close") * pl.col("ratio"),
        volume = pl.col("volume") * pl.col("volume_ratio")
    )

    # 选择最终列
    return adjusted.select([pl.col("date"), pl.col("asset"), pl.col("open"), pl.col("high"), pl.col("low"), pl.col("close"), pl.col("volume"), pl.col("amount")])

# 示例调用
start = datetime.date(2005, 1, 1)
end = datetime.date(2023, 12, 31)

barss = pl.read_parquet("/tmp/bars_1d_2005_2023_category.parquet")

universe = random.sample(barss['asset'].unique().to_list(), 2000)

%time adjust_polars(barss, universe, start, end)

结果是只需要91ms,令人印象深刻。duckdb的方案需要390ms,可能是因为我们需要在Python域拼接大量的selected_symbols字符串的原因。

借助 Deep Seek,我们把一个需要5秒左右的操作,加速到了0.1秒,速度提升了50倍。

本文测试都在一台mac m1机器上运行,RAM是16GB。当运行在其它机器上,因CPU,RAM及硬盘类型不同,数据表现甚至排名都会有所不同_。

结论

这次探索中,仅从解决问题的能力上看,Deep Seek、通义和豆包都相当于中级程序员,即能够较好地完成一个小模块的功能性需求,它情绪稳定,细微之处的代码质量更高。

当我们直接要求给出某个数据集下,能达到指定响应速度的Python方案时,Deep Seek有点用力过猛。从结果上看,如果我们通过单机、单线程就能达到91ms左右的响应速度,那么它给出的多进程方案,很可能是要劣于这个结果的。Deep Seek只是遵循了常见的优化思路,但它没有通过实际测试来修正自己的方案。

这说明,它们还无法完全替代人类程序员,特别是高级程序员:对于AI给出的结果,我们仍然需要验证、优化甚至是推动AI向前进,而这刚好是高级程序员才能做到的事情。

但这也仅仅是因为AI还不能四处走动的原因。因为这个原因,它不能像人类一样,知道自己有哪些测试环境可供方案验证,从而找出具体环境下的最优方案。

在铁皮机箱以内,它是森林之王,人类无法与之较量。但就像人不能拔着自己的头发离开地球一样,它的能力,也暂时被封印在铁皮机箱之内。但是,一旦它学会了拔插头,开电源,高级程序员的职业终点就不再是35岁,而是AI获得自己的莲花肉身之时。

至于初中级程序员,目前看是真不需要了。1万元的底薪,加上社保,这能买多少token? 2025年的毕业生,怎么办?

用HDBSCAN聚类算法选股是否有效

前篇文章提到可以用HDBSCAN算法来来对资产进行聚类,在聚类完成之后,对聚类结果进行协整检验,通过计算对冲比,可以构造成平稳序列。我们知道一个平稳时间序列的均值、方差恒定并且有自协相关特性,那么,一旦它偏离了均值,迟早都会回归到均值上。利用这一点,可以生成交易信号。

那怎样证明HDBSCN算法对寻到协整对的交易策略是有效的呢?下面我们来一步步分析。

首先,从前面的文章我们知道了HDBSCN算法是一种基于密度的聚类算法,它通过计算每个样本的密度来确定样本的聚类类别。HDBSCN算法的优点是它可以自动确定聚类中心,并且可以处理高维数据。下面是用python代码来实现HDBSCN算法的关键代码,获取历史数据的时间是从2022年1月1日至2023年12月31日。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
start_date = datetime.date(2022, 1, 1)
end_date = datetime.date(2023,12,31)
barss = load_bars(start_date, end_date, 2000)   #获取历史资产数据,这里选取2000条数据
closes = barss["close"].unstack().ffill().dropna(axis=1, how='any') #处理缺失值,将close列的MultiIndex转换为DataFrame二维表格,并使用ffill()方法填充缺失值。
clusterer = hdbscan.HDBSCAN(min_cluster_size=3, min_samples=2)# 使用 HDBSCAN 进行聚类,python可以直接安装hdbscan包
cluster_labels = clusterer.fit_predict(closes.T)  #转置是因为要对资产(特征)聚类

clustered = closes.T.copy()
clustered['cluster'] = cluster_labels# 将聚类结果添加到 DataFrame 中

clustered = clustered[clustered['cluster'] != -1] # 剔除类别为-1的点,这些是噪声,而不是一个类别
clustered_close = clustered.drop("cluster", axis=1)

unique_clusters = set(cluster_labels)
num_clusters = len(unique_clusters) # 获取有效的簇数量
print(f"有效的簇数量为:{num_clusters}")  

tsne = TSNE(n_components=3, random_state=42)
tsne_results = tsne.fit_transform(clustered_close)  # 使用t-SNE进行降维,便于后面的簇类可视化
reduced_tsne = pd.DataFrame(data=tsne_results, columns=['tsne_1', 'tsne_2', 'tsne_3'], index=clustered_close.index)# 将t-SNE结果添加到DataFrame中
reduced_tsne['cluster'] = clustered['cluster']

fig_tsne = px.scatter_3d(
    reduced_tsne, 
    x='tsne_1', y='tsne_2', z='tsne_3',
    color='cluster', 
    title='t-SNE Clustering of Stock Returns',
    labels={'tsne_1': 't-SNE Component 1', 'tsne_2': 't-SNE Component 2'}
)  #进行3D散点图可视化
fig_tsne.layout.width = 1200
fig_tsne.layout.height = 1100
fig_tsne.show()
导入必要的库,运行后可以得到下面的3D图:

3D图展示的是股票的3D空间分布,不同颜色代表不同的聚类类别,从图中可以观察到,这期间的2000支股票被分成了40多个类,除了一个包含420支股票的第39簇,其它簇都少于20支股票。下面分析第35簇,里面有3支股票,从2022年10月1日左右三支股票都是持续上升的。 那是否可以继续用HDBSCAN聚类算法对2022年10月份之前的数据进行聚类,看上面的3支股票是否也被分在了一起。还是上面的代码,只是将时间改为了2022年1月1日到2022年10月1日,运行后得到下面的3D图:

和上面的3D图相比,分为一类的股票(特征)更相似,所以聚集的更密集。那上面第35簇中的三支股票是否还是会被分为一类呢?可以将每一簇都都可视化展示出来(和前面文章一样的方法),观察那三支股票在这次分类的那一簇中。通过观察,我们发现这三支股票是被分在了一类,但是这一类里面有大概600支股票,我们看看这600多支股票的可视化结果: 我们想要知道的是这三支股票在持续上升之前会有什么特征,虽然这一个大类里面的股票数据有点多,但观察上面的趋势图可以知道它们的走势大致是相同的,所以才会被分为一类。那继续将这600多支股票在用HDBSCA进行聚类,之前的3支股票会不会被分为一类?也就是说需要验证3支股票在持续上升前的大致趋势也是一致。感兴趣的读者可以自己尝试一下,下面是小编验证的结论: 对这600支股票重新在2022年1月到2022年10月进行聚类,总共被分成了三类,之前的那三支股票有2支被分在了一簇,另外一支在另一类里面。这个结果我认为足以说明这600支股票在2022年1月到2022年10月之间的趋势是相似的,所以600多支股票只被分为三类。 所以,HDBSCAN算法对选股是有效的,它选出来的一类趋势类似,所以可以用前一篇内容中构造这一类股票的平稳序列来产生交易信号。

hdbscan 聚类算法扫描配对交易 速度提升99倍

配对交易是一种交易策略,由摩根士丹利的量化分析师农齐奥.塔尔塔里亚在 20 世纪 80 年代首创。他现在是意大利雅典娜资产管理公司的创始人。该策略涉及监控两只历史上相关性较强的证券,并监控两者之间的价格差。一旦价格差超出一个阈值(比如 n 倍的标准差),价格回归就是大概率事件,于是交易者就做空价格高的一支,做多价格低的一支,从而获利。