KarlQuant commited on
Commit
2655646
Β·
verified Β·
1 Parent(s): 7b3233f

Update hub_dashboard_service.py

Browse files
Files changed (1) hide show
  1. hub_dashboard_service.py +366 -75
hub_dashboard_service.py CHANGED
@@ -1,21 +1,38 @@
1
  #!/usr/bin/env python3
2
  """
3
  ╔══════════════════════════════════════════════════════════════════════════════════════╗
4
- β•‘ K1RL QUASAR β€” HUB DASHBOARD SERVICE (with Trade Log Parser) β€” FIXED v2.5 β•‘
5
  β•‘ ────────────────────────────────────────────────────────────────────────────────── β•‘
6
  β•‘ Architecture role: READ-ONLY subscriber β†’ serves dashboard UI β•‘
7
- β•‘ VERSION: v2.5 (PORT + LOG PATH FIX) | 2026-04-04 β•‘
8
  β•‘ β•‘
9
- β•‘ FIXES APPLIED: β•‘
10
- β•‘ βœ… FIX v2.5: Default port 8051β†’7860 (HF Spaces only exposes port 7860) β•‘
11
- β•‘ βœ… FIX v2.5: WebSocket ws://host:7860 β†’ wss://host (HF Spaces TLS proxy) β•‘
12
- β•‘ βœ… FIX v2.5: _find_files() searches all likely HF Spaces log paths β•‘
13
- β•‘ βœ… FIX v2.5: Added /api/debug endpoint for live diagnostics β•‘
14
- β•‘ βœ… FIX v2.4: All /api/ranker/logs/* routes inline β€” no Blueprint dependency β•‘
15
- β•‘ βœ… FIX v2.4: Training KPI enrichment (_enrich_training) applied on /recent β•‘
16
- β•‘ βœ… FIX #1: Include rotated log files (*.log, *.log.1, *.log.2, etc.) β•‘
17
- β•‘ βœ… FIX #2: Improved regex to catch all trade close formats β•‘
18
- β•‘ βœ… FIX #3: Added unrealized P&L tracking for open positions β•‘
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
  β•‘ β•‘
20
  β•‘ DEPLOYMENT: Just restart the service β€” routes are already inline in this file. β•‘
21
  β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
@@ -594,62 +611,267 @@ class FileBasedLoggerAdapter:
594
 
595
  from dataclasses import dataclass, field
596
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
597
  @dataclass
598
  class AssetSnapshot:
 
599
  space_name: str
600
- signal: float = 0.0
601
- confidence: float = 0.0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
602
  last_updated: float = 0.0
603
 
 
 
 
 
 
 
 
 
 
 
 
604
  class DashboardState:
605
- """Centralized state for the dashboard β€” collects snapshots from hub."""
606
-
607
  def __init__(self):
608
- self._snapshots: Dict[str, AssetSnapshot] = {}
 
609
  self._lock = threading.RLock()
610
 
611
- def update_from_snapshot(self, space_name: str, snap_dict: dict) -> None:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
612
  with self._lock:
613
- if space_name not in self._snapshots:
614
- self._snapshots[space_name] = AssetSnapshot(space_name=space_name)
615
- snap = self._snapshots[space_name]
616
- snap.signal = snap_dict.get("signal", 0.0)
617
- snap.confidence = snap_dict.get("confidence", 0.0)
618
- snap.last_updated = snap_dict.get("last_updated", time.time())
 
 
 
 
 
 
 
 
 
 
 
619
 
620
  def get_state(self) -> dict:
 
 
621
  with self._lock:
622
- snaps = [
623
- {
624
- "space_name": s.space_name,
625
- "signal": round(s.signal, 4),
626
- "confidence": round(s.confidence, 4),
627
- }
628
- for s in self._snapshots.values()
629
- ]
630
- return {"snapshots": snaps}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
631
 
632
- class HubSubscriberClient:
633
- """Subscribes to the central hub for live metric updates."""
634
-
635
- def __init__(self, state: DashboardState):
636
- self.state = state
637
- self.hub_url = f"wss://{_HUB_HOST}/ws/subscribe"
 
 
 
 
 
 
 
638
  self._ws = None
639
  self._running = False
640
- self._thread = None
641
  self._reconnect_count = 0
642
- self._MAX_BACKOFF = 30
643
 
644
  def start(self) -> None:
645
  if self._running:
646
  return
647
  self._running = True
648
- self._thread = threading.Thread(
649
- target=self._run_loop, daemon=True, name="HubSubscriberClient"
650
  )
651
  self._thread.start()
652
- logger.info(f"[HubSubscriberClient] Starting β†’ {self.hub_url}")
653
 
654
  def stop(self) -> None:
655
  self._running = False
@@ -666,12 +888,12 @@ class HubSubscriberClient:
666
  try:
667
  self._connect_and_run()
668
  except Exception as e:
669
- logger.error(f"[HubSubscriberClient] error: {e}")
670
  if not self._running:
671
  break
672
- backoff = min(self._MAX_BACKOFF, 2 ** min(self._reconnect_count, 4))
673
  logger.info(
674
- f"[HubSubscriberClient] reconnect in {backoff}s "
675
  f"(attempt #{self._reconnect_count + 1})"
676
  )
677
  time.sleep(backoff)
@@ -679,17 +901,30 @@ class HubSubscriberClient:
679
 
680
  def _connect_and_run(self) -> None:
681
  self._ws = websocket.WebSocketApp(
682
- self.hub_url,
683
  on_message = self._on_message,
684
- on_open = lambda ws: logger.info("[HubSubscriberClient] βœ… Connected"),
685
- on_error = lambda ws, e: logger.warning(f"[HubSubscriberClient] WS error: {e}"),
686
- on_close = lambda ws, c, m: logger.info(f"[HubSubscriberClient] closed code={c}"),
687
- )
688
- self._ws.run_forever(
689
- ping_interval = 30,
690
- ping_timeout = 10,
691
- reconnect = 0,
692
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
693
 
694
  def _on_message(self, ws, raw: str) -> None:
695
  try:
@@ -697,18 +932,64 @@ class HubSubscriberClient:
697
  kind = msg.get("type", "")
698
 
699
  if kind == "metrics_update":
700
- space = msg.get("space_name", "")
701
- snapshot = msg.get("snapshot", {})
702
- if space and snapshot:
703
- self.state.update_from_snapshot(space, snapshot)
 
 
 
704
 
705
  elif kind == "initial_state":
706
- for space, snapshot in msg.get("snapshots", {}).items():
707
- if space and snapshot:
708
- self.state.update_from_snapshot(space, snapshot)
 
 
 
 
 
 
 
 
 
 
 
 
 
709
 
710
  except Exception as e:
711
- logger.debug(f"[HubSubscriberClient] parse error: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
712
 
713
 
714
  # ══════════════════════════════════════════════════════════════════════════════════════
@@ -719,9 +1000,17 @@ _state = DashboardState()
719
  _trade_parser = TradeLogParser(log_dir=_LOG_DIR)
720
  _trade_parser.start_background()
721
 
722
- # Start hub subscriber so _state stays in sync with live metrics
723
- _hub_subscriber = HubSubscriberClient(state=_state)
724
- _hub_subscriber.start()
 
 
 
 
 
 
 
 
725
 
726
  app = Flask(__name__)
727
  CORS(app)
@@ -913,7 +1202,7 @@ def api_trades_closed():
913
 
914
  @app.route("/api/health")
915
  def health():
916
- return jsonify({"status": "ok", "version": "v2.5-port-fix"})
917
 
918
 
919
  @app.route("/api/debug")
@@ -953,15 +1242,17 @@ def api_debug():
953
 
954
 
955
  if __name__ == "__main__":
956
- logger.info("=== K1RL QUASAR HUB DASHBOARD SERVICE v2.5 (PORT + LOG PATH FIX) ===")
957
  logger.info(f"Dashboard port: {_DASHBOARD_PORT} (HF Spaces public port)")
958
  logger.info(f"Log directory: {_LOG_DIR}")
959
- logger.info("Fixes applied:")
960
- logger.info(" βœ… FIX v2.5: Default port 8051β†’7860 (HF Spaces only exposes 7860)")
961
- logger.info(" βœ… FIX v2.5: WebSocket wss:// with no explicit port")
962
- logger.info(" βœ… FIX v2.5: _find_files() scans all likely HF Spaces log paths")
963
- logger.info(" βœ… FIX v2.5: /api/debug endpoint for live diagnostics")
964
- logger.info(" βœ… FIX v2.4: All /api/ranker/logs/* routes inline")
 
 
965
  logger.info(f" β†’ Visit /api/debug to inspect log file discovery live")
966
 
967
  app.run(host="0.0.0.0", port=_DASHBOARD_PORT, debug=False, use_reloader=False)
 
1
  #!/usr/bin/env python3
2
  """
3
  ╔══════════════════════════════════════════════════════════════════════════════════════╗
4
+ β•‘ K1RL QUASAR β€” HUB DASHBOARD SERVICE (with Trade Log Parser) β€” FIXED v2.6 β•‘
5
  β•‘ ────────────────────────────────────────────────────────────────────────────────── β•‘
6
  β•‘ Architecture role: READ-ONLY subscriber β†’ serves dashboard UI β•‘
7
+ β•‘ VERSION: v2.6 (REALTIME SIGNALS) | 2026-04-26 β•‘
8
  β•‘ β•‘
9
+ β•‘ v2.6 β€” Wires the dashboard to the hub's actual broadcast contracts: β•‘
10
+ β•‘ βœ… MetricsSubscriber β†’ /ws/subscribe β€” properly unwraps msg["asset"]["snapshot"] β•‘
11
+ β•‘ and reads voting.flip_direction / buy_count / sell_count / last_price plus β•‘
12
+ β•‘ training.* fields. Previously v2.5 read flat keys that the hub never emits, β•‘
13
+ β•‘ so AssetSnapshot defaulted to zero on every message. β•‘
14
+ β•‘ βœ… SignalSubscriber β†’ /ws/signals β€” NEW. Consumes the per-tick realtime channel β•‘
15
+ β•‘ (signal_snapshot + signal_delta), drops out-of-order seq, stores the latest β•‘
16
+ β•‘ {action, price, ts, seq, source} per asset. β•‘
17
+ β•‘ βœ… DashboardState.get_state() merges both streams. flip_direction reflects the β•‘
18
+ β•‘ fresh realtime tick (≀1 s old) so the BUY-after-SELL-streak case is visible β•‘
19
+ β•‘ within ~30 ms of the V75 tick β€” same latency budget as the ranker. Falls back β•‘
20
+ β•‘ to the cumulative voting direction when the realtime stream is silent/stale. β•‘
21
+ β•‘ βœ… Full per-asset payload (buy/sell counts, training_steps, actor/critic/avn β•‘
22
+ β•‘ loss, avn_accuracy, signal_confidence, score) so the HTML table populates β•‘
23
+ β•‘ instead of rendering empty cells. β•‘
24
+ β•‘ βœ… Backward-compat alias: HubSubscriberClient = MetricsSubscriberClient. β•‘
25
+ β•‘ β•‘
26
+ β•‘ Carried over from v2.5: β•‘
27
+ β•‘ βœ… Default port 8051β†’7860 (HF Spaces only exposes port 7860) β•‘
28
+ β•‘ βœ… WebSocket ws://host:7860 β†’ wss://host (HF Spaces TLS proxy) β•‘
29
+ β•‘ βœ… _find_files() searches all likely HF Spaces log paths β•‘
30
+ β•‘ βœ… /api/debug endpoint for live diagnostics β•‘
31
+ β•‘ βœ… All /api/ranker/logs/* routes inline β€” no Blueprint dependency β•‘
32
+ β•‘ βœ… Training KPI enrichment (_enrich_training) applied on /recent β•‘
33
+ β•‘ βœ… Rotated log files (*.log, *.log.1, *.log.2, etc.) included β•‘
34
+ β•‘ βœ… Improved regex to catch all trade close formats β•‘
35
+ β•‘ βœ… Unrealized P&L tracking for open positions β•‘
36
  β•‘ β•‘
37
  β•‘ DEPLOYMENT: Just restart the service β€” routes are already inline in this file. β•‘
38
  β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
 
611
 
612
  from dataclasses import dataclass, field
613
 
614
+ # ══════════════════════════════════════════════════════════════════════════════════════
615
+ # v2.6 β€” REWRITTEN to match the hub's actual broadcast contracts.
616
+ #
617
+ # What was broken in v2.5:
618
+ # 1. AssetSnapshot read flat `signal` / `confidence` keys that the hub never emits.
619
+ # The hub emits `voting.flip_direction`, `voting.buy_count`, `voting.sell_count`
620
+ # inside the snapshot, plus a derived `metadata.scores.blended_confidence`.
621
+ # 2. _on_message read `msg.get("space_name")` and `msg.get("snapshot")` flat, but
622
+ # the hub wraps both under `msg["asset"]`. Same for `initial_state`, which uses
623
+ # the key `assets` (dict of space_name β†’ {metadata, snapshot}), not `snapshots`.
624
+ # 3. There was no subscriber for /ws/signals at all, so the per-tick realtime
625
+ # action emitted by the hub's signal broadcaster never reached the dashboard.
626
+ #
627
+ # What v2.6 does:
628
+ # β€’ MetricsSubscriberClient β†’ /ws/subscribe β€” properly unwraps `msg["asset"]`,
629
+ # reads `voting.flip_direction` / `voting.buy_count` / `voting.sell_count` /
630
+ # `voting.last_price`, plus `training.*` for the table columns the HTML reads.
631
+ # β€’ SignalSubscriberClient β†’ /ws/signals β€” handles signal_snapshot + signal_delta,
632
+ # drops out-of-order messages by per-asset `seq`, stores the realtime action
633
+ # keyed by space_name.
634
+ # β€’ DashboardState.get_state() merges both streams. For each asset it emits:
635
+ # - flip_direction: realtime per-tick action when fresh, else cumulative
636
+ # (HTML's vecOf reads this first β†’ realtime wins display)
637
+ # - latest_signal: pure realtime action (forward-compat field)
638
+ # - cumulative_flip_direction: voting.flip_direction (diagnostics)
639
+ # - signal_confidence: blended_confidence (HTML "Engaged" badge + Certainty)
640
+ # - buy_count / sell_count / training_steps / actor_loss / critic_loss /
641
+ # avn_accuracy / score / last_updated (everything the HTML table reads)
642
+ # ══════════════════════════════════════════════════════════════════════════════════════
643
+
644
+ # A realtime tick older than this is treated as stale β€” the cumulative direction
645
+ # wins instead. 1 s is tight: a tick has to land in the second before the /api/state
646
+ # poll for the realtime override to fire, otherwise the badge falls back to the
647
+ # cumulative voting direction. Reflects the design choice that the dashboard should
648
+ # only paint a realtime BUY/SELL when the asset is *actively* ticking.
649
+ _REALTIME_SIGNAL_FRESH_SEC = 1.0
650
+
651
+
652
  @dataclass
653
  class AssetSnapshot:
654
+ """Per-asset cumulative state, derived from /ws/subscribe `metrics_update`."""
655
  space_name: str
656
+ # Voting (cumulative)
657
+ flip_direction: str = "NONE" # "BUY" | "SELL" | "NONE"
658
+ flip_action: str = "HOLD"
659
+ buy_count: int = 0
660
+ sell_count: int = 0
661
+ last_price: float = 0.0
662
+ # Confidence scores (from hub's signal_metadata)
663
+ vote_confidence: float = 0.0
664
+ train_confidence: float = 0.0
665
+ blended_confidence: float = 0.0
666
+ # Training/learning fields used by the HTML table & detail panel
667
+ training_steps: int = 0
668
+ actor_loss: float = 0.0
669
+ critic_loss: float = 0.0
670
+ avn_accuracy: float = 0.0
671
+ avn_loss: float = 0.0
672
+ # Composite score (HTML reads `r.score` for the bar chart)
673
+ score: float = 0.0
674
+ # Bookkeeping
675
  last_updated: float = 0.0
676
 
677
+
678
+ @dataclass
679
+ class RealtimeSignal:
680
+ """Per-asset realtime per-tick state, from /ws/signals."""
681
+ action: str = "NONE" # "BUY" | "SELL" | "HOLD" | "NONE"
682
+ price: float = 0.0
683
+ seq: int = 0
684
+ ts: float = 0.0
685
+ source: str = ""
686
+
687
+
688
  class DashboardState:
689
+ """Centralized state β€” merges /ws/subscribe metrics with /ws/signals realtime."""
690
+
691
  def __init__(self):
692
+ self._snapshots: Dict[str, AssetSnapshot] = {}
693
+ self._signals: Dict[str, RealtimeSignal] = {}
694
  self._lock = threading.RLock()
695
 
696
+ # ── Writers ────────────────────────────────────────────────────────────────
697
+
698
+ def update_from_metrics(
699
+ self,
700
+ space_name: str,
701
+ snapshot: dict,
702
+ metadata: Optional[dict] = None,
703
+ ) -> None:
704
+ """
705
+ Apply a metrics_update payload from /ws/subscribe.
706
+
707
+ Reads from the snapshot's actual structure (voting.*, training.*) plus
708
+ the pre-computed `metadata.scores` block the hub attaches.
709
+ """
710
+ if not space_name or not snapshot:
711
+ return
712
+ metadata = metadata or {}
713
+ voting = snapshot.get("voting", {}) or {}
714
+ training = snapshot.get("training", {}) or {}
715
+ scores = (metadata.get("scores") or {})
716
+
717
+ with self._lock:
718
+ snap = self._snapshots.get(space_name)
719
+ if snap is None:
720
+ snap = AssetSnapshot(space_name=space_name)
721
+ self._snapshots[space_name] = snap
722
+
723
+ # Voting / cumulative direction
724
+ snap.flip_direction = str(voting.get("flip_direction", "NONE")).upper()
725
+ snap.flip_action = str(voting.get("flip_action", "HOLD")).upper()
726
+ snap.buy_count = int(voting.get("buy_count", 0) or 0)
727
+ snap.sell_count = int(voting.get("sell_count", 0) or 0)
728
+ try:
729
+ snap.last_price = float(voting.get("last_price", 0.0) or 0.0)
730
+ except (TypeError, ValueError):
731
+ snap.last_price = 0.0
732
+
733
+ # Confidence scores (prefer hub-derived metadata; fall back to local calc)
734
+ snap.vote_confidence = float(scores.get("vote_confidence", 0.0) or 0.0)
735
+ snap.train_confidence = float(scores.get("train_confidence", 0.0) or 0.0)
736
+ snap.blended_confidence = float(scores.get("blended_confidence", 0.0) or 0.0)
737
+ if snap.blended_confidence == 0.0 and snap.vote_confidence == 0.0:
738
+ # Hub didn't attach metadata for some reason β€” derive locally.
739
+ total_votes = snap.buy_count + snap.sell_count
740
+ if total_votes > 0:
741
+ snap.vote_confidence = max(snap.buy_count, snap.sell_count) / total_votes
742
+ snap.train_confidence = float(training.get("avn_accuracy", 0.0) or 0.0)
743
+ if snap.vote_confidence > 0 and snap.train_confidence > 0:
744
+ snap.blended_confidence = (snap.vote_confidence + snap.train_confidence) / 2.0
745
+ else:
746
+ snap.blended_confidence = snap.vote_confidence or snap.train_confidence
747
+
748
+ # Training fields the HTML table & detail panel read
749
+ snap.training_steps = int(training.get("training_steps", 0) or 0)
750
+ snap.actor_loss = float(training.get("actor_loss", 0.0) or 0.0)
751
+ snap.critic_loss = float(training.get("critic_loss", 0.0) or 0.0)
752
+ snap.avn_accuracy = float(training.get("avn_accuracy", 0.0) or 0.0)
753
+ snap.avn_loss = float(training.get("avn_loss", 0.0) or 0.0)
754
+
755
+ # Composite score: prefer an explicit field if the hub ever supplies one,
756
+ # otherwise fall back to signal_strength (already a [0,1] confidence-weighted
757
+ # directional score β€” close enough for the bar chart's relative ranking).
758
+ score = snapshot.get("score")
759
+ if score is None:
760
+ score = scores.get("signal_strength", 0.0)
761
+ try:
762
+ snap.score = float(score or 0.0)
763
+ except (TypeError, ValueError):
764
+ snap.score = 0.0
765
+
766
+ snap.last_updated = float(
767
+ snapshot.get("last_updated") or time.time()
768
+ )
769
+
770
+ def update_from_signal(self, signal: dict) -> None:
771
+ """Apply a per-tick signal from /ws/signals. Drops out-of-order seq."""
772
+ asset = signal.get("asset")
773
+ if not asset:
774
+ return
775
+ try:
776
+ seq = int(signal.get("seq", 0) or 0)
777
+ except (TypeError, ValueError):
778
+ seq = 0
779
+
780
  with self._lock:
781
+ cur = self._signals.get(asset)
782
+ if cur is not None and seq <= cur.seq:
783
+ # Replay or out-of-order β€” ignore (matches ranker SignalSubscriber semantics)
784
+ return
785
+ try:
786
+ price = float(signal.get("price", 0.0) or 0.0)
787
+ except (TypeError, ValueError):
788
+ price = 0.0
789
+ self._signals[asset] = RealtimeSignal(
790
+ action = str(signal.get("action", "NONE")).upper(),
791
+ price = price,
792
+ seq = seq,
793
+ ts = float(signal.get("ts") or time.time()),
794
+ source = str(signal.get("source", "") or ""),
795
+ )
796
+
797
+ # ── Reader ─────────────────────────────────────────────────────────────────
798
 
799
  def get_state(self) -> dict:
800
+ """Merge cumulative + realtime views into the payload `/api/state` returns."""
801
+ now = time.time()
802
  with self._lock:
803
+ snaps_out = []
804
+ for snap in self._snapshots.values():
805
+ rt = self._signals.get(snap.space_name)
806
+
807
+ # Realtime per-tick action (may be empty if /ws/signals never fired)
808
+ latest_signal = rt.action if rt else ""
809
+ rt_fresh = (
810
+ rt is not None
811
+ and rt.action in ("BUY", "SELL")
812
+ and (now - rt.ts) <= _REALTIME_SIGNAL_FRESH_SEC
813
+ )
814
+
815
+ # vecOf in the HTML reads flip_direction FIRST. Put the per-tick
816
+ # action there when it's fresh and directional, so the dashboard
817
+ # reflects the most recent signal within ~30 ms of the tick.
818
+ # Otherwise fall back to the cumulative voting direction.
819
+ display_direction = rt.action if rt_fresh else snap.flip_direction
820
+
821
+ snaps_out.append({
822
+ "space_name": snap.space_name,
823
+ "flip_direction": display_direction,
824
+ "cumulative_flip_direction": snap.flip_direction,
825
+ "latest_signal": latest_signal,
826
+ "flip_action": snap.flip_action,
827
+ "last_price": round(snap.last_price, 6),
828
+ "buy_count": snap.buy_count,
829
+ "sell_count": snap.sell_count,
830
+ "signal_confidence": round(snap.blended_confidence, 4),
831
+ "confidence": round(snap.blended_confidence, 4),
832
+ "vote_confidence": round(snap.vote_confidence, 4),
833
+ "train_confidence": round(snap.train_confidence, 4),
834
+ "training_steps": snap.training_steps,
835
+ "actor_loss": round(snap.actor_loss, 6),
836
+ "critic_loss": round(snap.critic_loss, 6),
837
+ "avn_loss": round(snap.avn_loss, 6),
838
+ "avn_accuracy": round(snap.avn_accuracy, 4),
839
+ "score": round(snap.score, 4),
840
+ "last_updated": snap.last_updated,
841
+ # Realtime diagnostics
842
+ "realtime_seq": rt.seq if rt else 0,
843
+ "realtime_ts": rt.ts if rt else 0.0,
844
+ "realtime_source": rt.source if rt else "",
845
+ })
846
+ return {"snapshots": snaps_out}
847
 
848
+
849
+ # ══════════════════════════════════════════════════════════════════════════════════════
850
+ # WebSocket subscriber base β€” DRY shared reconnect/backoff loop
851
+ # ══════════════════════════════════════════════════════════════════════════════════════
852
+
853
+ class _BaseSubscriber:
854
+ """Shared reconnect loop for both metrics and signals subscribers."""
855
+
856
+ NAME = "Subscriber"
857
+ MAX_BACKOFF = 30
858
+
859
+ def __init__(self, url: str):
860
+ self.url = url
861
  self._ws = None
862
  self._running = False
863
+ self._thread: Optional[threading.Thread] = None
864
  self._reconnect_count = 0
 
865
 
866
  def start(self) -> None:
867
  if self._running:
868
  return
869
  self._running = True
870
+ self._thread = threading.Thread(
871
+ target=self._run_loop, daemon=True, name=self.NAME
872
  )
873
  self._thread.start()
874
+ logger.info(f"[{self.NAME}] Starting β†’ {self.url}")
875
 
876
  def stop(self) -> None:
877
  self._running = False
 
888
  try:
889
  self._connect_and_run()
890
  except Exception as e:
891
+ logger.error(f"[{self.NAME}] error: {e}")
892
  if not self._running:
893
  break
894
+ backoff = min(self.MAX_BACKOFF, 2 ** min(self._reconnect_count, 4))
895
  logger.info(
896
+ f"[{self.NAME}] reconnect in {backoff}s "
897
  f"(attempt #{self._reconnect_count + 1})"
898
  )
899
  time.sleep(backoff)
 
901
 
902
  def _connect_and_run(self) -> None:
903
  self._ws = websocket.WebSocketApp(
904
+ self.url,
905
  on_message = self._on_message,
906
+ on_open = lambda ws: self._on_open(),
907
+ on_error = lambda ws, e: logger.warning(f"[{self.NAME}] WS error: {e}"),
908
+ on_close = lambda ws, c, m: logger.info(f"[{self.NAME}] closed code={c}"),
 
 
 
 
 
909
  )
910
+ self._ws.run_forever(ping_interval=30, ping_timeout=10, reconnect=0)
911
+
912
+ def _on_open(self) -> None:
913
+ self._reconnect_count = 0
914
+ logger.info(f"[{self.NAME}] βœ… Connected")
915
+
916
+ def _on_message(self, ws, raw: str) -> None: # pragma: no cover β€” overridden
917
+ raise NotImplementedError
918
+
919
+
920
+ class MetricsSubscriberClient(_BaseSubscriber):
921
+ """Subscribes to /ws/subscribe for cumulative per-asset snapshots."""
922
+
923
+ NAME = "MetricsSubscriber"
924
+
925
+ def __init__(self, state: DashboardState):
926
+ super().__init__(f"wss://{_HUB_HOST}/ws/subscribe")
927
+ self.state = state
928
 
929
  def _on_message(self, ws, raw: str) -> None:
930
  try:
 
932
  kind = msg.get("type", "")
933
 
934
  if kind == "metrics_update":
935
+ # Hub format: {"type":"metrics_update","asset":{"space_name","metadata","snapshot"},...}
936
+ asset = msg.get("asset") or {}
937
+ space_name = asset.get("space_name", "")
938
+ snapshot = asset.get("snapshot", {}) or {}
939
+ metadata = asset.get("metadata", {}) or {}
940
+ if space_name and snapshot:
941
+ self.state.update_from_metrics(space_name, snapshot, metadata)
942
 
943
  elif kind == "initial_state":
944
+ # Hub format: {"type":"initial_state","assets":{ "<name>":{"metadata","snapshot"} },...}
945
+ # NOTE: legacy key was "snapshots" β€” check both for safety across versions.
946
+ assets = msg.get("assets") or msg.get("snapshots") or {}
947
+ if isinstance(assets, dict):
948
+ for space_name, payload in assets.items():
949
+ if not isinstance(payload, dict):
950
+ continue
951
+ # New format wraps under {metadata, snapshot}; old format was the snapshot itself.
952
+ if "snapshot" in payload:
953
+ self.state.update_from_metrics(
954
+ space_name,
955
+ payload.get("snapshot", {}) or {},
956
+ payload.get("metadata", {}) or {},
957
+ )
958
+ else:
959
+ self.state.update_from_metrics(space_name, payload, {})
960
 
961
  except Exception as e:
962
+ logger.debug(f"[{self.NAME}] parse error: {e}")
963
+
964
+
965
+ class SignalSubscriberClient(_BaseSubscriber):
966
+ """Subscribes to /ws/signals for realtime per-tick BUY/SELL/HOLD actions."""
967
+
968
+ NAME = "SignalSubscriber"
969
+
970
+ def __init__(self, state: DashboardState):
971
+ super().__init__(f"wss://{_HUB_HOST}/ws/signals")
972
+ self.state = state
973
+
974
+ def _on_message(self, ws, raw: str) -> None:
975
+ try:
976
+ msg = json.loads(raw)
977
+ kind = msg.get("type", "")
978
+ if kind not in ("signal_snapshot", "signal_delta"):
979
+ return
980
+ signals = msg.get("signals") or []
981
+ if not isinstance(signals, list):
982
+ return
983
+ for sig in signals:
984
+ if isinstance(sig, dict):
985
+ self.state.update_from_signal(sig)
986
+ except Exception as e:
987
+ logger.debug(f"[{self.NAME}] parse error: {e}")
988
+
989
+
990
+ # Backward-compat alias β€” anything that imported HubSubscriberClient from earlier
991
+ # revisions of this file keeps working without touching its imports.
992
+ HubSubscriberClient = MetricsSubscriberClient
993
 
994
 
995
  # ══════════════════════════════════════════════════════════════════════════════════════
 
1000
  _trade_parser = TradeLogParser(log_dir=_LOG_DIR)
1001
  _trade_parser.start_background()
1002
 
1003
+ # Start hub subscribers β€” _state stays in sync with both cumulative metrics
1004
+ # (/ws/subscribe) and realtime per-tick signals (/ws/signals). Each runs in
1005
+ # its own daemon thread with independent reconnect/backoff.
1006
+ _metrics_subscriber = MetricsSubscriberClient(state=_state)
1007
+ _metrics_subscriber.start()
1008
+
1009
+ _signal_subscriber = SignalSubscriberClient(state=_state)
1010
+ _signal_subscriber.start()
1011
+
1012
+ # Backward-compat name in case anything else in the process imports this.
1013
+ _hub_subscriber = _metrics_subscriber
1014
 
1015
  app = Flask(__name__)
1016
  CORS(app)
 
1202
 
1203
  @app.route("/api/health")
1204
  def health():
1205
+ return jsonify({"status": "ok", "version": "v2.6-realtime-signals"})
1206
 
1207
 
1208
  @app.route("/api/debug")
 
1242
 
1243
 
1244
  if __name__ == "__main__":
1245
+ logger.info("=== K1RL QUASAR HUB DASHBOARD SERVICE v2.6 (REALTIME SIGNALS) ===")
1246
  logger.info(f"Dashboard port: {_DASHBOARD_PORT} (HF Spaces public port)")
1247
  logger.info(f"Log directory: {_LOG_DIR}")
1248
+ logger.info(f"Hub host: {_HUB_HOST}")
1249
+ logger.info("v2.6 fixes:")
1250
+ logger.info(" βœ… MetricsSubscriber β†’ /ws/subscribe (unwraps msg.asset, reads voting.*)")
1251
+ logger.info(" βœ… SignalSubscriber β†’ /ws/signals (per-tick BUY/SELL within ~30 ms)")
1252
+ logger.info(" βœ… get_state() merges both streams; flip_direction reflects realtime tick")
1253
+ logger.info(" βœ… Full snapshot fields exposed (buy/sell counts, training, confidence)")
1254
+ logger.info("Carried over from v2.5:")
1255
+ logger.info(" βœ… port 7860 / wss:// no explicit port / log path scan / /api/debug")
1256
  logger.info(f" β†’ Visit /api/debug to inspect log file discovery live")
1257
 
1258
  app.run(host="0.0.0.0", port=_DASHBOARD_PORT, debug=False, use_reloader=False)