""" main.py — Gradio interface with integrated ML probability filter. Pipeline: OHLCV Data │ ▼ Rule Engine (regime + volume + scoring + veto) │ ├─► Vetoed → skip (no ML call, save compute) │ └─► Approved by rules │ ▼ ML Filter (LightGBM / HGBM probability) │ ├─► prob < threshold → FILTERED (shown as ML_REJECT) │ └─► prob >= threshold → Risk Engine → Final setup │ ▼ Ranked output with ML prob overlay """ import logging import sys import time from typing import List, Optional, Dict, Any import gradio as gr from config import ( DEFAULT_SYMBOLS, TOP_N_DEFAULT, DEFAULT_ACCOUNT_EQUITY, TIMEFRAME, CANDLE_LIMIT, ) from data_fetcher import fetch_multiple, fetch_instruments from regime import detect_regime from volume_analysis import analyze_volume from risk_engine import evaluate_risk from veto import apply_veto, veto_summary from scorer import compute_structure_score, score_token, rank_tokens, format_score_bar, quality_tier from feature_builder import build_feature_dict, validate_features from ml_filter import TradeFilter logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", stream=sys.stdout, ) logger = logging.getLogger("main") # Load ML filter once at startup (None if not yet trained) _TRADE_FILTER: Optional[TradeFilter] = TradeFilter.load_or_none() _TREND_ICON = {"bullish": "▲", "ranging": "◆", "bearish": "▼"} _BREAK_LABEL = {1: "↑UP", -1: "↓DN", 0: " — "} _DIR_LABEL = {1: "LONG", -1: "SHORT", 0: "NONE"} def infer_direction(trend: str, breakout: int) -> int: if trend == "bullish" or breakout == 1: return 1 if trend == "bearish" or breakout == -1: return -1 return 0 def analyze_single( symbol: str, df, account_equity: float, consec_losses: int = 0, equity_drawdown_pct: float = 0.0, use_ml: bool = True, ) -> Dict[str, Any]: # ── RULE ENGINE ─────────────────────────────────────────────────────────── regime_data = detect_regime(df) volume_data = analyze_volume(df, atr_series=regime_data["atr_series"]) structure_sc = compute_structure_score(regime_data) direction = infer_direction(regime_data["trend"], volume_data["breakout"]) vetoed, veto_reason = apply_veto(regime_data, volume_data, structure_sc, direction=direction) scores = score_token(regime_data, volume_data, vetoed) # ── ML FILTER ───────────────────────────────────────────────────────────── ml_prob = None ml_approved = None ml_reject_reason = "" if use_ml and _TRADE_FILTER is not None and not vetoed: try: feat = build_feature_dict(regime_data, volume_data, scores) if validate_features(feat): result = _TRADE_FILTER.predict(regime_data, volume_data, scores) ml_prob = result.probability ml_approved = result.approved ml_reject_reason = result.reject_reason else: ml_approved = None # pass through if features invalid except Exception as e: logger.warning(f"{symbol}: ML filter error: {e}") ml_approved = None # ── RISK ENGINE ─────────────────────────────────────────────────────────── # Only compute full risk if not vetoed by rules AND not rejected by ML final_approved = ( not vetoed and (ml_approved is None or ml_approved) ) risk_data = evaluate_risk( close=float(df["close"].iloc[-1]), atr=regime_data["atr"], atr_pct=regime_data["atr_pct"], regime_score=regime_data["regime_score"], vol_ratio=regime_data["vol_ratio"], volume_score=volume_data["volume_score"], regime_confidence=regime_data["regime_confidence"], vol_compressed=regime_data["vol_compressed"], consec_losses=consec_losses, equity_drawdown_pct=equity_drawdown_pct, account_equity=account_equity, ) if final_approved else {} return { "symbol": symbol, "close": float(df["close"].iloc[-1]), "trend": regime_data["trend"], "adx": regime_data["adx"], "di_plus": regime_data["di_plus"], "di_minus": regime_data["di_minus"], "vol_ratio": regime_data["vol_ratio"], "vol_compressed": regime_data["vol_compressed"], "vol_expanding_from_base": regime_data["vol_expanding_from_base"], "vol_expanding": regime_data["vol_expanding"], "dist_atr": regime_data["dist_atr"], "price_extended": regime_data["price_extended_long"] or regime_data["price_extended_short"], "regime_confidence": regime_data["regime_confidence"], "spike": volume_data["spike"], "climax": volume_data["climax"], "absorption": volume_data["absorption"], "failed_breakout": volume_data["failed_breakout"], "recent_failed": volume_data["recent_failed_count"], "breakout": volume_data["breakout"], "obv_slope": volume_data["obv_slope_norm"], "delta_sign": volume_data["delta_sign"], "direction": direction, "rule_vetoed": vetoed, "veto_reason": veto_reason, "ml_prob": ml_prob, "ml_approved": ml_approved, "ml_reject_reason": ml_reject_reason, "final_approved": final_approved, "regime_score": scores["regime_score"], "volume_score": scores["volume_score"], "structure_score": scores["structure_score"], "confidence_score": scores["confidence_score"], "total_score": scores["total_score"], "risk": risk_data, } def _ml_status(d: Dict) -> str: if _TRADE_FILTER is None: return "NO_MODEL" if d["rule_vetoed"]: return "RULE_VET" if d["ml_prob"] is None: return "ML_ERR " prob_str = f"{d['ml_prob']:.3f}" return f"✓{prob_str}" if d["ml_approved"] else f"✗{prob_str}" def build_ranked_table(ranked: list, top_n: int) -> str: hdr = ( f"{'#':>3} {'Symbol':<14} {'Score':>7} {'Tier':>4} " f"{'Regime':>6} {'Vol':>6} {'S':>5} {'C':>5} " f"{'Trend':>7} {'ADX':>5} {'VR':>5} " f"{'ML':>8} {'Status'}\n" ) sep = "─" * 105 + "\n" rows = hdr + sep for rank, (sym, d) in enumerate(ranked[:top_n], 1): icon = _TREND_ICON.get(d["trend"], "?") tier = quality_tier(d["total_score"]) ml_str = _ml_status(d) if d["rule_vetoed"]: status = "RULE_VET" elif not d["final_approved"]: status = "ML_FILT " else: status = "OK " rows += ( f"{rank:>3} {sym:<14} {d['total_score']:>7.4f} {tier:>4} " f"{d['regime_score']:>6.3f} {d['volume_score']:>6.3f} " f"{d['structure_score']:>5.3f} {d['confidence_score']:>5.3f} " f"{icon} {d['trend']:<5} {d['adx']:>5.1f} {d['vol_ratio']:>5.2f} " f"{ml_str:>8} {status}\n" ) return rows def build_best_detail(data: Dict[str, Any]) -> str: r = data.get("risk", {}) sym = data["symbol"] icon = _TREND_ICON.get(data["trend"], "?") vol_state = [] if data["vol_compressed"]: vol_state.append("COMPRESSED") if data["vol_expanding_from_base"]: vol_state.append("EXPANDING FROM BASE ✓") if data["vol_expanding"] and not data["vol_expanding_from_base"]: vol_state.append("EXPANDING (no base)") vol_state_str = " | ".join(vol_state) or "NORMAL" ml_section = "" if _TRADE_FILTER is not None: prob_str = f"{data['ml_prob']:.4f}" if data["ml_prob"] is not None else "N/A" thresh_str = f"{_TRADE_FILTER.threshold:.4f}" decision = "APPROVED ✓" if data["ml_approved"] else "FILTERED ✗" ml_section = ( f"\n ── ML PROBABILITY FILTER ─────────────────────────\n" f" P(win): {prob_str}\n" f" Threshold: {thresh_str}\n" f" ML Decision: {decision}\n" ) risk_section = "" if r: risk_section = ( f"\n ── RISK PARAMETERS ────────────────────────────────\n" f" Entry: {r.get('entry_price', 0):.8f}\n" f" ATR: {r.get('atr', 0):.8f} ({r.get('atr_pct', 0):.3f}%)\n" f" Stop Mult: {r.get('stop_mult', 0):.1f}x ATR\n" f" LONG → Stop: {r.get('stop_long', 0):.8f} Target: {r.get('target_long', 0):.8f}\n" f" SHORT → Stop: {r.get('stop_short', 0):.8f} Target: {r.get('target_short', 0):.8f}\n" f" R:R Ratio: 1 : {r.get('rr_ratio', 2):.1f}\n" f" Risk Fraction: {r.get('risk_fraction', 0):.4f}%\n" f" $ At Risk: ${r.get('dollar_at_risk', 0):.2f}\n" f" Position Size: ${r.get('position_notional', 0):.2f} notional\n" f" Leverage (est): {r.get('leverage_implied', 0):.2f}x\n" f" Consec. Losses: {r.get('consec_losses', 0)}\n" f" Sizing Halted: {'YES ⛔' if r.get('sizing_halted') else 'no'}\n" ) lines = [ "═" * 64, f" BEST APPROVED SETUP: {sym} [{_DIR_LABEL.get(data['direction'], '?')}]", "═" * 64, f" Trend: {icon} {data['trend'].upper()}", f" ADX: {data['adx']:.1f} (DI+ {data['di_plus']:.1f} / DI- {data['di_minus']:.1f})", f" Vol State: {vol_state_str}", f" Dist from Mean: {data['dist_atr']:.2f} ATR", f" Regime Confidence:{data['regime_confidence']:.3f}", "", " ── SCORES ──────────────────────────────────────────", f" Regime: {format_score_bar(data['regime_score'])}", f" Volume: {format_score_bar(data['volume_score'])}", f" Structure: {format_score_bar(data['structure_score'])}", f" Confidence: {format_score_bar(data['confidence_score'])}", f" TOTAL: {format_score_bar(data['total_score'])}", ] lines.append(ml_section) lines.append(risk_section) lines.append("═" * 64) return "\n".join(lines) def parse_symbols(raw: str) -> List[str]: out = [] for tok in raw.replace(",", " ").replace("\n", " ").split(): tok = tok.strip().upper() if tok: out.append(tok if "-" in tok else f"{tok}-USDT") return out or DEFAULT_SYMBOLS def run_analysis( symbols_input: str, equity: float, consec_losses: int, drawdown_pct: float, top_n: int, use_live: bool, use_ml: bool, progress=gr.Progress(track_tqdm=False), ) -> str: t0 = time.time() lines = [] ml_status_str = "ACTIVE" if (_TRADE_FILTER is not None and use_ml) else ( "DISABLED" if not use_ml else "NOT TRAINED (run train.py)" ) lines += [ "━" * 68, " OKX QUANTITATIVE ANALYSIS ENGINE v3", f" ML Filter: {ml_status_str}", "━" * 68, ] if _TRADE_FILTER is not None and use_ml: lines.append(f" ML threshold: {_TRADE_FILTER.threshold:.4f} | Stats: {_TRADE_FILTER.stats()}") if use_live: lines.append("⟳ Fetching live OKX instrument list...") symbols = fetch_instruments("SPOT") or DEFAULT_SYMBOLS lines.append(f"✓ {len(symbols)} live USDT instruments") else: symbols = parse_symbols(symbols_input) lines.append(f"✓ {len(symbols)} symbol(s)") lines.append( f" Equity: ${equity:,.0f} | Losses: {int(consec_losses)}" f" | DD: {drawdown_pct:.1f}% | TF: {TIMEFRAME}" ) lines.append("") total = len(symbols) def prog_cb(i, t, sym): progress(i / t, desc=f"Fetching {sym} ({i}/{t})") ohlcv_map = fetch_multiple(symbols, min_bars=50, progress_callback=prog_cb) lines.append(f"✓ Fetched {len(ohlcv_map)}/{total}") lines.append("") all_results: Dict[str, Any] = {} errors = [] for sym, df in ohlcv_map.items(): try: all_results[sym] = analyze_single( sym, df, account_equity=equity, consec_losses=int(consec_losses), equity_drawdown_pct=drawdown_pct / 100.0, use_ml=use_ml, ) except Exception as exc: logger.error(f"{sym}: {exc}", exc_info=True) errors.append(sym) if errors: lines.append(f"⚠ Errors: {', '.join(errors)}") ranked = rank_tokens(all_results) rule_vetoed_n = sum(1 for _, d in ranked if d["rule_vetoed"]) ml_filtered_n = sum(1 for _, d in ranked if not d["rule_vetoed"] and not d["final_approved"]) approved_n = sum(1 for _, d in ranked if d["final_approved"]) lines += [ f" {len(all_results)} analyzed | {approved_n} approved | " f"{rule_vetoed_n} rule-vetoed | {ml_filtered_n} ML-filtered", "", " RANKED SETUPS", "─" * 105, build_ranked_table(ranked, int(top_n)), ] final_approved = [(s, d) for s, d in ranked if d["final_approved"]] if final_approved: best_sym, best_data = final_approved[0] lines += ["", build_best_detail(best_data)] else: lines += [ "", " ⚠ No fully approved setups.", " Possible causes: market regime unfavorable, ML model not trained,", " or all signals vetoed by rule engine.", ] if _TRADE_FILTER is not None and use_ml: lines += ["", f" ML session stats: {_TRADE_FILTER.stats()}"] lines += ["", f" ✓ Complete in {time.time() - t0:.1f}s", "━" * 68] return "\n".join(lines) def build_app() -> gr.Blocks: with gr.Blocks( title="OKX Quant Engine v3", theme=gr.themes.Base( primary_hue="slate", neutral_hue="zinc", font=[gr.themes.GoogleFont("JetBrains Mono"), "monospace"], ), css=""" body, .gradio-container { background: #060a10 !important; font-family: 'JetBrains Mono', monospace !important; max-width: 1280px !important; } .gr-button-primary { background: linear-gradient(90deg, #1a6bff, #0044cc) !important; border: none !important; font-weight: 700 !important; letter-spacing: 0.06em !important; } #output_box textarea { font-family: 'JetBrains Mono', monospace !important; font-size: 12px !important; line-height: 1.55 !important; background: #0a0e18 !important; color: #b0c4de !important; border: 1px solid #182030 !important; min-height: 740px !important; } label { color: #4a6080 !important; font-size: 11px !important; text-transform: uppercase !important; letter-spacing: 0.09em !important; } h1, h2 { color: #c0d4f0 !important; font-family: 'JetBrains Mono', monospace !important; } p { color: #384858 !important; font-size: 12px !important; } .gr-panel { background: #0c1020 !important; border: 1px solid #182030 !important; } """, ) as app: gr.Markdown("# ◈ OKX QUANT ENGINE v3") gr.Markdown( "ADX · absorption detection · volatility compression · " "fake breakout filter · **LightGBM probability layer** · adaptive risk" ) with gr.Row(): with gr.Column(scale=2): symbols_box = gr.Textbox( label="Symbols (comma / newline — blank = defaults)", placeholder="BTC-USDT, ETH-USDT, SOL-USDT ...", lines=4, value="", ) with gr.Column(scale=1): equity_slider = gr.Slider( label="Account Equity ($)", minimum=100, maximum=1_000_000, step=500, value=DEFAULT_ACCOUNT_EQUITY, ) top_n_slider = gr.Slider( label="Top N to Display", minimum=5, maximum=100, step=5, value=TOP_N_DEFAULT, ) with gr.Column(scale=1): consec_loss = gr.Slider(label="Consecutive Losses", minimum=0, maximum=10, step=1, value=0) drawdown = gr.Slider(label="Drawdown from Peak (%)", minimum=0.0, maximum=30.0, step=0.5, value=0.0) live_check = gr.Checkbox(label="Fetch live OKX instruments (100+)", value=False) ml_check = gr.Checkbox( label=f"Enable ML Filter (model: {'LOADED' if _TRADE_FILTER else 'NOT TRAINED'})", value=_TRADE_FILTER is not None, ) run_btn = gr.Button("▶ RUN ANALYSIS", variant="primary", size="lg") output_box = gr.Textbox( label="Analysis Output", lines=50, max_lines=150, interactive=False, elem_id="output_box", ) run_btn.click( fn=run_analysis, inputs=[symbols_box, equity_slider, consec_loss, drawdown, top_n_slider, live_check, ml_check], outputs=output_box, ) gr.Markdown( "**Research use only. Not financial advice.** " "Train the ML filter: `python train.py --use-defaults` | " "Re-optimize threshold: `python threshold_optimizer.py`" ) return app if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument("--port", type=int, default=7860) parser.add_argument("--share", action="store_true") a = parser.parse_args() build_app().launch(server_name="0.0.0.0", server_port=a.port, share=a.share, show_error=True)