KarlQuant commited on
Commit
de9e841
Β·
verified Β·
1 Parent(s): 120e492

Update websocket_hub.py

Browse files
Files changed (1) hide show
  1. websocket_hub.py +344 -8
websocket_hub.py CHANGED
@@ -71,10 +71,19 @@ _ALLOWED_TRAINING_FIELDS: frozenset = frozenset({
71
  })
72
 
73
  _ALLOWED_VOTING_FIELDS: frozenset = frozenset({
 
74
  "flip_direction",
75
  "flip_action",
76
  "last_price",
77
  "signal_source",
 
 
 
 
 
 
 
 
78
  })
79
 
80
 
@@ -130,22 +139,54 @@ def _validate_and_normalize(space_name: str, raw: dict) -> Optional[dict]:
130
 
131
  voting: dict = {}
132
  if voting_raw:
133
- raw_direction = voting_raw.get("flip_direction", "NONE")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
134
  if not isinstance(raw_direction, str):
135
  raw_direction = "NONE"
136
- raw_action = voting_raw.get("flip_action", "HOLD")
137
  if not isinstance(raw_action, str):
138
  raw_action = "HOLD"
139
- raw_source = voting_raw.get("signal_source", "LOG")
140
- if not isinstance(raw_source, str):
141
- raw_source = "LOG"
142
  _clean_direction = raw_direction.upper() if raw_direction.upper() in {"BUY", "SELL", "NONE"} else "NONE"
143
  _clean_action = raw_action.upper() if raw_action.upper() in {"ENTRY", "EXIT", "HOLD", "REBALANCE", "REDUCE", "SKIP"} else "HOLD"
 
144
  voting = {
145
  "flip_direction": _clean_direction,
146
  "flip_action": _clean_action,
147
  "last_price": _float(voting_raw.get("last_price", 0.0)),
148
  "signal_source": raw_source,
 
 
 
 
149
  }
150
 
151
  return {
@@ -177,7 +218,10 @@ class ConnectionManager:
177
  "step": "training_steps","steps": "training_steps","n_steps": "training_steps",
178
  }
179
  _VOTING_KEYS: frozenset = frozenset({
 
180
  "flip_direction", "flip_action", "last_price", "signal_source",
 
 
181
  })
182
 
183
  def __init__(self):
@@ -463,6 +507,170 @@ _axrvi_rankings: List[dict] = []
463
  _axrvi_rankings_ts: float = 0.0
464
  _AXRVI_RANKINGS_TTL: float = 60.0 # FIX: extended 30β†’60s to survive slow ranker POST cycles
465
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
466
  # ── Top-3 WebSocket client registry ───────────────────────────────────────────────────
467
  # top3_client.py connects here and receives top3_rankings broadcasts whenever the
468
  # Executo ranker POSTs new rankings via POST /api/flip/rankings.
@@ -520,8 +728,10 @@ manager = ConnectionManager()
520
 
521
  @app.on_event("startup")
522
  async def _on_startup():
523
- """Nothing to start β€” HubTradeStore is in-memory, populated by WS messages."""
 
524
  logger.info("πŸš€ HubTradeStore ready (no background scanner needed)")
 
525
 
526
 
527
  # ══════════════════════════════════════════════════════════════════════════════════════
@@ -546,12 +756,44 @@ async def ws_publisher_endpoint(websocket: WebSocket, space_name: str):
546
  manager.record_msg(space_name, msg_type)
547
 
548
  # ── Route by type ────────────────────────────────────────────────────
549
- if msg_type == "metrics":
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
550
  # Combined payload: top-level "training" and "voting" dicts
 
551
  await manager.ingest(space_name, {
552
  "training": data.get("training", {}),
553
- "voting": data.get("voting", {}),
554
  })
 
 
 
 
 
555
 
556
  elif msg_type == "training":
557
  # Bug A fix: try "data" wrapper first, then fall back to top-level fields.
@@ -576,6 +818,9 @@ async def ws_publisher_endpoint(websocket: WebSocket, space_name: str):
576
  if k in manager._VOTING_KEYS and k != "type"
577
  }
578
  await manager.ingest(space_name, {"training": {}, "voting": voting_raw})
 
 
 
579
 
580
  elif msg_type in ("heartbeat", "identify", "ping"):
581
  pass
@@ -612,6 +857,9 @@ async def ws_publisher_endpoint(websocket: WebSocket, space_name: str):
612
  "training": rescued_training,
