"""Microstructure signal study: does order-book imbalance predict the next move? This is the analysis stage that runs on the ticks gathered by ``collector.py``. load_ticks() read the collected Parquet tick files into one frame build_dataset() turn ticks into (features, forward-return) rows, with strict no-lookahead and no cross-market leakage evaluate_signal() measure whether book imbalance / microprice edge predicts the forward mid move — and, crucially, whether that move is large enough to survive the spread The edge source under test: bid/ask size imbalance carries short-horizon directional information. The honest question is not "is there a signal" (there usually is, weakly) but "does it beat the cost of crossing the spread to trade it." That comparison is the whole point. Run on collected data: python -m src.microstructure.study """ import logging from pathlib import Path import numpy as np import pandas as pd logger = logging.getLogger(__name__) DATA_DIR = Path("data/ticks") HORIZON = 10 # forward window, measured in ticks STRONG_QUANTILE = 0.2 # "strong signal" = the most extreme 20% of imbalance def load_ticks(data_dir=DATA_DIR) -> pd.DataFrame: """Read every collected Parquet file into one time-sorted DataFrame.""" paths = sorted(Path(data_dir).glob("**/*.parquet")) if not paths: raise FileNotFoundError( f"no Parquet files under {data_dir} — run the collector first" ) df = pd.concat((pd.read_parquet(p) for p in paths), ignore_index=True) df["ts"] = pd.to_datetime(df["ts"], utc=True) return df.sort_values(["market_ticker", "ts"]).reset_index(drop=True) def build_dataset(ticks: pd.DataFrame, horizon: int = HORIZON) -> pd.DataFrame: """Turn raw ticks into (features, label) rows. The label is the mid move ``horizon`` ticks into the future. Features use only same-tick information. Each market is processed on its own so a label never reaches across markets, and the final ``horizon`` ticks of each market are dropped — there is no future to label them with. Getting this no-lookahead discipline right is the difference between a real backtest and a fantasy one. """ parts = [] for ticker, g in ticks.groupby("market_ticker", sort=False): g = g.reset_index(drop=True) if len(g) <= horizon: continue # too short to produce a single labelled row fwd_mid = g["mid"].shift(-horizon) part = pd.DataFrame({ "market_ticker": ticker, "ts": g["ts"], "imbalance": g["imbalance"] - 0.5, # signed: >0 bid-heavy "microprice_edge": g["microprice"] - g["mid"], # signed pressure "spread": g["spread"], "mid": g["mid"], "fwd_return": fwd_mid - g["mid"], # the label }).iloc[:-horizon] parts.append(part) cols = ["market_ticker", "ts", "imbalance", "microprice_edge", "spread", "mid", "fwd_return"] if not parts: return pd.DataFrame(columns=cols) return pd.concat(parts, ignore_index=True).dropna().reset_index(drop=True) def evaluate_signal(data: pd.DataFrame, strong_q: float = STRONG_QUANTILE) -> dict: """Descriptive test of the imbalance / microprice signal. Every statistic here is parameter-free — the "signal" is just the sign of the book pressure, so there is nothing to overfit and full-sample numbers are honest. An ML model *fit* on these features would instead require a strict out-of-sample time split. """ n = len(data) if n == 0: return {"n": 0} imb = data["imbalance"].to_numpy() edge = data["microprice_edge"].to_numpy() fwd = data["fwd_return"].to_numpy() corr_imb = float(np.corrcoef(imb, fwd)[0, 1]) if imb.std() else 0.0 corr_edge = float(np.corrcoef(edge, fwd)[0, 1]) if edge.std() else 0.0 # Directional hit rate, over rows where signal and move are both non-zero. mask = (edge != 0) & (fwd != 0) hit_rate = (float(np.mean(np.sign(edge[mask]) == np.sign(fwd[mask]))) if mask.any() else float("nan")) # The honest part: among the strongest signals, how big is the forward # move you'd capture — and is it bigger than the spread you must cross? # Round-tripping a position pays the full spread, so the bar is # signal_move > mean_spread. hi = np.quantile(imb, 1 - strong_q) lo = np.quantile(imb, strong_q) bull = data.loc[data["imbalance"] >= hi, "fwd_return"].mean() bear = data.loc[data["imbalance"] <= lo, "fwd_return"].mean() signal_move = float((bull - bear) / 2) mean_spread = float(data["spread"].mean()) return { "n": n, "corr_imbalance": corr_imb, "corr_microprice_edge": corr_edge, "hit_rate": hit_rate, "signal_move": signal_move, # avg directional forward move captured "mean_spread": mean_spread, # cost to round-trip "beats_spread": bool(signal_move > mean_spread), } def render_report(r: dict) -> str: if r.get("n", 0) == 0: return "No data — run the collector to gather ticks first." verdict = ("signal exceeds the spread — worth modelling further" if r["beats_spread"] else "signal does NOT beat the spread — not tradeable as-is") return ( "Microstructure signal study\n" f" rows analysed {r['n']:,}\n" f" corr(imbalance, fwd) {r['corr_imbalance']:+.4f}\n" f" corr(microprice, fwd) {r['corr_microprice_edge']:+.4f}\n" f" directional hit rate {r['hit_rate']:.3f}\n" f" avg signal move {r['signal_move']:.4f}\n" f" avg spread (cost) {r['mean_spread']:.4f}\n" f" verdict: {verdict}" ) def main(): from src import config # noqa: F401 (configures root logging) ticks = load_ticks() logger.info("loaded %d ticks across %d markets", len(ticks), ticks["market_ticker"].nunique()) data = build_dataset(ticks) print(render_report(evaluate_signal(data))) if __name__ == "__main__": main()