KarlQuant commited on
Commit
a3cb980
Β·
verified Β·
1 Parent(s): b022ced

Upload Quasar_axrvi_ranker.py

Browse files
Files changed (1) hide show
  1. Quasar_axrvi_ranker.py +272 -42
Quasar_axrvi_ranker.py CHANGED
@@ -5845,16 +5845,27 @@ class HybridTrainer:
5845
  self.loss_history.append(loss_dict)
5846
 
5847
  if self.ranker_logger:
 
 
 
 
 
 
 
 
 
 
5848
  self.ranker_logger.training_update(
5849
  step=self.train_step,
5850
  loss=loss.item(),
5851
  lr=self.optimizer.param_groups[0]["lr"],
5852
- asset_count=len(valid),
5853
  )
5854
 
5855
  logger.info(
5856
  f"🧠 [TrainingStep {self.train_step:>6d}] "
5857
  f"total={loss.item():.4f} "
 
5858
  f"rl={l_rl_raw.item():.4f}(n={l_rl.item():.4f}) "
5859
  f"ce={l_ce_raw.item():.4f}(n={l_ce.item():.4f}) "
5860
  f"rank={l_rank_raw.item():.4f} "
@@ -6926,10 +6937,41 @@ class DerivWebSocketClient:
6926
  pass # best-effort cleanup
6927
 
6928
  async def send_message(self, msg: dict) -> bool:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6929
  try:
6930
  msg["req_id"] = self._next_msg_id()
6931
- await self.ws.send(json.dumps(msg))
 
 
 
6932
  return True
 
 
 
 
 
 
 
 
 
 
 
 
 
6933
  except Exception as e:
6934
  logger.error(f"❌ Send error: {e}")
6935
  return False
@@ -6977,11 +7019,20 @@ class DerivWebSocketClient:
6977
  symbols_to_restore = list(self._subscribed_symbols)
6978
  for symbol in symbols_to_restore:
6979
  try:
6980
- await self.ws.send(json.dumps({
6981
- "ticks": symbol, "subscribe": 1,
6982
- "req_id": self._next_msg_id()
6983
- }))
 
 
 
 
 
6984
  logger.info(f"πŸ”„ Re-subscribed to {symbol} after reconnect")
 
 
 
 
6985
  except Exception as re_err:
6986
  logger.warning(f"⚠️ Re-subscription failed for {symbol}: {re_err}")
6987
  logger.info(
@@ -7061,7 +7112,7 @@ class QuasarAXRVIBridge:
7061
  hub_ws_url: str = os.environ.get("QUASAR_HUB_URL", "ws://localhost:7860/ws/subscribe"),
7062
  enable_logging: bool = True,
7063
  checkpoint_dir: str = "./Ranker10", # new folder for 10-asset build
7064
- resume: bool = False, # FIX: fresh start
7065
  hf_repo_id: Optional[str] = "KarlQuant/quasar-axrvi-v10", # new HF repo (10 assets)
7066
  ):
7067
  self.config = config or AssetRankerConfig()
@@ -7069,6 +7120,23 @@ class QuasarAXRVIBridge:
7069
  self.reward_strategy = reward_strategy
7070
  self.enable_logging = enable_logging and LOGGING_AVAILABLE
7071
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7072
  # ── Checkpoint manager (local + optional HF sync) ─────────────────────
7073
  self.checkpoint_mgr = RankerCheckpointManager(
7074
  checkpoint_dir=checkpoint_dir,
@@ -7076,6 +7144,26 @@ class QuasarAXRVIBridge:
7076
  )
7077
  self.resume = resume
7078
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7079
  # ── Structured logger (optional) ──────────────────────────────────────
7080
  self.ranker_logger: Optional[object] = None
7081
  self.log_bridge: Optional[object] = None
@@ -7176,6 +7264,23 @@ class QuasarAXRVIBridge:
7176
  self._last_final_scores: Optional[np.ndarray] = None
7177
  self._last_value_estimates: Dict[str, float] = {}
7178
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7179
  # [S2] Pending-episode store: keyed by trade_id.
7180
  # s_t is captured at trade-open time; s_{t+1} is captured at close.
7181
  # This gives a proper (s_t, a_t, r_t, s_{t+1}) tuple with s_t ∈ F_t,
@@ -8183,20 +8288,35 @@ class QuasarAXRVIBridge:
8183
  # has dropped below the 2-trade floor. If so, trigger a fresh
8184
  # rank_and_gate() immediately β€” don't wait for the next scheduled
8185
  # _rank_loop tick β€” so the minimum is restored as fast as possible.
 
 
 
 
 
 
 
8186
  if closed_any:
8187
  open_count = len(self.position_mgr.get_open_trades())
8188
  if open_count < 4:
8189
- logger.warning(
8190
- f"[monitor_positions] ⚠️ REFILL TRIGGER β€” "
8191
- f"open_count={open_count} < 4 after close. "
8192
- f"Calling rank_and_gate() immediately to restore top-4."
8193
- )
8194
- try:
8195
- await self.rank_and_gate()
8196
- except Exception as refill_err:
8197
- logger.error(
8198
- f"[monitor_positions] ❌ Refill rank_and_gate error: {refill_err}"
 
8199
  )
 
 
 
 
 
 
 
8200
 
8201
  await asyncio.sleep(2)
8202
  except Exception as e:
@@ -8307,17 +8427,22 @@ class QuasarAXRVIBridge:
8307
  """
8308
  Fire-and-forget HTTP POST of the current AXRVI‑scored ranking list to
8309
  the hub's /api/flip/rankings endpoint, including the real flip_direction.
 
 
 
 
 
 
 
 
8310
  """
8311
  if not ranked:
8312
  return
8313
- try:
8314
- import urllib.request as _urlreq
8315
 
8316
- # Build payload with flip_direction from each asset's snapshot.
8317
- # FIX: fall back to r.dominant_signal (from rank_risk_neutral, which
8318
- # reads snap.dominant_signal at ranking time) when the snapshot is
8319
- # missing or still NEUTRAL β€” prevents all non-publishing assets from
8320
- # being broadcast with flip_direction="NEUTRAL" which blocks Gate A.
8321
  rankings_payload = []
8322
  for r in ranked:
8323
  snap = self.hub_subscriber.get_snapshot(r.space_name)
@@ -8325,15 +8450,15 @@ class QuasarAXRVIBridge:
8325
  flip_dir = (
8326
  snap_signal
8327
  if snap_signal not in ("NEUTRAL", "NONE", None, "")
8328
- else r.dominant_signal # set from snap at rank_risk_neutral time
8329
  )
8330
  rankings_payload.append({
8331
  "space_name": r.space_name,
8332
  "score": r.score,
8333
  "final_priority": r.final_priority,
8334
  "rank": r.rank,
8335
- "dominant_signal": flip_dir, # real BUY/SELL/NEUTRAL
8336
- "flip_direction": flip_dir, # explicit field for executor
8337
  "avn_accuracy": r.avn_accuracy,
8338
  "signal_confidence": r.signal_confidence,
8339
  "epistemic_std": r.epistemic_std,
@@ -8342,17 +8467,63 @@ class QuasarAXRVIBridge:
8342
 
8343
  payload = json.dumps({"rankings": rankings_payload}).encode()
8344
  url = f"{self._hub_http_url}/api/flip/rankings"
8345
- req = _urlreq.Request(
8346
- url,
8347
- data=payload,
8348
- headers={"Content-Type": "application/json"},
8349
- method="POST",
8350
- )
8351
- _urlreq.urlopen(req, timeout=0.5)
8352
- except Exception as _e:
8353
- logger.debug(f"[Rankings] Hub push skipped: {_e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8354
 
8355
  async def rank_and_gate(self) -> None:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8356
  """
8357
  v6/v7 Shreve Ranking Cycle:
8358
  1. Data readiness check
@@ -8870,6 +9041,9 @@ class QuasarAXRVIBridge:
8870
  self.ws_client.listen(),
8871
  self._rank_loop(),
8872
  self.monitor_positions(),
 
 
 
8873
  )
8874
  except asyncio.CancelledError:
8875
  pass
@@ -8899,9 +9073,62 @@ class QuasarAXRVIBridge:
8899
  try:
8900
  await self.rank_and_gate()
8901
  except Exception as e:
8902
- logger.error(f"❌ Rank loop error: {e}")
 
 
 
 
8903
  await asyncio.sleep(self.config.update_frequency_seconds)
8904
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8905
  def start_sync(self) -> None:
8906
  """Start in synchronous (threading) mode."""
8907
  def _run_loop():
@@ -10702,7 +10929,7 @@ async def run_live_trading_system(
10702
  enable_logging: bool = True,
10703
  shreve_config: Optional[ShreveConfig] = None,
10704
  checkpoint_dir: str = "./Ranker10",
10705
- resume: bool = False, # FIX: fresh start
10706
  hf_repo_id: Optional[str] = "KarlQuant/quasar-axrvi-v10", # new HF repo (10 assets)
10707
  ) -> None:
10708
  config = AssetRankerConfig(
@@ -11017,10 +11244,13 @@ def _parse_args():
11017
  help="[S7] Gate E martingale deviation threshold (default 0.05)")
11018
  parser.add_argument("--checkpoint-dir", default="./Ranker10",
11019
  help="Directory for full-state checkpoints (default ./Ranker10)")
11020
- parser.add_argument("--no-resume", dest="no_resume", action="store_true", default=True,
11021
- help="Default True β€” always fresh start.")
 
 
 
11022
  parser.add_argument("--resume", dest="no_resume", action="store_false",
11023
- help="Restore from latest Ranker10 checkpoint")
11024
  parser.add_argument("--hf-repo", default=None,
11025
  metavar="OWNER/REPO",
11026
  help="Hugging Face Dataset repo for checkpoint sync "
@@ -11077,7 +11307,7 @@ if __name__ == "__main__":
11077
  hub_ws_url = args.hub,
11078
  enable_logging = not args.no_logs,
11079
  checkpoint_dir = args.checkpoint_dir,
11080
- resume = not args.no_resume, # default False β€” always start fresh on Ranker10
11081
  hf_repo_id = args.hf_repo or "KarlQuant/quasar-axrvi-v10",
11082
  )
11083
 
 
5845
  self.loss_history.append(loss_dict)
5846
 
5847
  if self.ranker_logger:
5848
+ # [LABEL FIX] Previously passed len(valid) here β€” that's the
5849
+ # BATCH SIZE (number of episodes in this training step), not
5850
+ # the asset count. With TRAIN_BATCH=2 the field always showed
5851
+ # "asset_count=2" which looked like "only 2 of 10 assets are
5852
+ # training", but in fact every episode carries the full
5853
+ # (N=10, T, F) tensor and all 10 assets are trained per step.
5854
+ #
5855
+ # We now pass the TRUE asset count (model.num_assets) so the
5856
+ # dashboard/log reflects reality, and also log batch_size in
5857
+ # the human-readable line below so batch health stays visible.
5858
  self.ranker_logger.training_update(
5859
  step=self.train_step,
5860
  loss=loss.item(),
5861
  lr=self.optimizer.param_groups[0]["lr"],
5862
+ asset_count=self.model.num_assets,
5863
  )
5864
 
5865
  logger.info(
5866
  f"🧠 [TrainingStep {self.train_step:>6d}] "
5867
  f"total={loss.item():.4f} "
5868
+ f"assets={self.model.num_assets} batch={len(valid)}/{len(episodes)} "
5869
  f"rl={l_rl_raw.item():.4f}(n={l_rl.item():.4f}) "
5870
  f"ce={l_ce_raw.item():.4f}(n={l_ce.item():.4f}) "
5871
  f"rank={l_rank_raw.item():.4f} "
 
6937
  pass # best-effort cleanup
6938
 
6939
  async def send_message(self, msg: dict) -> bool:
6940
+ """
6941
+ Send a message to Deriv with a hard 10s timeout.
6942
+
6943
+ [HANG FIX β€” Layer 1] The previous implementation awaited self.ws.send()
6944
+ with no timeout. On a half-open TCP connection (silent proxy drop,
6945
+ NAT table flush, HF Spaces idle reap) the kernel send buffer fills
6946
+ and this await never returns, freezing every caller β€” including
6947
+ rank_and_gate() via _handle_rank_rotation β†’ _close_position, and
6948
+ via _ensure_minimum_trades β†’ process_axrvi_signal. No more rankings
6949
+ POST to the hub, its 60s TTL expires, dashboard shows 0.0000.
6950
+
6951
+ On timeout we mark the connection dead and schedule reconnect()
6952
+ asynchronously so we don't block the caller. Callers see False and
6953
+ can react (same as any other send failure).
6954
+ """
6955
  try:
6956
  msg["req_id"] = self._next_msg_id()
6957
+ await asyncio.wait_for(
6958
+ self.ws.send(json.dumps(msg)),
6959
+ timeout=10.0,
6960
+ )
6961
  return True
6962
+ except asyncio.TimeoutError:
6963
+ logger.error(
6964
+ "❌ Deriv ws.send() stalled >10s β€” connection is half-open. "
6965
+ "Scheduling reconnect."
6966
+ )
6967
+ self.connected = False
6968
+ # Fire-and-forget reconnect so we don't block the hung caller.
6969
+ try:
6970
+ asyncio.get_running_loop().create_task(self.reconnect())
6971
+ except RuntimeError:
6972
+ # No running loop (shouldn't happen here, but be safe).
6973
+ pass
6974
+ return False
6975
  except Exception as e:
6976
  logger.error(f"❌ Send error: {e}")
6977
  return False
 
7019
  symbols_to_restore = list(self._subscribed_symbols)
7020
  for symbol in symbols_to_restore:
7021
  try:
7022
+ # [HANG FIX β€” Layer 1] Same 10s cap as send_message so a
7023
+ # stalled re-subscribe can't hang the reconnect task.
7024
+ await asyncio.wait_for(
7025
+ self.ws.send(json.dumps({
7026
+ "ticks": symbol, "subscribe": 1,
7027
+ "req_id": self._next_msg_id()
7028
+ })),
7029
+ timeout=10.0,
7030
+ )
7031
  logger.info(f"πŸ”„ Re-subscribed to {symbol} after reconnect")
7032
+ except asyncio.TimeoutError:
7033
+ logger.warning(
7034
+ f"⚠️ Re-subscription to {symbol} timed out β€” will retry on next reconnect"
7035
+ )
7036
  except Exception as re_err:
7037
  logger.warning(f"⚠️ Re-subscription failed for {symbol}: {re_err}")
7038
  logger.info(
 
7112
  hub_ws_url: str = os.environ.get("QUASAR_HUB_URL", "ws://localhost:7860/ws/subscribe"),
7113
  enable_logging: bool = True,
7114
  checkpoint_dir: str = "./Ranker10", # new folder for 10-asset build
7115
+ resume: bool = True, # [RESUME FIX] default ON β€” see __init__
7116
  hf_repo_id: Optional[str] = "KarlQuant/quasar-axrvi-v10", # new HF repo (10 assets)
7117
  ):
7118
  self.config = config or AssetRankerConfig()
 
7120
  self.reward_strategy = reward_strategy
7121
  self.enable_logging = enable_logging and LOGGING_AVAILABLE
7122
 
7123
+ # ── [RESUME FIX] Environment variable override ────────────────────────
7124
+ # HF Spaces entrypoints usually can't pass CLI flags β€” they just run
7125
+ # `python Quasar_axrvi_ranker.py`. To control resume behaviour there,
7126
+ # set the QUASAR_RESUME environment variable in the Space's secrets:
7127
+ # QUASAR_RESUME=0 / false / no β†’ start fresh (overrides constructor)
7128
+ # QUASAR_RESUME=1 / true / yes β†’ resume from latest checkpoint
7129
+ # (unset) β†’ use the constructor argument
7130
+ _env_resume = os.environ.get("QUASAR_RESUME", "").strip().lower()
7131
+ if _env_resume in ("0", "false", "no", "off"):
7132
+ logger.warning(
7133
+ "[RESUME] QUASAR_RESUME env var forces fresh start "
7134
+ "(overriding resume=True constructor argument)"
7135
+ )
7136
+ resume = False
7137
+ elif _env_resume in ("1", "true", "yes", "on"):
7138
+ resume = True
7139
+
7140
  # ── Checkpoint manager (local + optional HF sync) ─────────────────────
7141
  self.checkpoint_mgr = RankerCheckpointManager(
7142
  checkpoint_dir=checkpoint_dir,
 
7144
  )
7145
  self.resume = resume
7146
 
7147
+ # ── [RESUME FIX] Startup banner ───────────────────────────────────────
7148
+ # Prints resume state + HF sync status in a single eyeballable block so
7149
+ # you can tell at a glance whether checkpoints will actually be used
7150
+ # and mirrored. The most common failure mode on Spaces is a missing
7151
+ # HF_TOKEN secret β€” that goes silent without this banner.
7152
+ _hf_enabled = self.checkpoint_mgr._hf.enabled
7153
+ _hf_token = "βœ… set" if os.environ.get("HF_TOKEN") else "❌ missing"
7154
+ _hf_repo = os.environ.get("HF_REPO_ID") or hf_repo_id or "β€”"
7155
+ logger.info(
7156
+ "\n" + "═" * 66 + "\n"
7157
+ f" πŸ“¦ CHECKPOINT CONFIG\n"
7158
+ f" resume : {self.resume} "
7159
+ f"({'will attempt to restore on start' if self.resume else 'FRESH START β€” no restore'})\n"
7160
+ f" checkpoint_dir: {checkpoint_dir}\n"
7161
+ f" hf_repo : {_hf_repo}\n"
7162
+ f" hf_token : {_hf_token}\n"
7163
+ f" hf_sync : {'βœ… enabled' if _hf_enabled else '❌ disabled (set HF_TOKEN + HF_REPO_ID)'}\n"
7164
+ + "═" * 66
7165
+ )
7166
+
7167
  # ── Structured logger (optional) ──────────────────────────────────────
7168
  self.ranker_logger: Optional[object] = None
7169
  self.log_bridge: Optional[object] = None
 
7264
  self._last_final_scores: Optional[np.ndarray] = None
7265
  self._last_value_estimates: Dict[str, float] = {}
7266
 
7267
+ # ── [HANG FIX β€” Layer 2] Re-entrancy guard for rank_and_gate ──────────
7268
+ # Both _rank_loop and monitor_positions' refill trigger call
7269
+ # rank_and_gate(). Without this lock they can enter concurrently and
7270
+ # corrupt shared state (rank_count, _last_value_estimates, the ranked
7271
+ # list, and the position manager). The lock is created lazily in
7272
+ # rank_and_gate() itself because asyncio.Lock() must be created inside
7273
+ # a running event loop on Python <3.10.
7274
+ self._rank_lock: Optional[asyncio.Lock] = None
7275
+
7276
+ # ── [HANG FIX β€” Layer 3] Watchdog timestamp ───────────────────────────
7277
+ # Updated at the END of every successful rank_and_gate() cycle.
7278
+ # _rank_watchdog() checks this and force-closes the ws (triggering
7279
+ # reconnect) if no cycle has completed within RANK_STALL_THRESHOLD_S.
7280
+ # This is the safety net that recovers from ANY cause of stall β€”
7281
+ # not just the ws.send() one we know about.
7282
+ self._last_rank_complete_ts: float = time.time()
7283
+
7284
  # [S2] Pending-episode store: keyed by trade_id.
7285
  # s_t is captured at trade-open time; s_{t+1} is captured at close.
7286
  # This gives a proper (s_t, a_t, r_t, s_{t+1}) tuple with s_t ∈ F_t,
 
8288
  # has dropped below the 2-trade floor. If so, trigger a fresh
8289
  # rank_and_gate() immediately β€” don't wait for the next scheduled
8290
  # _rank_loop tick β€” so the minimum is restored as fast as possible.
8291
+ #
8292
+ # [HANG FIX β€” Layer 2] Only trigger a refill if rank_and_gate
8293
+ # is NOT currently running. Re-entering rank_and_gate while the
8294
+ # scheduled _rank_loop call is still inside it corrupts shared
8295
+ # state and can deadlock on the Deriv WS. If it's running, the
8296
+ # next scheduled tick will pick up the refill need within
8297
+ # update_frequency_seconds β€” acceptable latency vs a deadlock.
8298
  if closed_any:
8299
  open_count = len(self.position_mgr.get_open_trades())
8300
  if open_count < 4:
8301
+ if self._rank_lock is not None and self._rank_lock.locked():
8302
+ logger.info(
8303
+ f"[monitor_positions] ⏩ REFILL SKIPPED β€” "
8304
+ f"rank_and_gate already running; next scheduled "
8305
+ f"tick will restore top-4 (open_count={open_count})"
8306
+ )
8307
+ else:
8308
+ logger.warning(
8309
+ f"[monitor_positions] ⚠️ REFILL TRIGGER β€” "
8310
+ f"open_count={open_count} < 4 after close. "
8311
+ f"Calling rank_and_gate() immediately to restore top-4."
8312
  )
8313
+ try:
8314
+ await self.rank_and_gate()
8315
+ except Exception as refill_err:
8316
+ logger.error(
8317
+ f"[monitor_positions] ❌ Refill rank_and_gate error: {refill_err}",
8318
+ exc_info=True,
8319
+ )
8320
 
8321
  await asyncio.sleep(2)
8322
  except Exception as e:
 
8427
  """
8428
  Fire-and-forget HTTP POST of the current AXRVI‑scored ranking list to
8429
  the hub's /api/flip/rankings endpoint, including the real flip_direction.
8430
+
8431
+ [HANG FIX β€” Layers 4 & 5]
8432
+ β€’ Previously the timeout was 0.5s and failures logged at DEBUG, so
8433
+ transient hub slowness silently dropped rankings with no visibility.
8434
+ β€’ Now: timeout=3.0s, failures logged at WARNING.
8435
+ β€’ The HTTP POST runs in a thread-pool executor so hub latency never
8436
+ blocks the async rank loop β€” even if the hub is completely dead,
8437
+ the ranker keeps producing rankings.
8438
  """
8439
  if not ranked:
8440
  return
 
 
8441
 
8442
+ # Capture the payload here (in the async caller's context) so the
8443
+ # snapshot read is consistent, then hand the blocking HTTP POST to
8444
+ # the executor.
8445
+ try:
 
8446
  rankings_payload = []
8447
  for r in ranked:
8448
  snap = self.hub_subscriber.get_snapshot(r.space_name)
 
8450
  flip_dir = (
8451
  snap_signal
8452
  if snap_signal not in ("NEUTRAL", "NONE", None, "")
8453
+ else r.dominant_signal
8454
  )
8455
  rankings_payload.append({
8456
  "space_name": r.space_name,
8457
  "score": r.score,
8458
  "final_priority": r.final_priority,
8459
  "rank": r.rank,
8460
+ "dominant_signal": flip_dir,
8461
+ "flip_direction": flip_dir,
8462
  "avn_accuracy": r.avn_accuracy,
8463
  "signal_confidence": r.signal_confidence,
8464
  "epistemic_std": r.epistemic_std,
 
8467
 
8468
  payload = json.dumps({"rankings": rankings_payload}).encode()
8469
  url = f"{self._hub_http_url}/api/flip/rankings"
8470
+ except Exception as build_err:
8471
+ logger.warning(f"[Rankings] Payload build failed: {build_err}")
8472
+ return
8473
+
8474
+ def _do_post() -> None:
8475
+ """Blocking HTTP POST β€” runs on a worker thread, not the event loop."""
8476
+ try:
8477
+ import urllib.request as _urlreq
8478
+ req = _urlreq.Request(
8479
+ url,
8480
+ data=payload,
8481
+ headers={"Content-Type": "application/json"},
8482
+ method="POST",
8483
+ )
8484
+ _urlreq.urlopen(req, timeout=3.0)
8485
+ except Exception as post_err:
8486
+ # Warn-level so we see repeated failures in the log and can
8487
+ # diagnose whether the hub is the problem next time.
8488
+ logger.warning(f"[Rankings] Hub push failed: {post_err}")
8489
+
8490
+ try:
8491
+ loop = asyncio.get_running_loop()
8492
+ loop.run_in_executor(None, _do_post)
8493
+ except RuntimeError:
8494
+ # No running loop (e.g. called from sync context) β€” do it inline.
8495
+ _do_post()
8496
 
8497
  async def rank_and_gate(self) -> None:
8498
+ """
8499
+ [HANG FIX β€” Layers 2 & 3] Thin wrapper around _rank_and_gate_impl
8500
+ that holds self._rank_lock for the duration of the cycle (so
8501
+ _rank_loop and the monitor_positions refill trigger cannot re-enter
8502
+ concurrently) and stamps self._last_rank_complete_ts so the watchdog
8503
+ can detect stalls.
8504
+
8505
+ Also logs cycle timing at WARNING level if a cycle exceeds 5s β€”
8506
+ slow cycles are the early symptom of a pending hang.
8507
+ """
8508
+ # asyncio.Lock must be created inside a running loop on Python <3.10
8509
+ if self._rank_lock is None:
8510
+ self._rank_lock = asyncio.Lock()
8511
+
8512
+ t0 = time.time()
8513
+ async with self._rank_lock:
8514
+ try:
8515
+ await self._rank_and_gate_impl()
8516
+ finally:
8517
+ # Stamp even on failure: a fast failure is healthier than a
8518
+ # stalled coroutine. The watchdog should only fire when the
8519
+ # rank loop is actually stuck, not when it's crashing loudly.
8520
+ self._last_rank_complete_ts = time.time()
8521
+
8522
+ elapsed = time.time() - t0
8523
+ if elapsed > 5.0:
8524
+ logger.warning(f"[RankCycle] completed slowly in {elapsed:.2f}s")
8525
+
8526
+ async def _rank_and_gate_impl(self) -> None:
8527
  """
8528
  v6/v7 Shreve Ranking Cycle:
8529
  1. Data readiness check
 
9041
  self.ws_client.listen(),
9042
  self._rank_loop(),
9043
  self.monitor_positions(),
9044
+ # [HANG FIX β€” Layer 3] Watchdog task: auto-recover if the rank
9045
+ # loop stops completing cycles (ws.send() hang, deadlock, etc.)
9046
+ self._rank_watchdog(),
9047
  )
9048
  except asyncio.CancelledError:
9049
  pass
 
9073
  try:
9074
  await self.rank_and_gate()
9075
  except Exception as e:
9076
+ # [HANG FIX β€” Layer 4] exc_info=True so we get a full traceback
9077
+ # in the ranker log the next time something breaks silently.
9078
+ # Previously errors were logged as a one-liner with no stack
9079
+ # frame, making root-cause diagnosis impossible.
9080
+ logger.error(f"❌ Rank loop error: {e}", exc_info=True)
9081
  await asyncio.sleep(self.config.update_frequency_seconds)
9082
 
9083
+ # ── [HANG FIX β€” Layer 3] Rank-loop watchdog ───────────────────────────────
9084
+ # Periodically checks self._last_rank_complete_ts. If no rank cycle has
9085
+ # completed within RANK_STALL_THRESHOLD_S, assumes the rank loop is hung
9086
+ # (almost always on a ws.send() to a half-open Deriv connection) and
9087
+ # force-closes the websocket. That raises ConnectionClosed inside any
9088
+ # pending send/recv, unblocking the await and triggering reconnect().
9089
+ #
9090
+ # This is the safety net: even if a NEW bug introduces a different hang,
9091
+ # the system auto-recovers within RANK_STALL_THRESHOLD_S instead of
9092
+ # sitting dead until a human restarts the Space.
9093
+ RANK_STALL_THRESHOLD_S: float = 120.0 # 4Γ— the expected worst-case cycle
9094
+ RANK_WATCHDOG_POLL_S: float = 30.0
9095
+
9096
+ async def _rank_watchdog(self) -> None:
9097
+ """Force-close the Deriv ws if the rank loop stops completing cycles."""
9098
+ logger.info(
9099
+ f"πŸ• [RankWatchdog] started | stall_threshold={self.RANK_STALL_THRESHOLD_S}s "
9100
+ f"| poll_interval={self.RANK_WATCHDOG_POLL_S}s"
9101
+ )
9102
+ while self.running:
9103
+ await asyncio.sleep(self.RANK_WATCHDOG_POLL_S)
9104
+ if not self.running:
9105
+ break
9106
+ since_last = time.time() - self._last_rank_complete_ts
9107
+ if since_last > self.RANK_STALL_THRESHOLD_S:
9108
+ logger.critical(
9109
+ f"🚨 [RankWatchdog] Rank loop has not completed a cycle for "
9110
+ f"{since_last:.0f}s (threshold={self.RANK_STALL_THRESHOLD_S}s). "
9111
+ f"Forcing Deriv ws close to unblock any pending send()."
9112
+ )
9113
+ # Force-close the ws. This raises ConnectionClosed inside
9114
+ # whatever coroutine is awaiting ws.send() or ws.recv(),
9115
+ # unblocking the rank loop. listen()'s except branch will
9116
+ # then drive reconnect().
9117
+ try:
9118
+ if self.ws_client and self.ws_client.ws:
9119
+ await self.ws_client.ws.close()
9120
+ except Exception as close_err:
9121
+ logger.warning(
9122
+ f"[RankWatchdog] ws.close() raised (expected on dead socket): {close_err}"
9123
+ )
9124
+ # Reset the stamp so we don't spam CRITICAL every poll interval
9125
+ # while reconnect is in progress.
9126
+ self._last_rank_complete_ts = time.time()
9127
+ else:
9128
+ logger.debug(
9129
+ f"[RankWatchdog] healthy | last_cycle={since_last:.1f}s ago"
9130
+ )
9131
+
9132
  def start_sync(self) -> None:
9133
  """Start in synchronous (threading) mode."""
9134
  def _run_loop():
 
10929
  enable_logging: bool = True,
10930
  shreve_config: Optional[ShreveConfig] = None,
10931
  checkpoint_dir: str = "./Ranker10",
10932
+ resume: bool = True, # [RESUME FIX] default ON β€” env var QUASAR_RESUME overrides
10933
  hf_repo_id: Optional[str] = "KarlQuant/quasar-axrvi-v10", # new HF repo (10 assets)
10934
  ) -> None:
10935
  config = AssetRankerConfig(
 
11244
  help="[S7] Gate E martingale deviation threshold (default 0.05)")
11245
  parser.add_argument("--checkpoint-dir", default="./Ranker10",
11246
  help="Directory for full-state checkpoints (default ./Ranker10)")
11247
+ # [RESUME FIX] Default is now --resume (load latest checkpoint).
11248
+ # Pass --no-resume to deliberately start fresh.
11249
+ # Env var QUASAR_RESUME=0|false further overrides this in the bridge.
11250
+ parser.add_argument("--no-resume", dest="no_resume", action="store_true", default=False,
11251
+ help="Start fresh, ignoring any existing checkpoint (default: resume).")
11252
  parser.add_argument("--resume", dest="no_resume", action="store_false",
11253
+ help="Restore from the latest Ranker10 checkpoint (default).")
11254
  parser.add_argument("--hf-repo", default=None,
11255
  metavar="OWNER/REPO",
11256
  help="Hugging Face Dataset repo for checkpoint sync "
 
11307
  hub_ws_url = args.hub,
11308
  enable_logging = not args.no_logs,
11309
  checkpoint_dir = args.checkpoint_dir,
11310
+ resume = not args.no_resume, # [RESUME FIX] default True β€” env var QUASAR_RESUME overrides
11311
  hf_repo_id = args.hf_repo or "KarlQuant/quasar-axrvi-v10",
11312
  )
11313