613
  "voting": rescued_voting,
614
  })
 
 
 
615
  else:
616
  logger.warning(
617
  f"[{space_name}] ⚠ Unknown type='{msg_type}' with no "
@@ -684,6 +932,67 @@ async def ws_top3_endpoint(websocket: WebSocket):
684
  logger.info(f"πŸ“‰ top3 client disconnected (remaining={len(_top3_clients)})")
685
 
686
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
687
  # ══════════════════════════════════════════════════════════════════════════════════════
688
  # SECTION 6 β€” REST API (READ-ONLY)
689
  # ══════════════════════════════════════════════════════════════════════════════════════
@@ -1219,6 +1528,33 @@ async def receive_axrvi_rankings(request: Request):
1219
  return JSONResponse({"ok": True, "count": len(rankings), "ts": _axrvi_rankings_ts})
1220
 
1221
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1222
  @app.get("/api/state")
1223
  async def api_state():
1224
  """Full dashboard state polled by hub_dashboard.html every 2 s."""
 
71
  })
72
 
73
  _ALLOWED_VOTING_FIELDS: frozenset = frozenset({
74
+ # Canonical (post-v2 schema) ────────────────────────────────────────────
75
  "flip_direction",
76
  "flip_action",
77
  "last_price",
78
  "signal_source",
79
+ # Legacy aliases accepted on the wire β€” translated inside the hub ───────
80
+ # Executor spaces still emit dominant_signal / buy_count / sell_count
81
+ # because their publisher code pre-dates the flip_* rename. We accept
82
+ # them at ingestion and map them to flip_direction so downstream code
83
+ # (ranker, dashboard) can stay on the unified flip_* schema.
84
+ "dominant_signal",
85
+ "buy_count",
86
+ "sell_count",
87
  })
88
 
89
 
 
139
 
140
  voting: dict = {}
141
  if voting_raw:
142
+ # ── Canonical fields (new schema) ──────────────────────────────────
143
+ raw_direction = voting_raw.get("flip_direction", None)
144
+ raw_action = voting_raw.get("flip_action", None)
145
+ raw_source = voting_raw.get("signal_source", "LOG")
146
+ if not isinstance(raw_source, str):
147
+ raw_source = "LOG"
148
+
149
+ # ── Legacy fallback: translate dominant_signal β†’ flip_direction ────
150
+ # The asset/executor spaces still publish the old schema
151
+ # ({dominant_signal, buy_count, sell_count}) because their publisher
152
+ # code was never rewritten. We map it to the new flip_* schema here
153
+ # so everything downstream stays on one naming convention.
154
+ #
155
+ # dominant_signal=BUY β†’ flip_direction=BUY, flip_action=ENTRY
156
+ # dominant_signal=SELL β†’ flip_direction=SELL, flip_action=ENTRY
157
+ # dominant_signal=NEUTRAL β†’ flip_direction=NONE, flip_action=HOLD
158
+ #
159
+ # If a payload carries BOTH flip_direction and dominant_signal,
160
+ # flip_direction wins (forward-compat with newer publishers).
161
+ if not isinstance(raw_direction, str) or raw_direction.upper() not in {"BUY", "SELL", "NONE"}:
162
+ legacy_signal = voting_raw.get("dominant_signal", None)
163
+ if isinstance(legacy_signal, str):
164
+ legacy_upper = legacy_signal.upper()
165
+ if legacy_upper in {"BUY", "SELL"}:
166
+ raw_direction = legacy_upper
167
+ if not isinstance(raw_action, str) or raw_action.upper() == "HOLD":
168
+ raw_action = "ENTRY"
169
+ elif legacy_upper == "NEUTRAL":
170
+ raw_direction = "NONE"
171
+ raw_action = "HOLD"
172
+
173
  if not isinstance(raw_direction, str):
174
  raw_direction = "NONE"
 
175
  if not isinstance(raw_action, str):
176
  raw_action = "HOLD"
177
+
 
 
178
  _clean_direction = raw_direction.upper() if raw_direction.upper() in {"BUY", "SELL", "NONE"} else "NONE"
179
  _clean_action = raw_action.upper() if raw_action.upper() in {"ENTRY", "EXIT", "HOLD", "REBALANCE", "REDUCE", "SKIP"} else "HOLD"
