KarlQuant commited on
Commit
7b3233f
Β·
verified Β·
1 Parent(s): a3cb980

Upload 2 files

Browse files
Files changed (2) hide show
  1. Quasar_axrvi_ranker.py +214 -129
  2. websocket_hub.py +352 -2
Quasar_axrvi_ranker.py CHANGED
@@ -795,13 +795,43 @@ class AssetSnapshot:
795
  avn_accuracy: float = 0.0
796
  training_steps: int = 0
797
 
 
 
 
 
 
 
 
 
798
  dominant_signal: str = "NEUTRAL"
799
  buy_count: int = 0
800
  sell_count: int = 0
801
 
 
 
 
 
 
 
 
 
 
 
802
  last_updated: float = 0.0
803
 
804
  def apply_update(self, snapshot: dict) -> None:
 
 
 
 
 
 
 
 
 
 
 
 
805
  training = snapshot.get("training", {})
806
  voting = snapshot.get("voting", {})
807
 
@@ -815,24 +845,51 @@ class AssetSnapshot:
815
  self.training_steps = int(training.get("training_steps", self.training_steps))
816
 
817
  if voting:
818
- # ── PREFER flip_direction (direct signal from asset engine) ──
819
- flip_dir = voting.get("flip_direction", "")
820
- if flip_dir.upper() in {"BUY", "SELL"}:
821
- self.dominant_signal = flip_dir.upper()
822
- self._latest_signal_confidence = 1.0 # direct signal, not a vote ratio
823
- else:
824
- # fallback to dominant_signal (legacy vote‑based aggregation)
825
- raw_dominant = voting.get("dominant_signal", self.dominant_signal)
826
- if raw_dominant.upper() in {"BUY", "SELL", "NEUTRAL"}:
827
- self.dominant_signal = raw_dominant.upper()
828
- # If we used dominant_signal, confidence is vote ratio
829
- self._latest_signal_confidence = None
830
-
831
  self.buy_count = int(voting.get("buy_count", self.buy_count))
832
  self.sell_count = int(voting.get("sell_count", self.sell_count))
833
 
834
  self.last_updated = snapshot.get("last_updated", time.time())
835
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
836
 
837
  @property
838
  def total_votes(self) -> int:
@@ -849,17 +906,22 @@ class AssetSnapshot:
849
 
850
  def to_dict(self) -> dict:
851
  return {
852
- "space_name": self.space_name,
853
- "actor_loss": self.actor_loss,
854
- "critic_loss": self.critic_loss,
855
- "avn_loss": self.avn_loss,
856
- "avn_accuracy": self.avn_accuracy,
857
- "training_steps": self.training_steps,
858
- "dominant_signal": self.dominant_signal,
859
- "buy_count": self.buy_count,
860
- "sell_count": self.sell_count,
861
- "signal_confidence": self.signal_confidence,
862
- "last_updated": self.last_updated,
 
 
 
 
 
863
  }
864
 
865
 
@@ -1023,98 +1085,95 @@ class HubSubscriber:
1023
  logger.error(f"[HubSubscriber] on_update callback error: {e}")
1024
 
1025
  # ──────────────────────────────────────────────────────────────────────────
1026
- # Public fast-path injector β€” called by FlipSubscriber when a flip arrives
1027
- # on the /ws/flips channel. Updates the SAME AssetSnapshot instance the
1028
- # ranker reads from, so rank_and_gate sees flips with ~30 ms latency
1029
- # instead of waiting for the next metrics_update on /ws/subscribe.
 
 
1030
  # ──────────────────────────────────────────────────────────────────────────
1031
- def inject_flip(self, space_name: str, flip: dict) -> None:
1032
  """
1033
- Directly merge a flip into the AssetSnapshot under the subscriber's
1034
- lock. Expects `flip` shaped as:
1035
- {"flip_direction": "BUY|SELL|NONE", "flip_action": "ENTRY|HOLD|...",
1036
- "last_price": float, "seq": int, "ts": float, ...}
1037
 
1038
- Handling of flip_direction:
1039
  β€’ BUY / SELL β†’ snap.dominant_signal is set accordingly (confidence=1.0)
1040
- β€’ NONE β†’ explicit reset to NEUTRAL (clears any prior BUY/SELL
1041
- so Gate A stops firing on stale direction)
1042
  """
1043
- if not isinstance(flip, dict):
1044
  return
1045
 
1046
- fd = flip.get("flip_direction", "NONE")
1047
- fd = fd.upper() if isinstance(fd, str) else "NONE"
1048
-
1049
- voting: dict = {
1050
- "flip_direction": fd,
1051
- "flip_action": flip.get("flip_action", "HOLD"),
1052
- "last_price": flip.get("last_price", 0.0),
1053
- "signal_source": flip.get("signal_source", "engine"),
1054
- }
1055
- # Explicit NEUTRAL reset when a NONE flip arrives β€” apply_update's
1056
- # fallback branch only overwrites dominant_signal if the incoming
1057
- # voting dict provides one, so we must include it here.
1058
- if fd == "NONE":
1059
- voting["dominant_signal"] = "NEUTRAL"
1060
 
1061
  with self._lock:
1062
  if space_name not in self._snapshots:
1063
  self._snapshots[space_name] = AssetSnapshot(space_name=space_name)
1064
  snap = self._snapshots[space_name]
1065
- snap.apply_update({
1066
- "training": {},
1067
- "voting": voting,
1068
- "last_updated": flip.get("ts", time.time()),
1069
- })
 
1070
 
1071
  if self.on_update:
1072
  try:
1073
  self.on_update(space_name, snap)
1074
  except Exception as e:
1075
- logger.error(f"[HubSubscriber] on_update (flip-path) callback error: {e}")
1076
 
1077
 
1078
  # ══════════════════════════════════════════════════════════════════════════════════════
1079
- # SECTION 4b β€” FLIP SUBSCRIBER (high-priority, low-latency side channel)
1080
  # ══════════════════════════════════════════════════════════════════════════════════════
1081
  #
1082
- # The FlipSubscriber is a dedicated WebSocket client that consumes only flip
1083
- # events from the hub's /ws/flips endpoint. It runs on its own background
1084
- # thread (matching the sketch: "DEDICATED THREAD, poll every 30ms") and
1085
- # bypasses the slower /ws/subscribe snapshot path for BUY/SELL flips.
 
1086
  #
1087
  # Why a separate subscriber:
1088
  # β€’ /ws/subscribe carries the full training-metrics firehose (losses, steps,
1089
- # avn_accuracy) β€” large payloads at irregular cadence. Flips would queue
1090
- # behind them.
1091
- # β€’ /ws/flips carries only {asset, flip_direction, flip_action, last_price,
1092
- # seq, ts} β€” tiny payloads, coalesced to 30 ms broadcasts.
 
 
 
 
1093
  # β€’ Running on its own thread means a slow rank_and_gate cycle in the main
1094
- # loop cannot stall flip ingestion.
1095
  #
1096
  # Per-asset signal consistency:
1097
  # β€’ The hub assigns a monotonic `seq` per asset. The subscriber tracks
1098
  # `last_seq` and drops replays / out-of-order deliveries.
1099
  # β€’ Updates land in the SAME AssetSnapshot dict the ranker reads from, via
1100
- # HubSubscriber.inject_flip(), so there is exactly one source of truth per
1101
- # asset at any point in time.
1102
  #
1103
- class FlipSubscriber:
1104
  """
1105
- High-priority WS client for /ws/flips. Feeds flips directly into the
1106
- shared HubSubscriber snapshot store via inject_flip().
1107
  """
