File size: 6,262 Bytes
c5555e5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"""Microstructure signal study: does order-book imbalance predict the next move?

This is the analysis stage that runs on the ticks gathered by ``collector.py``.

    load_ticks()      read the collected Parquet tick files into one frame
    build_dataset()   turn ticks into (features, forward-return) rows, with
                      strict no-lookahead and no cross-market leakage
    evaluate_signal() measure whether book imbalance / microprice edge
                      predicts the forward mid move β€” and, crucially, whether
                      that move is large enough to survive the spread

The edge source under test: bid/ask size imbalance carries short-horizon
directional information. The honest question is not "is there a signal"
(there usually is, weakly) but "does it beat the cost of crossing the
spread to trade it." That comparison is the whole point.

Run on collected data:  python -m src.microstructure.study
"""

import logging
from pathlib import Path

import numpy as np
import pandas as pd

logger = logging.getLogger(__name__)

DATA_DIR = Path("data/ticks")
HORIZON = 10           # forward window, measured in ticks
STRONG_QUANTILE = 0.2  # "strong signal" = the most extreme 20% of imbalance


def load_ticks(data_dir=DATA_DIR) -> pd.DataFrame:
    """Read every collected Parquet file into one time-sorted DataFrame."""
    paths = sorted(Path(data_dir).glob("**/*.parquet"))
    if not paths:
        raise FileNotFoundError(
            f"no Parquet files under {data_dir} β€” run the collector first"
        )
    df = pd.concat((pd.read_parquet(p) for p in paths), ignore_index=True)
    df["ts"] = pd.to_datetime(df["ts"], utc=True)
    return df.sort_values(["market_ticker", "ts"]).reset_index(drop=True)


def build_dataset(ticks: pd.DataFrame, horizon: int = HORIZON) -> pd.DataFrame:
    """Turn raw ticks into (features, label) rows.

    The label is the mid move ``horizon`` ticks into the future. Features use
    only same-tick information. Each market is processed on its own so a
    label never reaches across markets, and the final ``horizon`` ticks of
    each market are dropped β€” there is no future to label them with. Getting
    this no-lookahead discipline right is the difference between a real
    backtest and a fantasy one.
    """
    parts = []
    for ticker, g in ticks.groupby("market_ticker", sort=False):
        g = g.reset_index(drop=True)
        if len(g) <= horizon:
            continue  # too short to produce a single labelled row
        fwd_mid = g["mid"].shift(-horizon)
        part = pd.DataFrame({
            "market_ticker": ticker,
            "ts": g["ts"],
            "imbalance": g["imbalance"] - 0.5,              # signed: >0 bid-heavy
            "microprice_edge": g["microprice"] - g["mid"],  # signed pressure
            "spread": g["spread"],
            "mid": g["mid"],
            "fwd_return": fwd_mid - g["mid"],               # the label
        }).iloc[:-horizon]
        parts.append(part)
    cols = ["market_ticker", "ts", "imbalance", "microprice_edge",
            "spread", "mid", "fwd_return"]
    if not parts:
        return pd.DataFrame(columns=cols)
    return pd.concat(parts, ignore_index=True).dropna().reset_index(drop=True)


def evaluate_signal(data: pd.DataFrame, strong_q: float = STRONG_QUANTILE) -> dict:
    """Descriptive test of the imbalance / microprice signal.

    Every statistic here is parameter-free β€” the "signal" is just the sign of
    the book pressure, so there is nothing to overfit and full-sample numbers
    are honest. An ML model *fit* on these features would instead require a
    strict out-of-sample time split.
    """
    n = len(data)
    if n == 0:
        return {"n": 0}

    imb = data["imbalance"].to_numpy()
    edge = data["microprice_edge"].to_numpy()
    fwd = data["fwd_return"].to_numpy()

    corr_imb = float(np.corrcoef(imb, fwd)[0, 1]) if imb.std() else 0.0
    corr_edge = float(np.corrcoef(edge, fwd)[0, 1]) if edge.std() else 0.0

    # Directional hit rate, over rows where signal and move are both non-zero.
    mask = (edge != 0) & (fwd != 0)
    hit_rate = (float(np.mean(np.sign(edge[mask]) == np.sign(fwd[mask])))
                if mask.any() else float("nan"))

    # The honest part: among the strongest signals, how big is the forward
    # move you'd capture β€” and is it bigger than the spread you must cross?
    # Round-tripping a position pays the full spread, so the bar is
    # signal_move > mean_spread.
    hi = np.quantile(imb, 1 - strong_q)
    lo = np.quantile(imb, strong_q)
    bull = data.loc[data["imbalance"] >= hi, "fwd_return"].mean()
    bear = data.loc[data["imbalance"] <= lo, "fwd_return"].mean()
    signal_move = float((bull - bear) / 2)
    mean_spread = float(data["spread"].mean())

    return {
        "n": n,
        "corr_imbalance": corr_imb,
        "corr_microprice_edge": corr_edge,
        "hit_rate": hit_rate,
        "signal_move": signal_move,    # avg directional forward move captured
        "mean_spread": mean_spread,    # cost to round-trip
        "beats_spread": bool(signal_move > mean_spread),
    }


def render_report(r: dict) -> str:
    if r.get("n", 0) == 0:
        return "No data β€” run the collector to gather ticks first."
    verdict = ("signal exceeds the spread β€” worth modelling further"
               if r["beats_spread"]
               else "signal does NOT beat the spread β€” not tradeable as-is")
    return (
        "Microstructure signal study\n"
        f"  rows analysed          {r['n']:,}\n"
        f"  corr(imbalance, fwd)   {r['corr_imbalance']:+.4f}\n"
        f"  corr(microprice, fwd)  {r['corr_microprice_edge']:+.4f}\n"
        f"  directional hit rate   {r['hit_rate']:.3f}\n"
        f"  avg signal move        {r['signal_move']:.4f}\n"
        f"  avg spread (cost)      {r['mean_spread']:.4f}\n"
        f"  verdict: {verdict}"
    )


def main():
    from src import config  # noqa: F401  (configures root logging)
    ticks = load_ticks()
    logger.info("loaded %d ticks across %d markets",
                len(ticks), ticks["market_ticker"].nunique())
    data = build_dataset(ticks)
    print(render_report(evaluate_signal(data)))


if __name__ == "__main__":
    main()