180
+
181
  voting = {
182
  "flip_direction": _clean_direction,
183
  "flip_action": _clean_action,
184
  "last_price": _float(voting_raw.get("last_price", 0.0)),
185
  "signal_source": raw_source,
186
+ # Preserve vote counts (legacy format) so the ranker can compute
187
+ # vote-ratio confidence when the publisher emits them.
188
+ "buy_count": _int(voting_raw.get("buy_count", 0)),
189
+ "sell_count": _int(voting_raw.get("sell_count", 0)),
190
  }
191
 
192
  return {
 
218
  "step": "training_steps","steps": "training_steps","n_steps": "training_steps",
219
  }
220
  _VOTING_KEYS: frozenset = frozenset({
221
+ # Canonical
222
  "flip_direction", "flip_action", "last_price", "signal_source",
223
+ # Legacy (translated inside _validate_and_normalize)
224
+ "dominant_signal", "buy_count", "sell_count",
225
  })
226
 
227
  def __init__(self):
 
507
  _axrvi_rankings_ts: float = 0.0
508
  _AXRVI_RANKINGS_TTL: float = 60.0 # FIX: extended 30β†’60s to survive slow ranker POST cycles
509
 
510
+ # ══════════════════════════════════════════════════════════════════════════════════════
511
+ # FLIP FAST-PATH CHANNEL
512
+ # ═══════════════════════════════════════���══════════════════════════════════════════════
513
+ # A dedicated, high-priority, per-asset latest-wins cache + broadcaster for BUY/SELL
514
+ # flip events. Flips are time-sensitive (change every few hundred ms) and must not
515
+ # be delayed by the slower training/snapshot machinery.
516
+ #
517
+ # Architecture:
518
+ # Engine ──► {"type":"flip", "data": {"flip_direction":"BUY"|"SELL"|"NONE", ...}}
519
+ # β”‚
520
+ # β–Ό (short-circuit handler in publisher_handler β€” skips _validate_and_normalize)
521
+ # _store_flip(asset, payload) ── atomic write under _flip_lock, seq++
522
+ # β”‚
523
+ # β–Ό sets _flip_dirty event
524
+ # _flip_broadcaster_loop() ── dedicated asyncio task, runs every 30 ms
525
+ # β”‚ (or immediately when dirty, whichever is first)
526
+ # β–Ό
527
+ # /ws/flips subscribers (ranker, executor) ── push JSON delta
528
+ #
529
+ # Per-asset signal consistency:
530
+ # β€’ Latest-wins: newer flip replaces older one atomically under the lock.
531
+ # β€’ Monotonic seq per asset so consumers can detect replays / out-of-order.
532
+ # β€’ On subscribe, a full replay of the current cache is sent first so clients
533
+ # start with a consistent view.
534
+ #
535
+ # Backward-compat auto-bridge:
536
+ # Any inbound voting message whose normalized flip_direction ∈ {BUY, SELL} is
537
+ # AUTOMATICALLY promoted into the flip cache as well. This means existing
538
+ # publishers that send {"type":"voting","dominant_signal":"BUY"} or metrics with
539
+ # voting nested still benefit from the fast path without any code change on
540
+ # their end.
541
+ # ──────────────────────────────────────────────────────────────────────────────────────
542
+
543
+ _FLIP_BROADCAST_INTERVAL_SEC: float = 0.030 # 30 ms cadence per sketch
544
+ _flip_cache: Dict[str, dict] = {} # {asset: {flip_direction, flip_action, last_price, ts, seq, ...}}
545
+ _flip_seq: Dict[str, int] = {} # monotonic per-asset sequence
546
+ _flip_lock: asyncio.Lock = asyncio.Lock()
547
+ _flip_dirty: asyncio.Event = asyncio.Event()
548
+ _flip_subscribers: Set[WebSocket] = set()
549
+ _flip_subscribers_lock: asyncio.Lock = asyncio.Lock()
550
+
551
+
552
+ async def _store_flip(asset: str, raw: dict) -> Optional[dict]:
553
+ """
554
+ Write one flip into the per-asset cache with signal consistency.
555
+ Returns the normalized flip dict (or None if the payload was rejected).
556
+
557
+ Contract:
558
+ β€’ Accepts either {flip_direction: BUY|SELL|NONE|NEUTRAL} or legacy
559
+ {dominant_signal: BUY|SELL|NEUTRAL}. NEUTRAL is coerced to NONE.
560
+ β€’ Write is atomic under _flip_lock (latest-wins).
561
+ β€’ Sequence number is incremented monotonically per asset.
562
+ β€’ Sets _flip_dirty so the broadcaster picks it up within 30 ms.
563
+ """
564
+ if not isinstance(raw, dict):
565
+ return None
566
+
567
+ # Normalize direction β€” accept both new and legacy naming
568
+ raw_dir = raw.get("flip_direction")
569
+ if not isinstance(raw_dir, str) or raw_dir.upper() not in {"BUY", "SELL", "NONE", "NEUTRAL"}:
570
+ raw_dir = raw.get("dominant_signal")
571
+ if isinstance(raw_dir, str):
572
+ raw_dir = raw_dir.upper()
573
+ if raw_dir == "NEUTRAL":
574
+ raw_dir = "NONE"
575
+ else:
576
+ raw_dir = "NONE"
577
+ if raw_dir not in {"BUY", "SELL", "NONE"}:
578
+ raw_dir = "NONE"
579
+
580
+ # Normalize action
581
+ raw_act = raw.get("flip_action")
582
+ if isinstance(raw_act, str) and raw_act.upper() in {"ENTRY", "EXIT", "HOLD", "REBALANCE", "REDUCE", "SKIP"}:
583
+ raw_act = raw_act.upper()
584
+ else:
585
+ raw_act = "ENTRY" if raw_dir in {"BUY", "SELL"} else "HOLD"
586
+
587
+ try:
588
+ price = float(raw.get("last_price", 0.0) or 0.0)
589
+ except Exception:
590
+ price = 0.0
591
+ source = raw.get("signal_source", "engine")
592
+ if not isinstance(source, str):
593
+ source = "engine"
594
+
595
+ async with _flip_lock:
596
+ prev = _flip_cache.get(asset)
597
+ # De-dup: if direction AND action AND price are unchanged, don't bump seq.
598
+ # (Signal consistency: the ranker only reacts to real flips.)
599
+ if prev and prev["flip_direction"] == raw_dir and prev["flip_action"] == raw_act and prev["last_price"] == price:
600
+ # Refresh timestamp only β€” no seq bump, no dirty flag
601
+ prev["ts"] = time.time()
602
+ return prev
603
+
604
+ seq = _flip_seq.get(asset, 0) + 1
605
+ _flip_seq[asset] = seq
606
+ flip = {
607
+ "asset": asset,
608
+ "flip_direction": raw_dir,
609
+ "flip_action": raw_act,
610
+ "last_price": price,
611
+ "signal_source": source,
612
+ "ts": time.time(),
613
+ "seq": seq,
614
+ }
615
+ _flip_cache[asset] = flip
616
+
617
+ _flip_dirty.set()
618
+ return flip
619
+
620
+
621
+ async def _flip_broadcaster_loop():
622
+ """
623
+ Dedicated task: every 30 ms (or immediately on dirty) push the current
624
+ flip cache to all /ws/flips subscribers. Dead connections are pruned.
625
+ """
626
+ logger.info(f"🎯 Flip broadcaster started | interval={_FLIP_BROADCAST_INTERVAL_SEC*1000:.0f}ms")
627
+ while True:
628
+ try:
629
+ # Wait for either a dirty signal or the 30 ms timeout
630
+ try:
631
+ await asyncio.wait_for(_flip_dirty.wait(), timeout=_FLIP_BROADCAST_INTERVAL_SEC)
632
+ except asyncio.TimeoutError:
633
+ pass
634
+ _flip_dirty.clear()
635
+
636
+ async with _flip_subscribers_lock:
637
+ if not _flip_subscribers:
638
+ continue
639
+ subscribers_snapshot = list(_flip_subscribers)
640
+
641
+ async with _flip_lock:
642
+ if not _flip_cache:
643
+ continue
644
+ flips_snapshot = list(_flip_cache.values())
645
+
646
+ msg = {
647
+ "type": "flip_delta",
648
+ "flips": flips_snapshot,
649
+ "total_assets": len(flips_snapshot),
650
+ "hub_timestamp": time.time(),
651
+ }
652
+
653
+ dead: List[WebSocket] = []
654
+ for ws in subscribers_snapshot:
655
+ try:
656
+ await ws.send_json(msg)
657
+ except Exception:
658
+ dead.append(ws)
659
+
660
+ if dead:
661
+ async with _flip_subscribers_lock:
662
+ for ws in dead:
663
+ _flip_subscribers.discard(ws)
664
+ logger.info(f"🎯 Pruned {len(dead)} dead flip subscriber(s)")
665
+
666
+ except asyncio.CancelledError:
667
+ logger.info("🎯 Flip broadcaster cancelled")
668
+ raise
669
+ except Exception as e:
670
+ logger.error(f"🎯 Flip broadcaster error: {e}")
671
+ await asyncio.sleep(0.1)
672
+
673
+
674
  # ── Top-3 WebSocket client registry ───────────────────────────────────────────────────
