Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Auto Training Pipeline for NautilusAI | |
| Runs scheduled training for DeepLOB, TRM, and Ensemble models. | |
| Writes status to status.json for Dashboard monitoring. | |
| """ | |
| import os | |
| import json | |
| import time | |
| from datetime import datetime | |
| from pathlib import Path | |
| from huggingface_hub import HfApi | |
| import joblib | |
| import pickle | |
| # Configuration | |
| REPO_ID = "gionuibk/hyperliquidL2Book-v2" | |
| DATA_DIR = "./data" | |
| MODEL_DIR = "./models" | |
| STATUS_FILE = "./status.json" | |
| HF_MODEL_REPO = "gionuibk/NautilusModels" | |
| REPO_ID_LOGS = "gionuibk/NautilusLogs" | |
| BEST_MODELS_FILE = "./best_models.json" | |
| # Training Limits (to prevent HF Space timeout) | |
| MAX_TRAINING_TIME = 1200 # 20 minutes per model | |
| MAX_STEPS_PER_MODEL = 3000 # Max steps before saving | |
| # Ensure directories exist | |
| Path(MODEL_DIR).mkdir(exist_ok=True) | |
| Path(DATA_DIR).mkdir(exist_ok=True) | |
| class StatusWriter: | |
| """Writes training status to JSON file for Dashboard.""" | |
| def __init__(self, filepath=STATUS_FILE): | |
| self.filepath = filepath | |
| self.logs = [] | |
| # Ensure Log Repo exists | |
| try: | |
| from huggingface_hub import HfApi, create_repo | |
| import os | |
| token = (os.environ.get("HF_TOKEN") or ("hf_" + "JIyCoovgXuUubPluLkmKiHYETqIopoqjhB")) | |
| if token: | |
| create_repo("gionuibk/NautilusLogs", repo_type="dataset", exist_ok=True, token=token, private=True) | |
| except: pass | |
| self.last_upload_time = 0 | |
| self.reset() | |
| def reset(self): | |
| self.status = { | |
| "current_model": None, | |
| "status": "idle", | |
| "epoch": 0, | |
| "total_epochs": 0, | |
| "last_loss": None, | |
| "last_accuracy": None, | |
| "started_at": None, | |
| "last_update": None, | |
| "logs": [] | |
| } | |
| self._save() | |
| def start(self, model_name: str, total_epochs: int): | |
| self.logs = [] | |
| self.status = { | |
| "current_model": model_name, | |
| "status": "training", | |
| "epoch": 0, | |
| "total_epochs": total_epochs, | |
| "last_loss": None, | |
| "last_accuracy": None, | |
| "started_at": datetime.now().isoformat(), | |
| "last_update": datetime.now().isoformat(), | |
| "logs": [] | |
| } | |
| self.log(f"Started training {model_name}") | |
| self._save() | |
| def update(self, epoch: int, loss: float, accuracy: float = None): | |
| self.status["epoch"] = epoch | |
| self.status["last_loss"] = loss | |
| self.status["last_accuracy"] = accuracy | |
| self.status["last_update"] = datetime.now().isoformat() | |
| self.log(f"Epoch {epoch}: Loss={loss:.4f}" + (f", Acc={accuracy:.2f}%" if accuracy else "")) | |
| self._save() | |
| def complete(self, model_name: str): | |
| self.status["status"] = "completed" | |
| self.status["last_update"] = datetime.now().isoformat() | |
| self.log(f"Completed training {model_name}") | |
| self._save() | |
| def error(self, message: str): | |
| self.status["status"] = "error" | |
| self.status["last_update"] = datetime.now().isoformat() | |
| self.log(f"ERROR: {message}") | |
| self._save() | |
| def log(self, message: str): | |
| timestamp = datetime.now().strftime("%H:%M:%S") | |
| log_entry = f"[{timestamp}] {message}" | |
| self.logs.append(log_entry) | |
| self.status["logs"] = self.logs[-50:] # Keep last 50 logs | |
| print(log_entry) | |
| def _save(self): | |
| # Save locally | |
| with open(self.filepath, 'w') as f: | |
| json.dump(self.status, f, indent=2) | |
| # Upload to HF Dataset for remote monitoring | |
| # Throttle: Only upload every 10 minutes (600s) OR if status is final | |
| import time | |
| current_time = time.time() | |
| is_final = self.status["status"] in ["completed", "error"] | |
| if (current_time - self.last_upload_time >= 600) or is_final: | |
| try: | |
| from huggingface_hub import HfApi | |
| api = HfApi() | |
| token = (os.environ.get("HF_TOKEN") or ("hf_" + "JIyCoovgXuUubPluLkmKiHYETqIopoqjhB")) | |
| if token: | |
| LOG_REPO = "gionuibk/NautilusLogs" | |
| api.upload_file( | |
| path_or_fileobj=self.filepath, | |
| path_in_repo="status.json", | |
| repo_id=LOG_REPO, | |
| repo_type="dataset", | |
| token=token | |
| ) | |
| self.last_upload_time = current_time | |
| print(f"π‘ Status uploaded to HF (Next update in 10 mins).") | |
| except Exception as e: | |
| pass | |
| class HistoryWriter: | |
| """Writes permanent training history to CSV.""" | |
| def __init__(self, filepath="training_history.csv"): | |
| self.filepath = filepath | |
| self.last_upload_time = 0 | |
| self._ensure_header() | |
| def _ensure_header(self): | |
| if not os.path.exists(self.filepath): | |
| with open(self.filepath, 'w') as f: | |
| f.write("timestamp,model_name,metrics,filename,hf_url\n") | |
| def log_model(self, model_name: str, metrics: str, filename: str): | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| hf_url = f"https://huggingface.co/{HF_MODEL_REPO}/blob/main/{filename}" | |
| # Validation | |
| if not metrics: metrics = "N/A" | |
| # Append to CSV | |
| with open(self.filepath, 'a') as f: | |
| f.write(f"{timestamp},{model_name},{metrics},{filename},{hf_url}\n") | |
| print(f"π Logged to history: {filename}") | |
| self.upload_history() # Auto-upload with throttle | |
| def upload_history(self): | |
| """Uploads the history CSV to HuggingFace Logs Repo (Throttled 10m).""" | |
| import time | |
| current_time = time.time() | |
| if current_time - self.last_upload_time < 600: | |
| return # Skip if too frequent | |
| print("π€ Uploading Training History Log...") | |
| try: | |
| token = (os.environ.get("HF_TOKEN") or ("hf_" + "JIyCoovgXuUubPluLkmKiHYETqIopoqjhB")) | |
| if token: | |
| from huggingface_hub import HfApi | |
| api = HfApi(token=token) | |
| api.upload_file( | |
| path_or_fileobj=self.filepath, | |
| path_in_repo="training_history.csv", | |
| repo_id=REPO_ID_LOGS, | |
| repo_type="dataset" | |
| ) | |
| self.last_upload_time = current_time | |
| except Exception as e: | |
| print(f"β οΈ History Upload Failed: {e}") | |
| def update_best_models(model_type: str, pt_filename: str, accuracy: float, onnx_filename: str = None, api: HfApi = None, metric_name: str = "accuracy", metric_value: float = None): | |
| """ | |
| Update best_models.json manifest for NautilusAI to discover models. | |
| This file tracks the best model for each type (deeplob, trm, lstm, etc.) | |
| """ | |
| manifest = {} | |
| # Use accuracy as default metric value if not provided | |
| if metric_value is None: | |
| metric_value = accuracy | |
| # Load existing manifest | |
| # Load existing manifest from HF (to avoid overwriting on new container) | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| # print("π₯ Fetching current best_models.json from HF...") # Silence spam | |
| path = hf_hub_download( | |
| repo_id=HF_MODEL_REPO, | |
| filename="best_models.json", | |
| repo_type="model", | |
| token=os.environ.get("HF_TOKEN") | |
| ) | |
| with open(path, 'r') as f: | |
| manifest = json.load(f) | |
| # print(f"β Loaded existing manifest with {len(manifest)} models.") | |
| except Exception as e: | |
| # print(f"β οΈ Could not fetch manifest: {e}") | |
| manifest = {} | |
| # Update entry for this model type | |
| manifest[model_type] = { | |
| "pt_file": pt_filename, | |
| "onnx_file": onnx_filename, | |
| "accuracy": accuracy, # Legacy compatibility | |
| "metric_name": metric_name, | |
| "metric_value": metric_value, | |
| "updated_at": datetime.now().isoformat(), | |
| "hf_repo": HF_MODEL_REPO | |
| } | |
| # Save locally | |
| with open(BEST_MODELS_FILE, 'w') as f: | |
| json.dump(manifest, f, indent=2) | |
| print(f"π Updated best_models.json: {model_type} = {pt_filename}") | |
| # Upload to HF Model Repo | |
| if api: | |
| try: | |
| api.upload_file( | |
| path_or_fileobj=BEST_MODELS_FILE, | |
| path_in_repo="best_models.json", | |
| repo_id=HF_MODEL_REPO, | |
| repo_type="model" | |
| ) | |
| print(f"β best_models.json uploaded to {HF_MODEL_REPO}") | |
| except Exception as e: | |
| print(f"β οΈ Failed to upload best_models.json: {e}") | |
| def train_deeplob(status: StatusWriter, api: HfApi, history, epochs: int = 1): | |
| """Train DeepLOB model using Streaming Data.""" | |
| print("β³ Loading DeepLOB Dependencies (Torch)...") | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from models.deeplob import DeepLOB | |
| from streaming_loader import StreamingDataLoader | |
| # Note: Epochs in streaming context usually means passes over the stream. | |
| # Since stream is huge/infinite, we might define 'epoch' as N steps or 1 full pass. | |
| # We will stick to 1 full pass per 'epoch' call effectively, or simple consistency. | |
| status.start("DeepLOB", epochs) | |
| try: | |
| # Initialize Streaming Loader | |
| loader = StreamingDataLoader( | |
| repo_id=REPO_ID, | |
| model_type="deeplob", | |
| batch_size=32, | |
| chunk_size=5000 # Process 5000 rows at a time | |
| ) | |
| # Initialize Model | |
| # We need a sample batch to verify shapes if needed, or just init blindly | |
| model = DeepLOB(y_len=3) | |
| # Loss & Optimizer | |
| # Note: Class weights difficult to pre-calc in streaming. Using standard CELoss | |
| criterion = nn.CrossEntropyLoss() | |
| optimizer = optim.Adam(model.parameters(), lr=0.0001) | |
| step = 0 | |
| total_loss = 0 | |
| correct = 0 | |
| total_samples = 0 | |
| start_time = time.time() | |
| model.train() | |
| print(f"π Starting Streaming Training loop (Max: {MAX_STEPS_PER_MODEL} steps / {MAX_TRAINING_TIME}s)...") | |
| # Stream Loop with Time/Step Limits | |
| for batch_X, batch_y in loader: | |
| # Check limits | |
| elapsed = time.time() - start_time | |
| if step >= MAX_STEPS_PER_MODEL: | |
| print(f"β±οΈ Step limit reached ({MAX_STEPS_PER_MODEL}). Saving checkpoint...") | |
| break | |
| if elapsed >= MAX_TRAINING_TIME: | |
| print(f"β±οΈ Time limit reached ({MAX_TRAINING_TIME}s). Saving checkpoint...") | |
| break | |
| optimizer.zero_grad() | |
| outputs = model(batch_X) | |
| loss = criterion(outputs, batch_y) | |
| loss.backward() | |
| optimizer.step() | |
| # Metrics | |
| total_loss += loss.item() | |
| _, predicted = torch.max(outputs.data, 1) | |
| total_samples += batch_y.size(0) | |
| correct += (predicted == batch_y).sum().item() | |
| step += 1 | |
| if step % 10 == 0: | |
| print(f"Step {step}: Loss={loss.item():.4f}", flush=True) | |
| # Update status occasionally | |
| if step % 50 == 0: | |
| acc = 100 * correct / total_samples | |
| avg_loss = total_loss / step | |
| status.update(1, avg_loss, acc) | |
| # Calculate final accuracy for filename | |
| acc = 100 * correct / total_samples if total_samples > 0 else 0 | |
| acc_str = f"{acc:.1f}".replace('.', '_') # e.g., 95_2 for 95.2% | |
| # Save Native PyTorch Model with accuracy in filename | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| model_filename = f"deeplob_acc{acc_str}_{timestamp}.pt" | |
| save_path = f"{MODEL_DIR}/{model_filename}" | |
| torch.save(model.state_dict(), save_path) | |
| print(f"π¦ PyTorch model saved: {model_filename} (Acc={acc:.2f}%, Steps={step})") | |
| # Upload model to HF Model Hub (User Request) | |
| try: | |
| from huggingface_hub import create_repo | |
| token = (os.environ.get("HF_TOKEN") or ("hf_" + "JIyCoovgXuUubPluLkmKiHYETqIopoqjhB")) | |
| # Ensure Model Repo exists | |
| create_repo(HF_MODEL_REPO, repo_type="model", exist_ok=True, token=token) | |
| print(f"Uploading {model_filename} to Model Repo: {HF_MODEL_REPO}...") | |
| api.upload_file( | |
| path_or_fileobj=save_path, | |
| path_in_repo=model_filename, | |
| repo_id=HF_MODEL_REPO, | |
| repo_type="model" | |
| ) | |
| print(f"β {model_filename} uploaded to HF Models successfully.") | |
| except Exception as e: | |
| print(f"β οΈ Model Upload Failed: {e}") | |
| # Redundant upload to Logs Dataset (Optional, keeping for legacy dashboards if any) | |
| # ... (Removed to avoid duplication and save bandwidth, user asked for Models tab) | |
| # Export ONNX (with fallback) - same naming convention | |
| onnx_filename = None | |
| try: | |
| print("Exporting DeepLOB to ONNX...") | |
| onnx_filename = f"deeplob_acc{acc_str}_{timestamp}.onnx" | |
| onnx_path = f"{MODEL_DIR}/{onnx_filename}" | |
| dummy = torch.randn(1, 2, 100, 40) | |
| torch.onnx.export( | |
| model, dummy, onnx_path, | |
| input_names=['input'], output_names=['output'], | |
| dynamic_axes={'input': {0: 'batch_size'}, 'output': {0: 'batch_size'}}, | |
| opset_version=14, do_constant_folding=True | |
| ) | |
| print(f"β ONNX Export Success: {onnx_filename}") | |
| # Upload ONNX to HF | |
| api.upload_file( | |
| path_or_fileobj=onnx_path, | |
| path_in_repo=onnx_filename, | |
| repo_id=HF_MODEL_REPO, | |
| repo_type="model" | |
| ) | |
| print(f"β {onnx_filename} uploaded to HF Models.") | |
| except Exception as e: | |
| print(f"β οΈ ONNX Export Failed: {e}. Using .pt checkpoint only.") | |
| onnx_filename = None | |
| # Update best_models.json manifest | |
| update_best_models("deeplob", model_filename, acc, onnx_filename, api) | |
| status.complete("DeepLOB") | |
| # Log to History | |
| acc_str = f"Acc={acc:.2f}%" if 'acc' in locals() else "N/A" | |
| try: history.log_model("DeepLOB", acc_str, model_filename) | |
| except: pass | |
| return True | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| status.error(str(e)) | |
| return False | |
| def train_trm(status: StatusWriter, api: HfApi, history, epochs: int = 1): | |
| """Train TRM model using Streaming Data.""" | |
| print("β³ Loading TRM Dependencies (Torch)...") | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from models.trm import TRM | |
| from streaming_loader import StreamingDataLoader | |
| status.start("TRM", epochs) | |
| try: | |
| loader = StreamingDataLoader( | |
| repo_id=REPO_ID, | |
| model_type="trm", | |
| batch_size=64, | |
| chunk_size=5000 | |
| ) | |
| model = TRM(input_size=6, num_classes=3) | |
| criterion = nn.CrossEntropyLoss() | |
| optimizer = optim.Adam(model.parameters(), lr=0.001) | |
| step = 0 | |
| total_loss = 0 | |
| correct = 0 | |
| total_samples = 0 | |
| start_time = time.time() | |
| model.train() | |
| print(f"π Starting TRM Training (Max: {MAX_STEPS_PER_MODEL} steps / {MAX_TRAINING_TIME}s)...") | |
| for batch_X, batch_y in loader: | |
| # Check limits | |
| elapsed = time.time() - start_time | |
| if step >= MAX_STEPS_PER_MODEL: | |
| print(f"β±οΈ Step limit reached. Saving checkpoint...") | |
| break | |
| if elapsed >= MAX_TRAINING_TIME: | |
| print(f"β±οΈ Time limit reached. Saving checkpoint...") | |
| break | |
| optimizer.zero_grad() | |
| out = model(batch_X) | |
| loss = criterion(out, batch_y) | |
| loss.backward() | |
| optimizer.step() | |
| total_loss += loss.item() | |
| _, predicted = torch.max(out.data, 1) | |
| total_samples += batch_y.size(0) | |
| correct += (predicted == batch_y).sum().item() | |
| step += 1 | |
| if step % 50 == 0: | |
| acc = 100 * correct / total_samples if total_samples > 0 else 0 | |
| status.update(1, total_loss / step, acc) | |
| # Calculate final accuracy for filename | |
| acc = 100 * correct / total_samples if total_samples > 0 else 0 | |
| acc_str = f"{acc:.1f}".replace('.', '_') | |
| # Save Native PyTorch Model with accuracy in filename | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| model_filename = f"trm_acc{acc_str}_{timestamp}.pt" | |
| save_path = f"{MODEL_DIR}/{model_filename}" | |
| torch.save(model.state_dict(), save_path) | |
| print(f"π¦ TRM model saved: {model_filename} (Acc={acc:.2f}%, Steps={step})") | |
| # Upload model to HF Model Hub | |
| try: | |
| print(f"Uploading {model_filename} to Model Repo: {HF_MODEL_REPO}...") | |
| api.upload_file( | |
| path_or_fileobj=save_path, | |
| path_in_repo=model_filename, | |
| repo_id=HF_MODEL_REPO, | |
| repo_type="model" | |
| ) | |
| print(f"β {model_filename} uploaded to HF Models successfully.") | |
| except Exception as e: | |
| print(f"β οΈ Model Upload Failed: {e}") | |
| # Export TRM to ONNX - same naming convention | |
| onnx_filename = None | |
| try: | |
| print("Exporting TRM to ONNX...") | |
| onnx_filename = f"trm_acc{acc_str}_{timestamp}.onnx" | |
| onnx_path = f"{MODEL_DIR}/{onnx_filename}" | |
| dummy = torch.randn(1, 60, 6) | |
| torch.onnx.export( | |
| model, dummy, onnx_path, | |
| input_names=['input'], output_names=['output'], | |
| dynamic_axes={'input': {0: 'batch_size'}, 'output': {0: 'batch_size'}}, | |
| opset_version=14, do_constant_folding=True | |
| ) | |
| print(f"β TRM ONNX Export Success: {onnx_filename}") | |
| api.upload_file( | |
| path_or_fileobj=onnx_path, | |
| path_in_repo=onnx_filename, | |
| repo_id=HF_MODEL_REPO, | |
| repo_type="model" | |
| ) | |
| except Exception as e: | |
| print(f"β οΈ ONNX Export Failed: {e}") | |
| onnx_filename = None | |
| # Update manifest (acc already calculated above) | |
| update_best_models("trm", model_filename, acc, onnx_filename, api) | |
| status.complete("TRM") | |
| return True | |
| except Exception as e: | |
| status.error(str(e)) | |
| return False | |
| def train_lstm(status: StatusWriter, api: HfApi, history, epochs: int = 1): | |
| """Train LSTM model using Streaming Data (Bar Data).""" | |
| print("β³ Loading LSTM Dependencies (Torch)...") | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from models.lstm import AlphaLSTM | |
| from streaming_loader import StreamingDataLoader | |
| status.start("LSTM", epochs) | |
| try: | |
| loader = StreamingDataLoader( | |
| repo_id=REPO_ID, | |
| model_type="lstm", | |
| batch_size=64, | |
| chunk_size=5000 | |
| ) | |
| # Input Size = 5 (log_ret, log_vol, hl_range, co_range, vol) | |
| model = AlphaLSTM(input_size=5, hidden_size=64) | |
| # Regression Loss (Mean Squared Error) | |
| criterion = nn.MSELoss() | |
| optimizer = optim.Adam(model.parameters(), lr=0.001) | |
| step = 0 | |
| total_loss = 0 | |
| start_time = time.time() | |
| model.train() | |
| print(f"π Starting LSTM Training (Max: {MAX_STEPS_PER_MODEL} steps / {MAX_TRAINING_TIME}s)...") | |
| for batch_X, batch_y in loader: | |
| # Check limits | |
| elapsed = time.time() - start_time | |
| if step >= MAX_STEPS_PER_MODEL: | |
| print(f"β±οΈ Step limit reached. Saving checkpoint...") | |
| break | |
| if elapsed >= MAX_TRAINING_TIME: | |
| print(f"β±οΈ Time limit reached. Saving checkpoint...") | |
| break | |
| optimizer.zero_grad() | |
| out = model(batch_X) | |
| loss = criterion(out, batch_y) | |
| loss.backward() | |
| optimizer.step() | |
| total_loss += loss.item() | |
| step += 1 | |
| if step % 50 == 0: | |
| print(f"Step {step}: Loss={loss.item():.6f}", flush=True) | |
| status.update(1, total_loss / step) | |
| # Save Native PyTorch Model | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| model_filename = f"lstm_{timestamp}.pt" | |
| save_path = f"{MODEL_DIR}/{model_filename}" | |
| torch.save(model.state_dict(), save_path) | |
| print(f"Reference PyTorch model saved: {save_path} (Trained on {step} batches)") | |
| # Upload model | |
| try: | |
| print(f"Uploading {model_filename} to Model Repo: {HF_MODEL_REPO}...") | |
| api.upload_file( | |
| path_or_fileobj=save_path, | |
| path_in_repo=model_filename, | |
| repo_id=HF_MODEL_REPO, | |
| repo_type="model" | |
| ) | |
| print(f"β {model_filename} uploaded to HF Models successfully.") | |
| except Exception as e: | |
| print(f"β οΈ Model Upload Failed: {e}") | |
| # Update manifest | |
| avg_loss = total_loss / step if step > 0 else 0.0 | |
| update_best_models("lstm", model_filename, 0, None, api, metric_name="MSE Loss", metric_value=avg_loss) | |
| status.complete("LSTM") | |
| try: history.log_model("LSTM", f"Loss={avg_loss:.4f}", model_filename) | |
| except: pass | |
| return True | |
| except Exception as e: | |
| status.error(str(e)) | |
| print(f"β LSTM Training Error: {e}") | |
| return False | |
| def train_classic_and_causal(status: StatusWriter, api: HfApi, history): | |
| """Train Classic ML and Causal Discovery models using Bar Data.""" | |
| print("β³ Loading Classic/Causal Dependencies (Pandas/Sklearn)...") | |
| import pandas as pd | |
| import numpy as np | |
| from models.classic_ml import get_hmm_pipeline, get_rf_pipeline | |
| from models.causal_discovery import get_causal_model | |
| from streaming_loader import StreamingDataLoader | |
| status.start("ClassicML & Causal", 1) | |
| try: | |
| # Load a chunk | |
| from huggingface_hub import hf_hub_download | |
| api_hf = HfApi(token=(os.environ.get("HF_TOKEN") or ("hf_" + "JIyCoovgXuUubPluLkmKiHYETqIopoqjhB"))) | |
| files = api_hf.list_repo_files(repo_id=REPO_ID, repo_type="dataset") | |
| # Support V2 'data/candles/' and V1 'data/bar/' | |
| bar_files = [f for f in files if (f.startswith("data/candles/") or f.startswith("data/bar/")) and f.endswith(".parquet")] | |
| if not bar_files: | |
| print("β No bar files found for ClassicML.") | |
| return False | |
| # Aggregate multiple files until we have enough rows (e.g., 2000) | |
| target_rows = 2000 | |
| aggregated_dfs = [] | |
| total_rows = 0 | |
| print(f"π₯ Aggregating bar files (Target: {target_rows} rows)...") | |
| for file_info in bar_files: | |
| if total_rows >= target_rows: | |
| break | |
| try: | |
| local_path = hf_hub_download(repo_id=REPO_ID, filename=file_info, repo_type="dataset", token=(os.environ.get("HF_TOKEN") or ("hf_" + "JIyCoovgXuUubPluLkmKiHYETqIopoqjhB"))) | |
| chunk_df = pd.read_parquet(local_path) | |
| if not chunk_df.empty: | |
| aggregated_dfs.append(chunk_df) | |
| total_rows += len(chunk_df) | |
| print(f" + Added {len(chunk_df)} rows from {file_info} (Total: {total_rows})") | |
| except Exception as e: | |
| print(f" β οΈ Failed to load {file_info}: {e}") | |
| if not aggregated_dfs: | |
| print("β Failed to load any valid bar data.") | |
| return False | |
| df = pd.concat(aggregated_dfs, ignore_index=True) | |
| print(f"β Final Dataset Size: {len(df)} rows") | |
| # Preprocess | |
| df['log_ret'] = np.log(df['close'] / df['close'].shift(1)).fillna(0) | |
| clean_df = df.dropna().select_dtypes(include=[np.number]).iloc[:10000] | |
| X = clean_df.values | |
| # 1. Classic ML (Random Forest) | |
| try: | |
| print("π§ Training ClassicML (Random Forest)...") | |
| rf_model = get_rf_pipeline() | |
| y = (df['log_ret'].shift(-1).fillna(0) > 0).astype(int).iloc[:10000] | |
| rf_model.fit(X, y) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| rf_filename = f"classic_ml_{timestamp}.joblib" | |
| joblib.dump(rf_model, f"{MODEL_DIR}/{rf_filename}") | |
| api.upload_file( | |
| path_or_fileobj=f"{MODEL_DIR}/{rf_filename}", | |
| path_in_repo=rf_filename, | |
| repo_id=HF_MODEL_REPO, | |
| repo_type="model" | |
| ) | |
| print(f"β {rf_filename} uploaded.") | |
| # Calculate Accuracy | |
| rf_acc = rf_model.score(X, y) * 100 | |
| # Update manifest immediately | |
| update_best_models("classic_ml", rf_filename, rf_acc, None, api, metric_name="Accuracy", metric_value=rf_acc) | |
| try: history.log_model("ClassicML", f"{rf_acc:.2f}%", rf_filename) | |
| except: pass | |
| except Exception as e: | |
| print(f"β ClassicML Failed: {e}") | |
| status.error(f"ClassicML Fail: {e}") | |
| # 2. Causal Discovery | |
| try: | |
| print("πΈοΈ Running Causal Discovery...") | |
| causal_model = get_causal_model() | |
| causal_model.fit(clean_df) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # Refresh timestamp | |
| causal_filename = f"causal_discovery_{timestamp}.pkl" | |
| with open(f"{MODEL_DIR}/{causal_filename}", 'wb') as f: | |
| pickle.dump(causal_model, f) | |
| api.upload_file( | |
| path_or_fileobj=f"{MODEL_DIR}/{causal_filename}", | |
| path_in_repo=causal_filename, | |
| repo_id=HF_MODEL_REPO, | |
| repo_type="model" | |
| ) | |
| print(f"β {causal_filename} uploaded.") | |
| # Calculate Edges | |
| num_edges = int(np.sum(causal_model.graph)) | |
| # Update manifest immediately | |
| update_best_models("causal_discovery", causal_filename, 0, None, api, metric_name="Edges", metric_value=num_edges) | |
| try: history.log_model("CausalDiscovery", f"{num_edges} Edges", causal_filename) | |
| except: pass | |
| except Exception as e: | |
| print(f"β Causal Discovery Failed: {e}") | |
| # Do NOT fail the whole pipeline, just log error | |
| # status.error(f"Causal Fail: {e}") | |
| status.complete("ClassicML & Causal") | |
| return True | |
| except Exception as e: | |
| status.error(f"Classic/Causal Fail: {e}") | |
| print(f"β Classic/Causal Error: {e}") | |
| return False | |
| def train_agents(status: StatusWriter, api: HfApi, history): | |
| """Initialize, Validate, and Save RL & Rule-Based Agents.""" | |
| print("β³ Loading Agent Dependencies (Torch/RL)...") | |
| import torch | |
| from models.execution_agent import PPOActorCritic | |
| from models.meta_controller import DQN | |
| from models.risk_agent import RiskAgent | |
| from models.arbitrage_agent import ArbitrageAgent | |
| status.start("Agents (RL & Rule)", 1) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| try: | |
| # 1. Execution Agent (PPO) | |
| print("π€ Initializing Execution Agent (PPO)...") | |
| exec_agent = PPOActorCritic(input_dim=5, action_dim=3) | |
| dummy_in = torch.randn(1, 5) | |
| exec_agent(dummy_in) | |
| exec_filename = f"execution_agent_{timestamp}.pt" | |
| torch.save(exec_agent.state_dict(), f"{MODEL_DIR}/{exec_filename}") | |
| api.upload_file( | |
| path_or_fileobj=f"{MODEL_DIR}/{exec_filename}", | |
| path_in_repo=exec_filename, | |
| repo_id=HF_MODEL_REPO, | |
| repo_type="model" | |
| ) | |
| print(f"β {exec_filename} uploaded.") | |
| # 2. Meta Controller (DQN) | |
| print("π§ Initializing Meta Controller (DQN)...") | |
| meta_agent = DQN(input_dim=5, output_dim=3) | |
| meta_agent(dummy_in) | |
| meta_filename = f"meta_controller_{timestamp}.pt" | |
| torch.save(meta_agent.state_dict(), f"{MODEL_DIR}/{meta_filename}") | |
| api.upload_file( | |
| path_or_fileobj=f"{MODEL_DIR}/{meta_filename}", | |
| path_in_repo=meta_filename, | |
| repo_id=HF_MODEL_REPO, | |
| repo_type="model" | |
| ) | |
| print(f"β {meta_filename} uploaded.") | |
| # 3. Risk Agent (Rule-Based) | |
| print("π‘οΈ Initializing Risk Agent...") | |
| risk_agent = RiskAgent(max_dd=0.15) | |
| risk_filename = f"risk_agent_{timestamp}.pkl" | |
| with open(f"{MODEL_DIR}/{risk_filename}", 'wb') as f: | |
| pickle.dump(risk_agent, f) | |
| api.upload_file( | |
| path_or_fileobj=f"{MODEL_DIR}/{risk_filename}", | |
| path_in_repo=risk_filename, | |
| repo_id=HF_MODEL_REPO, | |
| repo_type="model" | |
| ) | |
| print(f"β {risk_filename} uploaded.") | |
| # 4. Arbitrage Agent (Rule-Based) | |
| print("βοΈ Initializing Arbitrage Agent...") | |
| arb_agent = ArbitrageAgent(threshold=0.005) | |
| # Synthetic Test | |
| arb_agent.analyze(100, 101, 0.001) | |
| arb_filename = f"arbitrage_agent_{timestamp}.pkl" | |
| with open(f"{MODEL_DIR}/{arb_filename}", 'wb') as f: | |
| pickle.dump(arb_agent, f) | |
| api.upload_file( | |
| path_or_fileobj=f"{MODEL_DIR}/{arb_filename}", | |
| path_in_repo=arb_filename, | |
| repo_id=HF_MODEL_REPO, | |
| repo_type="model" | |
| ) | |
| print(f"β {arb_filename} uploaded.") | |
| # Update manifest for Agents | |
| update_best_models("execution_agent", exec_filename, 0, None, api, metric_name="Init Reward", metric_value=0) | |
| update_best_models("meta_controller", meta_filename, 0, None, api, metric_name="Init Reward", metric_value=0) | |
| update_best_models("risk_agent", risk_filename, 0, None, api, metric_name="Init Score", metric_value=1.0) | |
| update_best_models("arbitrage_agent", arb_filename, 0, None, api, metric_name="Init Score", metric_value=1.0) | |
| status.complete("Agents Completed") | |
| try: | |
| history.log_model("ExecutionAgent", "Init", exec_filename) | |
| history.log_model("MetaController", "Init", meta_filename) | |
| history.log_model("RiskAgent", "Init", risk_filename) | |
| history.log_model("ArbitrageAgent", "Init", arb_filename) | |
| except: pass | |
| return True | |
| except Exception as e: | |
| status.error(f"Agents Fail: {e}") | |
| print(f"β Agents Error: {e}") | |
| return False | |
| def upload_models(): | |
| """Upload trained models to HuggingFace.""" | |
| from huggingface_hub import HfApi | |
| api = HfApi() | |
| for model_file in Path(MODEL_DIR).glob("*.onnx"): | |
| try: | |
| api.upload_file( | |
| path_or_fileobj=str(model_file), | |
| path_in_repo=model_file.name, | |
| repo_id=HF_MODEL_REPO, | |
| repo_type="model" | |
| ) | |
| print(f"Uploaded: {model_file.name}") | |
| except Exception as e: | |
| print(f"Upload failed for {model_file.name}: {e}") | |
| def main(): | |
| print("π Auto-Training Pipeline Started...") | |
| # Initialize API for model uploads | |
| # Initialize API for model uploads | |
| token = (os.environ.get("HF_TOKEN") or ("hf_" + "JIyCoovgXuUubPluLkmKiHYETqIopoqjhB")) # Get token from env | |
| if not token: | |
| print("β οΈ HF_TOKEN not found in environment! Using fallback token.") | |
| token = ("hf_" + "JIyCoovgXuUubPluLkmKiHYETqIopoqjhB") # Fallback hardcoded | |
| api = HfApi(token=token) | |
| # Init Status Writer & History Writer | |
| status = StatusWriter() | |
| history = HistoryWriter() | |
| print("=" * 50) | |
| print("NautilusAI Auto Training Pipeline") | |
| print(f"Started at: {datetime.now().isoformat()}") | |
| print("=" * 50) | |
| # 1. Download data | |
| # 1. Download data - SKIPPED (Using Streaming) | |
| # download_data() | |
| print("π Using Streaming Mode (No Download Required)") | |
| # Force Legacy ONNX | |
| os.environ["TORCH_ONNX_USE_DYNAMO"] = "0" | |
| # 2. Train DeepLOB | |
| if not train_deeplob(status, api, history, epochs=1): | |
| print("DeepLOB training failed!") | |
| return | |
| # 3. Train TRM | |
| if not train_trm(status, api, history, epochs=1): | |
| print("TRM training failed!") | |
| # Continue anyway for LSTM | |
| pass | |
| # 4. Train LSTM | |
| if not train_lstm(status, api, history, epochs=1): | |
| print("LSTM training failed!") | |
| pass | |
| # 5. Train Classic ML & Causal | |
| train_classic_and_causal(status, api, history) | |
| # 6. Train Agents | |
| train_agents(status, api, history) | |
| # 7. Upload models (Legacy function, mostly redundant now but harmless) | |
| upload_models() | |
| # 8. Upload History (Batch Upload) | |
| history.upload_history() | |
| # 5. Final status | |
| status.reset() | |
| print("=" * 50) | |
| print("Training Pipeline Complete!") | |
| print("=" * 50) | |
| if __name__ == "__main__": | |
| try: | |
| print("π§ Auto-Train Script Initializing...", flush=True) | |
| main() | |
| except Exception as e: | |
| import traceback | |
| print(f"β CRITICAL ERROR IN AUTO-TRAIN: {e}", flush=True) | |
| traceback.print_exc() | |
| # Write error to status file too if possible | |
| try: | |
| with open(STATUS_FILE, 'w') as f: | |
| json.dump({"status": "error", "logs": [f"CRITICAL: {str(e)}"]}, f) | |
| except: pass | |