Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β K1RL QUASAR β HUB DASHBOARD SERVICE (with Trade Log Parser) β FIXED v2.6 β | |
| β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β | |
| β Architecture role: READ-ONLY subscriber β serves dashboard UI β | |
| β VERSION: v2.6 (REALTIME SIGNALS) | 2026-04-26 β | |
| β β | |
| β v2.6 β Wires the dashboard to the hub's actual broadcast contracts: β | |
| β β MetricsSubscriber β /ws/subscribe β properly unwraps msg["asset"]["snapshot"] β | |
| β and reads voting.flip_direction / buy_count / sell_count / last_price plus β | |
| β training.* fields. Previously v2.5 read flat keys that the hub never emits, β | |
| β so AssetSnapshot defaulted to zero on every message. β | |
| β β SignalSubscriber β /ws/signals β NEW. Consumes the per-tick realtime channel β | |
| β (signal_snapshot + signal_delta), drops out-of-order seq, stores the latest β | |
| β {action, price, ts, seq, source} per asset. β | |
| β β DashboardState.get_state() merges both streams. flip_direction reflects the β | |
| β fresh realtime tick (β€1 s old) so the BUY-after-SELL-streak case is visible β | |
| β within ~30 ms of the V75 tick β same latency budget as the ranker. Falls back β | |
| β to the cumulative voting direction when the realtime stream is silent/stale. β | |
| β β Full per-asset payload (buy/sell counts, training_steps, actor/critic/avn β | |
| β loss, avn_accuracy, signal_confidence, score) so the HTML table populates β | |
| β instead of rendering empty cells. β | |
| β β Backward-compat alias: HubSubscriberClient = MetricsSubscriberClient. β | |
| β β | |
| β Carried over from v2.5: β | |
| β β Default port 8051β7860 (HF Spaces only exposes port 7860) β | |
| β β WebSocket ws://host:7860 β wss://host (HF Spaces TLS proxy) β | |
| β β _find_files() searches all likely HF Spaces log paths β | |
| β β /api/debug endpoint for live diagnostics β | |
| β β All /api/ranker/logs/* routes inline β no Blueprint dependency β | |
| β β Training KPI enrichment (_enrich_training) applied on /recent β | |
| β β Rotated log files (*.log, *.log.1, *.log.2, etc.) included β | |
| β β Improved regex to catch all trade close formats β | |
| β β Unrealized P&L tracking for open positions β | |
| β β | |
| β DEPLOYMENT: Just restart the service β routes are already inline in this file. β | |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| """ | |
| import json | |
| import logging | |
| import os | |
| import sys | |
| import re | |
| import glob | |
| import threading | |
| import time | |
| from collections import deque, defaultdict | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Dict, List, Optional | |
| import websocket | |
| from flask import Flask, jsonify, request, send_from_directory, send_file | |
| from flask_cors import CORS | |
| # ββ Logging βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", | |
| datefmt="%Y-%m-%d %H:%M:%S", | |
| stream=sys.stdout, | |
| ) | |
| logger = logging.getLogger("HubDashboardService") | |
| # ββ Config ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _HUB_HOST = os.environ.get("QUASAR_HUB_HOST", "karlquant-quasar-executo.hf.space") | |
| _DASHBOARD_PORT = int(os.environ.get("DASHBOARD_PORT", "7860")) | |
| _HTML_PATH = os.environ.get( | |
| "DASHBOARD_HTML", | |
| str(Path(__file__).parent / "hub_dashboard.html"), | |
| ) | |
| _LOG_DIR = os.environ.get("RANKER_LOG_DIR", "/app/ranker_logs") | |
| _METRIC_HISTORY_LEN = int(os.environ.get("QUASAR_METRIC_HISTORY", "200")) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SECTION 1 β TRADE LOG PARSER (FIXED v2.2) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TradeLogParser: | |
| """ | |
| Tails ranker log files and maintains open/closed trade state. | |
| Runs in a background thread, refreshing every 2 seconds. | |
| FIXED v2.2: | |
| β FIX #1: Now reads *.log* pattern to include rotated files (.log.1, .log.2, etc.) | |
| β FIX #2: Improved regex to catch all trade close formats (normal, fallback, timeout) | |
| β FIX #3: Tracks unrealized P&L for open positions | |
| Expected log format from ranker_logging.py: | |
| [2026-03-30 16:20:40] | INFO | TRADE | CRASH500 | TRADE OPENED | ID=CRASH500_123 | Dir=long | Entry=3524.6485 | Qty=0.000284 | |
| [2026-03-30 16:20:39] | INFO | TRADE | CRASH500 | TRADE CLOSED | ID=CRASH500_456 | pnl=-3.5246 | return=+0.01% | |
| [2026-03-30 16:20:45] | INFO | TRADE | V75 | Closed V75 (no-cid fallback) | reward=... | pnl=-2.0 | |
| [2026-03-30 16:20:50] | INFO | TRADE | CRASH500 | TRADE FORCE-CLOSED (timeout) | reward=... | profit=-1.5 | |
| """ | |
| # Regex patterns matching the actual log format from ranker_logging.py: | |
| TRADE_OPEN_RE = re.compile( | |
| r'TRADE OPENED \| ID=(\S+) \| Dir=(\w+) \| Entry=([\d.]+) \| Qty=([\d.]+)' | |
| ) | |
| TRADE_OPEN_RE_NOQTY = re.compile( | |
| r'TRADE OPENED \| ID=(\S+) \| Dir=(\w+) \| Entry=([\d.]+)' | |
| ) | |
| # FIXED v2.2: Improved regex to catch ALL trade close formats | |
| # Matches: "TRADE CLOSED | ID=xxx | pnl=X" | |
| # "no-cid fallback) | ... | pnl=X" | |
| # "FORCE-CLOSED (timeout) | ... | pnl=X" | |
| # "profit=X" (alternative field name) | |
| TRADE_CLOSE_RE = re.compile( | |
| r'(?:TRADE CLOSED|no-cid fallback|FORCE-CLOSED.*?timeout).*?(?:pnl|profit)=([+-]?[\d.]+)' | |
| ) | |
| # FIX v2.3: Dual-format regex for trade close lines that carry exit_price. | |
| # Root cause (v2.2 bug): ranker_logging.trade_close() wrote exit_price ONLY | |
| # into the trailing JSON metadata blob, e.g.: | |
| # TRADE CLOSED | ID=... | pnl=... | return=...% | {"exit_price": 4364.21} | |
| # but this regex looked for it as a pipe-delimited text field: | |
| # TRADE CLOSED | ID=... | pnl=... | exit_price=... <- never present | |
| # so TRADE_CLOSE_RE_WITH_EXIT never matched and exit_price was always None. | |
| # | |
| # Fix has TWO parts: | |
| # 1. ranker_logging.py now writes exit_price into the message text too. | |
| # 2. This regex matches BOTH formats so old log files still parse correctly: | |
| # Group 3 - pipe-delimited text field (new format, post-fix) | |
| # Group 4 - JSON metadata field (old format, pre-fix) | |
| TRADE_CLOSE_RE_WITH_EXIT = re.compile( | |
| r'TRADE CLOSED \| ID=(\S+) \| pnl=([+-]?[\d.]+)' | |
| r'.*?(?:\| exit_price=([\d.]+)|"exit_price":\s*([\d.]+))' | |
| ) | |
| # Fallback for Line 1: pnl + return%, no exit_price | |
| TRADE_CLOSE_RE_STRICT = re.compile( | |
| r'TRADE CLOSED \| ID=(\S+) \| pnl=([+-]?[\d.]+)' | |
| ) | |
| TRADE_CLOSE_RE_FALLBACK = re.compile( | |
| r'no-cid fallback.*?pnl=([+-]?[\d.]+)' | |
| ) | |
| TRADE_CLOSE_RE_TIMEOUT = re.compile( | |
| r'FORCE-CLOSED.*?timeout.*?(?:pnl|profit)=([+-]?[\d.]+)' | |
| ) | |
| # Safety-net: Rotation close lines appear BEFORE the TRADE CLOSED lines and | |
| # contain the underlying asset exit price. Format (from ranker log): | |
| # [Rotation] π€ Closing V75 β no longer in top-3 | price=33999.8690 | trade_id=V75_xxx | |
| # We store this price on the open-trade record so it's available when the | |
| # TRADE CLOSED line arrives. If Line 2 (exit_price=) also arrives, it takes | |
| # precedence (same value, but more authoritative). | |
| ROTATION_CLOSE_RE = re.compile( | |
| r'\[Rotation\].*?Closing\b.*?\|\s*price=([\d.]+).*?\|\s*trade_id=(\S+)' | |
| ) | |
| # Asset sits between the 4th and 5th pipe-separated fields: | |
| # "[ts] | LEVEL | TRADE | <ASSET> | ..." | |
| TRADE_ASSET_RE = re.compile(r'\|\s*TRADE\s*\|\s*(\w+)\s*\|') | |
| def __init__(self, log_dir: str = _LOG_DIR): | |
| self.log_dir = Path(log_dir) | |
| self._open: Dict[str, dict] = {} | |
| self._closed: List[dict] = [] | |
| self._last_pos: Dict[str, int] = {} | |
| self._lock = threading.RLock() | |
| self._stats = { | |
| "total_opened": 0, | |
| "total_closed": 0, | |
| "total_pnl": 0.0, | |
| "win_count": 0, | |
| "loss_count": 0, | |
| "unrealized_pnl": 0.0, # NEW: Track unrealized P&L from open positions | |
| } | |
| self._running = False | |
| self._thread: Optional[threading.Thread] = None | |
| # Create log directory if it doesn't exist | |
| self.log_dir.mkdir(parents=True, exist_ok=True) | |
| logger.info(f"[TradeLogParser] Initialized | log_dir={self.log_dir}") | |
| def start_background(self, interval: float = 2.0) -> None: | |
| """Launch a daemon thread that calls refresh() every `interval` seconds.""" | |
| if self._running: | |
| return | |
| self._running = True | |
| self._thread = threading.Thread(target=self._loop, daemon=True, name="TradeLogParser") | |
| self._thread.start() | |
| logger.info(f"[TradeLogParser] Started β watching {self.log_dir} (interval={interval}s)") | |
| def _loop(self) -> None: | |
| """Background loop.""" | |
| while self._running: | |
| try: | |
| self.refresh() | |
| except Exception as e: | |
| logger.error(f"[TradeLogParser] refresh error: {e}") | |
| time.sleep(2.0) | |
| def refresh(self) -> None: | |
| """ | |
| Find all log files, read new lines since last position. | |
| FIXED v2.2: Now uses *.log* pattern to include rotated files. | |
| On first call for each file, always scan from the beginning so trades | |
| that were written before the service started are not missed. | |
| """ | |
| # FIX #1: Changed from "*.log" to "*.log*" to include rotated files | |
| pattern = str(self.log_dir / "*.log*") | |
| files = sorted(glob.glob(pattern)) | |
| if not files: | |
| # Also check for .txt files as fallback | |
| pattern = str(self.log_dir / "*.txt") | |
| files = sorted(glob.glob(pattern)) | |
| for fpath in files: | |
| self._tail_file(fpath) | |
| def _tail_file(self, fpath: str) -> None: | |
| """Read only new bytes from fpath since last call. | |
| First encounter: start from byte 0 (full scan) so pre-existing trades are loaded.""" | |
| try: | |
| size = os.path.getsize(fpath) | |
| except OSError: | |
| return | |
| # Use 0 as default so a file seen for the first time is fully scanned | |
| last = self._last_pos.get(fpath, 0) | |
| if size <= last: | |
| return | |
| try: | |
| with open(fpath, "r", encoding="utf-8", errors="replace") as f: | |
| f.seek(last) | |
| new_lines = f.readlines() | |
| self._last_pos[fpath] = f.tell() | |
| except OSError: | |
| return | |
| for line in new_lines: | |
| self._parse_line(line) | |
| def _parse_line(self, line: str) -> None: | |
| """Extract trade events from a single log line.""" | |
| # Extract asset from the line (if present) | |
| asset_match = self.TRADE_ASSET_RE.search(line) | |
| asset = asset_match.group(1) if asset_match else None | |
| # ββ TRADE OPENED βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| m = self.TRADE_OPEN_RE.search(line) | |
| if m: | |
| trade_id, direction, entry, qty = m.group(1), m.group(2), float(m.group(3)), float(m.group(4)) | |
| else: | |
| m2 = self.TRADE_OPEN_RE_NOQTY.search(line) | |
| if m2: | |
| trade_id, direction, entry, qty = m2.group(1), m2.group(2), float(m2.group(3)), 0.0 | |
| else: | |
| m2 = None | |
| m = m2 # unify the branch below | |
| if m: | |
| direction = direction.upper() | |
| ts = self._parse_timestamp(line) | |
| with self._lock: | |
| self._open[trade_id] = { | |
| "trade_id": trade_id, | |
| "asset": asset or trade_id.split('_')[0], | |
| "direction": direction, | |
| "entry": entry, | |
| "qty": qty, | |
| "opened_at": ts, | |
| "status": "OPEN", | |
| } | |
| self._stats["total_opened"] += 1 | |
| logger.debug(f"[TradeLogParser] OPEN: {trade_id} | {direction} @ {entry} qty={qty}") | |
| return | |
| # ββ TRADE CLOSED βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Try Line-2 format first (has exit_price β dedicated regex, no ambiguity) | |
| m2e = self.TRADE_CLOSE_RE_WITH_EXIT.search(line) | |
| pnl = None | |
| trade_id = None | |
| _exit_price = None | |
| if m2e: | |
| trade_id = m2e.group(1) | |
| pnl = float(m2e.group(2)) | |
| # Group 3 = pipe-delimited "| exit_price=..." (new format, post-fix) | |
| # Group 4 = JSON metadata '"exit_price": ...' (old format, pre-fix) | |
| _exit_price = float(m2e.group(3) or m2e.group(4)) | |
| logger.debug( | |
| f"[TradeLogParser] Matched CLOSE+EXIT: {trade_id} " | |
| f"pnl={pnl} exit_price={_exit_price}" | |
| ) | |
| else: | |
| # Try Line-1 format (pnl + return%, no exit_price) | |
| m = self.TRADE_CLOSE_RE_STRICT.search(line) | |
| if m: | |
| trade_id = m.group(1) | |
| pnl = float(m.group(2)) | |
| logger.debug(f"[TradeLogParser] Matched CLOSE(no exit): {trade_id} pnl={pnl}") | |
| else: | |
| # Try fallback format (no-cid) | |
| m = self.TRADE_CLOSE_RE_FALLBACK.search(line) | |
| if m: | |
| pnl = float(m.group(1)) | |
| logger.debug(f"[TradeLogParser] Matched FALLBACK close: pnl={pnl}") | |
| else: | |
| # Try timeout format | |
| m = self.TRADE_CLOSE_RE_TIMEOUT.search(line) | |
| if m: | |
| pnl = float(m.group(1)) | |
| logger.debug(f"[TradeLogParser] Matched TIMEOUT close: pnl={pnl}") | |
| # If we found a PnL value (any format), log the closed trade | |
| # _exit_price is set in every branch above that sets pnl; default None otherwise | |
| if pnl is not None: | |
| ts = self._parse_timestamp(line) | |
| with self._lock: | |
| # FIX 4: The bot emits TWO "TRADE CLOSED" lines per close event: | |
| # Line 1 β has pnl + return but NO exit_price (matched first) | |
| # Line 2 β has pnl + exit_price + status + contract_id | |
| # Previously Line 1 created the closed record (exit_price=None) and | |
| # Line 2 created a duplicate with exit_price but no direction/entry. | |
| # Fix: if a closed record with the same trade_id already exists, | |
| # just patch its exit_price in-place and skip re-appending. | |
| if trade_id: | |
| existing_idx = next( | |
| (i for i, t in enumerate(self._closed) | |
| if t.get("trade_id") == trade_id), | |
| None | |
| ) | |
| if existing_idx is not None: | |
| # Second log line for the same close β update exit_price if we | |
| # now have it, then stop (don't double-count stats). | |
| if _exit_price is not None: | |
| self._closed[existing_idx]["exit_price"] = _exit_price | |
| logger.debug( | |
| f"[TradeLogParser] CLOSE patch exit_price: " | |
| f"{trade_id} exit_price={_exit_price}" | |
| ) | |
| return | |
| # Try to find the matching open trade by trade_id if available | |
| if trade_id: | |
| trade = self._open.pop(trade_id, None) | |
| else: | |
| # Fallback: unknown trade_id (from fallback/timeout path) | |
| trade = None | |
| closed = { | |
| "trade_id": trade_id or "UNKNOWN", | |
| "asset": asset or (trade.get("asset") if trade else "?"), | |
| "pnl": pnl, | |
| "closed_at": ts, | |
| "status": "CLOSED", | |
| "exit_price": _exit_price, | |
| } | |
| if trade: | |
| closed["direction"] = trade.get("direction", "?") | |
| closed["entry"] = trade.get("entry", 0.0) | |
| self._closed.append(closed) | |
| self._stats["total_closed"] += 1 | |
| self._stats["total_pnl"] += pnl | |
| if pnl >= 0: | |
| self._stats["win_count"] += 1 | |
| else: | |
| self._stats["loss_count"] += 1 | |
| logger.debug(f"[TradeLogParser] CLOSE: {trade_id or '?'} | pnl={pnl:+.2f}") | |
| return | |
| def _parse_timestamp(line: str) -> str: | |
| """Extract ISO timestamp from log line prefix.""" | |
| # Format: [2026-03-30 16:20:40] | ... | |
| match = re.search(r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]', line) | |
| if match: | |
| return match.group(1).replace(" ", "T") | |
| return datetime.utcnow().isoformat()[:19] | |
| def get_state(self) -> dict: | |
| """Return current trade state.""" | |
| with self._lock: | |
| open_trades = list(self._open.values()) | |
| closed_trades = list(reversed(self._closed[-100:])) # newest first | |
| stats = dict(self._stats) | |
| stats["win_rate"] = ( | |
| round(stats["win_count"] / stats["total_closed"] * 100, 1) | |
| if stats["total_closed"] > 0 else 0.0 | |
| ) | |
| return { | |
| "open": open_trades, | |
| "closed": closed_trades, | |
| "stats": stats, | |
| } | |
| def update_unrealized_pnl(self, unrealized_dict: Dict[str, float]) -> None: | |
| """ | |
| FIX #3: Update unrealized P&L for open positions from external source (WebSocket price feed). | |
| Call this every tick when you have current market prices. | |
| Args: | |
| unrealized_dict: {trade_id: unrealized_pnl_value, ...} | |
| """ | |
| with self._lock: | |
| total_unrealized = sum(unrealized_dict.values()) | |
| self._stats["unrealized_pnl"] = total_unrealized | |
| # Update individual open trade unrealized values | |
| for trade_id, unrealized in unrealized_dict.items(): | |
| if trade_id in self._open: | |
| self._open[trade_id]["unrealized_pnl"] = unrealized | |
| def stop(self) -> None: | |
| self._running = False | |
| if self._thread: | |
| self._thread.join(timeout=3) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SECTION 2b β FILE-BASED LOGGER ADAPTER | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # The ranker_logs_api Blueprint expects a RankerLogger-style object with get_recent(), | |
| # get_by_asset(), get_by_level(), get_stats(), export_json(), and clear_buffer(). | |
| # This adapter satisfies that interface by reading from the same log FILES that the | |
| # TradeLogParser uses β no in-memory ranker process required in the dashboard service. | |
| class FileBasedLoggerAdapter: | |
| """ | |
| Implements the RankerLogger interface expected by ranker_logs_api.py Blueprint, | |
| but reads from disk log files instead of an in-memory buffer. | |
| This lets the dashboard service power ALL Blueprint endpoints without needing a | |
| live RankerLogger instance. | |
| """ | |
| # ββ Shared compiled patterns βββββββββββββββββββββββββββββββββββββββββββββββ | |
| _CAT_RE = re.compile(r'\|\s*(INFO|DEBUG|WARNING|ERROR|CRITICAL)\s*\|\s*([A-Z_]+)\s*\|') | |
| _ASSET_RE = re.compile(r'\|\s*(?:TRADE|SIGNAL)\s*\|\s*(\w+)\s*\|') | |
| _TS_RE = re.compile(r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]') | |
| def __init__(self, log_dir: str = _LOG_DIR): | |
| self._log_dir = log_dir | |
| self._lock = threading.RLock() | |
| # ββ Internal helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _find_files(self) -> list: | |
| candidate_dirs = [ | |
| self._log_dir, # env-configured (default /app/ranker_logs) | |
| str(Path(__file__).parent / "ranker_logs"), # alongside this script | |
| "./ranker_logs", # cwd | |
| "/app/ranker_logs", # HF Spaces default app dir | |
| "/home/user/ranker_logs", # HF Spaces user home | |
| "/tmp/ranker_logs", # fallback tmp | |
| str(Path.home() / "ranker_logs"), # user home | |
| str(Path(__file__).parent), # script dir itself (*.log*) | |
| ] | |
| # Deduplicate while preserving order | |
| seen = set() | |
| unique_dirs = [] | |
| for d in candidate_dirs: | |
| if d not in seen: | |
| seen.add(d) | |
| unique_dirs.append(d) | |
| all_files = [] | |
| for cdir in unique_dirs: | |
| found = sorted(glob.glob(str(Path(cdir) / "*.log*"))) | |
| if found: | |
| all_files.extend(found) | |
| # Also check for any .log* directly in /app and /home/user | |
| for root_dir in ["/app", "/home/user", "."]: | |
| found = sorted(glob.glob(str(Path(root_dir) / "*.log*"))) | |
| all_files.extend(found) | |
| # Deduplicate file list | |
| seen_files = set() | |
| unique_files = [] | |
| for f in all_files: | |
| if f not in seen_files: | |
| seen_files.add(f) | |
| unique_files.append(f) | |
| return sorted(unique_files) | |
| def _read_lines(self, n_tail: int = 500) -> list: | |
| """Return up to n_tail most-recent lines across the 3 newest log files.""" | |
| files = self._find_files() | |
| raw = [] | |
| for fpath in files[-3:]: | |
| try: | |
| with open(fpath, "r", encoding="utf-8", errors="replace") as f: | |
| raw.extend(f.readlines()[-n_tail:]) | |
| except OSError: | |
| pass | |
| raw.reverse() # newest first | |
| return raw | |
| def _line_to_entry(self, line: str) -> Optional[dict]: | |
| ts_m = self._TS_RE.search(line) | |
| if not ts_m: | |
| return None | |
| cat_m = self._CAT_RE.search(line) | |
| level = cat_m.group(1) if cat_m else "INFO" | |
| cat = cat_m.group(2).strip() if cat_m else "" | |
| ast_m = self._ASSET_RE.search(line) | |
| asset = ast_m.group(1) if ast_m else None | |
| # Build a minimal dict compatible with what the Blueprint's callers expect. | |
| return { | |
| "timestamp": ts_m.group(1), | |
| "level": level, | |
| "category": cat, | |
| "message": line.strip(), | |
| "asset": asset, | |
| "data": None, | |
| } | |
| # ββ RankerLogger interface βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_recent(self, n: int = 50, category: Optional[str] = None) -> list: | |
| entries = [] | |
| for line in self._read_lines(n_tail=max(n * 3, 200)): | |
| e = self._line_to_entry(line) | |
| if e is None: | |
| continue | |
| if category and category.upper() not in line.upper(): | |
| continue | |
| entries.append(e) | |
| if len(entries) >= n: | |
| break | |
| return entries | |
| def get_by_asset(self, asset: str, n: int = 30) -> list: | |
| entries = [] | |
| for line in self._read_lines(n_tail=500): | |
| if asset.upper() not in line.upper(): | |
| continue | |
| e = self._line_to_entry(line) | |
| if e: | |
| entries.append(e) | |
| if len(entries) >= n: | |
| break | |
| return entries | |
| def get_by_level(self, level: str, n: int = 50) -> list: | |
| entries = [] | |
| for line in self._read_lines(n_tail=500): | |
| e = self._line_to_entry(line) | |
| if e and e["level"].upper() == level.upper(): | |
| entries.append(e) | |
| if len(entries) >= n: | |
| break | |
| return entries | |
| def get_stats(self) -> dict: | |
| by_category: dict = {} | |
| by_level: dict = {} | |
| by_asset: dict = {} | |
| errors: dict = {} | |
| total = 0 | |
| for line in self._read_lines(n_tail=2000): | |
| e = self._line_to_entry(line) | |
| if not e: | |
| continue | |
| total += 1 | |
| by_level[e["level"]] = by_level.get(e["level"], 0) + 1 | |
| by_category[e["category"]] = by_category.get(e["category"], 0) + 1 | |
| if e["asset"]: | |
| by_asset[e["asset"]] = by_asset.get(e["asset"], 0) + 1 | |
| if e["level"] in ("ERROR", "CRITICAL"): | |
| errors[e["category"]] = errors.get(e["category"], 0) + 1 | |
| return { | |
| "total_events": total, | |
| "by_level": by_level, | |
| "by_category": by_category, | |
| "by_asset": by_asset, | |
| "errors": errors, | |
| "buffer_size": total, | |
| "buffer_capacity": total, | |
| } | |
| def export_json(self, filepath: str, n: int = 500): | |
| import json as _json | |
| entries = self.get_recent(n) | |
| with open(filepath, "w") as f: | |
| _json.dump({ | |
| "export_time": datetime.utcnow().isoformat(), | |
| "count": len(entries), | |
| "logs": entries, | |
| }, f, indent=2) | |
| def clear_buffer(self): | |
| # File-based adapter has no in-memory buffer to clear. | |
| # No-op β files are managed by the ranker process itself. | |
| pass | |
| from dataclasses import dataclass, field | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # v2.6 β REWRITTEN to match the hub's actual broadcast contracts. | |
| # | |
| # What was broken in v2.5: | |
| # 1. AssetSnapshot read flat `signal` / `confidence` keys that the hub never emits. | |
| # The hub emits `voting.flip_direction`, `voting.buy_count`, `voting.sell_count` | |
| # inside the snapshot, plus a derived `metadata.scores.blended_confidence`. | |
| # 2. _on_message read `msg.get("space_name")` and `msg.get("snapshot")` flat, but | |
| # the hub wraps both under `msg["asset"]`. Same for `initial_state`, which uses | |
| # the key `assets` (dict of space_name β {metadata, snapshot}), not `snapshots`. | |
| # 3. There was no subscriber for /ws/signals at all, so the per-tick realtime | |
| # action emitted by the hub's signal broadcaster never reached the dashboard. | |
| # | |
| # What v2.6 does: | |
| # β’ MetricsSubscriberClient β /ws/subscribe β properly unwraps `msg["asset"]`, | |
| # reads `voting.flip_direction` / `voting.buy_count` / `voting.sell_count` / | |
| # `voting.last_price`, plus `training.*` for the table columns the HTML reads. | |
| # β’ SignalSubscriberClient β /ws/signals β handles signal_snapshot + signal_delta, | |
| # drops out-of-order messages by per-asset `seq`, stores the realtime action | |
| # keyed by space_name. | |
| # β’ DashboardState.get_state() merges both streams. For each asset it emits: | |
| # - flip_direction: realtime per-tick action when fresh, else cumulative | |
| # (HTML's vecOf reads this first β realtime wins display) | |
| # - latest_signal: pure realtime action (forward-compat field) | |
| # - cumulative_flip_direction: voting.flip_direction (diagnostics) | |
| # - signal_confidence: blended_confidence (HTML "Engaged" badge + Certainty) | |
| # - buy_count / sell_count / training_steps / actor_loss / critic_loss / | |
| # avn_accuracy / score / last_updated (everything the HTML table reads) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # A realtime tick older than this is treated as stale β the cumulative direction | |
| # wins instead. 1 s is tight: a tick has to land in the second before the /api/state | |
| # poll for the realtime override to fire, otherwise the badge falls back to the | |
| # cumulative voting direction. Reflects the design choice that the dashboard should | |
| # only paint a realtime BUY/SELL when the asset is *actively* ticking. | |
| _REALTIME_SIGNAL_FRESH_SEC = 1.0 | |
| class AssetSnapshot: | |
| """Per-asset cumulative state, derived from /ws/subscribe `metrics_update`.""" | |
| space_name: str | |
| # Voting (cumulative) | |
| flip_direction: str = "NONE" # "BUY" | "SELL" | "NONE" | |
| flip_action: str = "HOLD" | |
| buy_count: int = 0 | |
| sell_count: int = 0 | |
| last_price: float = 0.0 | |
| # Confidence scores (from hub's signal_metadata) | |
| vote_confidence: float = 0.0 | |
| train_confidence: float = 0.0 | |
| blended_confidence: float = 0.0 | |
| # Training/learning fields used by the HTML table & detail panel | |
| training_steps: int = 0 | |
| actor_loss: float = 0.0 | |
| critic_loss: float = 0.0 | |
| avn_accuracy: float = 0.0 | |
| avn_loss: float = 0.0 | |
| # Composite score (HTML reads `r.score` for the bar chart) | |
| score: float = 0.0 | |
| # Bookkeeping | |
| last_updated: float = 0.0 | |
| class RealtimeSignal: | |
| """Per-asset realtime per-tick state, from /ws/signals.""" | |
| action: str = "NONE" # "BUY" | "SELL" | "HOLD" | "NONE" | |
| price: float = 0.0 | |
| seq: int = 0 | |
| ts: float = 0.0 | |
| source: str = "" | |
| class DashboardState: | |
| """Centralized state β merges /ws/subscribe metrics with /ws/signals realtime.""" | |
| def __init__(self): | |
| self._snapshots: Dict[str, AssetSnapshot] = {} | |
| self._signals: Dict[str, RealtimeSignal] = {} | |
| self._lock = threading.RLock() | |
| # ββ Writers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def update_from_metrics( | |
| self, | |
| space_name: str, | |
| snapshot: dict, | |
| metadata: Optional[dict] = None, | |
| ) -> None: | |
| """ | |
| Apply a metrics_update payload from /ws/subscribe. | |
| Reads from the snapshot's actual structure (voting.*, training.*) plus | |
| the pre-computed `metadata.scores` block the hub attaches. | |
| """ | |
| if not space_name or not snapshot: | |
| return | |
| metadata = metadata or {} | |
| voting = snapshot.get("voting", {}) or {} | |
| training = snapshot.get("training", {}) or {} | |
| scores = (metadata.get("scores") or {}) | |
| with self._lock: | |
| snap = self._snapshots.get(space_name) | |
| if snap is None: | |
| snap = AssetSnapshot(space_name=space_name) | |
| self._snapshots[space_name] = snap | |
| # Voting / cumulative direction | |
| snap.flip_direction = str(voting.get("flip_direction", "NONE")).upper() | |
| snap.flip_action = str(voting.get("flip_action", "HOLD")).upper() | |
| snap.buy_count = int(voting.get("buy_count", 0) or 0) | |
| snap.sell_count = int(voting.get("sell_count", 0) or 0) | |
| try: | |
| snap.last_price = float(voting.get("last_price", 0.0) or 0.0) | |
| except (TypeError, ValueError): | |
| snap.last_price = 0.0 | |
| # Confidence scores (prefer hub-derived metadata; fall back to local calc) | |
| snap.vote_confidence = float(scores.get("vote_confidence", 0.0) or 0.0) | |
| snap.train_confidence = float(scores.get("train_confidence", 0.0) or 0.0) | |
| snap.blended_confidence = float(scores.get("blended_confidence", 0.0) or 0.0) | |
| if snap.blended_confidence == 0.0 and snap.vote_confidence == 0.0: | |
| # Hub didn't attach metadata for some reason β derive locally. | |
| total_votes = snap.buy_count + snap.sell_count | |
| if total_votes > 0: | |
| snap.vote_confidence = max(snap.buy_count, snap.sell_count) / total_votes | |
| snap.train_confidence = float(training.get("avn_accuracy", 0.0) or 0.0) | |
| if snap.vote_confidence > 0 and snap.train_confidence > 0: | |
| snap.blended_confidence = (snap.vote_confidence + snap.train_confidence) / 2.0 | |
| else: | |
| snap.blended_confidence = snap.vote_confidence or snap.train_confidence | |
| # Training fields the HTML table & detail panel read | |
| snap.training_steps = int(training.get("training_steps", 0) or 0) | |
| snap.actor_loss = float(training.get("actor_loss", 0.0) or 0.0) | |
| snap.critic_loss = float(training.get("critic_loss", 0.0) or 0.0) | |
| snap.avn_accuracy = float(training.get("avn_accuracy", 0.0) or 0.0) | |
| snap.avn_loss = float(training.get("avn_loss", 0.0) or 0.0) | |
| # Composite score: prefer an explicit field if the hub ever supplies one, | |
| # otherwise fall back to signal_strength (already a [0,1] confidence-weighted | |
| # directional score β close enough for the bar chart's relative ranking). | |
| score = snapshot.get("score") | |
| if score is None: | |
| score = scores.get("signal_strength", 0.0) | |
| try: | |
| snap.score = float(score or 0.0) | |
| except (TypeError, ValueError): | |
| snap.score = 0.0 | |
| snap.last_updated = float( | |
| snapshot.get("last_updated") or time.time() | |
| ) | |
| def update_from_signal(self, signal: dict) -> None: | |
| """Apply a per-tick signal from /ws/signals. Drops out-of-order seq.""" | |
| asset = signal.get("asset") | |
| if not asset: | |
| return | |
| try: | |
| seq = int(signal.get("seq", 0) or 0) | |
| except (TypeError, ValueError): | |
| seq = 0 | |
| with self._lock: | |
| cur = self._signals.get(asset) | |
| if cur is not None and seq <= cur.seq: | |
| # Replay or out-of-order β ignore (matches ranker SignalSubscriber semantics) | |
| return | |
| try: | |
| price = float(signal.get("price", 0.0) or 0.0) | |
| except (TypeError, ValueError): | |
| price = 0.0 | |
| self._signals[asset] = RealtimeSignal( | |
| action = str(signal.get("action", "NONE")).upper(), | |
| price = price, | |
| seq = seq, | |
| ts = float(signal.get("ts") or time.time()), | |
| source = str(signal.get("source", "") or ""), | |
| ) | |
| # ββ Reader βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_state(self) -> dict: | |
| """Merge cumulative + realtime views into the payload `/api/state` returns.""" | |
| now = time.time() | |
| with self._lock: | |
| snaps_out = [] | |
| for snap in self._snapshots.values(): | |
| rt = self._signals.get(snap.space_name) | |
| # Realtime per-tick action (may be empty if /ws/signals never fired) | |
| latest_signal = rt.action if rt else "" | |
| rt_fresh = ( | |
| rt is not None | |
| and rt.action in ("BUY", "SELL") | |
| and (now - rt.ts) <= _REALTIME_SIGNAL_FRESH_SEC | |
| ) | |
| # vecOf in the HTML reads flip_direction FIRST. Put the per-tick | |
| # action there when it's fresh and directional, so the dashboard | |
| # reflects the most recent signal within ~30 ms of the tick. | |
| # Otherwise fall back to the cumulative voting direction. | |
| display_direction = rt.action if rt_fresh else snap.flip_direction | |
| snaps_out.append({ | |
| "space_name": snap.space_name, | |
| "flip_direction": display_direction, | |
| "cumulative_flip_direction": snap.flip_direction, | |
| "latest_signal": latest_signal, | |
| "flip_action": snap.flip_action, | |
| "last_price": round(snap.last_price, 6), | |
| "buy_count": snap.buy_count, | |
| "sell_count": snap.sell_count, | |
| "signal_confidence": round(snap.blended_confidence, 4), | |
| "confidence": round(snap.blended_confidence, 4), | |
| "vote_confidence": round(snap.vote_confidence, 4), | |
| "train_confidence": round(snap.train_confidence, 4), | |
| "training_steps": snap.training_steps, | |
| "actor_loss": round(snap.actor_loss, 6), | |
| "critic_loss": round(snap.critic_loss, 6), | |
| "avn_loss": round(snap.avn_loss, 6), | |
| "avn_accuracy": round(snap.avn_accuracy, 4), | |
| "score": round(snap.score, 4), | |
| "last_updated": snap.last_updated, | |
| # Realtime diagnostics | |
| "realtime_seq": rt.seq if rt else 0, | |
| "realtime_ts": rt.ts if rt else 0.0, | |
| "realtime_source": rt.source if rt else "", | |
| }) | |
| return {"snapshots": snaps_out} | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # WebSocket subscriber base β DRY shared reconnect/backoff loop | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class _BaseSubscriber: | |
| """Shared reconnect loop for both metrics and signals subscribers.""" | |
| NAME = "Subscriber" | |
| MAX_BACKOFF = 30 | |
| def __init__(self, url: str): | |
| self.url = url | |
| self._ws = None | |
| self._running = False | |
| self._thread: Optional[threading.Thread] = None | |
| self._reconnect_count = 0 | |
| def start(self) -> None: | |
| if self._running: | |
| return | |
| self._running = True | |
| self._thread = threading.Thread( | |
| target=self._run_loop, daemon=True, name=self.NAME | |
| ) | |
| self._thread.start() | |
| logger.info(f"[{self.NAME}] Starting β {self.url}") | |
| def stop(self) -> None: | |
| self._running = False | |
| if self._ws: | |
| try: | |
| self._ws.close() | |
| except Exception: | |
| pass | |
| if self._thread: | |
| self._thread.join(timeout=3) | |
| def _run_loop(self) -> None: | |
| while self._running: | |
| try: | |
| self._connect_and_run() | |
| except Exception as e: | |
| logger.error(f"[{self.NAME}] error: {e}") | |
| if not self._running: | |
| break | |
| backoff = min(self.MAX_BACKOFF, 2 ** min(self._reconnect_count, 4)) | |
| logger.info( | |
| f"[{self.NAME}] reconnect in {backoff}s " | |
| f"(attempt #{self._reconnect_count + 1})" | |
| ) | |
| time.sleep(backoff) | |
| self._reconnect_count += 1 | |
| def _connect_and_run(self) -> None: | |
| self._ws = websocket.WebSocketApp( | |
| self.url, | |
| on_message = self._on_message, | |
| on_open = lambda ws: self._on_open(), | |
| on_error = lambda ws, e: logger.warning(f"[{self.NAME}] WS error: {e}"), | |
| on_close = lambda ws, c, m: logger.info(f"[{self.NAME}] closed code={c}"), | |
| ) | |
| self._ws.run_forever(ping_interval=30, ping_timeout=10, reconnect=0) | |
| def _on_open(self) -> None: | |
| self._reconnect_count = 0 | |
| logger.info(f"[{self.NAME}] β Connected") | |
| def _on_message(self, ws, raw: str) -> None: # pragma: no cover β overridden | |
| raise NotImplementedError | |
| class MetricsSubscriberClient(_BaseSubscriber): | |
| """Subscribes to /ws/subscribe for cumulative per-asset snapshots.""" | |
| NAME = "MetricsSubscriber" | |
| def __init__(self, state: DashboardState): | |
| super().__init__(f"wss://{_HUB_HOST}/ws/subscribe") | |
| self.state = state | |
| def _on_message(self, ws, raw: str) -> None: | |
| try: | |
| msg = json.loads(raw) | |
| kind = msg.get("type", "") | |
| if kind == "metrics_update": | |
| # Hub format: {"type":"metrics_update","asset":{"space_name","metadata","snapshot"},...} | |
| asset = msg.get("asset") or {} | |
| space_name = asset.get("space_name", "") | |
| snapshot = asset.get("snapshot", {}) or {} | |
| metadata = asset.get("metadata", {}) or {} | |
| if space_name and snapshot: | |
| self.state.update_from_metrics(space_name, snapshot, metadata) | |
| elif kind == "initial_state": | |
| # Hub format: {"type":"initial_state","assets":{ "<name>":{"metadata","snapshot"} },...} | |
| # NOTE: legacy key was "snapshots" β check both for safety across versions. | |
| assets = msg.get("assets") or msg.get("snapshots") or {} | |
| if isinstance(assets, dict): | |
| for space_name, payload in assets.items(): | |
| if not isinstance(payload, dict): | |
| continue | |
| # New format wraps under {metadata, snapshot}; old format was the snapshot itself. | |
| if "snapshot" in payload: | |
| self.state.update_from_metrics( | |
| space_name, | |
| payload.get("snapshot", {}) or {}, | |
| payload.get("metadata", {}) or {}, | |
| ) | |
| else: | |
| self.state.update_from_metrics(space_name, payload, {}) | |
| except Exception as e: | |
| logger.debug(f"[{self.NAME}] parse error: {e}") | |
| class SignalSubscriberClient(_BaseSubscriber): | |
| """Subscribes to /ws/signals for realtime per-tick BUY/SELL/HOLD actions.""" | |
| NAME = "SignalSubscriber" | |
| def __init__(self, state: DashboardState): | |
| super().__init__(f"wss://{_HUB_HOST}/ws/signals") | |
| self.state = state | |
| def _on_message(self, ws, raw: str) -> None: | |
| try: | |
| msg = json.loads(raw) | |
| kind = msg.get("type", "") | |
| if kind not in ("signal_snapshot", "signal_delta"): | |
| return | |
| signals = msg.get("signals") or [] | |
| if not isinstance(signals, list): | |
| return | |
| for sig in signals: | |
| if isinstance(sig, dict): | |
| self.state.update_from_signal(sig) | |
| except Exception as e: | |
| logger.debug(f"[{self.NAME}] parse error: {e}") | |
| # Backward-compat alias β anything that imported HubSubscriberClient from earlier | |
| # revisions of this file keeps working without touching its imports. | |
| HubSubscriberClient = MetricsSubscriberClient | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SECTION 3 β FLASK APP | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _state = DashboardState() | |
| _trade_parser = TradeLogParser(log_dir=_LOG_DIR) | |
| _trade_parser.start_background() | |
| # Start hub subscribers β _state stays in sync with both cumulative metrics | |
| # (/ws/subscribe) and realtime per-tick signals (/ws/signals). Each runs in | |
| # its own daemon thread with independent reconnect/backoff. | |
| _metrics_subscriber = MetricsSubscriberClient(state=_state) | |
| _metrics_subscriber.start() | |
| _signal_subscriber = SignalSubscriberClient(state=_state) | |
| _signal_subscriber.start() | |
| # Backward-compat name in case anything else in the process imports this. | |
| _hub_subscriber = _metrics_subscriber | |
| app = Flask(__name__) | |
| CORS(app) | |
| # ββ Instantiate the file-based log adapter (used by all /api/ranker/logs/* routes) ββ | |
| _log_adapter = FileBasedLoggerAdapter(log_dir=_LOG_DIR) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SECTION 4 β RANKER LOG ROUTES (self-contained β no Blueprint dependency) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # | |
| # FIX v2.4: These routes were previously delegated to ranker_logs_api.py Blueprint. | |
| # That Blueprint was never registered, so every /api/ranker/logs/* call returned 404. | |
| # Routes are now defined inline so hub_dashboard_service.py is fully self-contained. | |
| # FileBasedLoggerAdapter (above) satisfies the full RankerLogger interface by reading | |
| # the ranker's disk log files β no in-process ranker instance required. | |
| _TRAINING_RE_INLINE = re.compile( | |
| r'step=(\d+)\s*\|\s*loss=([\d.]+)\s*\|\s*lr=([\d.eE+\-]+)\s*\|\s*assets=(\d+)' | |
| ) | |
| _JSON_BLOB_RE_INLINE = re.compile(r'(\{.*\})\s*$') | |
| def _enrich_training(entry: dict) -> dict: | |
| """Attach parsed `data` dict to TRAINING entries so dashboard KPI cards populate.""" | |
| if entry.get("category", "").upper() != "TRAINING": | |
| return entry | |
| if entry.get("data"): | |
| return entry | |
| msg = entry.get("message", "") | |
| m = _TRAINING_RE_INLINE.search(msg) | |
| if m: | |
| entry["data"] = { | |
| "step": int(m.group(1)), | |
| "loss": float(m.group(2)), | |
| "lr": float(m.group(3)), | |
| "asset_count": int(m.group(4)), | |
| } | |
| return entry | |
| jm = _JSON_BLOB_RE_INLINE.search(msg) | |
| if jm: | |
| try: | |
| blob = json.loads(jm.group(1)) | |
| if "step" in blob: | |
| entry["data"] = { | |
| "step": blob.get("step", 0), | |
| "loss": blob.get("loss", 0.0), | |
| "lr": blob.get("lr", 0.0), | |
| "asset_count": blob.get("asset_count", blob.get("assets", 0)), | |
| } | |
| except (ValueError, KeyError): | |
| pass | |
| return entry | |
| def api_logs_recent(): | |
| """GET /api/ranker/logs/recent?limit=50&category=TRAINING""" | |
| try: | |
| limit = int(request.args.get("limit", 50)) | |
| category = request.args.get("category") | |
| entries = _log_adapter.get_recent(n=limit, category=category) | |
| entries = [_enrich_training(e) for e in entries] | |
| return jsonify({ | |
| "logs": entries, | |
| "count": len(entries), | |
| "stats": _log_adapter.get_stats(), | |
| }) | |
| except Exception as exc: | |
| logger.exception(f"[api_logs_recent] error: {exc}") | |
| return jsonify({"logs": [], "count": 0, "error": str(exc)}), 200 | |
| def api_logs_stats(): | |
| """GET /api/ranker/logs/stats""" | |
| try: | |
| return jsonify(_log_adapter.get_stats()) | |
| except Exception as exc: | |
| logger.exception(f"[api_logs_stats] error: {exc}") | |
| return jsonify({"total_events": 0, "by_level": {}, "by_category": {}, | |
| "by_asset": {}, "errors": {}, "error": str(exc)}), 200 | |
| def api_logs_asset(asset: str): | |
| """GET /api/ranker/logs/asset/V75?limit=30""" | |
| try: | |
| limit = int(request.args.get("limit", 30)) | |
| entries = _log_adapter.get_by_asset(asset, n=limit) | |
| return jsonify({"asset": asset, "logs": entries, "count": len(entries)}) | |
| except Exception as exc: | |
| logger.exception(f"[api_logs_asset] error: {exc}") | |
| return jsonify({"asset": asset, "logs": [], "count": 0, "error": str(exc)}), 200 | |
| def api_logs_level(level: str): | |
| """GET /api/ranker/logs/level/ERROR?limit=50""" | |
| try: | |
| limit = int(request.args.get("limit", 50)) | |
| entries = _log_adapter.get_by_level(level, n=limit) | |
| return jsonify({"level": level.upper(), "logs": entries, "count": len(entries)}) | |
| except Exception as exc: | |
| logger.exception(f"[api_logs_level] error: {exc}") | |
| return jsonify({"level": level.upper(), "logs": [], "count": 0, "error": str(exc)}), 200 | |
| def api_logs_export(): | |
| """GET /api/ranker/logs/export?limit=500 β download JSON""" | |
| try: | |
| limit = int(request.args.get("limit", 500)) | |
| export_path = Path("/tmp/ranker_logs_export.json") | |
| _log_adapter.export_json(str(export_path), n=limit) | |
| return send_file( | |
| export_path, | |
| mimetype="application/json", | |
| as_attachment=True, | |
| download_name="ranker_logs_export.json", | |
| ) | |
| except Exception as exc: | |
| logger.exception(f"[api_logs_export] error: {exc}") | |
| return jsonify({"error": str(exc)}), 500 | |
| def api_logs_clear(): | |
| """POST /api/ranker/logs/clear β no-op for file-based adapter""" | |
| try: | |
| _log_adapter.clear_buffer() | |
| return jsonify({"status": "cleared"}) | |
| except Exception as exc: | |
| return jsonify({"error": str(exc)}), 500 | |
| def index(): | |
| """Serve the dashboard HTML.""" | |
| html_path = Path(_HTML_PATH) | |
| if html_path.exists(): | |
| return send_from_directory(str(html_path.parent), html_path.name) | |
| return ( | |
| "<h1>hub_dashboard.html not found</h1>" | |
| f"<p>Expected: <code>{_HTML_PATH}</code></p>", | |
| 404, | |
| ) | |
| def api_state(): | |
| """Full dashboard state β polled by hub_dashboard.html every 2 s.""" | |
| return jsonify(_state.get_state()) | |
| def api_rankings(): | |
| """Get current rankings.""" | |
| return jsonify({"rankings": _state.get_state()["snapshots"]}) | |
| def api_trades(): | |
| """Returns open trades, recent closed trades, and summary stats.""" | |
| return jsonify(_trade_parser.get_state()) | |
| def api_trades_open(): | |
| """Get only open trades.""" | |
| state = _trade_parser.get_state() | |
| return jsonify({"open": state["open"]}) | |
| def api_trades_closed(): | |
| """Get closed trades and stats.""" | |
| limit = int(request.args.get("limit", 50)) | |
| state = _trade_parser.get_state() | |
| return jsonify({ | |
| "closed": state["closed"][:limit], | |
| "stats": state["stats"] | |
| }) | |
| def health(): | |
| return jsonify({"status": "ok", "version": "v2.6-realtime-signals"}) | |
| def api_debug(): | |
| """GET /api/debug β diagnostics: log files found, paths searched, env vars.""" | |
| import glob as _glob | |
| candidate_dirs = [ | |
| _LOG_DIR, | |
| str(Path(__file__).parent / "ranker_logs"), | |
| "./ranker_logs", | |
| "/app/ranker_logs", | |
| "/home/user/ranker_logs", | |
| "/tmp/ranker_logs", | |
| str(Path.home() / "ranker_logs"), | |
| str(Path(__file__).parent), | |
| "/app", "/home/user", ".", | |
| ] | |
| dir_scan = {} | |
| for d in candidate_dirs: | |
| files = sorted(_glob.glob(str(Path(d) / "*.log*"))) | |
| dir_scan[d] = {"exists": Path(d).exists(), "log_files": files} | |
| adapter_files = _log_adapter._find_files() | |
| return jsonify({ | |
| "version": "v2.5-port-fix", | |
| "env": { | |
| "DASHBOARD_PORT": os.environ.get("DASHBOARD_PORT", "(not set, using 7860)"), | |
| "RANKER_LOG_DIR": os.environ.get("RANKER_LOG_DIR", "(not set, using /app/ranker_logs)"), | |
| "QUASAR_HUB_HOST": os.environ.get("QUASAR_HUB_HOST", "(not set)"), | |
| "cwd": str(Path.cwd()), | |
| "script_dir": str(Path(__file__).parent), | |
| }, | |
| "adapter_files_found": adapter_files, | |
| "directory_scan": dir_scan, | |
| "adapter_stats": _log_adapter.get_stats(), | |
| }) | |
| if __name__ == "__main__": | |
| logger.info("=== K1RL QUASAR HUB DASHBOARD SERVICE v2.6 (REALTIME SIGNALS) ===") | |
| logger.info(f"Dashboard port: {_DASHBOARD_PORT} (HF Spaces public port)") | |
| logger.info(f"Log directory: {_LOG_DIR}") | |
| logger.info(f"Hub host: {_HUB_HOST}") | |
| logger.info("v2.6 fixes:") | |
| logger.info(" β MetricsSubscriber β /ws/subscribe (unwraps msg.asset, reads voting.*)") | |
| logger.info(" β SignalSubscriber β /ws/signals (per-tick BUY/SELL within ~30 ms)") | |
| logger.info(" β get_state() merges both streams; flip_direction reflects realtime tick") | |
| logger.info(" β Full snapshot fields exposed (buy/sell counts, training, confidence)") | |
| logger.info("Carried over from v2.5:") | |
| logger.info(" β port 7860 / wss:// no explicit port / log path scan / /api/debug") | |
| logger.info(f" β Visit /api/debug to inspect log file discovery live") | |
| app.run(host="0.0.0.0", port=_DASHBOARD_PORT, debug=False, use_reloader=False) |