KarlQuant commited on
Commit
1e76af5
Β·
verified Β·
1 Parent(s): 70bde30

Update websocket_hub.py

Browse files
Files changed (1) hide show
  1. websocket_hub.py +142 -144
websocket_hub.py CHANGED
@@ -313,158 +313,147 @@ class ConnectionManager:
313
 
314
 
315
  # ══════════════════════════════════════════════════════════════════════════════════════
316
- # SECTION 3 β€” TRADE LOG PARSER (native β€” no patch script needed)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
317
  # ══════════════════════════════════════════════════════════════════════════════════════
318
 
319
- class TradeLogParser:
320
  """
321
- Lightweight trade-log reader that watches ranker_logs/*.log files and
322
- exposes open/closed trade state via get_state().
323
-
324
- Imported from hub_dashboard_service if available; otherwise this built-in
325
- fallback is used so /api/trades routes always work.
326
  """
327
 
328
- def __init__(self, log_dir: str = "/app/ranker_logs"):
329
- self.log_dir = log_dir
330
- self._state = {"open": {}, "closed": [], "stats": {}}
331
- self._lock = None # set in start_background()
332
- self._running = False
333
-
334
- def start_background(self) -> None:
335
- """Schedule background log-scanning on the running event loop."""
336
- self._running = True
337
- asyncio.get_event_loop().create_task(self._scan_loop())
338
- logger.info(f"πŸ“‚ TradeLogParser started β€” watching {self.log_dir}")
339
-
340
- async def _scan_loop(self) -> None:
341
- while self._running:
342
- try:
343
- self._refresh()
344
- except Exception as e:
345
- logger.debug(f"TradeLogParser scan error: {e}")
346
- await asyncio.sleep(5)
347
-
348
- def _refresh(self) -> None:
349
- """Re-parse all log files and rebuild open/closed state."""
350
- open_trades: Dict[str, dict] = {}
351
- closed_trades: List[dict] = []
352
-
353
- log_files = sorted(glob.glob(os.path.join(self.log_dir, "*.log")))
354
- for fpath in log_files:
355
- try:
356
- with open(fpath, "r", encoding="utf-8", errors="replace") as fh:
357
- for line in fh:
358
- self._parse_line(line, open_trades, closed_trades)
359
- except Exception:
360
- pass
361
-
362
- # Sort closed newest-first
363
- closed_trades.sort(key=lambda t: t.get("closed_ts", 0), reverse=True)
364
-
365
- # Compute stats
366
- pnls = [t.get("pnl", 0.0) for t in closed_trades]
367
- wins = [p for p in pnls if p > 0]
368
- total = len(pnls)
369
- stats = {
370
- "total_closed": total,
371
- "total_open": len(open_trades),
372
- "win_rate": round(len(wins) / total, 4) if total else 0.0,
373
- "total_pnl": round(sum(pnls), 6),
374
- "avg_pnl": round(sum(pnls) / total, 6) if total else 0.0,
375
- "avg_pnl_trade": round(sum(pnls) / total, 6) if total else 0.0,
376
  }
377
-
378
- self._state = {
379
- "open": list(open_trades.values()),
380
- "closed": closed_trades,
381
- "stats": stats,
382
- }
383
-
384
- # ── Log-line parser ──────────────────────────────────────────────────────────────
385
- # Matches lines like:
386
- # [2026-03-31 10:02:07] | INFO | TRADE | V75 | TRADE OPENED | ID=V75_... | ...
387
- # [2026-03-31 10:02:07] | INFO | TRADE | V75 | TRADE CLOSED | ID=V75_... | pnl=-32.62 | ...
388
-
389
- def _parse_line(
390
- self,
391
- line: str,
392
- open_trades: Dict[str, dict],
393
- closed_trades: List[dict],
394
- ) -> None:
395
- if "| TRADE |" not in line:
396
- return
397
-
398
- parts = [p.strip() for p in line.split("|")]
399
- # parts[0]=timestamp, [1]=level, [2]="TRADE", [3]=asset, [4]=event, [5..]=fields
400
- if len(parts) < 5:
401
- return
402
-
403
- try:
404
- ts_str = parts[0].strip("[] ")
405
- ts = datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S").timestamp()
406
- except Exception:
407
- ts = 0.0
408
-
409
- asset = parts[3].strip()
410
- event = parts[4].strip().upper()
411
-
412
- # Build a quick field dict from KEY=VALUE tokens in the rest of the line
413
- fields: Dict[str, str] = {}
414
- for segment in parts[5:]:
415
- for token in segment.split():
416
- if "=" in token:
417
- k, _, v = token.partition("=")
418
- fields[k.strip()] = v.strip().rstrip(",")
419
-
420
- trade_id = fields.get("ID", f"{asset}_{int(ts)}")
421
-
422
- def _f(k: str) -> float:
423
- try: return float(fields.get(k, 0))
424
- except: return 0.0
425
-
426
- if "TRADE OPENED" in event:
427
- open_trades[trade_id] = {
428
  "trade_id": trade_id,
429
- "asset": asset,
430
- "direction": fields.get("Dir", fields.get("Direction", "?")).capitalize(),
431
- "entry": _f("Entry"),
432
- "qty": _f("Qty"),
433
- "open_pnl": 0.0,
434
- "opened_ts": ts,
435
- "opened": ts_str,
436
  }
 
 
 
 
 