675
  # top3_client.py connects here and receives top3_rankings broadcasts whenever the
676
  # Executo ranker POSTs new rankings via POST /api/flip/rankings.
 
728
 
729
  @app.on_event("startup")
730
  async def _on_startup():
731
+ """Start background tasks β€” flip broadcaster runs on the event loop as a
732
+ dedicated task (Python's equivalent of a dedicated thread for asyncio)."""
733
  logger.info("πŸš€ HubTradeStore ready (no background scanner needed)")
734
+ asyncio.create_task(_flip_broadcaster_loop())
735
 
736
 
737
  # ══════════════════════════════════════════════════════════════════════════════════════
 
756
  manager.record_msg(space_name, msg_type)
757
 
758
  # ── Route by type ────────────────────────────────────────────────────
759
+ if msg_type == "flip":
760
+ # β˜… HIGH-PRIORITY FLIP FAST PATH β˜…
761
+ # Bypasses the heavier _validate_and_normalize / snapshot path
762
+ # and writes directly into the flip cache. The broadcaster task
763
+ # picks it up within 30 ms and pushes to all /ws/flips subscribers.
764
+ flip_raw = data.get("data") or {
765
+ k: v for k, v in data.items() if k != "type"
766
+ }
767
+ stored = await _store_flip(space_name, flip_raw)
768
+ if stored and stored["flip_direction"] in {"BUY", "SELL"}:
769
+ logger.info(
770
+ f"[{space_name}] ⚑ flip#{stored['seq']} | "
771
+ f"{stored['flip_direction']}/{stored['flip_action']} @ {stored['last_price']:.5f}"
772
+ )
773
+ # Also mirror into the main snapshot so clients that only use
774
+ # /ws/subscribe (dashboard, legacy) stay in sync.
775
+ await manager.ingest(space_name, {
776
+ "training": {},
777
+ "voting": {
778
+ "flip_direction": flip_raw.get("flip_direction") or flip_raw.get("dominant_signal", "NONE"),
779
+ "flip_action": flip_raw.get("flip_action", "ENTRY"),
780
+ "last_price": flip_raw.get("last_price", 0.0),
781
+ "signal_source": flip_raw.get("signal_source", "engine"),
782
+ },
783
+ })
784
+
785
+ elif msg_type == "metrics":
786
  # Combined payload: top-level "training" and "voting" dicts
