File size: 6,262 Bytes
c5555e5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 | """Microstructure signal study: does order-book imbalance predict the next move?
This is the analysis stage that runs on the ticks gathered by ``collector.py``.
load_ticks() read the collected Parquet tick files into one frame
build_dataset() turn ticks into (features, forward-return) rows, with
strict no-lookahead and no cross-market leakage
evaluate_signal() measure whether book imbalance / microprice edge
predicts the forward mid move β and, crucially, whether
that move is large enough to survive the spread
The edge source under test: bid/ask size imbalance carries short-horizon
directional information. The honest question is not "is there a signal"
(there usually is, weakly) but "does it beat the cost of crossing the
spread to trade it." That comparison is the whole point.
Run on collected data: python -m src.microstructure.study
"""
import logging
from pathlib import Path
import numpy as np
import pandas as pd
logger = logging.getLogger(__name__)
DATA_DIR = Path("data/ticks")
HORIZON = 10 # forward window, measured in ticks
STRONG_QUANTILE = 0.2 # "strong signal" = the most extreme 20% of imbalance
def load_ticks(data_dir=DATA_DIR) -> pd.DataFrame:
"""Read every collected Parquet file into one time-sorted DataFrame."""
paths = sorted(Path(data_dir).glob("**/*.parquet"))
if not paths:
raise FileNotFoundError(
f"no Parquet files under {data_dir} β run the collector first"
)
df = pd.concat((pd.read_parquet(p) for p in paths), ignore_index=True)
df["ts"] = pd.to_datetime(df["ts"], utc=True)
return df.sort_values(["market_ticker", "ts"]).reset_index(drop=True)
def build_dataset(ticks: pd.DataFrame, horizon: int = HORIZON) -> pd.DataFrame:
"""Turn raw ticks into (features, label) rows.
The label is the mid move ``horizon`` ticks into the future. Features use
only same-tick information. Each market is processed on its own so a
label never reaches across markets, and the final ``horizon`` ticks of
each market are dropped β there is no future to label them with. Getting
this no-lookahead discipline right is the difference between a real
backtest and a fantasy one.
"""
parts = []
for ticker, g in ticks.groupby("market_ticker", sort=False):
g = g.reset_index(drop=True)
if len(g) <= horizon:
continue # too short to produce a single labelled row
fwd_mid = g["mid"].shift(-horizon)
part = pd.DataFrame({
"market_ticker": ticker,
"ts": g["ts"],
"imbalance": g["imbalance"] - 0.5, # signed: >0 bid-heavy
"microprice_edge": g["microprice"] - g["mid"], # signed pressure
"spread": g["spread"],
"mid": g["mid"],
"fwd_return": fwd_mid - g["mid"], # the label
}).iloc[:-horizon]
parts.append(part)
cols = ["market_ticker", "ts", "imbalance", "microprice_edge",
"spread", "mid", "fwd_return"]
if not parts:
return pd.DataFrame(columns=cols)
return pd.concat(parts, ignore_index=True).dropna().reset_index(drop=True)
def evaluate_signal(data: pd.DataFrame, strong_q: float = STRONG_QUANTILE) -> dict:
"""Descriptive test of the imbalance / microprice signal.
Every statistic here is parameter-free β the "signal" is just the sign of
the book pressure, so there is nothing to overfit and full-sample numbers
are honest. An ML model *fit* on these features would instead require a
strict out-of-sample time split.
"""
n = len(data)
if n == 0:
return {"n": 0}
imb = data["imbalance"].to_numpy()
edge = data["microprice_edge"].to_numpy()
fwd = data["fwd_return"].to_numpy()
corr_imb = float(np.corrcoef(imb, fwd)[0, 1]) if imb.std() else 0.0
corr_edge = float(np.corrcoef(edge, fwd)[0, 1]) if edge.std() else 0.0
# Directional hit rate, over rows where signal and move are both non-zero.
mask = (edge != 0) & (fwd != 0)
hit_rate = (float(np.mean(np.sign(edge[mask]) == np.sign(fwd[mask])))
if mask.any() else float("nan"))
# The honest part: among the strongest signals, how big is the forward
# move you'd capture β and is it bigger than the spread you must cross?
# Round-tripping a position pays the full spread, so the bar is
# signal_move > mean_spread.
hi = np.quantile(imb, 1 - strong_q)
lo = np.quantile(imb, strong_q)
bull = data.loc[data["imbalance"] >= hi, "fwd_return"].mean()
bear = data.loc[data["imbalance"] <= lo, "fwd_return"].mean()
signal_move = float((bull - bear) / 2)
mean_spread = float(data["spread"].mean())
return {
"n": n,
"corr_imbalance": corr_imb,
"corr_microprice_edge": corr_edge,
"hit_rate": hit_rate,
"signal_move": signal_move, # avg directional forward move captured
"mean_spread": mean_spread, # cost to round-trip
"beats_spread": bool(signal_move > mean_spread),
}
def render_report(r: dict) -> str:
if r.get("n", 0) == 0:
return "No data β run the collector to gather ticks first."
verdict = ("signal exceeds the spread β worth modelling further"
if r["beats_spread"]
else "signal does NOT beat the spread β not tradeable as-is")
return (
"Microstructure signal study\n"
f" rows analysed {r['n']:,}\n"
f" corr(imbalance, fwd) {r['corr_imbalance']:+.4f}\n"
f" corr(microprice, fwd) {r['corr_microprice_edge']:+.4f}\n"
f" directional hit rate {r['hit_rate']:.3f}\n"
f" avg signal move {r['signal_move']:.4f}\n"
f" avg spread (cost) {r['mean_spread']:.4f}\n"
f" verdict: {verdict}"
)
def main():
from src import config # noqa: F401 (configures root logging)
ticks = load_ticks()
logger.info("loaded %d ticks across %d markets",
len(ticks), ticks["market_ticker"].nunique())
data = build_dataset(ticks)
print(render_report(evaluate_signal(data)))
if __name__ == "__main__":
main()
|