KarlQuant commited on
Commit
ec71904
Β·
verified Β·
1 Parent(s): b379e71

Update websocket_hub.py

Browse files
Files changed (1) hide show
  1. websocket_hub.py +73 -71
websocket_hub.py CHANGED
@@ -71,9 +71,8 @@ _ALLOWED_TRAINING_FIELDS: frozenset = frozenset({
71
  })
72
 
73
  _ALLOWED_VOTING_FIELDS: frozenset = frozenset({
74
- "dominant_signal",
75
- "buy_count",
76
- "sell_count",
77
  "last_price",
78
  "signal_source",
79
  })
@@ -91,12 +90,10 @@ def _empty_snapshot(space_name: str) -> dict:
91
  "avn_accuracy": 0.0,
92
  },
93
  "voting": {
94
- "dominant_signal": "NEUTRAL",
95
- "latest_signal": "NEUTRAL", # FIX: most recent signal from inference (not vote majority)
96
- "buy_count": 0,
97
- "sell_count": 0,
98
- "last_price": 0.0,
99
- "signal_source": "LOG",
100
  },
101
  }
102
 
@@ -133,20 +130,22 @@ def _validate_and_normalize(space_name: str, raw: dict) -> Optional[dict]:
133
 
134
  voting: dict = {}
135
  if voting_raw:
136
- raw_signal = voting_raw.get("dominant_signal", "NEUTRAL")
137
- if not isinstance(raw_signal, str):
138
- raw_signal = "NEUTRAL"
 
 
 
139
  raw_source = voting_raw.get("signal_source", "LOG")
140
  if not isinstance(raw_source, str):
141
  raw_source = "LOG"
142
- _clean_signal = raw_signal.upper() if raw_signal.upper() in {"BUY", "SELL", "NEUTRAL"} else "NEUTRAL"
 
143
  voting = {
144
- "dominant_signal": _clean_signal,
145
- "latest_signal": _clean_signal, # FIX: capture per-update for freshness
146
- "buy_count": _int(voting_raw.get("buy_count", 0)),
147
- "sell_count": _int(voting_raw.get("sell_count", 0)),
148
- "last_price": _float(voting_raw.get("last_price", 0.0)),
149
- "signal_source": raw_source,
150
  }
151
 
152
  return {
@@ -178,8 +177,7 @@ class ConnectionManager:
178
  "step": "training_steps","steps": "training_steps","n_steps": "training_steps",
179
  }
180
  _VOTING_KEYS: frozenset = frozenset({
181
- "dominant_signal", "latest_signal", "buy_count", "sell_count", "last_price", "signal_source",
182
- "signal", "buy", "sell",
183
  })
184
 
185
  def __init__(self):
@@ -459,7 +457,7 @@ _hub_trades = HubTradeStore()
459
  logger.info("βœ… HubTradeStore initialised β€” awaiting trade_opened/trade_closed WS messages")
460
 
461
  # ── AXRVI live rankings store ─────────────────────────────────────────────────────────
462
- # Populated by POST /api/axrvi/rankings from the Executo ranker after every
463
  # rank_and_gate() cycle (~every 5s). Falls back to hub-snapshot scoring when stale.
464
  _axrvi_rankings: List[dict] = []
465
  _axrvi_rankings_ts: float = 0.0
@@ -467,7 +465,7 @@ _AXRVI_RANKINGS_TTL: float = 60.0 # FIX: extended 30β†’60s to survive slow
467
 
468
  # ── Top-3 WebSocket client registry ───────────────────────────────────────────────────
469
  # top3_client.py connects here and receives top3_rankings broadcasts whenever the
470
- # Executo ranker POSTs new rankings via POST /api/axrvi/rankings.
471
  _top3_clients: Set[WebSocket] = set()
472
  _top3_clients_lock = asyncio.Lock()
473
 
