KarlQuant commited on
Commit
b5dd7a2
Β·
verified Β·
1 Parent(s): 60e8889

Upload websocket_hub.py

Browse files
Files changed (1) hide show
  1. websocket_hub.py +277 -157
websocket_hub.py CHANGED
@@ -1,7 +1,7 @@
1
  #!/usr/bin/env python3
2
  """
3
  ╔══════════════════════════════════════════════════════════════════════════════════════╗
4
- β•‘ K1RL QUASAR β€” CENTRAL WEBSOCKET HUB β•‘
5
  β•‘ ────────────────────────────────────────────────────────────────────────────────── β•‘
6
  β•‘ β•‘
7
  β•‘ Architecture role: INGEST β†’ NORMALIZE β†’ BROADCAST β•‘
@@ -16,12 +16,19 @@
16
  β•‘ training: training_steps, actor_loss, critic_loss, avn_loss, avn_accuracy β•‘
17
  β•‘ voting: dominant_signal, buy_count, sell_count β•‘
18
  β•‘ β•‘
19
- β•‘ VERSION: v2.0-arch-strict | 2026-03-25 β•‘
 
 
 
 
 
 
20
  β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
21
  """
22
 
23
  import asyncio
24
  import copy
 
25
  import json
26
  import logging
27
  import os
@@ -48,7 +55,6 @@ logger = logging.getLogger("QuasarHub")
48
  # SECTION 1 β€” STRICT DATA MODEL
49
  # ══════════════════════════════════════════════════════════════════════════════════════
50
 