787
+ voting_payload = data.get("voting", {})
788
  await manager.ingest(space_name, {
789
  "training": data.get("training", {}),
790
+ "voting": voting_payload,
791
  })
792
+ # Auto-bridge: if the voting carries a directional signal,
793
+ # promote it to the flip cache so subscribers on /ws/flips see
794
+ # it immediately even though the publisher used the legacy type.
795
+ if voting_payload:
796
+ await _store_flip(space_name, voting_payload)
797
 
798
  elif msg_type == "training":
799
  # Bug A fix: try "data" wrapper first, then fall back to top-level fields.
 
818
  if k in manager._VOTING_KEYS and k != "type"
819
  }
820
  await manager.ingest(space_name, {"training": {}, "voting": voting_raw})
821
+ # Auto-bridge into fast path (see comment under "metrics").
822
+ if voting_raw:
823
+ await _store_flip(space_name, voting_raw)
824
 
825
  elif msg_type in ("heartbeat", "identify", "ping"):
826
  pass
 
857
  "training": rescued_training,
858
  "voting": rescued_voting,
859
  })
860
+ # Auto-bridge rescued voting into fast path too
861
+ if rescued_voting:
862
+ await _store_flip(space_name, rescued_voting)
863
  else:
864
  logger.warning(
865
  f"[{space_name}] ⚠ Unknown type='{msg_type}' with no "
 
932
  logger.info(f"πŸ“‰ top3 client disconnected (remaining={len(_top3_clients)})")
933
 
934
 
935
+ @app.websocket("/ws/flips")
936
+ async def ws_flips_endpoint(websocket: WebSocket):
937
+ """
938
+ /ws/flips β€” HIGH-PRIORITY flip-only channel.
939
+
940
+ Consumed by:
941
+ β€’ The ranker's FlipSubscriber (fast path into AssetSnapshot, bypassing
942
+ the slower /ws/subscribe snapshot broadcast).
943
+ β€’ The executor (to send flips to MT5 the moment they arrive).
944
+
945
+ Protocol:
946
+ β€’ On connect, the hub replays the full current flip cache as
947
+ {"type": "flip_snapshot", "flips": [...], "hub_timestamp": T}
948
+ so the subscriber starts with a consistent per-asset view.
949
+ β€’ Thereafter, every flip update (either from a type="flip" publisher
950
+ message, or auto-bridged from voting/metrics) is pushed as
951
+ {"type": "flip_delta", "flips": [...], "hub_timestamp": T}
952
+ at a coalesced cadence of _FLIP_BROADCAST_INTERVAL_SEC (30 ms).
953
+ β€’ Each flip carries a monotonically increasing `seq` per asset so
954
+ the consumer can detect replays / out-of-order.
955
+
956
+ Signal-consistency guarantee:
957
+ β€’ Writes into _flip_cache are serialized through _flip_lock.
958
+ β€’ Readers take a consistent snapshot under the same lock.
959
+ β€’ No torn reads, no lost updates β€” the ranker always sees exactly one
960
+ authoritative flip per asset.
961
+ """
962
+ await websocket.accept()
963
+ async with _flip_subscribers_lock:
964
+ _flip_subscribers.add(websocket)
965
+ logger.info(f"🎯 Flip subscriber connected (total={len(_flip_subscribers)})")
966
+
967
+ # ── Replay full cache on connect (so the client has state immediately) ──
968
+ async with _flip_lock:
969
+ replay = list(_flip_cache.values())
970
+ if replay:
971
+ try:
972
+ await websocket.send_text(json.dumps({
973
+ "type": "flip_snapshot",
974
+ "flips": replay,
975
+ "total_assets": len(replay),
976
+ "hub_timestamp": time.time(),
977
+ }))
978
+ except Exception:
979
+ pass
980
+
981
+ try:
982
+ while True:
983
+ # Keep-alive β€” the subscriber doesn't send messages, only receives.
984
+ # If it does send something (e.g. ping), we just discard it.
985
+ await websocket.receive_text()
986
+ except WebSocketDisconnect:
987
+ pass
988
+ except Exception as e:
989
+ logger.error(f"[flips] Subscriber error: {e}")
990
+ finally:
991
+ async with _flip_subscribers_lock:
992
+ _flip_subscribers.discard(websocket)
993
+ logger.info(f"🎯 Flip subscriber disconnected (remaining={len(_flip_subscribers)})")
994
+
995
+
996
  # ══════════════════════════════════════════════════════════════════════════════════════
997
  # SECTION 6 β€” REST API (READ-ONLY)
998
  # ══════════════════════════════════════════════════════════════════════════════════════
 
1528
  return JSONResponse({"ok": True, "count": len(rankings), "ts": _axrvi_rankings_ts})
1529
 
1530
 
1531
+ @app.get("/api/flips")
1532
+ async def api_flips():
1533
+ """Current flip cache β€” snapshot of the latest per-asset flip state.
1534
+
1535
+ Diagnostic + executor fallback: if MT5 executor can't maintain a WS
1536
+ connection, it can poll this endpoint. Response is consistent under the
1537
+ same lock as /ws/flips, so readers never see a torn write.
1538
+ """
1539
+ async with _flip_lock:
1540
+ flips_now = list(_flip_cache.values())
1541
+ return JSONResponse({
1542
+ "flips": flips_now,
1543
+ "total_assets": len(flips_now),
1544
+ "hub_timestamp": time.time(),
1545
+ })
1546
+
1547
+
1548
+ @app.get("/api/flips/{asset}")
1549
+ async def api_flips_asset(asset: str):
1550
+ """Single-asset flip lookup."""
1551
+ async with _flip_lock:
1552
+ flip = _flip_cache.get(asset)
1553
+ if not flip:
1554
+ return JSONResponse({"ok": False, "error": f"No flip for {asset}"}, status_code=404)
1555
+ return JSONResponse({"ok": True, "flip": flip, "hub_timestamp": time.time()})
1556
+
1557
+
1558
  @app.get("/api/state")
1559
  async def api_state():
1560
  """Full dashboard state polled by hub_dashboard.html every 2 s."""