AJAY KASU
Add microstructure signal study pipeline
c5555e5
Raw
History Blame Contribute Delete
6.26 kB
"""Microstructure signal study: does order-book imbalance predict the next move?
This is the analysis stage that runs on the ticks gathered by ``collector.py``.
load_ticks() read the collected Parquet tick files into one frame
build_dataset() turn ticks into (features, forward-return) rows, with
strict no-lookahead and no cross-market leakage
evaluate_signal() measure whether book imbalance / microprice edge
predicts the forward mid move β€” and, crucially, whether
that move is large enough to survive the spread
The edge source under test: bid/ask size imbalance carries short-horizon
directional information. The honest question is not "is there a signal"
(there usually is, weakly) but "does it beat the cost of crossing the
spread to trade it." That comparison is the whole point.
Run on collected data: python -m src.microstructure.study
"""
import logging
from pathlib import Path
import numpy as np
import pandas as pd
logger = logging.getLogger(__name__)
DATA_DIR = Path("data/ticks")
HORIZON = 10 # forward window, measured in ticks
STRONG_QUANTILE = 0.2 # "strong signal" = the most extreme 20% of imbalance
def load_ticks(data_dir=DATA_DIR) -> pd.DataFrame:
"""Read every collected Parquet file into one time-sorted DataFrame."""
paths = sorted(Path(data_dir).glob("**/*.parquet"))
if not paths:
raise FileNotFoundError(
f"no Parquet files under {data_dir} β€” run the collector first"
)
df = pd.concat((pd.read_parquet(p) for p in paths), ignore_index=True)
df["ts"] = pd.to_datetime(df["ts"], utc=True)
return df.sort_values(["market_ticker", "ts"]).reset_index(drop=True)
def build_dataset(ticks: pd.DataFrame, horizon: int = HORIZON) -> pd.DataFrame:
"""Turn raw ticks into (features, label) rows.
The label is the mid move ``horizon`` ticks into the future. Features use
only same-tick information. Each market is processed on its own so a
label never reaches across markets, and the final ``horizon`` ticks of
each market are dropped β€” there is no future to label them with. Getting
this no-lookahead discipline right is the difference between a real
backtest and a fantasy one.
"""
parts = []
for ticker, g in ticks.groupby("market_ticker", sort=False):
g = g.reset_index(drop=True)
if len(g) <= horizon:
continue # too short to produce a single labelled row
fwd_mid = g["mid"].shift(-horizon)
part = pd.DataFrame({
"market_ticker": ticker,
"ts": g["ts"],
"imbalance": g["imbalance"] - 0.5, # signed: >0 bid-heavy
"microprice_edge": g["microprice"] - g["mid"], # signed pressure
"spread": g["spread"],
"mid": g["mid"],
"fwd_return": fwd_mid - g["mid"], # the label
}).iloc[:-horizon]
parts.append(part)
cols = ["market_ticker", "ts", "imbalance", "microprice_edge",
"spread", "mid", "fwd_return"]
if not parts:
return pd.DataFrame(columns=cols)
return pd.concat(parts, ignore_index=True).dropna().reset_index(drop=True)
def evaluate_signal(data: pd.DataFrame, strong_q: float = STRONG_QUANTILE) -> dict:
"""Descriptive test of the imbalance / microprice signal.
Every statistic here is parameter-free β€” the "signal" is just the sign of
the book pressure, so there is nothing to overfit and full-sample numbers
are honest. An ML model *fit* on these features would instead require a
strict out-of-sample time split.
"""
n = len(data)
if n == 0:
return {"n": 0}
imb = data["imbalance"].to_numpy()
edge = data["microprice_edge"].to_numpy()
fwd = data["fwd_return"].to_numpy()
corr_imb = float(np.corrcoef(imb, fwd)[0, 1]) if imb.std() else 0.0
corr_edge = float(np.corrcoef(edge, fwd)[0, 1]) if edge.std() else 0.0
# Directional hit rate, over rows where signal and move are both non-zero.
mask = (edge != 0) & (fwd != 0)
hit_rate = (float(np.mean(np.sign(edge[mask]) == np.sign(fwd[mask])))
if mask.any() else float("nan"))
# The honest part: among the strongest signals, how big is the forward
# move you'd capture β€” and is it bigger than the spread you must cross?
# Round-tripping a position pays the full spread, so the bar is
# signal_move > mean_spread.
hi = np.quantile(imb, 1 - strong_q)
lo = np.quantile(imb, strong_q)
bull = data.loc[data["imbalance"] >= hi, "fwd_return"].mean()
bear = data.loc[data["imbalance"] <= lo, "fwd_return"].mean()
signal_move = float((bull - bear) / 2)
mean_spread = float(data["spread"].mean())
return {
"n": n,
"corr_imbalance": corr_imb,
"corr_microprice_edge": corr_edge,
"hit_rate": hit_rate,
"signal_move": signal_move, # avg directional forward move captured
"mean_spread": mean_spread, # cost to round-trip
"beats_spread": bool(signal_move > mean_spread),
}
def render_report(r: dict) -> str:
if r.get("n", 0) == 0:
return "No data β€” run the collector to gather ticks first."
verdict = ("signal exceeds the spread β€” worth modelling further"
if r["beats_spread"]
else "signal does NOT beat the spread β€” not tradeable as-is")
return (
"Microstructure signal study\n"
f" rows analysed {r['n']:,}\n"
f" corr(imbalance, fwd) {r['corr_imbalance']:+.4f}\n"
f" corr(microprice, fwd) {r['corr_microprice_edge']:+.4f}\n"
f" directional hit rate {r['hit_rate']:.3f}\n"
f" avg signal move {r['signal_move']:.4f}\n"
f" avg spread (cost) {r['mean_spread']:.4f}\n"
f" verdict: {verdict}"
)
def main():
from src import config # noqa: F401 (configures root logging)
ticks = load_ticks()
logger.info("loaded %d ticks across %d markets",
len(ticks), ticks["market_ticker"].nunique())
data = build_dataset(ticks)
print(render_report(evaluate_signal(data)))
if __name__ == "__main__":
main()