51
- # Fields that are permitted through the hub. All others are silently dropped.
52
  _ALLOWED_TRAINING_FIELDS: frozenset = frozenset({
53
  "training_steps",
54
  "actor_loss",
@@ -61,45 +67,35 @@ _ALLOWED_VOTING_FIELDS: frozenset = frozenset({
61
  "dominant_signal",
62
  "buy_count",
63
  "sell_count",
64
- "last_price", # FIX: was silently dropped β€” needed by subscribers
65
- "signal_source", # FIX: was silently dropped β€” needed by subscribers
66
  })
67
 
68
 
69
  def _empty_snapshot(space_name: str) -> dict:
70
- """Return a clean, zeroed snapshot for a space."""
71
  return {
72
- "space_name": space_name,
73
  "last_updated": 0.0,
74
  "training": {
75
  "training_steps": 0,
76
- "actor_loss": 0.0,
77
- "critic_loss": 0.0,
78
- "avn_loss": 0.0,
79
- "avn_accuracy": 0.0,
80
  },
81
  "voting": {
82
  "dominant_signal": "NEUTRAL",
83
- "buy_count": 0,
84
- "sell_count": 0,
85
- "last_price": 0.0, # FIX: added to match allowed fields
86
- "signal_source": "LOG", # FIX: added to match allowed fields
87
  },
88
  }
89
 
90
 
91
  def _validate_and_normalize(space_name: str, raw: dict) -> Optional[dict]:
92
- """
93
- Validate incoming payload. Return a clean normalized dict or None if invalid.
94
-
95
- Strict rules:
96
- β€’ Must contain at least one of 'training' or 'voting' keys.
97
- β€’ Unknown top-level keys are dropped.
98
- β€’ Unknown sub-keys inside training/voting are dropped.
99
- β€’ Values are coerced to the correct Python types; malformed values are zeroed.
100
- """
101
  training_raw = raw.get("training", {})
102
- voting_raw = raw.get("voting", {})
103
 
104
  if not isinstance(training_raw, dict):
105
  training_raw = {}
@@ -107,32 +103,26 @@ def _validate_and_normalize(space_name: str, raw: dict) -> Optional[dict]:
107
  voting_raw = {}
108
 
109
  if not training_raw and not voting_raw:
110
- return None # Nothing useful
111
 
112
- # --- training ---
113
  def _float(v, default: float = 0.0) -> float:
114
- try:
115
- return float(v)
116
- except (TypeError, ValueError):
117
- return default
118
 
119
  def _int(v, default: int = 0) -> int:
120
- try:
121
- return int(v)
122
- except (TypeError, ValueError):
123
- return default
124
 
125
  training: dict = {}
126
  if training_raw:
127
  training = {
128
  "training_steps": _int(training_raw.get("training_steps", 0)),
129
- "actor_loss": _float(training_raw.get("actor_loss", 0.0)),
130
- "critic_loss": _float(training_raw.get("critic_loss", 0.0)),
131
- "avn_loss": _float(training_raw.get("avn_loss", 0.0)),
132
  "avn_accuracy": max(0.0, min(1.0, _float(training_raw.get("avn_accuracy", 0.0)))),
133
  }
134
 
135
- # --- voting ---
136
  voting: dict = {}
137
  if voting_raw:
138
  raw_signal = voting_raw.get("dominant_signal", "NEUTRAL")
@@ -143,16 +133,16 @@ def _validate_and_normalize(space_name: str, raw: dict) -> Optional[dict]:
143
  raw_source = "LOG"
144
  voting = {
145
  "dominant_signal": raw_signal.upper() if raw_signal.upper() in {"BUY", "SELL", "NEUTRAL"} else "NEUTRAL",
146
- "buy_count": _int(voting_raw.get("buy_count", 0)),
147
- "sell_count": _int(voting_raw.get("sell_count", 0)),
148
- "last_price": _float(voting_raw.get("last_price", 0.0)), # FIX: now passed through
149
- "signal_source": raw_source, # FIX: now passed through
150
  }
151
 
152
  return {
153
  "space_name": space_name,
154
- "training": training,
155
- "voting": voting,
156
  }
157
 
158
 
@@ -161,28 +151,12 @@ def _validate_and_normalize(space_name: str, raw: dict) -> Optional[dict]:
161
  # ══════════════════════════════════════════════════════════════════════════════════════
162
 
163
  class ConnectionManager:
164
- """
165
- Manages publisher (Asset Space) and subscriber (Ranker) WebSocket connections.
166
-
167
- Design:
168
- β€’ Publishers β†’ write-only lane (/ws/publish/{space_name})
169
- β€’ Subscribers β†’ read-only lane (/ws/subscribe)
170
- β€’ No cross-talk: subscribers never send to publishers
171
- """
172
-
173
  def __init__(self):
174
- # Keyed by space_name β†’ WebSocket
175
- self._publishers: Dict[str, WebSocket] = {}
176
- # Set of subscriber sockets
177
- self._subscribers: Set[WebSocket] = set()
178
- # Latest normalized snapshot per space
179
- self._snapshots: Dict[str, dict] = {}
180
- # Asyncio lock for thread-safe mutation
181
- self._lock = asyncio.Lock()
182
- # Message counter (read by /api/state)
183
- self._total_ingested: int = 0
184
-
185
- # ── Publisher lifecycle ──────────────────────────────────────────────────────────
186
 
187
  async def register_publisher(self, space_name: str, ws: WebSocket) -> None:
188
  await ws.accept()
@@ -190,16 +164,13 @@ class ConnectionManager:
190
  self._publishers[space_name] = ws
191
  if space_name not in self._snapshots:
192
  self._snapshots[space_name] = _empty_snapshot(space_name)
193
- logger.info(f"πŸ“‘ Publisher connected: {space_name} "
194
- f"(total={len(self._publishers)})")
195
 
196
  async def unregister_publisher(self, space_name: str) -> None:
197
  async with self._lock:
198
  self._publishers.pop(space_name, None)
199
  logger.info(f"πŸ“‘ Publisher disconnected: {space_name}")
200
 
201
- # ── Subscriber lifecycle ─────────────────────────────────────────────────────────
202
-
203
  async def register_subscriber(self, ws: WebSocket) -> None:
204
  await ws.accept()
205
  async with self._lock:
@@ -211,78 +182,54 @@ class ConnectionManager:
211
  self._subscribers.discard(ws)
212
  logger.info(f"πŸ”” Subscriber disconnected (total={len(self._subscribers)})")
213
 
214
- # ── Ingestion pipeline ───────────────────────────────────────────────────────────
215
-
216
  async def ingest(self, space_name: str, raw_payload: dict) -> None:
217
- """
218
- Validate β†’ Normalize β†’ Store β†’ Broadcast pipeline.
219
- Called for every inbound message from a publisher.
220
- """
221
  normalized = _validate_and_normalize(space_name, raw_payload)
222
  if normalized is None:
223
  logger.debug(f"[{space_name}] Payload dropped (no valid fields)")
224
  return
225
 
226
- # βœ… FIX #6: deep-copy the snapshot *inside* the lock before releasing.
227
- # Without this, snap is a live reference. Another publisher coroutine
228
- # can acquire the lock and mutate snap["training"] / snap["voting"]
229
- # in-place while json.dumps(snapshot) is still serializing it during
230
- # broadcast, producing torn / mixed data sent to subscribers.
231
  async with self._lock:
232
  snap = self._snapshots.setdefault(space_name, _empty_snapshot(space_name))
233
  snap["last_updated"] = time.time()
234
-
235
  if normalized["training"]:
236
  snap["training"].update(normalized["training"])
237
  if normalized["voting"]:
238
  snap["voting"].update(normalized["voting"])
239
-
240
  self._total_ingested += 1
241
- snap_copy = copy.deepcopy(snap) # frozen snapshot for broadcast
242
 
243
- # Broadcast outside the lock using the immutable copy
244
  await self._broadcast_update(space_name, snap_copy)
245
 
246
- # ── Broadcast engine ─────────────────────────────────────────────────────────────
247
-
248
  async def _broadcast_update(self, space_name: str, snapshot: dict) -> None:
249
- """Fan-out the updated snapshot to all subscribers (event-driven)."""
250
  if not self._subscribers:
251
  return
252
-
253
  message = json.dumps({
254
- "type": "metrics_update",
255
- "space_name": space_name,
256
- "snapshot": snapshot,
257
  "hub_timestamp": time.time(),
258
  })
259
-
260
  dead: list = []
261
  for ws in list(self._subscribers):
262
  try:
263
  await ws.send_text(message)
264
  except Exception:
265
  dead.append(ws)
266
-
267
  if dead:
268
  async with self._lock:
269
  for ws in dead:
270
  self._subscribers.discard(ws)
271
 
272
  async def send_initial_state(self, ws: WebSocket) -> None:
273
- """Send full current state to a newly connected subscriber."""
274
  async with self._lock:
275
  snapshots_copy = dict(self._snapshots)
276
-
277
  message = json.dumps({
278
- "type": "initial_state",
279
- "snapshots": snapshots_copy,
280
  "hub_timestamp": time.time(),
281
  })
282
  await ws.send_text(message)
283
 
284
- # ── Read-only accessors (REST API) ───────────────────────────────────────────────
285
-
286
  def get_snapshot(self, space_name: str) -> Optional[dict]:
287
  return self._snapshots.get(space_name)
288
 
@@ -294,7 +241,7 @@ class ConnectionManager:
294
  return {
295
  "publishers": {
296
  name: {
297
- "last_updated": self._snapshots.get(name, {}).get("last_updated", 0),
298
  "stale_seconds": round(now - self._snapshots.get(name, {}).get("last_updated", now), 1),
299
  }
300
  for name in self._publishers
@@ -304,13 +251,173 @@ class ConnectionManager:
304
 
305
 
306
  # ══════════════════════════════════════════════════════════════════════════════════════
307
- # SECTION 3 β€” FASTAPI APPLICATION
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
308
  # ══════════════════════════════════════════════════════════════════════════════════════
309
 
310
  app = FastAPI(
311
  title="K1RL QUASAR Hub",
312
  description="Central WebSocket hub β€” ingest, normalize, broadcast (one-way)",
313
- version="2.0.0",
314
  )
315
  app.add_middleware(
316
  CORSMiddleware,
@@ -322,22 +429,19 @@ app.add_middleware(
322
  manager = ConnectionManager()
323
 
324
 
 
 
 
 
 
 
 
325
  # ══════════════════════════════════════════════════════════════════════════════════════
326
- # SECTION 4 β€” WEBSOCKET ENDPOINTS
327
  # ══════════════════════════════════════════════════════════════════════════════════════
328
 
329
  @app.websocket("/ws/publish/{space_name}")
330
  async def ws_publisher_endpoint(websocket: WebSocket, space_name: str):
331
- """
332
- Publisher endpoint β€” Asset Spaces connect here.
333
- SEND ONLY: hub never writes back to this socket.
334
-
335
- Accepted message types:
336
- {"type": "metrics", "training": {...}, "voting": {...}}
337
- {"type": "training", "data": {...}}
338
- {"type": "voting", "data": {...}}
339
- {"type": "heartbeat"}
340
- """
341
  await manager.register_publisher(space_name, websocket)
342
  try:
343
  while True:
@@ -351,28 +455,22 @@ async def ws_publisher_endpoint(websocket: WebSocket, space_name: str):
351
  msg_type = data.get("type", "")
352
 
353
  if msg_type == "metrics":
354
- # Full combined payload
355
  await manager.ingest(space_name, {
356
  "training": data.get("training", {}),
357
  "voting": data.get("voting", {}),
358
  })
359
-
360
  elif msg_type == "training":
361
  await manager.ingest(space_name, {
362
  "training": data.get("data", {}),
363
  "voting": {},
364
  })
365
-
366
  elif msg_type == "voting":
367
  await manager.ingest(space_name, {
368
  "training": {},
369
  "voting": data.get("data", {}),
370
  })
371
-
372
  elif msg_type in ("heartbeat", "identify", "ping"):
373
- # Silently acknowledged β€” no reply sent back
374
  pass
375
-
376
  else:
377
  logger.debug(f"[{space_name}] Unrecognised type '{msg_type}' β€” dropped")
378
 
@@ -386,19 +484,10 @@ async def ws_publisher_endpoint(websocket: WebSocket, space_name: str):
386
 
387
  @app.websocket("/ws/subscribe")
388
  async def ws_subscriber_endpoint(websocket: WebSocket):
389
- """
390
- Subscriber endpoint β€” Ranker Space connects here.
391
- READ ONLY: subscribers must not send data; any inbound messages are discarded.
392
-
393
- Messages received by subscriber:
394
- {"type": "initial_state", "snapshots": {...}, "hub_timestamp": ...}
395
- {"type": "metrics_update", "space_name": "...", "snapshot": {...}, "hub_timestamp": ...}
396
- """
397
  await manager.register_subscriber(websocket)
398
  await manager.send_initial_state(websocket)
399
  try:
400
  while True:
401
- # Drain any inbound messages without processing them (read-only contract)
402
  await websocket.receive_text()
403
  except WebSocketDisconnect:
404
  pass
@@ -409,15 +498,11 @@ async def ws_subscriber_endpoint(websocket: WebSocket):
409
 
410
 
411
  # ══════════════════════════════════════════════════════════════════════════════════════
412
- # SECTION 5 β€” REST API (READ-ONLY)
413
  # ══════════════════════════════════════════════════════════════════════════════════════
414
 
415
  @app.get("/rankings")
416
  async def get_rankings():
417
- """
418
- Return latest snapshot for all assets.
419
- Ranker may also poll this endpoint as a fallback.
420
- """
421
  return {
422
  "snapshots": manager.get_all_snapshots(),
423
  "timestamp": datetime.utcnow().isoformat() + "Z",
@@ -426,7 +511,6 @@ async def get_rankings():
426
 
427
  @app.get("/metrics/{space_name}")
428
  async def get_space_metrics(space_name: str):
429
- """Return latest snapshot for a single asset space."""
430
  snap = manager.get_snapshot(space_name)
431
  if snap is None:
432
  return {"error": f"Unknown space: {space_name}"}
@@ -436,15 +520,56 @@ async def get_space_metrics(space_name: str):
436
  @app.get("/health")
437
  async def get_health():
438
  return {
439
- "status": "ok",
440
  "timestamp": datetime.utcnow().isoformat() + "Z",
441
  **manager.get_health(),
442
  }
443
 
444
 
445
  # ══════════════════════════════════════════════════════════════════════════════════════
446
- # SECTION 6 β€” DASHBOARD UI ROUTES
447
- # (serves hub_dashboard.html at / and provides /api/state for the polling frontend)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
448
  # ══════════════════════════════════════════════════════════════════════════════════════
449
 
450
  _HTML_PATH = Path(os.environ.get(
@@ -454,12 +579,6 @@ _HTML_PATH = Path(os.environ.get(
454
 
455
 
456
  def _compute_rankings() -> List[dict]:
457
- """
458
- Derive ranked list from manager snapshots using the AXRVI formula:
459
- signal_confidence = max(buy, sell) / (buy + sell) if total > 0 else 0
460
- score = signal_confidence - avn_accuracy
461
- Sorted descending by score.
462
- """
463
  ranked: List[dict] = []
464
  for name, snap in manager.get_all_snapshots().items():
465
  training = snap.get("training", {})
@@ -493,7 +612,6 @@ def _compute_rankings() -> List[dict]:
493
 
494
  @app.get("/")
495
  async def serve_dashboard():
496
- """Serve the dashboard HTML. Returns a helpful message if the file is missing."""
497
  if _HTML_PATH.exists():
498
  return FileResponse(str(_HTML_PATH), media_type="text/html")
499
  return JSONResponse(
@@ -503,24 +621,26 @@ async def serve_dashboard():
503
  "status": "running",
504
  "note": "hub_dashboard.html not found β€” upload it to the Space",
505
  "expected": str(_HTML_PATH),
506
- "endpoints": ["/rankings", "/health", "/api/state", "/ws/publish/{space}", "/ws/subscribe"],
 
 
 
 
 
507
  },
508
  )
509
 
510
 
511
  @app.get("/api/state")
512
  async def api_state():
513
- """
514
- Full dashboard state polled by hub_dashboard.html every 2 s.
515
- Returns rankings + metric history stub + health.
516
- """
517
- h = manager.get_health()
518
  rankings = _compute_rankings()
519
  return JSONResponse({
520
  "rankings": rankings,
521
- "metric_history": {}, # history kept client-side via snapshot deltas
522
  "health": {
523
- "hub_connected": True, # we ARE the hub
524
  "spaces_connected": len(manager.get_all_snapshots()),
525
  "messages_rx": manager._total_ingested,
526
  "last_update_ts": max(
@@ -533,8 +653,8 @@ async def api_state():
533
  default=time.time(),
534
  ), 1
535
  ),
536
- "uptime_seconds": round(time.time() - _START_TIME, 0),
537
- "reconnect_count": 0,
538
  },
539
  "timestamp": datetime.utcnow().isoformat() + "Z",
540
  })
@@ -544,10 +664,10 @@ _START_TIME = time.time()
544
 
545
 
546
  # ══════════════════════════════════════════════════════════════════════════════════════
547
- # SECTION 7 β€” ENTRY POINT
548
  # ══════════════════════════════════════════════════════════════════════════════════════
549
 
550
  if __name__ == "__main__":
551
  port = int(os.environ.get("PORT", 7860))
552
  logger.info(f"πŸš€ QUASAR Hub starting on port {port}")
553
- uvicorn.run(app, host="0.0.0.0", port=port, log_level="info")
 
1
  #!/usr/bin/env python3
2
  """
3
  ╔══════════════════════════════════════════════════════════════════════════════════════╗
4
+ β•‘ K1RL QUASAR β€” CENTRAL WEBSOCKET HUB v2.1-integrated β•‘
5
  β•‘ ────────────────────────────────────────────────────────────────────────────────── β•‘
6
  β•‘ β•‘
7
  β•‘ Architecture role: INGEST β†’ NORMALIZE β†’ BROADCAST β•‘
 
16
  β•‘ training: training_steps, actor_loss, critic_loss, avn_loss, avn_accuracy β•‘
17
  β•‘ voting: dominant_signal, buy_count, sell_count β•‘
18
  β•‘ β•‘
19
+ β•‘ TRADE API (served natively β€” no patch script needed): β•‘
20
+ β•‘ GET /api/trades β†’ full open + closed state + stats β•‘
21
+ β•‘ GET /api/trades/open β†’ open trades only β•‘
22
+ β•‘ GET /api/trades/closed β†’ recent closed trades + stats (?limit=N) β•‘
23
+ β•‘ GET /api/health β†’ service health including trade counts β•‘
24
+ β•‘ β•‘
25
+ β•‘ VERSION: v2.1-integrated | 2026-03-31 β•‘
26
  β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
27
  """
28
 
29
  import asyncio
30
  import copy
31
+ import glob
32
  import json
33
  import logging
34
  import os
 
55
  # SECTION 1 β€” STRICT DATA MODEL
56
  # ══════════════════════════════════════════════════════════════════════════════════════
57
 
 
58
  _ALLOWED_TRAINING_FIELDS: frozenset = frozenset({
59
  "training_steps",
60
  "actor_loss",
 
67
  "dominant_signal",
68
  "buy_count",
69
  "sell_count",
70
+ "last_price",
71
+ "signal_source",
72
  })
73
 
74
 
75
  def _empty_snapshot(space_name: str) -> dict:
 
76
  return {
77
+ "space_name": space_name,
78
  "last_updated": 0.0,
79
  "training": {
80
  "training_steps": 0,
81
+ "actor_loss": 0.0,
82
+ "critic_loss": 0.0,
83
+ "avn_loss": 0.0,
84
+ "avn_accuracy": 0.0,
85
  },
86
  "voting": {
87
  "dominant_signal": "NEUTRAL",
88
+ "buy_count": 0,
89
+ "sell_count": 0,
90
+ "last_price": 0.0,
91
+ "signal_source": "LOG",
92
  },
93
  }
94
 
95
 
96
  def _validate_and_normalize(space_name: str, raw: dict) -> Optional[dict]:
 
 
 
 
 
 
 
 
 
97
  training_raw = raw.get("training", {})
98
+ voting_raw = raw.get("voting", {})
99
 
100
  if not isinstance(training_raw, dict):
101
  training_raw = {}
 
103
  voting_raw = {}
104
 
105
  if not training_raw and not voting_raw:
106
+ return None
107
 
 
108
  def _float(v, default: float = 0.0) -> float:
109
+ try: return float(v)
110
+ except: return default
 
 
111
 
112
  def _int(v, default: int = 0) -> int:
113
+ try: return int(v)
114
+ except: return default
 
 
115
 
116
  training: dict = {}
117
  if training_raw:
118
  training = {
119
  "training_steps": _int(training_raw.get("training_steps", 0)),
120
+ "actor_loss": _float(training_raw.get("actor_loss", 0.0)),
121
+ "critic_loss": _float(training_raw.get("critic_loss", 0.0)),
122
+ "avn_loss": _float(training_raw.get("avn_loss", 0.0)),
123
  "avn_accuracy": max(0.0, min(1.0, _float(training_raw.get("avn_accuracy", 0.0)))),
124
  }
125
 
 
126
  voting: dict = {}
127
  if voting_raw:
128
  raw_signal = voting_raw.get("dominant_signal", "NEUTRAL")
 
133
  raw_source = "LOG"
134
  voting = {
135
  "dominant_signal": raw_signal.upper() if raw_signal.upper() in {"BUY", "SELL", "NEUTRAL"} else "NEUTRAL",
136
+ "buy_count": _int(voting_raw.get("buy_count", 0)),
137
+ "sell_count": _int(voting_raw.get("sell_count", 0)),
138
+ "last_price": _float(voting_raw.get("last_price", 0.0)),
139
+ "signal_source": raw_source,
140
  }
141
 
142
  return {
143
  "space_name": space_name,
144
+ "training": training,
145
+ "voting": voting,
146
  }
147
 
148
 
 
151
  # ══════════════════════════════════════════════════════════════════════════════════════
152
 
153
  class ConnectionManager:
 
 
 
 
 
 
 
 
 
154
  def __init__(self):
155
+ self._publishers: Dict[str, WebSocket] = {}
156
+ self._subscribers: Set[WebSocket] = set()
157
+ self._snapshots: Dict[str, dict] = {}
158
+ self._lock = asyncio.Lock()
159
+ self._total_ingested: int = 0
 
 
 
 
 
 
 
160
 
161
  async def register_publisher(self, space_name: str, ws: WebSocket) -> None:
162
  await ws.accept()
 
164
  self._publishers[space_name] = ws
165
  if space_name not in self._snapshots:
166
  self._snapshots[space_name] = _empty_snapshot(space_name)
167
+ logger.info(f"πŸ“‘ Publisher connected: {space_name} (total={len(self._publishers)})")
 
168
 
169
  async def unregister_publisher(self, space_name: str) -> None:
170
  async with self._lock:
171
  self._publishers.pop(space_name, None)
172
  logger.info(f"πŸ“‘ Publisher disconnected: {space_name}")
173
 
 
 
174
  async def register_subscriber(self, ws: WebSocket) -> None:
175
  await ws.accept()
176
  async with self._lock:
 
182
  self._subscribers.discard(ws)
183
  logger.info(f"πŸ”” Subscriber disconnected (total={len(self._subscribers)})")
184
 
 
 
185
  async def ingest(self, space_name: str, raw_payload: dict) -> None:
 
 
 
 
186
  normalized = _validate_and_normalize(space_name, raw_payload)
187
  if normalized is None:
188
  logger.debug(f"[{space_name}] Payload dropped (no valid fields)")
189
  return
190
 
 
 
 
 
 
191
  async with self._lock:
192
  snap = self._snapshots.setdefault(space_name, _empty_snapshot(space_name))
193
  snap["last_updated"] = time.time()
 
194
  if normalized["training"]:
195
  snap["training"].update(normalized["training"])
196
  if normalized["voting"]:
197
  snap["voting"].update(normalized["voting"])
 
198
  self._total_ingested += 1
199
+ snap_copy = copy.deepcopy(snap)
200
 
 
201
  await self._broadcast_update(space_name, snap_copy)
202
 
 
 
203
  async def _broadcast_update(self, space_name: str, snapshot: dict) -> None:
 
204
  if not self._subscribers:
205
  return
 
206
  message = json.dumps({
207
+ "type": "metrics_update",
208
+ "space_name": space_name,
209
+ "snapshot": snapshot,
210
  "hub_timestamp": time.time(),
211
  })
 
212
  dead: list = []
213
  for ws in list(self._subscribers):
214
  try:
215
  await ws.send_text(message)
216
  except Exception:
217
  dead.append(ws)
 
218
  if dead:
219
  async with self._lock:
220
  for ws in dead:
221
  self._subscribers.discard(ws)
222
 
223
  async def send_initial_state(self, ws: WebSocket) -> None:
 
224
  async with self._lock:
225
  snapshots_copy = dict(self._snapshots)
 
226
  message = json.dumps({
227
+ "type": "initial_state",
228
+ "snapshots": snapshots_copy,
229
  "hub_timestamp": time.time(),
230
  })
231
  await ws.send_text(message)
232
 
 
 
233
  def get_snapshot(self, space_name: str) -> Optional[dict]:
234
  return self._snapshots.get(space_name)
235
 
 
241
  return {
242
  "publishers": {
243
  name: {
244
+ "last_updated": self._snapshots.get(name, {}).get("last_updated", 0),
245
  "stale_seconds": round(now - self._snapshots.get(name, {}).get("last_updated", now), 1),
246
  }
247
  for name in self._publishers
 
251
 
252
 
253
  # ══════════════════════════════════════════════════════════════════════════════════════
254
+ # SECTION 3 β€” TRADE LOG PARSER (native β€” no patch script needed)
255
+ # ══════════════════════════════════════════════════════════════════════════════════════
256
+
257
+ class TradeLogParser:
258
+ """
259
+ Lightweight trade-log reader that watches ranker_logs/*.log files and
260
+ exposes open/closed trade state via get_state().
261
+
262
+ Imported from hub_dashboard_service if available; otherwise this built-in
263
+ fallback is used so /api/trades routes always work.
264
+ """
265
+
266
+ def __init__(self, log_dir: str = "/app/ranker_logs"):
267
+ self.log_dir = log_dir
268
+ self._state = {"open": {}, "closed": [], "stats": {}}
269
+ self._lock = None # set in start_background()
270
+ self._running = False
271
+
272
+ def start_background(self) -> None:
273
+ """Schedule background log-scanning on the running event loop."""
274
+ self._running = True
275
+ asyncio.get_event_loop().create_task(self._scan_loop())
276
+ logger.info(f"πŸ“‚ TradeLogParser started β€” watching {self.log_dir}")
277
+
278
+ async def _scan_loop(self) -> None:
279
+ while self._running:
280
+ try:
281
+ self._refresh()
282
+ except Exception as e:
283
+ logger.debug(f"TradeLogParser scan error: {e}")
284
+ await asyncio.sleep(5)
285
+
286
+ def _refresh(self) -> None:
287
+ """Re-parse all log files and rebuild open/closed state."""
288
+ open_trades: Dict[str, dict] = {}
289
+ closed_trades: List[dict] = []
290
+
291
+ log_files = sorted(glob.glob(os.path.join(self.log_dir, "*.log")))
292
+ for fpath in log_files:
293
+ try:
294
+ with open(fpath, "r", encoding="utf-8", errors="replace") as fh:
295
+ for line in fh:
296
+ self._parse_line(line, open_trades, closed_trades)
297
+ except Exception:
298
+ pass
299
+
300
+ # Sort closed newest-first
301
+ closed_trades.sort(key=lambda t: t.get("closed_ts", 0), reverse=True)
302
+
303
+ # Compute stats
304
+ pnls = [t.get("pnl", 0.0) for t in closed_trades]
305
+ wins = [p for p in pnls if p > 0]
306
+ total = len(pnls)
307
+ stats = {
308
+ "total_closed": total,
309
+ "total_open": len(open_trades),
310
+ "win_rate": round(len(wins) / total, 4) if total else 0.0,
311
+ "total_pnl": round(sum(pnls), 6),
312
+ "avg_pnl": round(sum(pnls) / total, 6) if total else 0.0,
313
+ "avg_pnl_trade": round(sum(pnls) / total, 6) if total else 0.0,
314
+ }
315
+
316
+ self._state = {
317
+ "open": list(open_trades.values()),
318
+ "closed": closed_trades,
319
+ "stats": stats,
320
+ }
321
+
322
+ # ── Log-line parser ──────────────────────────────────────────────────────────────
323
+ # Matches lines like:
324
+ # [2026-03-31 10:02:07] | INFO | TRADE | V75 | TRADE OPENED | ID=V75_... | ...
325
+ # [2026-03-31 10:02:07] | INFO | TRADE | V75 | TRADE CLOSED | ID=V75_... | pnl=-32.62 | ...
326
+
327
+ def _parse_line(
328
+ self,
329
+ line: str,
330
+ open_trades: Dict[str, dict],
331
+ closed_trades: List[dict],
332
+ ) -> None:
333
+ if "| TRADE |" not in line:
334
+ return
335
+
336
+ parts = [p.strip() for p in line.split("|")]
337
+ # parts[0]=timestamp, [1]=level, [2]="TRADE", [3]=asset, [4]=event, [5..]=fields
338
+ if len(parts) < 5:
339
+ return
340
+
341
+ try:
342
+ ts_str = parts[0].strip("[] ")
343
+ ts = datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S").timestamp()
344
+ except Exception:
345
+ ts = 0.0
346
+
347
+ asset = parts[3].strip()
348
+ event = parts[4].strip().upper()
349
+
350
+ # Build a quick field dict from KEY=VALUE tokens in the rest of the line
351
+ fields: Dict[str, str] = {}
352
+ for segment in parts[5:]:
353
+ for token in segment.split():
354
+ if "=" in token:
355
+ k, _, v = token.partition("=")
356
+ fields[k.strip()] = v.strip().rstrip(",")
357
+
358
+ trade_id = fields.get("ID", f"{asset}_{int(ts)}")
359
+
360
+ def _f(k: str) -> float:
361
+ try: return float(fields.get(k, 0))
362
+ except: return 0.0
363
+
364
+ if "TRADE OPENED" in event:
365
+ open_trades[trade_id] = {
366
+ "trade_id": trade_id,
367
+ "asset": asset,
368
+ "direction": fields.get("Dir", fields.get("Direction", "?")).capitalize(),
369
+ "entry": _f("Entry"),
370
+ "qty": _f("Qty"),
371
+ "open_pnl": 0.0,
372
+ "opened_ts": ts,
373
+ "opened": ts_str,
374
+ }
375
+
376
+ elif "TRADE CLOSED" in event:
377
+ # Remove from open if present
378
+ trade = open_trades.pop(trade_id, {})
379
+ pnl = _f("pnl")
380
+ ret = _f("return")
381
+ closed_trades.append({
382
+ "trade_id": trade_id,
383
+ "asset": asset,
384
+ "direction": trade.get("direction", "?"),
385
+ "entry": trade.get("entry", _f("Entry")),
386
+ "exit": _f("Exit") or _f("exit"),
387
+ "qty": trade.get("qty", _f("Qty")),
388
+ "pnl": pnl,
389
+ "return_pct": ret,
390
+ "closed_ts": ts,
391
+ "closed": ts_str,
392
+ "duration": round(ts - trade.get("opened_ts", ts), 0) if trade else 0,
393
+ })
394
+
395
+ def get_state(self) -> dict:
396
+ return self._state
397
+
398
+
399
+ # ── Bootstrap parser ─────────────────────────────────────────────────────────────────
400
+ # Try the richer version from hub_dashboard_service first; fall back to built-in above.
401
+
402
+ _LOG_DIR = os.environ.get("RANKER_LOG_DIR", "/app/ranker_logs")
403
+
404
+ try:
405
+ from hub_dashboard_service import TradeLogParser as _ExternalParser
406
+ _trade_parser = _ExternalParser(log_dir=_LOG_DIR)
407
+ logger.info("βœ… TradeLogParser loaded from hub_dashboard_service")
408
+ except Exception:
409
+ _trade_parser = TradeLogParser(log_dir=_LOG_DIR)
410
+ logger.info("βœ… TradeLogParser using built-in fallback")
411
+
412
+
413
+ # ══════════════════════════════════════════════════════════════════════════════════════
414
+ # SECTION 4 β€” FASTAPI APPLICATION
415
  # ══════════════════════════════════════════════════════════════════════════════════════
416
 
417
  app = FastAPI(
418
  title="K1RL QUASAR Hub",
419
  description="Central WebSocket hub β€” ingest, normalize, broadcast (one-way)",
420
+ version="2.1.0",
421
  )
422
  app.add_middleware(
423
  CORSMiddleware,
 
429
  manager = ConnectionManager()
430
 
431
 
432
+ @app.on_event("startup")
433
+ async def _on_startup():
434
+ """Start background trade-log scanner after the event loop is running."""
435
+ _trade_parser.start_background()
436
+ logger.info("πŸš€ Trade log scanner started")
437
+
438
+
439
  # ══════════════════════════════════════════════════════════════════════════════════════
440
+ # SECTION 5 β€” WEBSOCKET ENDPOINTS
441
  # ══════════════════════════════════════════════════════════════════════════════════════
442
 
443
  @app.websocket("/ws/publish/{space_name}")
444
  async def ws_publisher_endpoint(websocket: WebSocket, space_name: str):
 
 
 
 
 
 
 
 
 
 
445
  await manager.register_publisher(space_name, websocket)
446
  try:
447
  while True:
 
455
  msg_type = data.get("type", "")
456
 
457
  if msg_type == "metrics":
 
458
  await manager.ingest(space_name, {
459
  "training": data.get("training", {}),
460
  "voting": data.get("voting", {}),
461
  })
 
462
  elif msg_type == "training":
463
  await manager.ingest(space_name, {
464
  "training": data.get("data", {}),
465
  "voting": {},
466
  })
 
467
  elif msg_type == "voting":
468
  await manager.ingest(space_name, {
469
  "training": {},
470
  "voting": data.get("data", {}),
471
  })
 
472
  elif msg_type in ("heartbeat", "identify", "ping"):
 
473
  pass
 
474
  else:
475
  logger.debug(f"[{space_name}] Unrecognised type '{msg_type}' β€” dropped")
476
 
 
484
 
485
  @app.websocket("/ws/subscribe")
486
  async def ws_subscriber_endpoint(websocket: WebSocket):
 
 
 
 
 
 
 
 
487
  await manager.register_subscriber(websocket)
488
  await manager.send_initial_state(websocket)
489
  try:
490
  while True:
 
491
  await websocket.receive_text()
492
  except WebSocketDisconnect:
493
  pass
 
498
 
499
 
500
  # ══════════════════════════════════════════════════════════════════════════════════════
501
+ # SECTION 6 β€” REST API (READ-ONLY)
502
  # ══════════════════════════════════════════════════════════════════════════════════════
503
 
504
  @app.get("/rankings")
505
  async def get_rankings():
 
 
 
 
506
  return {
507
  "snapshots": manager.get_all_snapshots(),
508
  "timestamp": datetime.utcnow().isoformat() + "Z",
 
511
 
512
  @app.get("/metrics/{space_name}")
513
  async def get_space_metrics(space_name: str):
 
514
  snap = manager.get_snapshot(space_name)
515
  if snap is None:
516
  return {"error": f"Unknown space: {space_name}"}
 
520
  @app.get("/health")
521
  async def get_health():
522
  return {
523
+ "status": "ok",
524
  "timestamp": datetime.utcnow().isoformat() + "Z",
525
  **manager.get_health(),
526
  }
527
 
528
 
529
  # ══════════════════════════════════════════════════════════════════════════════════════
530
+ # SECTION 7 β€” TRADE API (native β€” replaces patch_websocket_hub.py)
531
+ # ══════════════════════════════════════════════════════════════════════════════════════
532
+
533
+ @app.get("/api/trades")
534
+ async def api_trades():
535
+ """Full trade state: open trades, recent closed trades, summary stats."""
536
+ return JSONResponse(_trade_parser.get_state())
537
+
538
+
539
+ @app.get("/api/trades/open")
540
+ async def api_trades_open():
541
+ """Open trades only."""
542
+ state = _trade_parser.get_state()
543
+ return JSONResponse({"open": state["open"]})
544
+
545
+
546
+ @app.get("/api/trades/closed")
547
+ async def api_trades_closed(limit: int = 50):
548
+ """Recent closed trades (newest first) + cumulative stats."""
549
+ state = _trade_parser.get_state()
550
+ return JSONResponse({
551
+ "closed": state["closed"][:limit],
552
+ "stats": state["stats"],
553
+ })
554
+
555
+
556
+ @app.get("/api/health")
557
+ async def api_health():
558
+ """Service health β€” includes live trade counts and log-file inventory."""
559
+ state = _trade_parser.get_state()
560
+ return JSONResponse({
561
+ "service": "websocket_hub",
562
+ "version": "v2.1-integrated",
563
+ "status": "running",
564
+ "log_files": len(glob.glob(os.path.join(_LOG_DIR, "*.log"))),
565
+ "trade_open": len(state["open"]),
566
+ "trade_closed": len(state["closed"]),
567
+ "uptime_seconds": round(time.time() - _START_TIME, 0),
568
+ })
569
+
570
+
571
+ # ══════════════════════════════════════════════════════════════════════════════════════
572
+ # SECTION 8 β€” DASHBOARD UI ROUTES
573
  # ══════════════════════════════════════════════════════════════════════════════════════
574
 
575
  _HTML_PATH = Path(os.environ.get(
 
579
 
580
 
581
  def _compute_rankings() -> List[dict]:
 
 
 
 
 
 
582
  ranked: List[dict] = []
583
  for name, snap in manager.get_all_snapshots().items():
584
  training = snap.get("training", {})
 
612
 
613
  @app.get("/")
614
  async def serve_dashboard():
 
615
  if _HTML_PATH.exists():
616
  return FileResponse(str(_HTML_PATH), media_type="text/html")
617
  return JSONResponse(
 
621
  "status": "running",
622
  "note": "hub_dashboard.html not found β€” upload it to the Space",
623
  "expected": str(_HTML_PATH),
624
+ "endpoints": [
625
+ "/rankings", "/health",
626
+ "/api/state", "/api/trades", "/api/trades/open",
627
+ "/api/trades/closed", "/api/health",
628
+ "/ws/publish/{space}", "/ws/subscribe",
629
+ ],
630
  },
631
  )
632
 
633
 
634
  @app.get("/api/state")
635
  async def api_state():
636
+ """Full dashboard state polled by hub_dashboard.html every 2 s."""
637
+ h = manager.get_health()
 
 
 
638
  rankings = _compute_rankings()
639
  return JSONResponse({
640
  "rankings": rankings,
641
+ "metric_history": {},
642
  "health": {
643
+ "hub_connected": True,
644
  "spaces_connected": len(manager.get_all_snapshots()),
645
  "messages_rx": manager._total_ingested,
646
  "last_update_ts": max(
 
653
  default=time.time(),
654
  ), 1
655
  ),
656
+ "uptime_seconds": round(time.time() - _START_TIME, 0),
657
+ "reconnect_count": 0,
658
  },
659
  "timestamp": datetime.utcnow().isoformat() + "Z",
660
  })
 
664
 
665
 
666
  # ══════════════════════════════════════════════════════════════════════════════════════
667
+ # SECTION 9 β€” ENTRY POINT
668
  # ══════════════════════════════════════════════════════════════════════════════════════
669
 
670
  if __name__ == "__main__":
671
  port = int(os.environ.get("PORT", 7860))
672
  logger.info(f"πŸš€ QUASAR Hub starting on port {port}")
673
+ uvicorn.run(app, host="0.0.0.0", port=port, log_level="info")