1108
 
1109
  _MAX_BACKOFF = 30
1110
 
1111
  def __init__(
1112
  self,
1113
- flip_url: str,
1114
  hub_subscriber: "HubSubscriber", # snapshots are written through here
1115
  ranker_logger: Optional[object] = None,
1116
  ):
1117
- self.flip_url = flip_url
1118
  self.hub_subscriber = hub_subscriber
1119
  self.ranker_logger = ranker_logger
1120
 
@@ -1125,11 +1184,11 @@ class FlipSubscriber:
1125
  self._reconnect_count = 0
1126
 
1127
  self.stats = {
1128
- "flips_received": 0,
1129
- "flips_applied": 0,
1130
- "flips_out_of_order": 0,
1131
- "reconnect_count": 0,
1132
- "last_flip_time": 0.0,
1133
  }
1134
 
1135
  def start(self) -> None:
@@ -1137,21 +1196,21 @@ class FlipSubscriber:
1137
  return
1138
  self._running = True
1139
  self._thread = threading.Thread(
1140
- target=self._run_loop, daemon=True, name="FlipSubscriber"
1141
  )
1142
  self._thread.start()
1143
- logger.info(f"[FlipSubscriber] 🎯 Started β†’ {self.flip_url}")
1144
  if self.ranker_logger:
1145
  try:
1146
  self.ranker_logger.connection_event(
1147
- "Flip WebSocket", "connected", "FlipSubscriber started"
1148
  )
1149
  except Exception:
1150
  pass
1151
 
1152
  def stop(self) -> None:
1153
  self._running = False
1154
- logger.info("[FlipSubscriber] Stopping…")
1155
 
1156
  def _run_loop(self) -> None:
1157
  while self._running:
@@ -1162,99 +1221,99 @@ class FlipSubscriber:
1162
  loop.close()
1163
  self._reconnect_count = 0
1164
  except Exception as e:
1165
- logger.error(f"[FlipSubscriber] Session error: {e}")
1166
  self._reconnect_count += 1
1167
  self.stats["reconnect_count"] = self._reconnect_count
1168
 
1169
  if not self._running:
1170
  break
1171
  backoff = min(self._MAX_BACKOFF, 2 ** min(self._reconnect_count, 4))
1172
- logger.info(f"[FlipSubscriber] Reconnecting in {backoff}s…")
1173
  time.sleep(backoff)
1174
 
1175
  async def _ws_session(self) -> None:
1176
  if websockets is None:
1177
- logger.error("[FlipSubscriber] websockets library not installed")
1178
  await asyncio.sleep(5)
1179
  return
1180
 
1181
  from websockets.exceptions import ConnectionClosed
1182
 
1183
- async with websockets.connect(self.flip_url) as ws:
1184
  self._reconnect_count = 0
1185
- logger.info("[FlipSubscriber] βœ… Connected to flip channel")
1186
 
1187
  while self._running:
1188
  try:
1189
  raw = await ws.recv()
1190
  self._handle_message(raw)
1191
  except ConnectionClosed:
1192
- logger.info("[FlipSubscriber] Connection closed by hub")
1193
  break
1194
  except Exception as e:
1195
- logger.error(f"[FlipSubscriber] Message error: {e}")
1196
 
1197
  def _handle_message(self, raw: str) -> None:
1198
  try:
1199
  data = json.loads(raw)
1200
  except json.JSONDecodeError:
1201
- logger.warning("[FlipSubscriber] Malformed JSON")
1202
  return
1203
 
1204
  msg_type = data.get("type", "")
1205
- if msg_type not in ("flip_snapshot", "flip_delta"):
1206
  return
1207
 
1208
- flips = data.get("flips", [])
1209
- if not isinstance(flips, list):
1210
  return
1211
 
1212
- for flip in flips:
1213
- self._apply_flip(flip)
1214
 
1215
- def _apply_flip(self, flip: dict) -> None:
1216
- asset = flip.get("asset")
1217
- seq = flip.get("seq", 0)
1218
  if not asset:
1219
  return
1220
 
1221
- self.stats["flips_received"] += 1
1222
 
1223
  # Drop replays / out-of-order (per-asset signal consistency)
1224
  with self._lock:
1225
  last = self._last_seq.get(asset, 0)
1226
  if seq <= last:
1227
- self.stats["flips_out_of_order"] += 1
1228
  return
1229
  self._last_seq[asset] = seq
1230
 
1231
  # Push directly into the shared AssetSnapshot store
1232
- self.hub_subscriber.inject_flip(asset, flip)
1233
- self.stats["flips_applied"] += 1
1234
- self.stats["last_flip_time"] = time.time()
1235
 
1236
- fd = flip.get("flip_direction", "?")
1237
- if fd in ("BUY", "SELL"):
1238
  logger.info(
1239
- f"[FlipSubscriber] ⚑ {asset} flip#{seq} β†’ {fd} "
1240
- f"@ {flip.get('last_price', 0):.5f}"
1241
  )
1242
 
1243
 
1244
- def _derive_flip_url(hub_ws_url: str) -> str:
1245
- """Derive the /ws/flips URL from whatever hub URL the ranker was given.
1246
- Accepts ws://host:port/ws/subscribe β†’ ws://host:port/ws/flips."""
1247
  url = hub_ws_url
1248
  # Replace the path if present, otherwise append
1249
- for known in ("/ws/subscribe", "/ws/metrics", "/subscribe"):
1250
  if url.endswith(known):
1251
- return url[: -len(known)] + "/ws/flips"
1252
- # Fallback: strip any trailing path and append /ws/flips
1253
  if "://" in url:
1254
  scheme, rest = url.split("://", 1)
1255
  host_and_path = rest.split("/", 1)[0]
1256
- return f"{scheme}://{host_and_path}/ws/flips"
1257
- return url.rstrip("/") + "/ws/flips"
1258
 
1259
 
1260
  # ══════════════════════════════════════════════════════════════════════════════════════
@@ -1817,9 +1876,28 @@ class AssetStateBuffer:
1817
  return (time.time() - self.last_signal_ts) > self.STALE_TIMEOUT
1818
 
1819
  def on_hub_snapshot(self, snap: AssetSnapshot) -> None:
