| """Microstructure signal study: does order-book imbalance predict the next move? |
| |
| This is the analysis stage that runs on the ticks gathered by ``collector.py``. |
| |
| load_ticks() read the collected Parquet tick files into one frame |
| build_dataset() turn ticks into (features, forward-return) rows, with |
| strict no-lookahead and no cross-market leakage |
| evaluate_signal() measure whether book imbalance / microprice edge |
| predicts the forward mid move β and, crucially, whether |
| that move is large enough to survive the spread |
| |
| The edge source under test: bid/ask size imbalance carries short-horizon |
| directional information. The honest question is not "is there a signal" |
| (there usually is, weakly) but "does it beat the cost of crossing the |
| spread to trade it." That comparison is the whole point. |
| |
| Run on collected data: python -m src.microstructure.study |
| """ |
|
|
| import logging |
| from pathlib import Path |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| logger = logging.getLogger(__name__) |
|
|
| DATA_DIR = Path("data/ticks") |
| HORIZON = 10 |
| STRONG_QUANTILE = 0.2 |
|
|
|
|
| def load_ticks(data_dir=DATA_DIR) -> pd.DataFrame: |
| """Read every collected Parquet file into one time-sorted DataFrame.""" |
| paths = sorted(Path(data_dir).glob("**/*.parquet")) |
| if not paths: |
| raise FileNotFoundError( |
| f"no Parquet files under {data_dir} β run the collector first" |
| ) |
| df = pd.concat((pd.read_parquet(p) for p in paths), ignore_index=True) |
| df["ts"] = pd.to_datetime(df["ts"], utc=True) |
| return df.sort_values(["market_ticker", "ts"]).reset_index(drop=True) |
|
|
|
|
| def build_dataset(ticks: pd.DataFrame, horizon: int = HORIZON) -> pd.DataFrame: |
| """Turn raw ticks into (features, label) rows. |
| |
| The label is the mid move ``horizon`` ticks into the future. Features use |
| only same-tick information. Each market is processed on its own so a |
| label never reaches across markets, and the final ``horizon`` ticks of |
| each market are dropped β there is no future to label them with. Getting |
| this no-lookahead discipline right is the difference between a real |
| backtest and a fantasy one. |
| """ |
| parts = [] |
| for ticker, g in ticks.groupby("market_ticker", sort=False): |
| g = g.reset_index(drop=True) |
| if len(g) <= horizon: |
| continue |
| fwd_mid = g["mid"].shift(-horizon) |
| part = pd.DataFrame({ |
| "market_ticker": ticker, |
| "ts": g["ts"], |
| "imbalance": g["imbalance"] - 0.5, |
| "microprice_edge": g["microprice"] - g["mid"], |
| "spread": g["spread"], |
| "mid": g["mid"], |
| "fwd_return": fwd_mid - g["mid"], |
| }).iloc[:-horizon] |
| parts.append(part) |
| cols = ["market_ticker", "ts", "imbalance", "microprice_edge", |
| "spread", "mid", "fwd_return"] |
| if not parts: |
| return pd.DataFrame(columns=cols) |
| return pd.concat(parts, ignore_index=True).dropna().reset_index(drop=True) |
|
|
|
|
| def evaluate_signal(data: pd.DataFrame, strong_q: float = STRONG_QUANTILE) -> dict: |
| """Descriptive test of the imbalance / microprice signal. |
| |
| Every statistic here is parameter-free β the "signal" is just the sign of |
| the book pressure, so there is nothing to overfit and full-sample numbers |
| are honest. An ML model *fit* on these features would instead require a |
| strict out-of-sample time split. |
| """ |
| n = len(data) |
| if n == 0: |
| return {"n": 0} |
|
|
| imb = data["imbalance"].to_numpy() |
| edge = data["microprice_edge"].to_numpy() |
| fwd = data["fwd_return"].to_numpy() |
|
|
| corr_imb = float(np.corrcoef(imb, fwd)[0, 1]) if imb.std() else 0.0 |
| corr_edge = float(np.corrcoef(edge, fwd)[0, 1]) if edge.std() else 0.0 |
|
|
| |
| mask = (edge != 0) & (fwd != 0) |
| hit_rate = (float(np.mean(np.sign(edge[mask]) == np.sign(fwd[mask]))) |
| if mask.any() else float("nan")) |
|
|
| |
| |
| |
| |
| hi = np.quantile(imb, 1 - strong_q) |
| lo = np.quantile(imb, strong_q) |
| bull = data.loc[data["imbalance"] >= hi, "fwd_return"].mean() |
| bear = data.loc[data["imbalance"] <= lo, "fwd_return"].mean() |
| signal_move = float((bull - bear) / 2) |
| mean_spread = float(data["spread"].mean()) |
|
|
| return { |
| "n": n, |
| "corr_imbalance": corr_imb, |
| "corr_microprice_edge": corr_edge, |
| "hit_rate": hit_rate, |
| "signal_move": signal_move, |
| "mean_spread": mean_spread, |
| "beats_spread": bool(signal_move > mean_spread), |
| } |
|
|
|
|
| def render_report(r: dict) -> str: |
| if r.get("n", 0) == 0: |
| return "No data β run the collector to gather ticks first." |
| verdict = ("signal exceeds the spread β worth modelling further" |
| if r["beats_spread"] |
| else "signal does NOT beat the spread β not tradeable as-is") |
| return ( |
| "Microstructure signal study\n" |
| f" rows analysed {r['n']:,}\n" |
| f" corr(imbalance, fwd) {r['corr_imbalance']:+.4f}\n" |
| f" corr(microprice, fwd) {r['corr_microprice_edge']:+.4f}\n" |
| f" directional hit rate {r['hit_rate']:.3f}\n" |
| f" avg signal move {r['signal_move']:.4f}\n" |
| f" avg spread (cost) {r['mean_spread']:.4f}\n" |
| f" verdict: {verdict}" |
| ) |
|
|
|
|
| def main(): |
| from src import config |
| ticks = load_ticks() |
| logger.info("loaded %d ticks across %d markets", |
| len(ticks), ticks["market_ticker"].nunique()) |
| data = build_dataset(ticks) |
| print(render_report(evaluate_signal(data))) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|