437
 
438
- elif "TRADE CLOSED" in event:
439
- # Remove from open if present
440
- trade = open_trades.pop(trade_id, {})
441
- pnl = _f("pnl")
442
- ret = _f("return")
443
- closed_trades.append({
444
- "trade_id": trade_id,
445
- "asset": asset,
446
- "direction": trade.get("direction", "?"),
447
- "entry": trade.get("entry", _f("Entry")),
448
- "exit": _f("Exit") or _f("exit"),
449
- "qty": trade.get("qty", _f("Qty")),
 
450
  "pnl": pnl,
451
- "return_pct": ret,
452
- "closed_ts": ts,
453
- "closed": ts_str,
454
- "duration": round(ts - trade.get("opened_ts", ts), 0) if trade else 0,
455
- })
 
 
 
 
 
 
 
 
 
 
 
456
 
457
  def get_state(self) -> dict:
458
- return self._state
459
-
 
 
 
 
 
 
 
 
 
 
 
 
460
 
461
- # ── Bootstrap parser ─────────────────────────────────────────────────────────────────
462
- # Use built-in TradeLogParser only β€” importing hub_dashboard_service causes a crash
463
- # because it instantiates Flask, starts threads, and calls app.run() at module level.
464
 
465
- _LOG_DIR = os.environ.get("RANKER_LOG_DIR", "/app/ranker_logs")
466
- _trade_parser = TradeLogParser(log_dir=_LOG_DIR)
467
- logger.info("βœ… TradeLogParser using built-in")
 
468
 
469
 
470
  # ══════════════════════════════════════════════════════════════════════════════════════
@@ -488,9 +477,8 @@ manager = ConnectionManager()
488
 
489
  @app.on_event("startup")
490
  async def _on_startup():
491
- """Start background trade-log scanner after the event loop is running."""
492
- _trade_parser.start_background()
493
- logger.info("πŸš€ Trade log scanner started")
494
 
495
 
496
  # ══════════════════════════════════════════════════════════════════════════════════════
@@ -549,6 +537,16 @@ async def ws_publisher_endpoint(websocket: WebSocket, space_name: str):
549
  elif msg_type in ("heartbeat", "identify", "ping"):
550
  pass
551
 
 
 
 
 
 
 
 
 
 
 
552
  else:
553
  # Bug B fix: don't silently swallow. Try to rescue training/voting
554
  # fields that live at the top level of an unrecognised message type.
@@ -676,20 +674,20 @@ async def api_debug_hub():
676
  @app.get("/api/trades")
677
  async def api_trades():
678
  """Full trade state: open trades, recent closed trades, summary stats."""
679
- return JSONResponse(_trade_parser.get_state())
680
 
681
 
682
  @app.get("/api/trades/open")
683
  async def api_trades_open():
684
  """Open trades only."""
685
- state = _trade_parser.get_state()
686
  return JSONResponse({"open": state["open"]})
687
 
688
 
689
  @app.get("/api/trades/closed")
690
  async def api_trades_closed(limit: int = 50):
691
  """Recent closed trades (newest first) + cumulative stats."""