1820
- """Update from a hub AssetSnapshot β€” triggers feature extraction + buffer append."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1821
  self.latest_signal = {
1822
- "action": snap.dominant_signal if snap.dominant_signal != "NEUTRAL" else "HOLD",
1823
  "confidence": snap.signal_confidence,
1824
  "avn_loss": snap.avn_loss,
1825
  "avn_accuracy": snap.avn_accuracy,
@@ -7231,14 +7309,15 @@ class QuasarAXRVIBridge:
7231
  ranker_logger = self.ranker_logger,
7232
  )
7233
 
7234
- # ── Flip subscriber (high-priority side channel) ──────────────────────
7235
- # Runs on its own thread, consumes /ws/flips at ~30 ms cadence, and
7236
- # writes flips directly into the shared AssetSnapshot store via
7237
- # hub_subscriber.inject_flip(). Gives rank_and_gate sub-second
7238
- # visibility into BUY/SELL flips instead of waiting for the next
7239
- # metrics_update broadcast on /ws/subscribe.
7240
- self.flip_subscriber = FlipSubscriber(
7241
- flip_url = _derive_flip_url(hub_ws_url),
 
7242
  hub_subscriber = self.hub_subscriber,
7243
  ranker_logger = self.ranker_logger,
7244
  )
@@ -7443,7 +7522,7 @@ class QuasarAXRVIBridge:
7443
  self.ranking_engine.asset_buffers = self.asset_buffers
7444
 
7445
  self.hub_subscriber.start()
7446
- self.flip_subscriber.start()
7447
 
7448
  # ── Full-state checkpoint resume ──────────────────────────────────
7449
  # Must run AFTER all components are initialised so _restore_checkpoint
@@ -9054,7 +9133,7 @@ class QuasarAXRVIBridge:
9054
  self.running = False
9055
  await self.ws_client.close()
9056
  self.hub_subscriber.stop()
9057
- self.flip_subscriber.stop()
9058
  # ── Full-state shutdown checkpoint ────────────────────────────────
9059
  # Saves everything: model, replay buffer, bandit, normalizers, etc.
9060
  # This is the "progress matters" save β€” nothing is lost on restart.
@@ -11004,6 +11083,9 @@ def test_components() -> None:
11004
  "training": {"avn_accuracy": 0.8, "actor_loss": 0.1, "critic_loss": 0.2, "avn_loss": 0.05},
11005
  "voting": {"buy_count": 7, "sell_count": 3, "dominant_signal": "BUY"},
11006
  })
 
 
 
11007
  buf.on_price(1500.0, 0.03, 0.0)
11008
  buf.on_hub_snapshot(snap)
11009
  seq = buf.get_sequence()
@@ -11038,6 +11120,9 @@ def test_components() -> None:
11038
  "voting": {"buy_count": buy, "sell_count": sell,
11039
  "dominant_signal": "BUY" if buy > sell else "NEUTRAL"},
11040
  })
 
 
 
11041
  snapshots[name] = s
11042
 
11043
  sig_map = {"V75": 0.60, "V100_1s": 0.70, "CRASH500": 0.40}
 
795
  avn_accuracy: float = 0.0
796
  training_steps: int = 0
797
 
798
+ # `dominant_signal` is the public action field that all downstream code
799
+ # (Gate A, _ensure_minimum_trades, ranking export, monitoring) reads from.
800
+ # In v2.3+ it is populated SOLELY by realtime per-tick signals delivered
801
+ # via /ws/signals β†’ HubSubscriber.inject_signal(). The hub-snapshot path
802
+ # (apply_update) NO LONGER writes to it β€” voting aggregates on /ws/subscribe
803
+ # are intentionally ignored as a source of truth for the action, because
804
+ # they carry the cumulative dominant of an EMA-style aggregation rather
805
+ # than the raw per-tick AVN inference.
806
  dominant_signal: str = "NEUTRAL"
807
  buy_count: int = 0
808
  sell_count: int = 0
809
 
810
+ # ── Realtime signal bookkeeping (populated by inject_signal) ──────────
811
+ # latest_action mirrors the per-tick action that arrived on /ws/signals.
812
+ # Domain: BUY | SELL | HOLD. dominant_signal is derived from it
813
+ # (HOLD β†’ NEUTRAL) so that the rest of the codebase keeps reading the
814
+ # same field name with the same {BUY, SELL, NEUTRAL} domain it always had.
815
+ latest_action: str = "HOLD"
816
+ latest_action_price: float = 0.0
817
+ latest_action_ts: float = 0.0
818
+ latest_action_seq: int = 0
819
+
820
  last_updated: float = 0.0
821
 
822
  def apply_update(self, snapshot: dict) -> None:
823
+ """
824
+ Merge a hub snapshot (from /ws/subscribe) into this AssetSnapshot.
825
+
826
+ v2.3+ scope:
827
+ β€’ Training metrics (actor_loss, critic_loss, avn_loss/accuracy, steps)
828
+ β€’ Voting counters (buy_count, sell_count) β€” kept ONLY for the legacy
829
+ vote-ratio confidence fallback. NOT used to set dominant_signal.
830
+
831
+ The per-tick action is delivered out-of-band on /ws/signals and lands
832
+ via inject_signal() / apply_signal(). Hub-snapshot voting aggregates
833
+ are deliberately ignored here.
834
+ """
835
  training = snapshot.get("training", {})
836
  voting = snapshot.get("voting", {})
837
 
 
845
  self.training_steps = int(training.get("training_steps", self.training_steps))
846
 
847
  if voting:
848
+ # Counts are still useful for the vote-ratio fallback in
849
+ # signal_confidence when no realtime signal has arrived yet.
 
 
 
 
 
 
 
 
 
 
 
850
  self.buy_count = int(voting.get("buy_count", self.buy_count))
851
  self.sell_count = int(voting.get("sell_count", self.sell_count))
852
 
853
  self.last_updated = snapshot.get("last_updated", time.time())
854
+
855
+ def apply_signal(
856
+ self,
857
+ action: str,
858
+ price: float = 0.0,
859
+ ts: Optional[float] = None,
860
+ seq: int = 0,
861
+ ) -> None:
862
+ """
863
+ Apply a realtime per-tick signal from /ws/signals.
864
+
865
+ Sets latest_action and mirrors it into dominant_signal so all existing
866
+ gate / ranking / execution code (which reads dominant_signal) sees the
867
+ per-tick action with no further changes.
868
+
869
+ BUY β†’ dominant_signal = BUY, latest_action_confidence = 1.0
870
+ SELL β†’ dominant_signal = SELL, latest_action_confidence = 1.0
871
+ HOLD β†’ dominant_signal = NEUTRAL, confidence falls back to vote ratio
872
+ """
873
+ if not isinstance(action, str):
874
+ return
875
+ action = action.upper()
876
+ if action not in {"BUY", "SELL", "HOLD"}:
877
+ action = "HOLD"
878
+
879
+ self.latest_action = action
880
+ self.latest_action_price = float(price or 0.0)
881
+ self.latest_action_ts = float(ts) if ts is not None else time.time()
882
+ self.latest_action_seq = int(seq or 0)
883
+
884
+ if action in {"BUY", "SELL"}:
885
+ self.dominant_signal = action
886
+ self._latest_signal_confidence = 1.0 # direct AVN inference
887
+ else: # HOLD
888
+ self.dominant_signal = "NEUTRAL"
889
+ self._latest_signal_confidence = None # fall back to vote ratio
890
+
891
+ self.last_updated = self.latest_action_ts
892
+
893
 
894
  @property
895
  def total_votes(self) -> int:
 
906
 
907
  def to_dict(self) -> dict:
908
  return {
909
+ "space_name": self.space_name,
910
+ "actor_loss": self.actor_loss,
911
+ "critic_loss": self.critic_loss,
912
+ "avn_loss": self.avn_loss,
913
+ "avn_accuracy": self.avn_accuracy,
914
+ "training_steps": self.training_steps,
915
+ "dominant_signal": self.dominant_signal,
916
+ "buy_count": self.buy_count,
917
+ "sell_count": self.sell_count,
918
+ "signal_confidence": self.signal_confidence,
919
+ # Realtime per-tick signal bookkeeping (v2.3+)
920
+ "latest_action": self.latest_action,
921
+ "latest_action_price": self.latest_action_price,
922
+ "latest_action_ts": self.latest_action_ts,
923
+ "latest_action_seq": self.latest_action_seq,
924
+ "last_updated": self.last_updated,
925
  }
926
 
927
 
 
1085
  logger.error(f"[HubSubscriber] on_update callback error: {e}")
1086
 
1087
  # ──────────────────────────────────────────────────────────────────────────
1088
+ # Public fast-path injector β€” called by SignalSubscriber when a per-tick
1089
+ # realtime signal arrives on the /ws/signals channel. Updates the SAME
1090
+ # AssetSnapshot instance the ranker reads from, so rank_and_gate sees the
1091
+ # latest action with ~30 ms latency instead of waiting for the next
1092
+ # metrics_update on /ws/subscribe (which carries cumulative aggregates,
1093
+ # not the per-tick action we actually want).
1094
  # ──────────────────────────────────────────────────────────────────────────
1095
+ def inject_signal(self, space_name: str, signal: dict) -> None:
1096
  """
