KarlQuant commited on
Commit
f42d35d
Β·
verified Β·
1 Parent(s): 60e0300

Update hub_dashboard_service.py

Browse files
Files changed (1) hide show
  1. hub_dashboard_service.py +36 -34
hub_dashboard_service.py CHANGED
@@ -188,13 +188,18 @@ class DashboardState:
188
  # ══════════════════════════════════════════════════════════════════════════════════════
189
  # SECTION 1b β€” TRADE LOG PARSER
190
  # ══════════════════════════════════════════════════════════════════════════════════════
191
-
192
  class TradeLogParser:
193
  """
194
  Tails ranker log files and maintains open/closed trade state.
195
  Runs in a background thread, refreshing every 2 seconds.
196
  """
197
 
 
 
 
 
 
 
198
  def __init__(self, log_dir: str = "./ranker_logs"):
199
  self._lock = threading.RLock()
200
  self._open: dict = {} # trade_id β†’ trade dict
@@ -208,11 +213,9 @@ class TradeLogParser:
208
  "win_count": 0,
209
  "loss_count": 0,
210
  }
211
-
212
- # ── public ────────────────────────────────────────────────────────────────────────
213
 
214
  def start_background(self, interval: float = 2.0) -> None:
215
- """Launch a daemon thread that calls refresh() every `interval` seconds."""
216
  def _loop():
217
  while True:
218
  try:
@@ -226,21 +229,19 @@ class TradeLogParser:
226
  logger.info(f"[TradeLogParser] Started β€” watching {self._log_dir}")
227
 
228
  def refresh(self) -> None:
229
- """Find all log files, read new lines since last position."""
230
  pattern = os.path.join(self._log_dir, "*.log")
231
- files = sorted(glob.glob(pattern))
232
  if not files:
233
- # Also try plain text files
234
  pattern = os.path.join(self._log_dir, "*.txt")
235
- files = sorted(glob.glob(pattern))
236
 
237
  for fpath in files:
238
  self._tail_file(fpath)
239
 
240
  def get_state(self) -> dict:
241
  with self._lock:
242
- open_trades = list(self._open.values())
243
- closed_trades = list(reversed(self._closed[-100:])) # last 100, newest first
244
  stats = dict(self._stats)
245
  stats["win_rate"] = (
246
  round(stats["win_count"] / stats["total_closed"] * 100, 1)
@@ -253,10 +254,7 @@ class TradeLogParser:
253
  "timestamp": datetime.utcnow().isoformat() + "Z",
254
  }
255
 
256
- # ── private ───────────────────────────────────────────────────────────────────────
257
-
258
  def _tail_file(self, fpath: str) -> None:
259
- """Read only new bytes from fpath since last call."""
260
  try:
261
  size = os.path.getsize(fpath)
262
  except OSError:
@@ -264,7 +262,7 @@ class TradeLogParser:
264
 
265
  last = self._last_pos.get(fpath, 0)
266
  if size <= last:
267
- return # no new data
268
 
269
  try:
270
  with open(fpath, "r", encoding="utf-8", errors="replace") as f:
@@ -278,14 +276,16 @@ class TradeLogParser:
278
  self._parse_line(line)
279
 
280
  def _parse_line(self, line: str) -> None:
281
- """Extract trade events from a single log line."""
 
 
 
 
282
  # ── TRADE OPENED ─────────────────────────────────────────────────────────
283
- m = TRADE_OPEN_RE.search(line)
284
  if m:
285
- trade_id, direction, entry = m.group(1), m.group(2), float(m.group(3))
286
- # Extract asset from trade_id prefix (e.g. "V75_1774...")
287
- asset = "_".join(trade_id.split("_")[:-1]) if "_" in trade_id else trade_id
288
- ts = self._parse_ts(line)
289
  with self._lock:
290
  self._open[trade_id] = {
291
  "trade_id": trade_id,
@@ -296,52 +296,54 @@ class TradeLogParser:
296
  "status": "OPEN",
297
  }