692
- state = _trade_parser.get_state()
693
  return JSONResponse({
694
  "closed": state["closed"][:limit],
695
  "stats": state["stats"],
 
313
 
314
 
315
  # ══════════════════════════════════════════════════════════════════════════════════════
316
+ # SECTION 3 β€” HUB TRADE STORE (in-memory, fed by WebSocket messages)
317
+ # ══════════════════════════════════════════════════════════════════════════════════════
318
+ #
319
+ # ROOT CAUSE FIX: The previous TradeLogParser read from *.log files on THIS container
320
+ # (/app/ranker_logs). Those files NEVER exist on the Executo Hub space β€” they are
321
+ # written by ranker processes running in the individual executor spaces (V75, V50, …),
322
+ # each in their own separate container with their own filesystem.
323
+ #
324
+ # Fix: replace file-based parsing with an in-memory store that is populated when
325
+ # executor spaces send WebSocket trade events to this hub.
326
+ #
327
+ # Executor spaces must send:
328
+ # {"type": "trade_opened", "data": {trade_id, asset, direction, entry, qty, opened_at}}
329
+ # {"type": "trade_closed", "data": {trade_id, asset, pnl, exit_price, closed_at}}
330
+ #
331
+ # See ranker_logging.py β€” the on_event callback already fires for every log entry.
332
+ # Wire it in your executor space's ranker like this:
333
+ #
334
+ # def _trade_ws_hook(entry: dict):
335
+ # cat = entry.get("category", "")
336
+ # msg = entry.get("message", "")
337
+ # if cat != "TRADE":
338
+ # return
339
+ # meta = entry.get("metadata") or {}
340
+ # if "TRADE OPENED" in msg:
341
+ # asyncio.create_task(ws.send_text(json.dumps({
342
+ # "type": "trade_opened",
343
+ # "data": {
344
+ # "trade_id": meta.get("trade_id"),
345
+ # "asset": entry.get("asset"),
346
+ # "direction": meta.get("direction", "?"),
347
+ # "entry": meta.get("price"),
348
+ # "qty": meta.get("qty", 0.0),
349
+ # "opened_at": entry.get("timestamp", ""),
350
+ # },
351
+ # })))
352
+ # elif "TRADE CLOSED" in msg:
353
+ # asyncio.create_task(ws.send_text(json.dumps({
354
+ # "type": "trade_closed",
355
+ # "data": {
356
+ # "trade_id": meta.get("trade_id"),
357
+ # "asset": entry.get("asset"),
358
+ # "pnl": meta.get("pnl", 0.0),
359
+ # "exit_price": meta.get("exit_price"),
360
+ # "closed_at": entry.get("timestamp", ""),
361
+ # },
362
+ # })))
363
+ #
364
+ # ranker_logger = RankerLogger(..., on_event=_trade_ws_hook)
365
  # ══════════════════════════════════════════════════════════════════════════════════════
366
 
367
+ class HubTradeStore:
368
  """
369
+ In-memory trade store populated by WebSocket trade-event messages from
370
+ executor spaces. Thread-safe. Replaces the broken file-based TradeLogParser.
 
 
 
371
  """
372
 
373
+ def __init__(self) -> None:
374
+ self._open: Dict[str, dict] = {} # trade_id β†’ record
375
+ self._closed: List[dict] = [] # newest-first, capped at 500
376
+ self._stats = {
377
+ "total_opened": 0,
378
+ "total_closed": 0,
379
+ "total_pnl": 0.0,
380
+ "win_count": 0,
381
+ "loss_count": 0,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
382
  }
383
+ self._lock = threading.Lock()
384
+
385
+ def open_trade(self, space_name: str, data: dict) -> None:
386
+ trade_id = data.get("trade_id") or f"{space_name}_{int(time.time())}"
387
+ direction = str(data.get("direction", "?")).upper()
388
+ entry_px = data.get("entry") or data.get("price") or 0.0
389
+ with self._lock:
390
+ self._open[trade_id] = {
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
391
  "trade_id": trade_id,
392
+ "asset": data.get("asset", space_name),
393
+ "direction": direction,
394
+ "entry": float(entry_px),
395
+ "qty": float(data.get("qty", 0.0)),
396
+ "opened_at": data.get("opened_at", datetime.utcnow().isoformat()[:19]),
397
+ "status": "OPEN",
 
398
  }
399
+ self._stats["total_opened"] += 1
400
+ logger.info(
401
+ f"[HubTradeStore] OPEN {trade_id} | {direction} @ {entry_px} "
402
+ f"(from {space_name})"
403
+ )
404
 
405
+ def close_trade(self, space_name: str, data: dict) -> None:
406
+ trade_id = data.get("trade_id")
407
+ pnl = float(data.get("pnl", 0.0))
408
+ exit_price = data.get("exit_price")
409
+ with self._lock:
410
+ trade = self._open.pop(trade_id, {}) if trade_id else {}
411
+ closed_rec = {
412
+ "trade_id": trade_id or "UNKNOWN",
413
+ "asset": data.get("asset") or trade.get("asset", space_name),
414
+ "direction": str(data.get("direction") or trade.get("direction", "?")).upper(),
415
+ "entry": data.get("entry") or trade.get("entry", 0.0),
416
+ "exit_price": float(exit_price) if exit_price is not None else None,
417
+ "qty": data.get("qty") or trade.get("qty", 0.0),
418
  "pnl": pnl,
419
+ "closed_at": data.get("closed_at", datetime.utcnow().isoformat()[:19]),
420
+ "status": "CLOSED",
421
+ }
422
+ self._closed.insert(0, closed_rec) # newest-first
423
+ if len(self._closed) > 500:
424
+ self._closed = self._closed[:500]
425
+ self._stats["total_closed"] += 1
426
+ self._stats["total_pnl"] += pnl
427
+ if pnl >= 0:
428
+ self._stats["win_count"] += 1
429
+ else:
430
+ self._stats["loss_count"] += 1
431
+ logger.info(
432
+ f"[HubTradeStore] CLOSE {trade_id} | pnl={pnl:+.4f} "
433
+ f"(from {space_name})"
434
+ )
435
 
436
  def get_state(self) -> dict:
437
+ with self._lock:
438
+ open_list = list(self._open.values())
439
+ closed_list = list(self._closed[:100]) # newest 100 for dashboard
440
+ stats = dict(self._stats)
441
+ total_closed = stats["total_closed"]
442
+ stats["win_rate"] = (
443
+ round(stats["win_count"] / total_closed * 100, 1)
444
+ if total_closed > 0 else 0.0
445
+ )
446
+ return {
447
+ "open": open_list,
448
+ "closed": closed_list,
449
+ "stats": stats,
450
+ }
451
 
 
 
 
452
 
453
+ # ── Bootstrap ─────────────────────────────────────────────────────────────────────────
454
+ _LOG_DIR = os.environ.get("RANKER_LOG_DIR", "/app/ranker_logs")
455
+ _hub_trades = HubTradeStore()
456
+ logger.info("βœ… HubTradeStore initialised β€” awaiting trade_opened/trade_closed WS messages")
457
 
458
 
459
  # ══════════════════════════════════════════════════════════════════════════════════════
 
477
 
478
  @app.on_event("startup")
479
  async def _on_startup():
480
+ """Nothing to start β€” HubTradeStore is in-memory, populated by WS messages."""
481
+ logger.info("πŸš€ HubTradeStore ready (no background scanner needed)")
 
482
 
483
 
484
  # ══════════════════════════════════════════════════════════════════════════════════════
 
537
  elif msg_type in ("heartbeat", "identify", "ping"):
538
  pass
539
 
540
+ elif msg_type == "trade_opened":
541
+ # Executor space opened a trade β€” add to the hub's in-memory store.
542
+ # data = {trade_id, asset, direction, entry, qty, opened_at}
543
+ _hub_trades.open_trade(space_name, data.get("data", data))
544
+
545
+ elif msg_type == "trade_closed":
546
+ # Executor space closed a trade β€” update the hub's in-memory store.
547
+ # data = {trade_id, asset, pnl, exit_price, closed_at}
548
+ _hub_trades.close_trade(space_name, data.get("data", data))
549
+
550
  else:
551
  # Bug B fix: don't silently swallow. Try to rescue training/voting
552
  # fields that live at the top level of an unrecognised message type.
 
674
  @app.get("/api/trades")
675
  async def api_trades():
676
  """Full trade state: open trades, recent closed trades, summary stats."""
677
+ return JSONResponse(_hub_trades.get_state())
678
 
679
 
680
  @app.get("/api/trades/open")
681
  async def api_trades_open():
682
  """Open trades only."""
683
+ state = _hub_trades.get_state()
684
  return JSONResponse({"open": state["open"]})
685
 
686
 
687
  @app.get("/api/trades/closed")
688
  async def api_trades_closed(limit: int = 50):
689
  """Recent closed trades (newest first) + cumulative stats."""
690
+ state = _hub_trades.get_state()
691
  return JSONResponse({
692
  "closed": state["closed"][:limit],
693
  "stats": state["stats"],