1097
+ Apply a realtime per-tick signal directly to the AssetSnapshot under
1098
+ the subscriber's lock. Expects `signal` shaped as:
1099
+ {"action": "BUY|SELL|HOLD", "price": float, "seq": int,
1100
+ "ts": float, "source": str, ...}
1101
 
1102
+ Handling:
1103
  β€’ BUY / SELL β†’ snap.dominant_signal is set accordingly (confidence=1.0)
1104
+ β€’ HOLD β†’ snap.dominant_signal reset to NEUTRAL so Gate A
1105
+ stops firing on stale direction
1106
  """
1107
+ if not isinstance(signal, dict):
1108
  return
1109
 
1110
+ action = signal.get("action", "HOLD")
1111
+ if not isinstance(action, str):
1112
+ action = "HOLD"
 
 
 
 
 
 
 
 
 
 
 
1113
 
1114
  with self._lock:
1115
  if space_name not in self._snapshots:
1116
  self._snapshots[space_name] = AssetSnapshot(space_name=space_name)
1117
  snap = self._snapshots[space_name]
1118
+ snap.apply_signal(
1119
+ action = action,
1120
+ price = signal.get("price", 0.0),
1121
+ ts = signal.get("ts"),
1122
+ seq = signal.get("seq", 0),
1123
+ )
1124
 
1125
  if self.on_update:
1126
  try:
1127
  self.on_update(space_name, snap)
1128
  except Exception as e:
1129
+ logger.error(f"[HubSubscriber] on_update (signal-path) callback error: {e}")
1130
 
1131
 
1132
  # ══════════════════════════════════════════════════════════════════════════════════════
1133
+ # SECTION 4b β€” SIGNAL SUBSCRIBER (high-priority, low-latency side channel)
1134
  # ══════════════════════════════════════════════════════════════════════════════════════
1135
  #
1136
+ # The SignalSubscriber is a dedicated WebSocket client that consumes only the
1137
+ # realtime per-tick AVN signal stream from the hub's /ws/signals endpoint. It
1138
+ # runs on its own background thread (matching the original sketch:
1139
+ # "DEDICATED THREAD, poll every 30ms") and bypasses the slower /ws/subscribe
1140
+ # snapshot path for per-tick action delivery.
1141
  #
1142
  # Why a separate subscriber:
1143
  # β€’ /ws/subscribe carries the full training-metrics firehose (losses, steps,
1144
+ # avn_accuracy) β€” large payloads at irregular cadence. Per-tick signals
1145
+ # would queue behind them.
1146
+ # β€’ /ws/subscribe also only carries CUMULATIVE aggregates (the EMA-style
1147
+ # dominant_signal). The per-tick AVN action β€” what we actually want to
1148
+ # drive Gate A and the asset-buffer features off of β€” is not in those
1149
+ # payloads, only on /ws/signals.
1150
+ # β€’ /ws/signals carries only {asset, action, price, source, seq, ts} β€”
1151
+ # tiny payloads, coalesced to 30 ms broadcasts.
1152
  # β€’ Running on its own thread means a slow rank_and_gate cycle in the main
1153
+ # loop cannot stall realtime-signal ingestion.
1154
  #
1155
  # Per-asset signal consistency:
1156
  # β€’ The hub assigns a monotonic `seq` per asset. The subscriber tracks
1157
  # `last_seq` and drops replays / out-of-order deliveries.
1158
  # β€’ Updates land in the SAME AssetSnapshot dict the ranker reads from, via
1159
+ # HubSubscriber.inject_signal(), so there is exactly one source of truth
1160
+ # per asset at any point in time.
1161
  #
1162
+ class SignalSubscriber:
1163
  """
1164
+ High-priority WS client for /ws/signals. Feeds per-tick realtime signals
1165
+ directly into the shared HubSubscriber snapshot store via inject_signal().
1166
  """
1167
 
1168
  _MAX_BACKOFF = 30
1169
 
1170
  def __init__(
1171
  self,
1172
+ signal_url: str,
1173
  hub_subscriber: "HubSubscriber", # snapshots are written through here
1174
  ranker_logger: Optional[object] = None,
1175
  ):
1176
+ self.signal_url = signal_url
1177
  self.hub_subscriber = hub_subscriber
1178
  self.ranker_logger = ranker_logger
1179
 
 
1184
  self._reconnect_count = 0
1185
 
1186
  self.stats = {
1187
+ "signals_received": 0,
1188
+ "signals_applied": 0,
1189
+ "signals_out_of_order": 0,
1190
+ "reconnect_count": 0,
1191
+ "last_signal_time": 0.0,
1192
  }
1193
 
1194
  def start(self) -> None:
 
1196
  return
1197
  self._running = True
1198
  self._thread = threading.Thread(
1199
+ target=self._run_loop, daemon=True, name="SignalSubscriber"
1200
  )
1201
  self._thread.start()
1202
+ logger.info(f"[SignalSubscriber] πŸ“‘ Started β†’ {self.signal_url}")
1203
  if self.ranker_logger:
1204
  try:
1205
  self.ranker_logger.connection_event(
1206
+ "Signal WebSocket", "connected", "SignalSubscriber started"
1207
  )
1208
  except Exception:
1209
  pass
1210
 
1211
  def stop(self) -> None:
1212
  self._running = False
1213
+ logger.info("[SignalSubscriber] Stopping…")
1214
 
1215
  def _run_loop(self) -> None:
1216
  while self._running:
 
1221
  loop.close()
1222
  self._reconnect_count = 0
1223
  except Exception as e:
1224
+ logger.error(f"[SignalSubscriber] Session error: {e}")
1225
  self._reconnect_count += 1
1226
  self.stats["reconnect_count"] = self._reconnect_count
1227
 
1228
  if not self._running:
1229
  break
1230
  backoff = min(self._MAX_BACKOFF, 2 ** min(self._reconnect_count, 4))
1231
+ logger.info(f"[SignalSubscriber] Reconnecting in {backoff}s…")
1232
  time.sleep(backoff)
1233
 
1234
  async def _ws_session(self) -> None:
1235
  if websockets is None:
1236
+ logger.error("[SignalSubscriber] websockets library not installed")
1237
  await asyncio.sleep(5)
1238
  return
1239
 
1240
  from websockets.exceptions import ConnectionClosed
1241
 
1242
+ async with websockets.connect(self.signal_url) as ws:
1243
  self._reconnect_count = 0
1244
+ logger.info("[SignalSubscriber] βœ… Connected to signal channel")
1245
 
1246
  while self._running:
1247
  try:
1248
  raw = await ws.recv()
1249
  self._handle_message(raw)
1250
  except ConnectionClosed:
1251
+ logger.info("[SignalSubscriber] Connection closed by hub")
1252
  break
1253
  except Exception as e:
1254
+ logger.error(f"[SignalSubscriber] Message error: {e}")
1255
 
1256
  def _handle_message(self, raw: str) -> None:
1257
  try:
1258
  data = json.loads(raw)
1259
  except json.JSONDecodeError:
1260
+ logger.warning("[SignalSubscriber] Malformed JSON")
1261
  return
1262
 
1263
  msg_type = data.get("type", "")
1264
+ if msg_type not in ("signal_snapshot", "signal_delta"):
1265
  return
1266
 
1267
+ signals = data.get("signals", [])
1268
+ if not isinstance(signals, list):
1269
  return
1270
 
1271
+ for sig in signals:
1272
+ self._apply_signal(sig)
1273
 
1274
+ def _apply_signal(self, signal: dict) -> None:
1275
+ asset = signal.get("asset")
1276
+ seq = signal.get("seq", 0)
1277
  if not asset:
1278
  return
1279
 
1280
+ self.stats["signals_received"] += 1
1281
 
1282
  # Drop replays / out-of-order (per-asset signal consistency)
1283
  with self._lock:
1284
  last = self._last_seq.get(asset, 0)
1285
  if seq <= last:
1286
+ self.stats["signals_out_of_order"] += 1
1287
  return
1288
  self._last_seq[asset] = seq
1289
 
1290
  # Push directly into the shared AssetSnapshot store
1291
+ self.hub_subscriber.inject_signal(asset, signal)
1292
+ self.stats["signals_applied"] += 1
1293
+ self.stats["last_signal_time"] = time.time()
1294
 
1295
+ action = signal.get("action", "?")
1296
+ if action in ("BUY", "SELL"):
1297
  logger.info(
1298
+ f"[SignalSubscriber] ⚑ {asset} signal#{seq} β†’ {action} "
1299
+ f"@ {signal.get('price', 0):.5f}"
1300
  )
1301
 
1302
 
1303
+ def _derive_signal_url(hub_ws_url: str) -> str:
1304
+ """Derive the /ws/signals URL from whatever hub URL the ranker was given.
1305
+ Accepts ws://host:port/ws/subscribe β†’ ws://host:port/ws/signals."""
1306
  url = hub_ws_url