298
  self._stats["total_opened"] += 1
 
299
  return
300
 
301
  # ── TRADE CLOSED ─────────────────────────────────────────────────────────
302
- m_pnl = TRADE_CLOSE_RE.search(line)
303
- m_id = TRADE_ID_RE.search(line)
304
- if m_pnl and m_id:
305
  asset = m_id.group(1)
306
- pnl = float(m_pnl.group(1))
307
- ts = self._parse_ts(line)
308
- # find matching open trade by asset prefix
309
  with self._lock:
310
  matched_id = None
311
- for tid in list(self._open):
312
- if tid.startswith(asset + "_"):
313
  matched_id = tid
314
  break
 
315
  trade = self._open.pop(matched_id, None) if matched_id else None
316
  closed = {
317
  "trade_id": matched_id or asset,
318
  "asset": asset,
319
  "direction": trade["direction"] if trade else "?",
320
- "entry": trade["entry"] if trade else 0.0,
321
  "pnl": pnl,
322
  "closed_at": ts,
323
  "status": "CLOSED",
324
  }
325
  self._closed.append(closed)
326
  self._stats["total_closed"] += 1
327
- self._stats["total_pnl"] += pnl
328
  if pnl >= 0:
329
  self._stats["win_count"] += 1
330
  else:
331
  self._stats["loss_count"] += 1
 
332
  return
333
 
334
  # ── REWARD / portfolio_dd line ────────────────────────────────────────────
335
- m = REWARD_RE.search(line)
336
  if m and self._closed:
337
  reward = float(m.group(2))
338
  with self._lock:
339
- self._closed[-1]["reward"] = reward
340
- self._closed[-1]["portfolio_dd"] = float(m.group(4))
341
 
342
  @staticmethod
343
  def _parse_ts(line: str) -> str:
344
- """Extract ISO timestamp from log line prefix, fallback to now."""
345
  parts = line.split("|")
346
  if parts:
347
  ts = parts[0].strip()
 
188
  # ══════════════════════════════════════════════════════════════════════════════════════
189
  # SECTION 1b β€” TRADE LOG PARSER
190
  # ══════════════════════════════════════════════════════════════════════════════════════
 
191
  class TradeLogParser:
192
  """
193
  Tails ranker log files and maintains open/closed trade state.
194
  Runs in a background thread, refreshing every 2 seconds.
195
  """
196
 
197
+ # Updated regex to match ranker's actual log format
198
+ TRADE_OPEN_RE = re.compile(r'\[([^\]]+)\] TRADE OPENED \| ID=(\S+) \| Dir=(\w+) \| Entry=([\d.]+)')
199
+ TRADE_CLOSE_RE = re.compile(r'\[([^\]]+)\] TRADE CLOSED \| reward=[+-]?[\d.]+ \| pnl=([+-]?[\d.]+)')
200
+ TRADE_ID_RE = re.compile(r'\[([^\]]+)\] TRADE CLOSED')
201
+ REWARD_RE = re.compile(r'Closed (\w+) \| reward=([+-]?[\d.]+) \| pnl=([+-]?[\d.]+) \| portfolio_dd=([\d.]+)%')
202
+
203
  def __init__(self, log_dir: str = "./ranker_logs"):
204
  self._lock = threading.RLock()
205
  self._open: dict = {} # trade_id β†’ trade dict
 
213
  "win_count": 0,
214
  "loss_count": 0,
215
  }
216
+ self._debug_counter = 0 # for debugging
 
217
 
218
  def start_background(self, interval: float = 2.0) -> None:
 
219
  def _loop():
220
  while True:
221
  try:
 
229
  logger.info(f"[TradeLogParser] Started β€” watching {self._log_dir}")
230
 
231
  def refresh(self) -> None:
 
232
  pattern = os.path.join(self._log_dir, "*.log")
233
+ files = sorted(glob.glob(pattern))
234
  if not files:
 