@@ -475,7 +473,7 @@ _top3_clients_lock = asyncio.Lock()
475
  async def _broadcast_top3_rankings(rankings: List[dict]) -> None:
476
  """
477
  Broadcast a top3_rankings message to all /ws/top3 subscribers.
478
- Called immediately after /api/axrvi/rankings receives a fresh ranking list.
479
  Dead connections are pruned automatically.
480
  """
481
  if not _top3_clients:
@@ -651,7 +649,7 @@ async def ws_top3_endpoint(websocket: WebSocket):
651
  Sends a top3_rankings message immediately on connect (replay of the latest
652
  known ranking so the client does not have to wait for the next ranker cycle),
653
  then keeps the socket open to receive subsequent broadcasts triggered by
654
- POST /api/axrvi/rankings.
655
 
656
  Message format:
657
  {"type": "top3_rankings", "rankings": [...], "total_assets": N, "hub_timestamp": T}
@@ -1086,12 +1084,19 @@ def _compute_rankings() -> List[dict]:
1086
  """
1087
  Build the rankings list served by /api/state.
1088
 
1089
- Priority order:
1090
- 1. Live AXRVI rankings pushed by the Executo ranker via
1091
- POST /api/axrvi/rankings (within the last 30 s).
1092
- These contain real softmax-Shreve priorities from AXRVINet.
1093
- 2. Fallback: hub-snapshot vote-ratio scoring used before the
1094
- ranker connects or if the push is stale (e.g. ranker restart).
 
 
 
 
 
 
 
1095
  """
1096
  global _axrvi_rankings, _axrvi_rankings_ts
1097
 
@@ -1104,17 +1109,15 @@ def _compute_rankings() -> List[dict]:
1104
  snap = snapshots.get(name, {})
1105
  training = snap.get("training", {})
1106
  voting = snap.get("voting", {})
1107
- buy = voting.get("buy_count", r.get("buy_count", 0))
1108
- sell = voting.get("sell_count", r.get("sell_count", 0))
1109
  merged.append({
1110
- # Core AXRVI fields β€” these are the live ranker values
1111
  "rank": r.get("rank", 0),
1112
  "space_name": name,
1113
  "score": r.get("score", 0.0),
1114
  "final_priority": r.get("final_priority", r.get("score", 0.0)),
1115
  "signal_confidence": r.get("signal_confidence",0.0),
1116
- "dominant_signal": r.get("dominant_signal", "NEUTRAL"),
1117
- "latest_signal": snap.get("voting", {}).get("latest_signal", r.get("dominant_signal", "NEUTRAL")),
1118
  "avn_accuracy": r.get("avn_accuracy", 0.0),
1119
  "epistemic_std": r.get("epistemic_std", 0.0),
1120
  "training_steps": r.get("training_steps", training.get("training_steps", 0)),
@@ -1122,45 +1125,44 @@ def _compute_rankings() -> List[dict]:
1122
  "actor_loss": training.get("actor_loss", 0.0),
1123
  "critic_loss": training.get("critic_loss", 0.0),
1124
  "avn_loss": training.get("avn_loss", 0.0),
1125
- "buy_count": buy,
1126
- "sell_count": sell,
1127
  "last_updated": snap.get("last_updated", _axrvi_rankings_ts),
1128
  })
1129
  return merged
1130
 
1131
- # ── Path 2: fallback hub-snapshot scoring ────────────────────────────────
1132
- ranked: List[dict] = []
 
 
 
 
 
1133
  for name, snap in manager.get_all_snapshots().items():
1134
- training = snap.get("training", {})
1135
  voting = snap.get("voting", {})