1307
  # Replace the path if present, otherwise append
1308
+ for known in ("/ws/subscribe", "/ws/metrics", "/subscribe", "/ws/flips"):
1309
  if url.endswith(known):
1310
+ return url[: -len(known)] + "/ws/signals"
1311
+ # Fallback: strip any trailing path and append /ws/signals
1312
  if "://" in url:
1313
  scheme, rest = url.split("://", 1)
1314
  host_and_path = rest.split("/", 1)[0]
1315
+ return f"{scheme}://{host_and_path}/ws/signals"
1316
+ return url.rstrip("/") + "/ws/signals"
1317
 
1318
 
1319
  # ══════════════════════════════════════════════════════════════════════════════════════
 
1876
  return (time.time() - self.last_signal_ts) > self.STALE_TIMEOUT
1877
 
1878
  def on_hub_snapshot(self, snap: AssetSnapshot) -> None:
1879
+ """
1880
+ Update from a hub AssetSnapshot β€” triggers feature extraction +
1881
+ buffer append.
1882
+
1883
+ v2.3+ action sourcing:
1884
+ The per-tick action is taken from snap.latest_action (populated by
1885
+ SignalSubscriber β†’ HubSubscriber.inject_signal from /ws/signals).
1886
+ snap.dominant_signal is now a mirror of latest_action (BUY/SELL
1887
+ pass-through, HOLD β†’ NEUTRAL) so it stays usable as a fallback if
1888
+ a snapshot arrives via the cumulative path before any realtime
1889
+ signal has landed.
1890
+ """
1891
+ # Prefer the realtime per-tick action from /ws/signals.
1892
+ action = snap.latest_action if snap.latest_action in {"BUY", "SELL", "HOLD"} else "HOLD"
1893
+ # Backward-compat fallback: if no realtime signal has landed yet but
1894
+ # dominant_signal is set (e.g. legacy code path or snapshot replay),
1895
+ # still use it. Domain mapping: NEUTRAL β†’ HOLD.
1896
+ if action == "HOLD" and snap.dominant_signal in {"BUY", "SELL"}:
1897
+ action = snap.dominant_signal
1898
+
1899
  self.latest_signal = {
1900
+ "action": action,
1901
  "confidence": snap.signal_confidence,
1902
  "avn_loss": snap.avn_loss,
1903
  "avn_accuracy": snap.avn_accuracy,
 
7309
  ranker_logger = self.ranker_logger,
7310
  )
7311
 
7312
+ # ── Signal subscriber (high-priority side channel, v2.3+) ─────────────
7313
+ # Runs on its own thread, consumes /ws/signals at ~30 ms cadence, and
7314
+ # writes per-tick realtime signals directly into the shared
7315
+ # AssetSnapshot store via hub_subscriber.inject_signal(). Gives
7316
+ # rank_and_gate sub-second visibility into BUY/SELL actions instead of
7317
+ # waiting for the next metrics_update broadcast on /ws/subscribe
7318
+ # (which only carries cumulative aggregates, not per-tick data).
7319
+ self.signal_subscriber = SignalSubscriber(
7320
+ signal_url = _derive_signal_url(hub_ws_url),
7321
  hub_subscriber = self.hub_subscriber,
7322
  ranker_logger = self.ranker_logger,
7323
  )
 
7522
  self.ranking_engine.asset_buffers = self.asset_buffers
7523
 
7524
  self.hub_subscriber.start()
7525
+ self.signal_subscriber.start()
7526
 
7527
  # ── Full-state checkpoint resume ──────────────────────────────────
7528
  # Must run AFTER all components are initialised so _restore_checkpoint
 
9133
  self.running = False
9134
  await self.ws_client.close()
9135
  self.hub_subscriber.stop()
9136
+ self.signal_subscriber.stop()
9137
  # ── Full-state shutdown checkpoint ────────────────────────────────
9138
  # Saves everything: model, replay buffer, bandit, normalizers, etc.
9139
  # This is the "progress matters" save β€” nothing is lost on restart.
 
11083
  "training": {"avn_accuracy": 0.8, "actor_loss": 0.1, "critic_loss": 0.2, "avn_loss": 0.05},
11084
  "voting": {"buy_count": 7, "sell_count": 3, "dominant_signal": "BUY"},
11085
  })
11086
+ # v2.3+: per-tick action arrives via /ws/signals β†’ apply_signal, not via
11087
+ # the snapshot's voting dict. Simulate that here.
11088
+ snap.apply_signal(action="BUY", price=1500.0)
11089
  buf.on_price(1500.0, 0.03, 0.0)
11090
  buf.on_hub_snapshot(snap)
11091
  seq = buf.get_sequence()
 
11120
  "voting": {"buy_count": buy, "sell_count": sell,
11121
  "dominant_signal": "BUY" if buy > sell else "NEUTRAL"},
11122
  })
11123
+ # v2.3+: realtime per-tick action lives in latest_action / dominant_signal,
11124
+ # populated by apply_signal (which mirrors the /ws/signals path).
11125
+ s.apply_signal(action="BUY" if buy > sell else "HOLD")
11126
  snapshots[name] = s
11127
 
11128
  sig_map = {"V75": 0.60, "V100_1s": 0.70, "CRASH500": 0.40}
websocket_hub.py CHANGED
@@ -1,7 +1,7 @@
1
  #!/usr/bin/env python3