235
  pattern = os.path.join(self._log_dir, "*.txt")
236
+ files = sorted(glob.glob(pattern))
237
 
238
  for fpath in files:
239
  self._tail_file(fpath)
240
 
241
  def get_state(self) -> dict:
242
  with self._lock:
243
+ open_trades = list(self._open.values())
244
+ closed_trades = list(reversed(self._closed[-100:]))
245
  stats = dict(self._stats)
246
  stats["win_rate"] = (
247
  round(stats["win_count"] / stats["total_closed"] * 100, 1)
 
254
  "timestamp": datetime.utcnow().isoformat() + "Z",
255
  }
256
 
 
 
257
  def _tail_file(self, fpath: str) -> None:
 
258
  try:
259
  size = os.path.getsize(fpath)
260
  except OSError:
 
262
 
263
  last = self._last_pos.get(fpath, 0)
264
  if size <= last:
265
+ return
266
 
267
  try:
268
  with open(fpath, "r", encoding="utf-8", errors="replace") as f:
 
276
  self._parse_line(line)
277
 
278
  def _parse_line(self, line: str) -> None:
279
+ # Debug first few lines
280
+ if self._debug_counter < 10:
281
+ logger.debug(f"[TradeLogParser] Line: {line.strip()}")
282
+ self._debug_counter += 1
283
+
284
  # ── TRADE OPENED ─────────────────────────────────────────────────────────
285
+ m = self.TRADE_OPEN_RE.search(line)
286
  if m:
287
+ asset, trade_id, direction, entry = m.group(1), m.group(2), m.group(3), float(m.group(4))
288
+ ts = self._parse_ts(line)
 
 
289
  with self._lock:
290
  self._open[trade_id] = {
291
  "trade_id": trade_id,
 
296
  "status": "OPEN",
297
  }
298
  self._stats["total_opened"] += 1
299
+ logger.info(f"[TradeLogParser] Opened: {trade_id} {asset} {direction}")
300
  return
301
 
302
  # ── TRADE CLOSED ─────────────────────────────────────────────────────────
303
+ m_close = self.TRADE_CLOSE_RE.search(line)
304
+ m_id = self.TRADE_ID_RE.search(line)
305
+ if m_close and m_id:
306
  asset = m_id.group(1)
307
+ pnl = float(m_close.group(2))
308
+ ts = self._parse_ts(line)
309
+
310
  with self._lock:
311
  matched_id = None
312
+ for tid, trade in self._open.items():
313
+ if trade["asset"] == asset:
314
  matched_id = tid
315
  break
316
+
317
  trade = self._open.pop(matched_id, None) if matched_id else None
318
  closed = {
319
  "trade_id": matched_id or asset,
320
  "asset": asset,
321
  "direction": trade["direction"] if trade else "?",
322
+ "entry": trade["entry"] if trade else 0.0,
323
  "pnl": pnl,
324
  "closed_at": ts,
325
  "status": "CLOSED",
326
  }
327
  self._closed.append(closed)
328
  self._stats["total_closed"] += 1
329
+ self._stats["total_pnl"] += pnl
330
  if pnl >= 0:
331
  self._stats["win_count"] += 1
332
  else:
333
  self._stats["loss_count"] += 1
334
+ logger.info(f"[TradeLogParser] Closed: {asset} pnl={pnl}")
335
  return
336
 
337
  # ── REWARD / portfolio_dd line ────────────────────────────────────────────
338
+ m = self.REWARD_RE.search(line)
339
  if m and self._closed:
340
  reward = float(m.group(2))
341
  with self._lock:
342
+ self._closed[-1]["reward"] = reward
343
+ self._closed[-1]["portfolio_dd"] = float(m.group(4))
344
 
345
  @staticmethod
346
  def _parse_ts(line: str) -> str:
 
347
  parts = line.split("|")
348
  if parts:
349
  ts = parts[0].strip()