1136
- buy = voting.get("buy_count", 0)
1137
- sell = voting.get("sell_count", 0)
1138
- total = buy + sell
1139
- sig_conf = (max(buy, sell) / total) if total > 0 else 0.0
1140
- avn_acc = training.get("avn_accuracy", 0.0)
1141
- score = round(sig_conf - avn_acc, 6)
1142
- ranked.append({
1143
- "rank": 0,
1144
- "space_name": name,
1145
- "score": score,
1146
- "final_priority": score,
1147
- "signal_confidence": round(sig_conf, 6),
1148
- "avn_accuracy": round(avn_acc, 6),
1149
- "dominant_signal": voting.get("dominant_signal", "NEUTRAL"),
1150
- "latest_signal": voting.get("latest_signal", voting.get("dominant_signal", "NEUTRAL")),
1151
- "buy_count": buy,
1152
- "sell_count": sell,
1153
- "training_steps": training.get("training_steps", 0),
1154
- "actor_loss": training.get("actor_loss", 0.0),
1155
- "critic_loss": training.get("critic_loss", 0.0),
1156
- "avn_loss": training.get("avn_loss", 0.0),
1157
- "last_updated": snap.get("last_updated", 0.0),
1158
- "epistemic_std": 0.0,
1159
  })
1160
- ranked.sort(key=lambda r: r["score"], reverse=True)
1161
- for i, r in enumerate(ranked):
1162
  r["rank"] = i + 1
1163
- return ranked
1164
 
1165
 
1166
  @app.get("/")
@@ -1184,12 +1186,12 @@ async def serve_dashboard():
1184
  )
1185
 
1186
 
1187
- @app.post("/api/axrvi/rankings")
1188
  async def receive_axrvi_rankings(request: Request):
1189
  """
1190
  Called by the Executo ranker after every rank_and_gate() cycle (~5 s).
1191
  Stores the live AXRVI-scored ranking list so _compute_rankings() can serve
1192
- it from /api/state instead of the stale hub-snapshot vote-ratio fallback.
1193
 
1194
  Expected body:
1195
  {"rankings": [{"space_name": "V75", "score": 0.24, "rank": 1, ...}, ...]}
@@ -1207,9 +1209,9 @@ async def receive_axrvi_rankings(request: Request):
1207
  _axrvi_rankings = rankings
1208
  _axrvi_rankings_ts = time.time()
1209
  logger.debug(
1210
- f"[AXRVI Rankings] Received {len(rankings)} assets | "
1211
  f"top={rankings[0].get('space_name','?')} score={rankings[0].get('score',0):.4f}"
1212
- if rankings else "[AXRVI Rankings] Received empty list"
1213
  )
1214
  # Broadcast to all connected top3_client.py instances immediately
1215
  if rankings:
 
71
  })
72
 
73
  _ALLOWED_VOTING_FIELDS: frozenset = frozenset({
74
+ "flip_direction",
75
+ "flip_action",
 
76
  "last_price",
77
  "signal_source",
78
  })
 
90
  "avn_accuracy": 0.0,
91
  },
92
  "voting": {
93
+ "flip_direction": "NONE",
94
+ "flip_action": "HOLD",
95
+ "last_price": 0.0,
96
+ "signal_source": "LOG",
 
 
97
  },
98
  }
99
 
 
130
 
131
  voting: dict = {}
132
  if voting_raw:
133
+ raw_direction = voting_raw.get("flip_direction", "NONE")
134
+ if not isinstance(raw_direction, str):
135
+ raw_direction = "NONE"
136
+ raw_action = voting_raw.get("flip_action", "HOLD")
137
+ if not isinstance(raw_action, str):
138
+ raw_action = "HOLD"
139
  raw_source = voting_raw.get("signal_source", "LOG")
140
  if not isinstance(raw_source, str):
141
  raw_source = "LOG"
142
+ _clean_direction = raw_direction.upper() if raw_direction.upper() in {"BUY", "SELL", "NONE"} else "NONE"
143
+ _clean_action = raw_action.upper() if raw_action.upper() in {"ENTRY", "EXIT", "HOLD", "REBALANCE", "REDUCE", "SKIP"} else "HOLD"
144
  voting = {
145
+ "flip_direction": _clean_direction,
146
+ "flip_action": _clean_action,
147
+ "last_price": _float(voting_raw.get("last_price", 0.0)),
148
+ "signal_source": raw_source,
 
 
149
  }
150
 
151
  return {
 
177
  "step": "training_steps","steps": "training_steps","n_steps": "training_steps",
178
  }
179
  _VOTING_KEYS: frozenset = frozenset({
180
+ "flip_direction", "flip_action", "last_price", "signal_source",
 
181
  })
182
 
183
  def __init__(self):
 
457
  logger.info("βœ… HubTradeStore initialised β€” awaiting trade_opened/trade_closed WS messages")
458
 
459
  # ── AXRVI live rankings store ─────────────────────────────────────────────────────────
460
+ # Populated by POST /api/flip/rankings from the Executo ranker after every
461
  # rank_and_gate() cycle (~every 5s). Falls back to hub-snapshot scoring when stale.
462
  _axrvi_rankings: List[dict] = []
463
  _axrvi_rankings_ts: float = 0.0
 
465
 
466
  # ── Top-3 WebSocket client registry ───────────────────────────────────────────────────
467
  # top3_client.py connects here and receives top3_rankings broadcasts whenever the
468
+ # Executo ranker POSTs new rankings via POST /api/flip/rankings.
469
  _top3_clients: Set[WebSocket] = set()
470
  _top3_clients_lock = asyncio.Lock()
471
 
 
473
  async def _broadcast_top3_rankings(rankings: List[dict]) -> None:
474
  """