2
  """
3
  ╔══════════════════════════════════════════════════════════════════════════════════════╗
4
- β•‘ K1RL QUASAR β€” CENTRAL WEBSOCKET HUB v2.2-ranker-logs β•‘
5
  β•‘ ────────────────────────────────────────────────────────────────────────────────── β•‘
6
  β•‘ β•‘
7
  β•‘ Architecture role: INGEST β†’ NORMALIZE β†’ BROADCAST β•‘
@@ -26,7 +26,12 @@
26
  β•‘ GET /api/trades/closed β†’ recent closed trades + stats (?limit=N) β•‘
27
  β•‘ GET /api/health β†’ service health including trade counts β•‘
28
  β•‘ β•‘
29
- β•‘ VERSION: v2.2-ranker-logs | 2026-04-04 β•‘
 
 
 
 
 
30
  β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
31
  """
32
 
@@ -747,6 +752,264 @@ async def _flip_broadcaster_loop():
747
  await asyncio.sleep(0.1)
748
 
749
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
750
  # ── Top-3 WebSocket client registry ───────────────────────────────────────────────────
751
  # top3_client.py connects here and receives top3_rankings broadcasts whenever the
752
  # Executo ranker POSTs new rankings via POST /api/flip/rankings.
@@ -808,6 +1071,10 @@ async def _on_startup():
808
  dedicated task (Python's equivalent of a dedicated thread for asyncio)."""
809
  logger.info("πŸš€ HubTradeStore ready (no background scanner needed)")
810
  asyncio.create_task(_flip_broadcaster_loop())
 
 
 
 
811
 
812
 
813
  # ══════════════════════════════════════════════════════════════════════════════════════
@@ -870,6 +1137,10 @@ async def ws_publisher_endpoint(websocket: WebSocket, space_name: str):
870
  # it immediately even though the publisher used the legacy type.
871
  if voting_payload:
872
  await _store_flip(space_name, voting_payload)
 
 
 
 
873
 
874
  elif msg_type == "training":
875
  # Bug A fix: try "data" wrapper first, then fall back to top-level fields.
@@ -897,6 +1168,8 @@ async def ws_publisher_endpoint(websocket: WebSocket, space_name: str):
897
  # Auto-bridge into fast path (see comment under "metrics").
898
  if voting_raw:
899
  await _store_flip(space_name, voting_raw)
 
 
900
 
901
  elif msg_type in ("heartbeat", "identify", "ping"):
902
  pass
@@ -936,6 +1209,8 @@ async def ws_publisher_endpoint(websocket: WebSocket, space_name: str):
936
  # Auto-bridge rescued voting into fast path too
937
  if rescued_voting:
938
  await _store_flip(space_name, rescued_voting)
 
 
939
  else:
940
  logger.warning(
941
  f"[{space_name}] ⚠ Unknown type='{msg_type}' with no "
@@ -1069,6 +1344,81 @@ async def ws_flips_endpoint(websocket: WebSocket):
1069
  logger.info(f"🎯 Flip subscriber disconnected (remaining={len(_flip_subscribers)})")
1070
 
1071
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1072
  # ══════════════════════════════════════════════════════════════════════════════════════
1073
  # SECTION 6 β€” REST API (READ-ONLY)
1074
  # ══════════════════════════════════════════════════════════════════════════════════════
 
1
  #!/usr/bin/env python3
2
  """
3
  ╔══════════════════════════════════════════════════════════════════════════════════════╗
4
+ β•‘ K1RL QUASAR β€” CENTRAL WEBSOCKET HUB v2.3-realtime-signals β•‘
5
  β•‘ ────────────────────────────────────────────────────────────────────────────────── β•‘
6
  β•‘ β•‘
7
  β•‘ Architecture role: INGEST β†’ NORMALIZE β†’ BROADCAST β•‘
 
26
  β•‘ GET /api/trades/closed β†’ recent closed trades + stats (?limit=N) β•‘
27
  β•‘ GET /api/health β†’ service health including trade counts β•‘
28
  β•‘ β•‘
29
+ β•‘ REALTIME SIGNAL CHANNEL (NEW v2.3 β€” per-tick AVN inferenced signals): β•‘
30
+ β•‘ WS /ws/signals β†’ fan-out of {asset, action, price, ts, seq} β•‘
31
+ β•‘ derived from buy/sell count deltas. No engine β•‘
32
+ β•‘ changes required. Consumed by the AXRVI ranker. β•‘
33
+ β•‘ β•‘
34
+ β•‘ VERSION: v2.3-realtime-signals | 2026-04-26 β•‘
35
  β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
36
  """
37
 
 
752
  await asyncio.sleep(0.1)
753
 
754
 
755
+ # ══════════════════════════════════════════════════════════════════════════════════════
756
+ # REALTIME SIGNAL FAST-PATH CHANNEL (v2.3 β€” pure per-tick AVN inferenced signals)
757
+ # ══════════════════════════════════════════════════════════════════════════════════════
758
+ # Purpose
759
+ # ───────
760
+ # Stream the *per-tick* AVN action (BUY / SELL / HOLD) β€” i.e. the realtime
761
+ # inferenced signal emitted by each asset engine on every Redis tick β€” to
762
+ # downstream consumers (the AXRVI ranker today, executor tomorrow) with no
763
+ # flip / dominant-signal translation layer in between.
764
+ #
765
+ # Why this exists
766
+ # ───────────────
767
+ # The pre-existing /ws/flips channel rebroadcasts the *cumulative* dominant
768
+ # voting signal aggregated by _best_voting_unlocked() in V75 β€” i.e. an EMA of
769
+ # the per-tick action. It loses the per-tick information by design. The ranker
770
+ # wants raw per-tick activity, not aggregates.
771
+ #
772
+ # Wire format (no V75 changes required)
773
+ # ─────────────────────────────────────
774
+ # The V75 publisher already emits {buy_count, sell_count, dominant_signal} on
775
+ # every tick via the voting payload. update_from_redis(action) increments
776
+ # either buy_count or sell_count by 1 *before* the publish, so a delta of +1
777
+ # in one of those counters between two consecutive frames *is* the per-tick
778
+ # action. This is what we extract and rebroadcast as {action, price, ts, seq}.
779
+ # Engine-side code is untouched.
780
+ #
781
+ # V75 tick (action=BUY) ──► buy_count: 4 β†’ 5
782
+ # ──► publish_voting({buy_count:5, sell_count:3, …})
783
+ # β”‚
784
+ # Hub _validate_and_normalize() runs ──► normalized voting carries the new counts
785
+ # β”‚
786
+ # _detect_realtime_signal(asset, normalized_voting):
787
+ # prev = _prev_counts.get(asset, {buy:0, sell:0})
788
+ # db = new.buy - prev.buy β†’ +1 (BUY tick happened)
789
+ # ds = new.sell - prev.sell β†’ 0
790
+ # _emit_signal(asset, "BUY", price, source="voting_delta")
791
+ # _prev_counts[asset] = new counts
792
+ #
793
+ # Per-asset signal consistency
794
+ # ────────────────────────────
795
+ # β€’ Latest-wins with monotonic seq per asset (consumer can detect replays).
796
+ # β€’ Atomic write under _signal_lock (no torn reads).
797
+ # β€’ On subscribe, full cache replay first β†’ consistent starting view.
798
+ # β€’ If multiple ticks batch into one frame (db>1 or ds>1), we emit the
799
+ # dominant-side action once (matching the dominant of the batch). This is
800
+ # a graceful degradation β€” at typical ~10 Hz tick rates batching is rare.
801
+ # β€’ Counter resets (buy/sell counts go down or to zero) are detected and
802
+ # reset _prev_counts without emitting a phantom signal.
803
+ # ──────────────────────────────────────────────────────────────────────────────────────
804
+
805
+ _SIGNAL_BROADCAST_INTERVAL_SEC: float = 0.030 # 30 ms, matching flip cadence
806
+ _signal_cache: Dict[str, dict] = {} # {asset: {action, price, source, ts, seq, ...}}
807
+ _signal_seq: Dict[str, int] = {} # monotonic per-asset
808
+ _signal_lock: asyncio.Lock = asyncio.Lock()
809
+ _signal_dirty: asyncio.Event = asyncio.Event()
810
+ _signal_subscribers: Set[WebSocket] = set()
811
+ _signal_subscribers_lock: asyncio.Lock = asyncio.Lock()
812
+
813
+ # Per-asset previous (buy_count, sell_count) used to compute per-tick deltas.
814
+ # Keyed by space_name. Reset to current values whenever counts go DOWN
815
+ # (publisher restart / counter wraparound) without emitting a phantom signal.
816
+ _prev_counts: Dict[str, Dict[str, int]] = {}
817
+ _prev_counts_lock: threading.Lock = threading.Lock()
818
+
819
+
820
+ async def _emit_signal(
821
+ asset: str,
822
+ action: str,
823
+ price: float,
824
+ source: str = "voting_delta",
825
+ ) -> Optional[dict]:
826
+ """
827
+ Publish one realtime signal event into the per-asset cache.
828
+
829
+ Contract:
830
+ β€’ action ∈ {BUY, SELL, HOLD} (anything else coerced to HOLD).
831
+ β€’ Atomic write under _signal_lock (latest-wins).
832
+ β€’ Monotonic seq per asset.
833
+ β€’ Sets _signal_dirty so the broadcaster picks it up within 30 ms.
834
+ β€’ Returns the normalized signal dict, or None if rejected.
835
+ """
836
+ if not isinstance(action, str):
837
+ return None
838
+ action = action.upper()
839
+ if action not in {"BUY", "SELL", "HOLD"}:
840
+ action = "HOLD"
841
+
842
+ try:
843
+ price = float(price or 0.0)
844
+ except Exception:
845
+ price = 0.0
846
+
847
+ if not isinstance(source, str):
848
+ source = "voting_delta"
849
+
850
+ async with _signal_lock:
851
+ seq = _signal_seq.get(asset, 0) + 1
852
+ _signal_seq[asset] = seq
853
+ sig = {
854
+ "asset": asset,
855
+ "action": action,
856
+ "price": price,
857
+ "source": source,
858
+ "ts": time.time(),
859
+ "seq": seq,
860
+ }
861
+ _signal_cache[asset] = sig
862
+
863
+ _signal_dirty.set()
864
+ return sig
865
+
866
+
867
+ def _detect_realtime_signal(asset: str, voting_normalized: dict) -> Optional[str]:
868
+ """
869
+ Compare current (buy_count, sell_count) against the previously-seen pair
870
+ for this asset and infer which per-tick action just occurred.
871
+
872
+ Returns the inferred action (BUY / SELL / HOLD) when a delta is detected,
873
+ or None if nothing actionable changed (e.g. first frame, equal counts,
874
+ counter reset).
875
+
876
+ NOTE: This is a *synchronous* helper β€” it only mutates _prev_counts under
877
+ a thin threading.Lock. Emission is done by the caller via _emit_signal()
878
+ inside the asyncio publisher_handler.
879
+ """
880
+ if not isinstance(voting_normalized, dict):
881
+ return None
882
+
883
+ try:
884
+ new_buy = int(voting_normalized.get("buy_count", 0) or 0)
885
+ new_sell = int(voting_normalized.get("sell_count", 0) or 0)
886
+ except Exception:
887
+ return None
888
+
889
+ with _prev_counts_lock:
890
+ prev = _prev_counts.get(asset)
891
+ if prev is None:
892
+ # First time we see this asset β€” record baseline, no emission.
893
+ _prev_counts[asset] = {"buy": new_buy, "sell": new_sell}
894
+ return None
895
+
896
+ prev_buy = prev.get("buy", 0)
897
+ prev_sell = prev.get("sell", 0)
898
+
899
+ # Counter regression (publisher restart / wraparound) β€” re-baseline,
900
+ # no phantom emission.
901
+ if new_buy < prev_buy or new_sell < prev_sell:
902
+ _prev_counts[asset] = {"buy": new_buy, "sell": new_sell}
903
+ return None
904
+
905
+ db = new_buy - prev_buy
906
+ ds = new_sell - prev_sell
907
+
908
+ # Always update the baseline before returning.
909
+ _prev_counts[asset] = {"buy": new_buy, "sell": new_sell}
910
+
911
+ if db == 0 and ds == 0:
912
+ # No new ticks since last frame. Could still be a HOLD heartbeat;
913
+ # we treat lack-of-delta as silence (no signal).
914
+ return None
915
+
916
+ # Both counters incremented in the same frame: very rare batching case
917
+ # (multiple ticks coalesced). Emit the dominant side β€” if equal, BUY wins
918
+ # (arbitrary but deterministic). This is a graceful degradation, not the
919
+ # common path.
920
+ if db > ds:
921
+ return "BUY"
922
+ if ds > db:
923
+ return "SELL"
924
+ return "BUY" # tie-break β€” extremely rare
925
+
926
+
927
+ async def _signal_broadcaster_loop():
928
+ """
929
+ Dedicated task: every 30 ms (or immediately on dirty) push the current
930
+ signal cache to all /ws/signals subscribers. Dead connections are pruned.
931
+ """
932
+ logger.info(
933
+ f"πŸ“‘ Signal broadcaster started | "
934
+ f"interval={_SIGNAL_BROADCAST_INTERVAL_SEC * 1000:.0f}ms"
935
+ )
936
+ while True:
937
+ try:
938
+ try:
939
+ await asyncio.wait_for(
940
+ _signal_dirty.wait(),
941
+ timeout=_SIGNAL_BROADCAST_INTERVAL_SEC,
942
+ )
943
+ except asyncio.TimeoutError:
944
+ pass
945
+ _signal_dirty.clear()
946
+
947
+ async with _signal_subscribers_lock:
948
+ if not _signal_subscribers:
949
+ continue
950
+ subscribers_snapshot = list(_signal_subscribers)
951
+
952
+ async with _signal_lock:
953
+ if not _signal_cache:
954
+ continue
955
+ signals_snapshot = list(_signal_cache.values())
956
+
957
+ msg = {
958
+ "type": "signal_delta",
959
+ "signals": signals_snapshot,
960
+ "total_assets": len(signals_snapshot),
961
+ "hub_timestamp": time.time(),
962
+ }
963
+
964
+ dead: List[WebSocket] = []
965
+ for ws in subscribers_snapshot:
966
+ try:
967
+ await ws.send_json(msg)
968
+ except Exception:
969
+ dead.append(ws)
970
+
971
+ if dead:
972
+ async with _signal_subscribers_lock:
973
+ for ws in dead:
974
+ _signal_subscribers.discard(ws)
975
+ logger.info(f"πŸ“‘ Pruned {len(dead)} dead signal subscriber(s)")
976
+
977
+ except asyncio.CancelledError:
978
+ logger.info("πŸ“‘ Signal broadcaster cancelled")
979
+ raise
980
+ except Exception as e:
981
+ logger.error(f"πŸ“‘ Signal broadcaster error: {e}")
982
+ await asyncio.sleep(0.1)
983
+
984
+
985
+ async def _maybe_emit_realtime_signal(space_name: str, voting_payload: dict) -> None:
986
+ """
987
+ Convenience wrapper called from the publisher_handler ingestion paths.
988
+ Detects a per-tick delta and emits a signal event when one is found.
989
+
990
+ `voting_payload` is the *raw* voting dict received on the wire (still
991
+ carries buy_count/sell_count plus last_price). We pass it through
992
+ directly β€” no need to wait for the full snapshot normalization pass.
993
+ """
994
+ if not isinstance(voting_payload, dict):
995
+ return
996
+
997
+ action = _detect_realtime_signal(space_name, voting_payload)
998
+ if action is None:
999
+ return
1000
+
1001
+ try:
1002
+ price = float(voting_payload.get("last_price", 0.0) or 0.0)
1003
+ except Exception:
1004
+ price = 0.0
1005
+
1006
+ src = voting_payload.get("signal_source", "voting_delta")
1007
+ if not isinstance(src, str):
1008
+ src = "voting_delta"
1009
+
1010
+ await _emit_signal(space_name, action, price, source=src)
1011
+
1012
+
1013
  # ── Top-3 WebSocket client registry ───────────────────────────────────────────────────
1014
  # top3_client.py connects here and receives top3_rankings broadcasts whenever the
1015
  # Executo ranker POSTs new rankings via POST /api/flip/rankings.
 
1071
  dedicated task (Python's equivalent of a dedicated thread for asyncio)."""
1072
  logger.info("πŸš€ HubTradeStore ready (no background scanner needed)")
1073
  asyncio.create_task(_flip_broadcaster_loop())
1074
+ # New realtime per-tick signal channel (parallel to the flip channel,
1075
+ # consumed by the AXRVI ranker). See REALTIME SIGNAL FAST-PATH CHANNEL
1076
+ # comment block above for protocol details.
1077
+ asyncio.create_task(_signal_broadcaster_loop())
1078
 
1079
 
1080
  # ══════════════════════════════════════════════════════════════════════════════════════
 
1137
  # it immediately even though the publisher used the legacy type.
1138
  if voting_payload:
1139
  await _store_flip(space_name, voting_payload)
1140
+ # Realtime per-tick signal channel β€” detect delta in
1141
+ # buy_count/sell_count and emit a {action, price, ts} event
1142
+ # on /ws/signals. Independent of the cumulative flip cache.
1143
+ await _maybe_emit_realtime_signal(space_name, voting_payload)
1144
 
1145
  elif msg_type == "training":
1146
  # Bug A fix: try "data" wrapper first, then fall back to top-level fields.
 
1168
  # Auto-bridge into fast path (see comment under "metrics").
1169
  if voting_raw:
1170
  await _store_flip(space_name, voting_raw)
1171
+ # Realtime per-tick signal β€” see /ws/signals docstring.
1172
+ await _maybe_emit_realtime_signal(space_name, voting_raw)
1173
 
1174
  elif msg_type in ("heartbeat", "identify", "ping"):
1175
  pass
 
1209
  # Auto-bridge rescued voting into fast path too
1210
  if rescued_voting:
1211
  await _store_flip(space_name, rescued_voting)
1212
+ # Realtime per-tick signal β€” see /ws/signals docstring.
1213
+ await _maybe_emit_realtime_signal(space_name, rescued_voting)
1214
  else:
1215
  logger.warning(
1216
  f"[{space_name}] ⚠ Unknown type='{msg_type}' with no "
 
1344
  logger.info(f"🎯 Flip subscriber disconnected (remaining={len(_flip_subscribers)})")
1345
 
1346
 
1347
+ @app.websocket("/ws/signals")
1348
+ async def ws_signals_endpoint(websocket: WebSocket):
1349
+ """
1350
+ /ws/signals β€” REALTIME PER-TICK signal channel (v2.3+).
1351
+
1352
+ Consumed by:
1353
+ β€’ The AXRVI ranker's SignalSubscriber (one source of truth for the
1354
+ per-tick AVN action that drives Gate A and asset-buffer features).
1355
+ β€’ Any future consumer that wants raw per-tick signals rather than
1356
+ aggregated flips.
1357
+
1358
+ Protocol:
1359
+ β€’ On connect, the hub replays the full current signal cache as
1360
+ {"type": "signal_snapshot", "signals": [...], "hub_timestamp": T}
1361
+ so the subscriber starts with a consistent per-asset view.
1362
+ β€’ Thereafter, every realtime signal event (derived from a delta in
1363
+ buy_count/sell_count between two consecutive voting frames) is
1364
+ pushed as
1365
+ {"type": "signal_delta", "signals": [...], "hub_timestamp": T}
1366
+ at a coalesced cadence of _SIGNAL_BROADCAST_INTERVAL_SEC (30 ms).
1367
+ β€’ Each signal carries a monotonic per-asset `seq` so consumers can
1368
+ detect replays / out-of-order deliveries.
1369
+
1370
+ Per-signal payload:
1371
+ {
1372
+ "asset": "<space_name>",
1373
+ "action": "BUY" | "SELL" | "HOLD",
1374
+ "price": <last_price>,
1375
+ "source": "voting_delta" | <publisher_supplied>,
1376
+ "ts": <unix_seconds>,
1377
+ "seq": <monotonic_int>
1378
+ }
1379
+
1380
+ Signal-consistency guarantee:
1381
+ β€’ Writes into _signal_cache are serialized through _signal_lock.
1382
+ β€’ Readers take a consistent snapshot under the same lock.
1383
+ β€’ No torn reads, no lost updates β€” the ranker always sees exactly one
1384
+ authoritative realtime signal per asset.
1385
+ """
1386
+ await websocket.accept()
1387
+ async with _signal_subscribers_lock:
1388
+ _signal_subscribers.add(websocket)
1389
+ logger.info(f"πŸ“‘ Signal subscriber connected (total={len(_signal_subscribers)})")
1390
+
1391
+ # ── Replay full cache on connect (so the client has state immediately) ──
1392
+ async with _signal_lock:
1393
+ replay = list(_signal_cache.values())
1394
+ if replay:
1395
+ try:
1396
+ await websocket.send_text(json.dumps({
1397
+ "type": "signal_snapshot",
1398
+ "signals": replay,
1399
+ "total_assets": len(replay),
1400
+ "hub_timestamp": time.time(),
1401
+ }))
1402
+ except Exception:
1403
+ pass
1404
+
1405
+ try:
1406
+ while True:
1407
+ # Keep-alive β€” the subscriber doesn't send messages, only receives.
1408
+ # If it does send something (e.g. ping), we just discard it.
1409
+ await websocket.receive_text()
1410
+ except WebSocketDisconnect:
1411
+ pass
1412
+ except Exception as e:
1413
+ logger.error(f"[signals] Subscriber error: {e}")
1414
+ finally:
1415
+ async with _signal_subscribers_lock:
1416
+ _signal_subscribers.discard(websocket)
1417
+ logger.info(
1418
+ f"πŸ“‘ Signal subscriber disconnected (remaining={len(_signal_subscribers)})"
1419
+ )
1420
+
1421
+
1422
  # ══════════════════════════════════════════════════════════════════════════════════════
1423
  # SECTION 6 β€” REST API (READ-ONLY)
1424
  # ══════════════════════════════════════════════════════════════════════════════════════