RayMelius Claude Opus 4.6 commited on
Commit
018d9a4
Β·
1 Parent(s): db41009

Fix AI Analyst duplicate output and CH trade attribution

Browse files

- Dashboard: remove direct SSE broadcast from _generate_and_broadcast(),
let Kafka consumer handle it (was causing every insight to appear twice)
- CH trade consumer: match matcher's field names (buy_id/sell_id) instead
of buy_order_id/sell_order_id, so trades are actually attributed to members
- Notebook: support both Kaggle (dataset) and Colab (Drive) checkpoints,
set SAVE_STEPS=10

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

clearing_house/ch_ai_trader.py CHANGED
@@ -118,8 +118,8 @@ def _trade_consumer_thread():
118
  break
119
  try:
120
  trade = msg.value
121
- buy_id = trade.get("buy_order_id", "")
122
- sell_id = trade.get("sell_order_id", "")
123
  symbol = trade.get("symbol", "")
124
  price = float(trade.get("price", 0))
125
  qty = int(trade.get("quantity", 0))
 
118
  break
119
  try:
120
  trade = msg.value
121
+ buy_id = trade.get("buy_id") or trade.get("buy_order_id") or ""
122
+ sell_id = trade.get("sell_id") or trade.get("sell_order_id") or ""
123
  symbol = trade.get("symbol", "")
124
  price = float(trade.get("price", 0))
125
  qty = int(trade.get("quantity", 0))
dashboard/dashboard.py CHANGED
@@ -209,10 +209,6 @@ def _generate_and_broadcast():
209
  text, source = _call_llm(prompt, force_provider=_active_provider, force_model=_active_model)
210
  if text:
211
  insight = {"text": text, "source": source, "timestamp": time.time()}
212
- with lock:
213
- ai_insights_cache.insert(0, insight)
214
- ai_insights_cache[:] = ai_insights_cache[:10]
215
- broadcast_event("ai_insight", insight)
216
  try:
217
  get_producer().send(Config.AI_INSIGHTS_TOPIC, insight)
218
  except Exception:
 
209
  text, source = _call_llm(prompt, force_provider=_active_provider, force_model=_active_model)
210
  if text:
211
  insight = {"text": text, "source": source, "timestamp": time.time()}
 
 
 
 
212
  try:
213
  get_producer().send(Config.AI_INSIGHTS_TOPIC, insight)
214
  except Exception:
notebooks/ch_trader_finetune.ipynb CHANGED
@@ -79,13 +79,13 @@
79
  {
80
  "cell_type": "markdown",
81
  "id": "g7cean6ejyj",
82
- "source": "## 1b. Checkpoint Setup (Resumable Training)\n\nCheckpoints are saved to **Google Drive** so they survive Colab session disconnects.\n- If a checkpoint is found, training resumes automatically from where it left off.\n- Adapters are also pushed to HF Hub every `SAVE_STEPS` steps as a backup.",
83
  "metadata": {}
84
  },
85
  {
86
  "cell_type": "code",
87
  "id": "egq0dp9csuo",
88
- "source": "import shutil, math\nfrom transformers.trainer_utils import get_last_checkpoint\n\n# ── Mount Google Drive for persistent checkpoint storage ──────────────────────\ntry:\n from google.colab import drive\n drive.mount(\"/content/drive\", force_remount=False)\n DRIVE_CKPT_DIR = \"/content/drive/MyDrive/stockex-ch-checkpoints\"\n USE_DRIVE = True\n print(f\"Google Drive mounted. Checkpoints β†’ {DRIVE_CKPT_DIR}\")\nexcept Exception:\n DRIVE_CKPT_DIR = None\n USE_DRIVE = False\n print(\"Google Drive not available β€” checkpoints saved locally only.\")\n\nos.makedirs(OUTPUT_DIR, exist_ok=True)\nif USE_DRIVE:\n os.makedirs(DRIVE_CKPT_DIR, exist_ok=True)\n drive_ckpt = get_last_checkpoint(DRIVE_CKPT_DIR)\n if drive_ckpt:\n local_ckpt_name = os.path.basename(drive_ckpt)\n local_ckpt_path = os.path.join(OUTPUT_DIR, local_ckpt_name)\n if not os.path.exists(local_ckpt_path):\n print(f\"Restoring checkpoint from Drive: {drive_ckpt}\")\n shutil.copytree(drive_ckpt, local_ckpt_path)\n\n# ── Detect latest local checkpoint ────────────────────────────────────────────\nRESUME_FROM = get_last_checkpoint(OUTPUT_DIR)\n\nSAVE_STEPS = 50\n\n# ── Resume summary ─────────────────────────────────────────────────────────────\nif RESUME_FROM:\n # Parse completed steps from checkpoint folder name (e.g. \"checkpoint-350\")\n completed_steps = int(os.path.basename(RESUME_FROM).split(\"-\")[-1])\n\n # Total steps = ceil(train_size / (batch * grad_accum)) * epochs\n train_size = int(DATASET_SIZE * 0.9)\n steps_per_epoch = math.ceil(train_size / (BATCH_SIZE * GRAD_ACCUM))\n total_steps = steps_per_epoch * NUM_EPOCHS\n remaining = max(0, total_steps - completed_steps)\n pct_done = 100 * completed_steps / total_steps\n epoch_done = completed_steps / steps_per_epoch\n\n print(\"\\n\" + \"=\" * 55)\n print(\" RESUMING FROM CHECKPOINT\")\n print(\"=\" * 55)\n print(f\" Checkpoint : {os.path.basename(RESUME_FROM)}\")\n print(f\" Steps done : {completed_steps:,} / {total_steps:,} ({pct_done:.1f}%)\")\n print(f\" Steps left : {remaining:,}\")\n print(f\" Epoch : {epoch_done:.2f} / {NUM_EPOCHS}\")\n print(f\" Epochs left : {NUM_EPOCHS - epoch_done:.2f}\")\n print(f\" Steps/epoch : {steps_per_epoch:,}\")\n print(f\" Save every : {SAVE_STEPS} steps\")\n print(\"=\" * 55 + \"\\n\")\nelse:\n train_size = int(DATASET_SIZE * 0.9)\n steps_per_epoch = math.ceil(train_size / (BATCH_SIZE * GRAD_ACCUM))\n total_steps = steps_per_epoch * NUM_EPOCHS\n print(\"\\n\" + \"=\" * 55)\n print(\" STARTING FRESH\")\n print(\"=\" * 55)\n print(f\" Total steps : {total_steps:,}\")\n print(f\" Steps/epoch : {steps_per_epoch:,}\")\n print(f\" Epochs : {NUM_EPOCHS}\")\n print(f\" Save every : {SAVE_STEPS} steps\")\n print(\"=\" * 55 + \"\\n\")",
89
  "metadata": {},
90
  "execution_count": null,
91
  "outputs": []
@@ -243,7 +243,7 @@
243
  "id": "train",
244
  "metadata": {},
245
  "outputs": [],
246
- "source": "from transformers import TrainerCallback\n\nclass CheckpointSyncCallback(TrainerCallback):\n \"\"\"After every checkpoint: copy to Google Drive and push adapter to HF Hub.\"\"\"\n\n def on_save(self, args, state, control, **kwargs):\n ckpt_dir = os.path.join(args.output_dir, f\"checkpoint-{state.global_step}\")\n if not os.path.isdir(ckpt_dir):\n return\n\n # 1. Sync to Google Drive\n if USE_DRIVE and DRIVE_CKPT_DIR:\n dest = os.path.join(DRIVE_CKPT_DIR, f\"checkpoint-{state.global_step}\")\n if not os.path.exists(dest):\n shutil.copytree(ckpt_dir, dest)\n print(f\"[Checkpoint] Saved to Drive: {dest}\")\n\n # 2. Push adapter to HF Hub (lightweight β€” only LoRA weights)\n try:\n kwargs[\"model\"].push_to_hub(\n OUTPUT_REPO,\n commit_message=f\"Checkpoint step {state.global_step} \"\n f\"(epoch {state.epoch:.2f})\",\n token=HF_TOKEN,\n )\n print(f\"[Checkpoint] Pushed adapter step {state.global_step} β†’ HF Hub\")\n except Exception as e:\n print(f\"[Checkpoint] HF Hub push failed (non-fatal): {e}\")\n\n\nsft_config = SFTConfig(\n output_dir=OUTPUT_DIR,\n num_train_epochs=NUM_EPOCHS,\n per_device_train_batch_size=BATCH_SIZE,\n per_device_eval_batch_size=BATCH_SIZE,\n gradient_accumulation_steps=GRAD_ACCUM,\n gradient_checkpointing=True,\n optim=\"paged_adamw_32bit\",\n learning_rate=LR,\n lr_scheduler_type=\"cosine\",\n warmup_ratio=0.05,\n fp16=not torch.cuda.is_bf16_supported(),\n bf16=torch.cuda.is_bf16_supported(),\n logging_steps=10,\n eval_strategy=\"steps\",\n eval_steps=SAVE_STEPS,\n save_strategy=\"steps\",\n save_steps=SAVE_STEPS,\n save_total_limit=3, # keep only 3 latest local checkpoints\n load_best_model_at_end=True,\n metric_for_best_model=\"eval_loss\",\n greater_is_better=False,\n report_to=\"none\",\n dataset_text_field=\"text\",\n packing=False,\n)\n\ntrainer = SFTTrainer(\n model=model,\n args=sft_config,\n train_dataset=train_dataset,\n eval_dataset=val_dataset,\n peft_config=lora_config,\n processing_class=tokenizer,\n callbacks=[CheckpointSyncCallback()],\n)\n\nif RESUME_FROM:\n print(f\"β–Ά Resuming from checkpoint: {RESUME_FROM}\")\nelse:\n print(\"β–Ά Starting training from scratch\")\n\ntrainer.train(resume_from_checkpoint=RESUME_FROM)\nprint(\"βœ“ Training complete.\")"
247
  },
248
  {
249
  "cell_type": "markdown",
 
79
  {
80
  "cell_type": "markdown",
81
  "id": "g7cean6ejyj",
82
+ "source": "## 1b. Checkpoint Setup (Resumable Training)\n\nCheckpoints persist across session restarts for both **Kaggle** and **Colab**:\n\n| Platform | Restore from | Save to |\n|----------|-------------|---------|\n| **Kaggle** | `/kaggle/input/stockex-ch-checkpoints/` (dataset) | `/kaggle/working/stockex-ch-checkpoints/` (output) |\n| **Colab** | Google Drive `stockex-ch-checkpoints/` | Google Drive `stockex-ch-checkpoints/` |\n\nAdapters are also pushed to HF Hub every `SAVE_STEPS` steps as a backup.",
83
  "metadata": {}
84
  },
85
  {
86
  "cell_type": "code",
87
  "id": "egq0dp9csuo",
88
+ "source": "import shutil, math\nfrom transformers.trainer_utils import get_last_checkpoint\n\n# ── Detect platform ───────────────────────────────────────────────────────────\nON_KAGGLE = os.path.isdir(\"/kaggle\")\nON_COLAB = not ON_KAGGLE\n\n# ── Platform-specific checkpoint dirs ─────────────────────────────────────────\nDRIVE_CKPT_DIR = None\nKAGGLE_INPUT_CKPT_DIR = \"/kaggle/input/stockex-ch-checkpoints\"\nKAGGLE_OUTPUT_CKPT_DIR = \"/kaggle/working/stockex-ch-checkpoints\"\n\nif ON_COLAB:\n try:\n from google.colab import drive\n drive.mount(\"/content/drive\", force_remount=False)\n DRIVE_CKPT_DIR = \"/content/drive/MyDrive/stockex-ch-checkpoints\"\n os.makedirs(DRIVE_CKPT_DIR, exist_ok=True)\n print(f\"[Colab] Google Drive mounted. Checkpoints β†’ {DRIVE_CKPT_DIR}\")\n except Exception:\n print(\"[Colab] Google Drive not available β€” checkpoints saved locally only.\")\nelif ON_KAGGLE:\n os.makedirs(KAGGLE_OUTPUT_CKPT_DIR, exist_ok=True)\n print(f\"[Kaggle] Checkpoints restore ← {KAGGLE_INPUT_CKPT_DIR}\")\n print(f\"[Kaggle] Checkpoints save β†’ {KAGGLE_OUTPUT_CKPT_DIR}\")\n\n# ── Restore checkpoint to local OUTPUT_DIR ────────────────────────────────────\nos.makedirs(OUTPUT_DIR, exist_ok=True)\n\n# Pick the right source directory for restoring\nrestore_dir = None\nif ON_KAGGLE and os.path.isdir(KAGGLE_INPUT_CKPT_DIR):\n restore_dir = KAGGLE_INPUT_CKPT_DIR\nelif ON_COLAB and DRIVE_CKPT_DIR and os.path.isdir(DRIVE_CKPT_DIR):\n restore_dir = DRIVE_CKPT_DIR\n\nif restore_dir:\n remote_ckpt = get_last_checkpoint(restore_dir)\n if remote_ckpt:\n local_ckpt_name = os.path.basename(remote_ckpt)\n local_ckpt_path = os.path.join(OUTPUT_DIR, local_ckpt_name)\n if not os.path.exists(local_ckpt_path):\n print(f\"Restoring checkpoint: {remote_ckpt}\")\n shutil.copytree(remote_ckpt, local_ckpt_path)\n else:\n print(f\"No checkpoint found in {restore_dir}\")\n\n# ── Detect latest local checkpoint ────────────────────────────────────────────\nRESUME_FROM = get_last_checkpoint(OUTPUT_DIR)\n\nSAVE_STEPS = 10\n\n# ── Resume summary ─────────────────────────────────────────────────────────────\nif RESUME_FROM:\n completed_steps = int(os.path.basename(RESUME_FROM).split(\"-\")[-1])\n train_size = int(DATASET_SIZE * 0.9)\n steps_per_epoch = math.ceil(train_size / (BATCH_SIZE * GRAD_ACCUM))\n total_steps = steps_per_epoch * NUM_EPOCHS\n remaining = max(0, total_steps - completed_steps)\n pct_done = 100 * completed_steps / total_steps\n epoch_done = completed_steps / steps_per_epoch\n\n print(\"\\n\" + \"=\" * 55)\n print(\" RESUMING FROM CHECKPOINT\")\n print(\"=\" * 55)\n print(f\" Checkpoint : {os.path.basename(RESUME_FROM)}\")\n print(f\" Steps done : {completed_steps:,} / {total_steps:,} ({pct_done:.1f}%)\")\n print(f\" Steps left : {remaining:,}\")\n print(f\" Epoch : {epoch_done:.2f} / {NUM_EPOCHS}\")\n print(f\" Epochs left : {NUM_EPOCHS - epoch_done:.2f}\")\n print(f\" Steps/epoch : {steps_per_epoch:,}\")\n print(f\" Save every : {SAVE_STEPS} steps\")\n print(\"=\" * 55 + \"\\n\")\nelse:\n train_size = int(DATASET_SIZE * 0.9)\n steps_per_epoch = math.ceil(train_size / (BATCH_SIZE * GRAD_ACCUM))\n total_steps = steps_per_epoch * NUM_EPOCHS\n print(\"\\n\" + \"=\" * 55)\n print(\" STARTING FRESH\")\n print(\"=\" * 55)\n print(f\" Total steps : {total_steps:,}\")\n print(f\" Steps/epoch : {steps_per_epoch:,}\")\n print(f\" Epochs : {NUM_EPOCHS}\")\n print(f\" Save every : {SAVE_STEPS} steps\")\n print(\"=\" * 55 + \"\\n\")",
89
  "metadata": {},
90
  "execution_count": null,
91
  "outputs": []
 
243
  "id": "train",
244
  "metadata": {},
245
  "outputs": [],
246
+ "source": "from transformers import TrainerCallback\n\n\nclass CheckpointSyncCallback(TrainerCallback):\n \"\"\"After every checkpoint: copy to persistent storage and push adapter to HF Hub.\"\"\"\n\n def on_save(self, args, state, control, **kwargs):\n ckpt_dir = os.path.join(args.output_dir, f\"checkpoint-{state.global_step}\")\n if not os.path.isdir(ckpt_dir):\n return\n\n # 1. Sync to persistent storage (platform-specific)\n if ON_COLAB and DRIVE_CKPT_DIR:\n dest = os.path.join(DRIVE_CKPT_DIR, f\"checkpoint-{state.global_step}\")\n if not os.path.exists(dest):\n shutil.copytree(ckpt_dir, dest)\n print(f\"[Checkpoint] Saved to Drive: {dest}\")\n elif ON_KAGGLE:\n dest = os.path.join(KAGGLE_OUTPUT_CKPT_DIR, f\"checkpoint-{state.global_step}\")\n if not os.path.exists(dest):\n shutil.copytree(ckpt_dir, dest)\n print(f\"[Checkpoint] Saved to output: {dest}\")\n\n # 2. Push adapter to HF Hub (lightweight β€” only LoRA weights)\n try:\n kwargs[\"model\"].push_to_hub(\n OUTPUT_REPO,\n commit_message=f\"Checkpoint step {state.global_step} \"\n f\"(epoch {state.epoch:.2f})\",\n token=HF_TOKEN,\n )\n print(f\"[Checkpoint] Pushed adapter step {state.global_step} β†’ HF Hub\")\n except Exception as e:\n print(f\"[Checkpoint] HF Hub push failed (non-fatal): {e}\")\n\n\nsft_config = SFTConfig(\n output_dir=OUTPUT_DIR,\n num_train_epochs=NUM_EPOCHS,\n per_device_train_batch_size=BATCH_SIZE,\n per_device_eval_batch_size=BATCH_SIZE,\n gradient_accumulation_steps=GRAD_ACCUM,\n gradient_checkpointing=True,\n optim=\"paged_adamw_32bit\",\n learning_rate=LR,\n lr_scheduler_type=\"cosine\",\n warmup_ratio=0.05,\n fp16=not torch.cuda.is_bf16_supported(),\n bf16=torch.cuda.is_bf16_supported(),\n logging_steps=10,\n eval_strategy=\"steps\",\n eval_steps=SAVE_STEPS,\n save_strategy=\"steps\",\n save_steps=SAVE_STEPS,\n save_total_limit=3, # keep only 3 latest local checkpoints\n load_best_model_at_end=True,\n metric_for_best_model=\"eval_loss\",\n greater_is_better=False,\n report_to=\"none\",\n dataset_text_field=\"text\",\n packing=False,\n)\n\ntrainer = SFTTrainer(\n model=model,\n args=sft_config,\n train_dataset=train_dataset,\n eval_dataset=val_dataset,\n peft_config=lora_config,\n processing_class=tokenizer,\n callbacks=[CheckpointSyncCallback()],\n)\n\nif RESUME_FROM:\n print(f\"β–Ά Resuming from checkpoint: {RESUME_FROM}\")\nelse:\n print(\"β–Ά Starting training from scratch\")\n\ntrainer.train(resume_from_checkpoint=RESUME_FROM)\nprint(\"βœ“ Training complete.\")"
247
  },
248
  {
249
  "cell_type": "markdown",