475
  Broadcast a top3_rankings message to all /ws/top3 subscribers.
476
+ Called immediately after /api/flip/rankings receives a fresh ranking list.
477
  Dead connections are pruned automatically.
478
  """
479
  if not _top3_clients:
 
649
  Sends a top3_rankings message immediately on connect (replay of the latest
650
  known ranking so the client does not have to wait for the next ranker cycle),
651
  then keeps the socket open to receive subsequent broadcasts triggered by
652
+ POST /api/flip/rankings.
653
 
654
  Message format:
655
  {"type": "top3_rankings", "rankings": [...], "total_assets": N, "hub_timestamp": T}
 
1084
  """
1085
  Build the rankings list served by /api/state.
1086
 
1087
+ Path 1 β€” live ranker push (preferred):
1088
+ Rankings received via POST /api/flip/rankings within the last
1089
+ _AXRVI_RANKINGS_TTL seconds. Contains real Shreve priorities,
1090
+ signal_confidence, and epistemic_std from the AXRVI ranker.
1091
+ Hub snapshot fields (training, last_price, flip_direction) are
1092
+ merged in on top to give the freshest available state.
1093
+
1094
+ Path 2 β€” hub-snapshot pass-through (ranker absent or stale):
1095
+ Emits only what the hub snapshot actually contains: flip_direction,
1096
+ flip_action, last_price, signal_source, and training metrics.
1097
+ score / final_priority / signal_confidence / epistemic_std are
1098
+ set to None β€” they are NOT synthesised or approximated.
1099
+ Assets are ordered by last_updated (most recently active first).
1100
  """
1101
  global _axrvi_rankings, _axrvi_rankings_ts
1102
 
 
1109
  snap = snapshots.get(name, {})
1110
  training = snap.get("training", {})
1111
  voting = snap.get("voting", {})
 
 
1112
  merged.append({
1113
+ # Core flip-channel fields β€” live ranker values
1114
  "rank": r.get("rank", 0),
1115
  "space_name": name,
1116
  "score": r.get("score", 0.0),
1117
  "final_priority": r.get("final_priority", r.get("score", 0.0)),
1118
  "signal_confidence": r.get("signal_confidence",0.0),
1119
+ "flip_direction": voting.get("flip_direction", r.get("flip_direction", "NONE")),
1120
+ "flip_action": voting.get("flip_action", r.get("flip_action", "HOLD")),
1121
  "avn_accuracy": r.get("avn_accuracy", 0.0),
1122
  "epistemic_std": r.get("epistemic_std", 0.0),
1123
  "training_steps": r.get("training_steps", training.get("training_steps", 0)),
 
1125
  "actor_loss": training.get("actor_loss", 0.0),
1126
  "critic_loss": training.get("critic_loss", 0.0),
1127
  "avn_loss": training.get("avn_loss", 0.0),
 
 
1128
  "last_updated": snap.get("last_updated", _axrvi_rankings_ts),
1129
  })
1130
  return merged
1131
 
1132
+ # ── Path 2: hub-snapshot pass-through (ranker not yet connected or stale) ──
1133
+ # Only fields that are genuinely present in the hub snapshot are emitted.
1134
+ # No score, signal_confidence, or priority is synthesised β€” those values
1135
+ # belong to the Shreve/AXRVI ranker and must not be approximated here.
1136
+ # Assets are ordered by last_updated so the most recently active asset
1137
+ # appears first; rank is assigned on that basis alone.
1138
+ raw_snaps: List[dict] = []
1139
  for name, snap in manager.get_all_snapshots().items():
 
1140
  voting = snap.get("voting", {})
1141
+ training = snap.get("training", {})
1142
+ raw_snaps.append({
1143
+ "rank": 0, # assigned below after sort
1144
+ "space_name": name,
1145
+ "score": None, # not available without ranker
1146
+ "final_priority": None, # not available without ranker
1147
+ "signal_confidence": None, # not available without ranker
1148
+ "epistemic_std": None, # not available without ranker
1149
+ # ── real flip-channel fields from hub snapshot ──────────────────
1150
+ "flip_direction": voting.get("flip_direction", "NONE"),
1151
+ "flip_action": voting.get("flip_action", "HOLD"),
1152
+ "last_price": voting.get("last_price", 0.0),
1153
+ "signal_source": voting.get("signal_source", "LOG"),
1154
+ # ── real training fields from hub snapshot ───────────────────────
1155
+ "avn_accuracy": training.get("avn_accuracy", 0.0),
1156
+ "training_steps": training.get("training_steps", 0),
1157
+ "actor_loss": training.get("actor_loss", 0.0),
1158
+ "critic_loss": training.get("critic_loss", 0.0),
1159
+ "avn_loss": training.get("avn_loss", 0.0),
1160
+ "last_updated": snap.get("last_updated", 0.0),
 
 
 
1161
  })
1162
+ raw_snaps.sort(key=lambda r: r["last_updated"], reverse=True)
1163
+ for i, r in enumerate(raw_snaps):
1164
  r["rank"] = i + 1
1165
+ return raw_snaps
1166
 
1167
 
1168
  @app.get("/")
 
1186
  )
1187
 
1188
 
1189
+ @app.post("/api/flip/rankings")
1190
  async def receive_axrvi_rankings(request: Request):
1191
  """
1192
  Called by the Executo ranker after every rank_and_gate() cycle (~5 s).
1193
  Stores the live AXRVI-scored ranking list so _compute_rankings() can serve
1194
+ it from /api/state instead of the hub-snapshot pass-through (Path 2).
1195
 
1196
  Expected body:
1197
  {"rankings": [{"space_name": "V75", "score": 0.24, "rank": 1, ...}, ...]}
 
1209
  _axrvi_rankings = rankings
1210
  _axrvi_rankings_ts = time.time()
1211
  logger.debug(
1212
+ f"[Flip Rankings] Received {len(rankings)} assets | "
1213
  f"top={rankings[0].get('space_name','?')} score={rankings[0].get('score',0):.4f}"
1214
+ if rankings else "[Flip Rankings] Received empty list"
1215
  )
1216
  # Broadcast to all connected top3_client.py instances immediately
1217
  if rankings: