Spaces:
Sleeping
Sleeping
| """ | |
| Polymarket Factor Dashboard β Gradio app for Hugging Face Spaces. | |
| Uses the public, non-trade Polymarket APIs: | |
| - Gamma API https://gamma-api.polymarket.com | |
| - CLOB API https://clob.polymarket.com | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import math | |
| from datetime import datetime, timezone | |
| from typing import Any | |
| import gradio as gr | |
| import numpy as np | |
| import pandas as pd | |
| import plotly.graph_objects as go | |
| import requests | |
| from plotly.subplots import make_subplots | |
| GAMMA = "https://gamma-api.polymarket.com" | |
| CLOB = "https://clob.polymarket.com" | |
| SESSION = requests.Session() | |
| SESSION.headers.update({"User-Agent": "polymarket-dashboard/1.0"}) | |
| TIMEOUT = 20 | |
| CATEGORIES = { | |
| "Top 24h Volume": {"order": "volume24hr", "ascending": False}, | |
| "Top Total Volume": {"order": "volume", "ascending": False}, | |
| "Top Liquidity": {"order": "liquidity", "ascending": False}, | |
| "Most Competitive": {"order": "competitive", "ascending": False}, | |
| "Breaking Hot (24h/total ratio)": {"order": "volume24hr", "ascending": False, "hot": True}, | |
| "Ending Soonest": {"order": "end_date", "ascending": True}, | |
| } | |
| # ---------- API helpers ---------- | |
| def fetch_events(order: str, ascending: bool, limit: int = 40) -> list[dict]: | |
| r = SESSION.get( | |
| f"{GAMMA}/events", | |
| params={ | |
| "active": "true", | |
| "closed": "false", | |
| "order": order, | |
| "ascending": str(ascending).lower(), | |
| "limit": limit, | |
| }, | |
| timeout=TIMEOUT, | |
| ) | |
| r.raise_for_status() | |
| return r.json() | |
| def fetch_price_history(token_id: str, interval: str = "1m", fidelity: int = 60) -> pd.DataFrame: | |
| """Return DataFrame with columns ['t', 'p'] where t is a pandas Timestamp.""" | |
| r = SESSION.get( | |
| f"{CLOB}/prices-history", | |
| params={"market": token_id, "interval": interval, "fidelity": fidelity}, | |
| timeout=TIMEOUT, | |
| ) | |
| r.raise_for_status() | |
| hist = r.json().get("history", []) | |
| if not hist: | |
| return pd.DataFrame(columns=["t", "p"]) | |
| df = pd.DataFrame(hist) | |
| df["t"] = pd.to_datetime(df["t"], unit="s", utc=True) | |
| df["p"] = df["p"].astype(float) | |
| return df.set_index("t").sort_index() | |
| # ---------- Data shaping ---------- | |
| def _parse_list(x: Any) -> list: | |
| if isinstance(x, list): | |
| return x | |
| if isinstance(x, str): | |
| try: | |
| return json.loads(x) | |
| except Exception: | |
| return [] | |
| return [] | |
| def flatten_markets(events: list[dict], hot: bool = False) -> pd.DataFrame: | |
| """One row per *market*, enriched with parent event info.""" | |
| rows = [] | |
| for evt in events: | |
| evt_title = evt.get("title", "") | |
| for m in evt.get("markets", []): | |
| vol = float(m.get("volumeNum") or m.get("volume") or 0) | |
| vol24 = float(m.get("volume24hr") or 0) | |
| liq = float(m.get("liquidityNum") or 0) | |
| token_ids = _parse_list(m.get("clobTokenIds")) | |
| outcomes = _parse_list(m.get("outcomes")) | |
| prices = _parse_list(m.get("outcomePrices")) | |
| if not token_ids or not outcomes: | |
| continue | |
| rows.append({ | |
| "event": evt_title, | |
| "question": m.get("question", ""), | |
| "slug": m.get("slug", ""), | |
| "vol_total": vol, | |
| "vol_24h": vol24, | |
| "liquidity": liq, | |
| "hot_ratio": (vol24 / vol) if vol > 0 else 0.0, | |
| "token_yes": token_ids[0] if len(token_ids) > 0 else None, | |
| "token_no": token_ids[1] if len(token_ids) > 1 else None, | |
| "outcomes": outcomes, | |
| "prices": [float(p) for p in prices] if prices else [], | |
| "end_date": m.get("endDate"), | |
| "condition_id": m.get("conditionId"), | |
| }) | |
| df = pd.DataFrame(rows) | |
| if df.empty: | |
| return df | |
| if hot: | |
| df = df[df["vol_total"] > 50_000] # filter noise | |
| df = df.sort_values("hot_ratio", ascending=False) | |
| return df | |
| def market_label(row: pd.Series) -> str: | |
| q = row["question"] | |
| if len(q) > 85: | |
| q = q[:82] + "β¦" | |
| return f'{q} β’ 24h ${row["vol_24h"]:,.0f} β’ liq ${row["liquidity"]:,.0f}' | |
| # ---------- Factor computations ---------- | |
| def hurst_exponent(series: np.ndarray) -> float: | |
| """R/S Hurst. H<0.5 mean-reverting, 0.5 random walk, >0.5 trending.""" | |
| series = np.asarray(series, dtype=float) | |
| n = len(series) | |
| if n < 20: | |
| return float("nan") | |
| lags = np.unique(np.logspace(0.7, np.log10(n // 2), 12).astype(int)) | |
| lags = lags[lags >= 2] | |
| rs = [] | |
| for lag in lags: | |
| chunks = n // lag | |
| if chunks < 1: | |
| continue | |
| vals = [] | |
| for i in range(chunks): | |
| seg = series[i * lag:(i + 1) * lag] | |
| mean = seg.mean() | |
| dev = seg - mean | |
| Z = np.cumsum(dev) | |
| R = Z.max() - Z.min() | |
| S = seg.std(ddof=0) | |
| if S > 0: | |
| vals.append(R / S) | |
| if vals: | |
| rs.append((lag, np.mean(vals))) | |
| if len(rs) < 4: | |
| return float("nan") | |
| lags_arr = np.log([r[0] for r in rs]) | |
| rs_arr = np.log([r[1] for r in rs]) | |
| slope, _ = np.polyfit(lags_arr, rs_arr, 1) | |
| return float(slope) | |
| def compute_factors(df: pd.DataFrame, sibling_series: list[pd.Series] | None = None) -> dict: | |
| """Momentum, mean-reversion, vol regime, correlation break.""" | |
| out: dict = {} | |
| if df.empty or len(df) < 20: | |
| return {"error": "Not enough history to compute factors."} | |
| p = df["p"].astype(float) | |
| # log-diff returns; avoid p=0 | |
| p_clip = p.clip(lower=1e-6, upper=1 - 1e-6) | |
| logit = np.log(p_clip / (1 - p_clip)) | |
| ret = logit.diff().dropna() | |
| # --- Momentum: 24h and 7d cumulative logit change, normalized | |
| now = df.index[-1] | |
| def window_change(hours: int) -> float: | |
| cutoff = now - pd.Timedelta(hours=hours) | |
| seg = logit.loc[logit.index >= cutoff] | |
| if len(seg) < 2: | |
| return float("nan") | |
| return float(seg.iloc[-1] - seg.iloc[0]) | |
| mom_24h = window_change(24) | |
| mom_7d = window_change(24 * 7) | |
| vol_all = float(ret.std()) if len(ret) > 2 else float("nan") | |
| mom_z_24h = mom_24h / (vol_all * math.sqrt(24)) if vol_all and not math.isnan(mom_24h) else float("nan") | |
| mom_z_7d = mom_7d / (vol_all * math.sqrt(24 * 7)) if vol_all and not math.isnan(mom_7d) else float("nan") | |
| out["momentum"] = { | |
| "24h_dlogit": mom_24h, | |
| "7d_dlogit": mom_7d, | |
| "24h_zscore": mom_z_24h, | |
| "7d_zscore": mom_z_7d, | |
| "label": _momentum_label(mom_z_24h, mom_z_7d), | |
| } | |
| # --- Mean reversion: Hurst + lag-1 autocorr of returns | |
| hurst = hurst_exponent(p.values) | |
| lag1 = float(ret.autocorr(lag=1)) if len(ret) > 10 else float("nan") | |
| out["mean_reversion"] = { | |
| "hurst": hurst, | |
| "lag1_autocorr": lag1, | |
| "label": _mr_label(hurst, lag1), | |
| } | |
| # --- Vol regime: recent vs baseline | |
| ret_24h = ret.loc[ret.index >= now - pd.Timedelta(hours=24)] | |
| ret_7d = ret.loc[ret.index >= now - pd.Timedelta(days=7)] | |
| vol_24h = float(ret_24h.std()) if len(ret_24h) > 2 else float("nan") | |
| vol_7d = float(ret_7d.std()) if len(ret_7d) > 2 else float("nan") | |
| ratio = (vol_24h / vol_7d) if vol_7d and vol_7d > 0 else float("nan") | |
| out["vol_regime"] = { | |
| "vol_24h": vol_24h, | |
| "vol_7d": vol_7d, | |
| "ratio": ratio, | |
| "label": _vol_label(ratio), | |
| } | |
| # --- Correlation break: rolling corr between this market and sibling composite | |
| out["corr_break"] = _corr_break(p, sibling_series) | |
| return out | |
| def _momentum_label(z24: float, z7: float) -> str: | |
| if any(math.isnan(x) for x in (z24, z7)): | |
| return "insufficient" | |
| if z24 > 1.5 and z7 > 0.5: return "STRONG UP" | |
| if z24 < -1.5 and z7 < -0.5: return "STRONG DOWN" | |
| if z24 > 0.7: return "up" | |
| if z24 < -0.7: return "down" | |
| return "flat" | |
| def _mr_label(hurst: float, lag1: float) -> str: | |
| if math.isnan(hurst): return "insufficient" | |
| if hurst < 0.4 or (not math.isnan(lag1) and lag1 < -0.15): | |
| return "MEAN-REVERTING" | |
| if hurst > 0.6: | |
| return "TRENDING" | |
| return "random walk" | |
| def _vol_label(ratio: float) -> str: | |
| if math.isnan(ratio): return "insufficient" | |
| if ratio > 1.6: return "HIGH VOL (regime shift up)" | |
| if ratio < 0.6: return "LOW VOL (calm)" | |
| return "normal" | |
| def _corr_break(p: pd.Series, sibs: list[pd.Series] | None) -> dict: | |
| if not sibs: | |
| return {"label": "no siblings", "corr_recent": None, "corr_baseline": None, "delta": None} | |
| # Align all on common index via forward fill | |
| frame = pd.concat([p.rename("self")] + [s.rename(f"sib{i}") for i, s in enumerate(sibs)], axis=1) | |
| frame = frame.ffill().dropna() | |
| if len(frame) < 48: | |
| return {"label": "insufficient", "corr_recent": None, "corr_baseline": None, "delta": None} | |
| composite = frame.drop(columns="self").mean(axis=1) | |
| merged = pd.concat([frame["self"], composite.rename("comp")], axis=1) | |
| baseline = merged.iloc[:-24].corr().iloc[0, 1] | |
| recent = merged.iloc[-24:].corr().iloc[0, 1] | |
| if pd.isna(baseline) or pd.isna(recent): | |
| return {"label": "insufficient", "corr_recent": None, "corr_baseline": None, "delta": None} | |
| delta = recent - baseline | |
| label = "CORR BREAK" if abs(delta) > 0.35 else "stable" | |
| return { | |
| "corr_recent": float(recent), | |
| "corr_baseline": float(baseline), | |
| "delta": float(delta), | |
| "label": label, | |
| } | |
| # ---------- Plotting ---------- | |
| def build_plot(df: pd.DataFrame, question: str, factors: dict) -> go.Figure: | |
| fig = make_subplots( | |
| rows=2, cols=1, shared_xaxes=True, | |
| row_heights=[0.65, 0.35], | |
| vertical_spacing=0.06, | |
| subplot_titles=("Implied Probability (%)", "Rolling Volatility of Logit Returns"), | |
| ) | |
| p = df["p"] * 100 | |
| fig.add_trace( | |
| go.Scatter(x=df.index, y=p, mode="lines", name="YES %", | |
| line=dict(color="#1f77b4", width=2)), | |
| row=1, col=1, | |
| ) | |
| # 24h rolling mean | |
| ma = p.rolling("24h").mean() | |
| fig.add_trace( | |
| go.Scatter(x=df.index, y=ma, mode="lines", name="24h MA", | |
| line=dict(color="#ff7f0e", width=1, dash="dash")), | |
| row=1, col=1, | |
| ) | |
| # Vol panel | |
| p_clip = df["p"].clip(1e-6, 1 - 1e-6) | |
| logit = np.log(p_clip / (1 - p_clip)) | |
| ret = logit.diff() | |
| roll_vol = ret.rolling("24h").std() | |
| fig.add_trace( | |
| go.Scatter(x=roll_vol.index, y=roll_vol, mode="lines", name="24h rolling Ο", | |
| line=dict(color="#d62728", width=1.5)), | |
| row=2, col=1, | |
| ) | |
| title = question if len(question) <= 110 else question[:107] + "β¦" | |
| fig.update_layout( | |
| title=title, | |
| height=620, | |
| hovermode="x unified", | |
| margin=dict(l=40, r=20, t=60, b=40), | |
| legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), | |
| template="plotly_white", | |
| ) | |
| fig.update_yaxes(title_text="Prob (%)", row=1, col=1, range=[0, 100]) | |
| fig.update_yaxes(title_text="Ο", row=2, col=1) | |
| return fig | |
| # ---------- Gradio glue ---------- | |
| MARKET_CACHE: dict[str, pd.DataFrame] = {} | |
| def refresh_markets(category: str): | |
| cfg = CATEGORIES[category] | |
| hot = cfg.get("hot", False) | |
| events = fetch_events(cfg["order"], cfg["ascending"], limit=40) | |
| df = flatten_markets(events, hot=hot) | |
| if df.empty: | |
| return gr.Dropdown(choices=[], value=None), "No markets found." | |
| df = df.head(30).reset_index(drop=True) | |
| MARKET_CACHE[category] = df | |
| labels = [market_label(r) for _, r in df.iterrows()] | |
| return gr.Dropdown(choices=labels, value=labels[0]), f"Loaded {len(df)} markets." | |
| def analyze_selected(category: str, label: str, interval: str, fidelity: int): | |
| if category not in MARKET_CACHE or not label: | |
| return None, "Refresh markets first, then pick one.", pd.DataFrame() | |
| df_markets = MARKET_CACHE[category] | |
| labels = [market_label(r) for _, r in df_markets.iterrows()] | |
| if label not in labels: | |
| return None, "Selection out of sync β refresh markets.", pd.DataFrame() | |
| row = df_markets.iloc[labels.index(label)] | |
| token_id = row["token_yes"] | |
| if not token_id: | |
| return None, "No YES token id.", pd.DataFrame() | |
| # Fetch target history | |
| df = fetch_price_history(token_id, interval=interval, fidelity=int(fidelity)) | |
| if df.empty: | |
| return None, "No price history returned for this market.", pd.DataFrame() | |
| # Fetch a few siblings from the same event for corr-break | |
| siblings = [] | |
| try: | |
| evt_title = row["event"] | |
| sibs = df_markets[df_markets["event"] == evt_title] | |
| sibs = sibs[sibs["question"] != row["question"]].head(4) | |
| for _, sr in sibs.iterrows(): | |
| try: | |
| sdf = fetch_price_history(sr["token_yes"], interval=interval, fidelity=int(fidelity)) | |
| if not sdf.empty: | |
| siblings.append(sdf["p"]) | |
| except Exception: | |
| continue | |
| except Exception: | |
| pass | |
| factors = compute_factors(df, sibling_series=siblings) | |
| fig = build_plot(df, row["question"], factors) | |
| # Build human summary | |
| summary = _format_summary(row, df, factors) | |
| # Factor table | |
| tbl = _factor_table(factors) | |
| return fig, summary, tbl | |
| def _format_summary(row: pd.Series, df: pd.DataFrame, factors: dict) -> str: | |
| last_p = float(df["p"].iloc[-1]) * 100 | |
| first_p = float(df["p"].iloc[0]) * 100 | |
| end = row.get("end_date") or "?" | |
| lines = [ | |
| f"### {row['question']}", | |
| f"**Event:** {row['event']}", | |
| f"**Current:** YES = {last_p:.1f}% β’ NO = {100 - last_p:.1f}%", | |
| f"**Period change:** {first_p:.1f}% β {last_p:.1f}% ({last_p - first_p:+.1f} pp)", | |
| f"**24h volume:** ${row['vol_24h']:,.0f} β’ **Total volume:** ${row['vol_total']:,.0f} β’ **Liquidity:** ${row['liquidity']:,.0f}", | |
| f"**Ends:** {end}", | |
| f"**Samples:** {len(df)}", | |
| ] | |
| if "error" not in factors: | |
| m = factors["momentum"]; mr = factors["mean_reversion"]; v = factors["vol_regime"]; c = factors["corr_break"] | |
| lines += [ | |
| "", | |
| "#### Factor signals", | |
| f"- **Momentum:** {m['label']} (24h z={m['24h_zscore']:+.2f}, 7d z={m['7d_zscore']:+.2f})", | |
| f"- **Mean reversion:** {mr['label']} (Hurst={mr['hurst']:.2f}, Οβ={mr['lag1_autocorr']:+.2f})", | |
| f"- **Vol regime:** {v['label']} (Οββh/Οβd = {v['ratio']:.2f})", | |
| f"- **Correlation vs event siblings:** {c['label']}" + ( | |
| f" (baseline={c['corr_baseline']:+.2f}, recent={c['corr_recent']:+.2f}, Ξ={c['delta']:+.2f})" | |
| if c.get("delta") is not None else "" | |
| ), | |
| ] | |
| return "\n".join(lines) | |
| def _factor_table(factors: dict) -> pd.DataFrame: | |
| if "error" in factors: | |
| return pd.DataFrame([{"factor": "β", "value": factors["error"], "signal": ""}]) | |
| m = factors["momentum"]; mr = factors["mean_reversion"]; v = factors["vol_regime"]; c = factors["corr_break"] | |
| rows = [ | |
| {"factor": "Momentum 24h (z)", "value": f"{m['24h_zscore']:+.2f}", "signal": m["label"]}, | |
| {"factor": "Momentum 7d (z)", "value": f"{m['7d_zscore']:+.2f}", "signal": ""}, | |
| {"factor": "Hurst exponent", "value": f"{mr['hurst']:.2f}", "signal": mr["label"]}, | |
| {"factor": "Lag-1 autocorr", "value": f"{mr['lag1_autocorr']:+.2f}", "signal": ""}, | |
| {"factor": "Οββh / Οβd", "value": f"{v['ratio']:.2f}", "signal": v["label"]}, | |
| {"factor": "Corr vs siblings (Ξ)", | |
| "value": f"{c['delta']:+.2f}" if c.get("delta") is not None else "n/a", | |
| "signal": c["label"]}, | |
| ] | |
| return pd.DataFrame(rows) | |
| # ---------- UI ---------- | |
| with gr.Blocks(title="Polymarket Factor Dashboard", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown( | |
| "# Polymarket Factor Dashboard\n" | |
| "Pick a category, load markets, then analyze any market's price behavior " | |
| "with momentum / mean-reversion / volatility-regime / correlation-break detection." | |
| ) | |
| with gr.Row(): | |
| category = gr.Dropdown( | |
| label="Category", | |
| choices=list(CATEGORIES.keys()), | |
| value="Top 24h Volume", | |
| scale=2, | |
| ) | |
| load_btn = gr.Button("Load markets", variant="primary", scale=1) | |
| status = gr.Markdown("", elem_id="status") | |
| market = gr.Dropdown(label="Market", choices=[], value=None, interactive=True) | |
| with gr.Row(): | |
| interval = gr.Dropdown( | |
| label="History interval", | |
| choices=["1h", "6h", "1d", "1w", "1m", "max"], | |
| value="1w", | |
| ) | |
| fidelity = gr.Number(label="Fidelity (minutes per sample)", value=60, precision=0) | |
| analyze_btn = gr.Button("Analyze", variant="primary") | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| plot = gr.Plot(label="Price & Volatility") | |
| with gr.Column(scale=2): | |
| summary = gr.Markdown("") | |
| table = gr.Dataframe( | |
| headers=["factor", "value", "signal"], | |
| label="Factor signals", | |
| interactive=False, | |
| wrap=True, | |
| ) | |
| load_btn.click( | |
| fn=refresh_markets, | |
| inputs=[category], | |
| outputs=[market, status], | |
| ) | |
| category.change( | |
| fn=refresh_markets, | |
| inputs=[category], | |
| outputs=[market, status], | |
| ) | |
| analyze_btn.click( | |
| fn=analyze_selected, | |
| inputs=[category, market, interval, fidelity], | |
| outputs=[plot, summary, table], | |
| ) | |
| # Auto-load on start | |
| demo.load( | |
| fn=refresh_markets, | |
| inputs=[category], | |
| outputs=[market, status], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |