KarlQuant commited on
Commit
012fb5c
Β·
verified Β·
1 Parent(s): b734b4b

Update hub_dashboard_service.py

Browse files
Files changed (1) hide show
  1. hub_dashboard_service.py +172 -4
hub_dashboard_service.py CHANGED
@@ -280,14 +280,75 @@ class DashboardState:
280
 
281
  self._rankings: List[dict] = []
282
  self._snapshots: Dict[str, dict] = {}
 
283
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
284
  def get_state(self) -> dict:
285
  with self._lock:
286
  return {
287
- "rankings": list(self._rankings),
288
- "metric_history": {},
289
- "health": self._health_snapshot(),
290
- "timestamp": datetime.utcnow().isoformat() + "Z",
291
  }
292
 
293
  def _health_snapshot(self) -> dict:
@@ -302,6 +363,109 @@ class DashboardState:
302
  }
303
 
304
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
305
  # ══════════════════════════════════════════════════════════════════════════════════════
306
  # SECTION 3 β€” FLASK APP
307
  # ══════════════════════════════════════════════════════════════════════════════════════
@@ -310,6 +474,10 @@ _state = DashboardState()
310
  _trade_parser = TradeLogParser(log_dir=_LOG_DIR)
311
  _trade_parser.start_background()
312
 
 
 
 
 
313
  app = Flask(__name__)
314
  CORS(app)
315
 
 
280
 
281
  self._rankings: List[dict] = []
282
  self._snapshots: Dict[str, dict] = {}
283
+ self._history: Dict[str, deque] = {} # rolling per-space metric history
284
 
285
+ def update_from_snapshot(self, space_name: str, snapshot: dict) -> None:
286
+ """Record a metrics_update frame from the hub WebSocket."""
287
+ with self._lock:
288
+ self._snapshots[space_name] = snapshot
289
+ training = snapshot.get("training", {})
290
+
291
+ # Only append a history point when training has non-zero loss/accuracy
292
+ if any(
293
+ training.get(k, 0) != 0
294
+ for k in ("actor_loss", "critic_loss", "avn_loss", "avn_accuracy")
295
+ ):
296
+ if space_name not in self._history:
297
+ self._history[space_name] = deque(maxlen=self._history_len)
298
+ self._history[space_name].append({
299
+ "ts": snapshot.get("last_updated", time.time()),
300
+ "actor_loss": training.get("actor_loss", 0.0),
301
+ "critic_loss": training.get("critic_loss", 0.0),
302
+ "avn_loss": training.get("avn_loss", 0.0),
303
+ "avn_accuracy": training.get("avn_accuracy", 0.0),
304
+ "training_steps": training.get("training_steps", 0),
305
+ })
306
+
307
+ self.messages_rx += 1
308
+ self.last_update_ts = time.time()
309
+ self.hub_connected = True
310
+
311
+ # Recompute rankings from the latest snapshot collection
312
+ self._rankings = self._compute_rankings()
313
+
314
+ def _compute_rankings(self) -> List[dict]:
315
+ ranked: List[dict] = []
316
+ for name, snap in self._snapshots.items():
317
+ training = snap.get("training", {})
318
+ voting = snap.get("voting", {})
319
+ buy = voting.get("buy_count", 0)
320
+ sell = voting.get("sell_count", 0)
321
+ total = buy + sell
322
+ sig_conf = (max(buy, sell) / total) if total > 0 else 0.0
323
+ avn_acc = training.get("avn_accuracy", 0.0)
324
+ score = round(sig_conf - avn_acc, 6)
325
+ ranked.append({
326
+ "rank": 0,
327
+ "space_name": name,
328
+ "score": score,
329
+ "signal_confidence": round(sig_conf, 6),
330
+ "avn_accuracy": round(avn_acc, 6),
331
+ "dominant_signal": voting.get("dominant_signal", "NEUTRAL"),
332
+ "buy_count": buy,
333
+ "sell_count": sell,
334
+ "training_steps": training.get("training_steps", 0),
335
+ "actor_loss": training.get("actor_loss", 0.0),
336
+ "critic_loss": training.get("critic_loss", 0.0),
337
+ "avn_loss": training.get("avn_loss", 0.0),
338
+ "last_updated": snap.get("last_updated", 0.0),
339
+ })
340
+ ranked.sort(key=lambda r: r["score"], reverse=True)
341
+ for i, r in enumerate(ranked):
342
+ r["rank"] = i + 1
343
+ return ranked
344
+
345
  def get_state(self) -> dict:
346
  with self._lock:
347
  return {
348
+ "rankings": list(self._rankings),
349
+ "metric_history": {name: list(dq) for name, dq in self._history.items()},
350
+ "health": self._health_snapshot(),
351
+ "timestamp": datetime.utcnow().isoformat() + "Z",
352
  }
353
 
354
  def _health_snapshot(self) -> dict:
 
363
  }
364
 
365
 
366
+ # ══════════════════════════════════════════════════════════════════════════════════════
367
+ # SECTION 2B β€” HUB SUBSCRIBER CLIENT
368
+ #
369
+ # Connects to websocket_hub.py via /ws/subscribe and feeds incoming
370
+ # metrics_update / initial_state frames into DashboardState.
371
+ # Runs in a daemon thread with exponential-back-off reconnection.
372
+ # ══════════════════════════════════════════════════════════════════════════════════════
373
+
374
+ _HUB_WS_URL = os.environ.get(
375
+ "QUASAR_HUB_WS",
376
+ "ws://127.0.0.1:7860/ws/subscribe", # local default; override via env var
377
+ )
378
+
379
+ class HubSubscriberClient:
380
+ """
381
+ READ-ONLY WebSocket subscriber that keeps DashboardState in sync with
382
+ whatever the hub knows, including live training metrics.
383
+
384
+ Messages handled:
385
+ β€’ initial_state β€” {snapshots: {space: snapshot}}
386
+ β€’ metrics_update β€” {space_name: str, snapshot: dict}
387
+ """
388
+
389
+ _MAX_BACKOFF = 30
390
+
391
+ def __init__(self, state: "DashboardState", hub_url: str = _HUB_WS_URL):
392
+ self.state = state
393
+ self.hub_url = hub_url
394
+ self._ws: Optional[websocket.WebSocketApp] = None
395
+ self._running = False
396
+ self._thread: Optional[threading.Thread] = None
397
+ self._reconnect_count = 0
398
+
399
+ def start(self) -> None:
400
+ if self._running:
401
+ return
402
+ self._running = True
403
+ self._thread = threading.Thread(
404
+ target=self._run_loop, daemon=True, name="HubSubscriberClient"
405
+ )
406
+ self._thread.start()
407
+ logger.info(f"[HubSubscriberClient] Starting β†’ {self.hub_url}")
408
+
409
+ def stop(self) -> None:
410
+ self._running = False
411
+ if self._ws:
412
+ try:
413
+ self._ws.close()
414
+ except Exception:
415
+ pass
416
+ if self._thread:
417
+ self._thread.join(timeout=3)
418
+
419
+ def _run_loop(self) -> None:
420
+ while self._running:
421
+ try:
422
+ self._connect_and_run()
423
+ except Exception as e:
424
+ logger.error(f"[HubSubscriberClient] error: {e}")
425
+ if not self._running:
426
+ break
427
+ backoff = min(self._MAX_BACKOFF, 2 ** min(self._reconnect_count, 4))
428
+ logger.info(
429
+ f"[HubSubscriberClient] reconnect in {backoff}s "
430
+ f"(attempt #{self._reconnect_count + 1})"
431
+ )
432
+ time.sleep(backoff)
433
+ self._reconnect_count += 1
434
+
435
+ def _connect_and_run(self) -> None:
436
+ self._ws = websocket.WebSocketApp(
437
+ self.hub_url,
438
+ on_message = self._on_message,
439
+ on_open = lambda ws: logger.info("[HubSubscriberClient] βœ… Connected"),
440
+ on_error = lambda ws, e: logger.warning(f"[HubSubscriberClient] WS error: {e}"),
441
+ on_close = lambda ws, c, m: logger.info(f"[HubSubscriberClient] closed code={c}"),
442
+ )
443
+ self._ws.run_forever(
444
+ ping_interval = 30,
445
+ ping_timeout = 10,
446
+ reconnect = 0,
447
+ )
448
+
449
+ def _on_message(self, ws, raw: str) -> None:
450
+ try:
451
+ msg = json.loads(raw)
452
+ kind = msg.get("type", "")
453
+
454
+ if kind == "metrics_update":
455
+ space = msg.get("space_name", "")
456
+ snapshot = msg.get("snapshot", {})
457
+ if space and snapshot:
458
+ self.state.update_from_snapshot(space, snapshot)
459
+
460
+ elif kind == "initial_state":
461
+ for space, snapshot in msg.get("snapshots", {}).items():
462
+ if space and snapshot:
463
+ self.state.update_from_snapshot(space, snapshot)
464
+
465
+ except Exception as e:
466
+ logger.debug(f"[HubSubscriberClient] parse error: {e}")
467
+
468
+
469
  # ══════════════════════════════════════════════════════════════════════════════════════
470
  # SECTION 3 β€” FLASK APP
471
  # ══════════════════════════════════════════════════════════════════════════════════════
 
474
  _trade_parser = TradeLogParser(log_dir=_LOG_DIR)
475
  _trade_parser.start_background()
476
 
477
+ # Start hub subscriber so _state stays in sync with live metrics
478
+ _hub_subscriber = HubSubscriberClient(state=_state)
479
+ _hub_subscriber.start()
480
+
481
  app = Flask(__name__)
482
  CORS(app)
483