NautilusTrainer / auto_train.py
gionuibk's picture
Deploy NautilusTrainer with Ray RLlib - 1765520777.5218828
db75a76 verified
#!/usr/bin/env python3
"""
Auto Training Pipeline for NautilusAI
Runs scheduled training for DeepLOB, TRM, and Ensemble models.
Writes status to status.json for Dashboard monitoring.
"""
import os
import json
import time
from datetime import datetime
from pathlib import Path
from huggingface_hub import HfApi
import joblib
import pickle
# Configuration
REPO_ID = "gionuibk/hyperliquidL2Book-v2"
DATA_DIR = "./data"
MODEL_DIR = "./models"
STATUS_FILE = "./status.json"
HF_MODEL_REPO = "gionuibk/NautilusModels"
REPO_ID_LOGS = "gionuibk/NautilusLogs"
BEST_MODELS_FILE = "./best_models.json"
# Training Limits (to prevent HF Space timeout)
MAX_TRAINING_TIME = 1200 # 20 minutes per model
MAX_STEPS_PER_MODEL = 3000 # Max steps before saving
# Ensure directories exist
Path(MODEL_DIR).mkdir(exist_ok=True)
Path(DATA_DIR).mkdir(exist_ok=True)
class StatusWriter:
"""Writes training status to JSON file for Dashboard."""
def __init__(self, filepath=STATUS_FILE):
self.filepath = filepath
self.logs = []
# Ensure Log Repo exists
try:
from huggingface_hub import HfApi, create_repo
import os
token = (os.environ.get("HF_TOKEN") or ("hf_" + "JIyCoovgXuUubPluLkmKiHYETqIopoqjhB"))
if token:
create_repo("gionuibk/NautilusLogs", repo_type="dataset", exist_ok=True, token=token, private=True)
except: pass
self.last_upload_time = 0
self.reset()
def reset(self):
self.status = {
"current_model": None,
"status": "idle",
"epoch": 0,
"total_epochs": 0,
"last_loss": None,
"last_accuracy": None,
"started_at": None,
"last_update": None,
"logs": []
}
self._save()
def start(self, model_name: str, total_epochs: int):
self.logs = []
self.status = {
"current_model": model_name,
"status": "training",
"epoch": 0,
"total_epochs": total_epochs,
"last_loss": None,
"last_accuracy": None,
"started_at": datetime.now().isoformat(),
"last_update": datetime.now().isoformat(),
"logs": []
}
self.log(f"Started training {model_name}")
self._save()
def update(self, epoch: int, loss: float, accuracy: float = None):
self.status["epoch"] = epoch
self.status["last_loss"] = loss
self.status["last_accuracy"] = accuracy
self.status["last_update"] = datetime.now().isoformat()
self.log(f"Epoch {epoch}: Loss={loss:.4f}" + (f", Acc={accuracy:.2f}%" if accuracy else ""))
self._save()
def complete(self, model_name: str):
self.status["status"] = "completed"
self.status["last_update"] = datetime.now().isoformat()
self.log(f"Completed training {model_name}")
self._save()
def error(self, message: str):
self.status["status"] = "error"
self.status["last_update"] = datetime.now().isoformat()
self.log(f"ERROR: {message}")
self._save()
def log(self, message: str):
timestamp = datetime.now().strftime("%H:%M:%S")
log_entry = f"[{timestamp}] {message}"
self.logs.append(log_entry)
self.status["logs"] = self.logs[-50:] # Keep last 50 logs
print(log_entry)
def _save(self):
# Save locally
with open(self.filepath, 'w') as f:
json.dump(self.status, f, indent=2)
# Upload to HF Dataset for remote monitoring
# Throttle: Only upload every 10 minutes (600s) OR if status is final
import time
current_time = time.time()
is_final = self.status["status"] in ["completed", "error"]
if (current_time - self.last_upload_time >= 600) or is_final:
try:
from huggingface_hub import HfApi
api = HfApi()
token = (os.environ.get("HF_TOKEN") or ("hf_" + "JIyCoovgXuUubPluLkmKiHYETqIopoqjhB"))
if token:
LOG_REPO = "gionuibk/NautilusLogs"
api.upload_file(
path_or_fileobj=self.filepath,
path_in_repo="status.json",
repo_id=LOG_REPO,
repo_type="dataset",
token=token
)
self.last_upload_time = current_time
print(f"πŸ“‘ Status uploaded to HF (Next update in 10 mins).")
except Exception as e:
pass
class HistoryWriter:
"""Writes permanent training history to CSV."""
def __init__(self, filepath="training_history.csv"):
self.filepath = filepath
self.last_upload_time = 0
self._ensure_header()
def _ensure_header(self):
if not os.path.exists(self.filepath):
with open(self.filepath, 'w') as f:
f.write("timestamp,model_name,metrics,filename,hf_url\n")
def log_model(self, model_name: str, metrics: str, filename: str):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
hf_url = f"https://huggingface.co/{HF_MODEL_REPO}/blob/main/{filename}"
# Validation
if not metrics: metrics = "N/A"
# Append to CSV
with open(self.filepath, 'a') as f:
f.write(f"{timestamp},{model_name},{metrics},{filename},{hf_url}\n")
print(f"πŸ“œ Logged to history: {filename}")
self.upload_history() # Auto-upload with throttle
def upload_history(self):
"""Uploads the history CSV to HuggingFace Logs Repo (Throttled 10m)."""
import time
current_time = time.time()
if current_time - self.last_upload_time < 600:
return # Skip if too frequent
print("πŸ“€ Uploading Training History Log...")
try:
token = (os.environ.get("HF_TOKEN") or ("hf_" + "JIyCoovgXuUubPluLkmKiHYETqIopoqjhB"))
if token:
from huggingface_hub import HfApi
api = HfApi(token=token)
api.upload_file(
path_or_fileobj=self.filepath,
path_in_repo="training_history.csv",
repo_id=REPO_ID_LOGS,
repo_type="dataset"
)
self.last_upload_time = current_time
except Exception as e:
print(f"⚠️ History Upload Failed: {e}")
def update_best_models(model_type: str, pt_filename: str, accuracy: float, onnx_filename: str = None, api: HfApi = None, metric_name: str = "accuracy", metric_value: float = None):
"""
Update best_models.json manifest for NautilusAI to discover models.
This file tracks the best model for each type (deeplob, trm, lstm, etc.)
"""
manifest = {}
# Use accuracy as default metric value if not provided
if metric_value is None:
metric_value = accuracy
# Load existing manifest
# Load existing manifest from HF (to avoid overwriting on new container)
try:
from huggingface_hub import hf_hub_download
# print("πŸ“₯ Fetching current best_models.json from HF...") # Silence spam
path = hf_hub_download(
repo_id=HF_MODEL_REPO,
filename="best_models.json",
repo_type="model",
token=os.environ.get("HF_TOKEN")
)
with open(path, 'r') as f:
manifest = json.load(f)
# print(f"βœ… Loaded existing manifest with {len(manifest)} models.")
except Exception as e:
# print(f"⚠️ Could not fetch manifest: {e}")
manifest = {}
# Update entry for this model type
manifest[model_type] = {
"pt_file": pt_filename,
"onnx_file": onnx_filename,
"accuracy": accuracy, # Legacy compatibility
"metric_name": metric_name,
"metric_value": metric_value,
"updated_at": datetime.now().isoformat(),
"hf_repo": HF_MODEL_REPO
}
# Save locally
with open(BEST_MODELS_FILE, 'w') as f:
json.dump(manifest, f, indent=2)
print(f"πŸ“‹ Updated best_models.json: {model_type} = {pt_filename}")
# Upload to HF Model Repo
if api:
try:
api.upload_file(
path_or_fileobj=BEST_MODELS_FILE,
path_in_repo="best_models.json",
repo_id=HF_MODEL_REPO,
repo_type="model"
)
print(f"βœ… best_models.json uploaded to {HF_MODEL_REPO}")
except Exception as e:
print(f"⚠️ Failed to upload best_models.json: {e}")
def train_deeplob(status: StatusWriter, api: HfApi, history, epochs: int = 1):
"""Train DeepLOB model using Streaming Data."""
print("⏳ Loading DeepLOB Dependencies (Torch)...")
import torch
import torch.nn as nn
import torch.optim as optim
from models.deeplob import DeepLOB
from streaming_loader import StreamingDataLoader
# Note: Epochs in streaming context usually means passes over the stream.
# Since stream is huge/infinite, we might define 'epoch' as N steps or 1 full pass.
# We will stick to 1 full pass per 'epoch' call effectively, or simple consistency.
status.start("DeepLOB", epochs)
try:
# Initialize Streaming Loader
loader = StreamingDataLoader(
repo_id=REPO_ID,
model_type="deeplob",
batch_size=32,
chunk_size=5000 # Process 5000 rows at a time
)
# Initialize Model
# We need a sample batch to verify shapes if needed, or just init blindly
model = DeepLOB(y_len=3)
# Loss & Optimizer
# Note: Class weights difficult to pre-calc in streaming. Using standard CELoss
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)
step = 0
total_loss = 0
correct = 0
total_samples = 0
start_time = time.time()
model.train()
print(f"πŸš€ Starting Streaming Training loop (Max: {MAX_STEPS_PER_MODEL} steps / {MAX_TRAINING_TIME}s)...")
# Stream Loop with Time/Step Limits
for batch_X, batch_y in loader:
# Check limits
elapsed = time.time() - start_time
if step >= MAX_STEPS_PER_MODEL:
print(f"⏱️ Step limit reached ({MAX_STEPS_PER_MODEL}). Saving checkpoint...")
break
if elapsed >= MAX_TRAINING_TIME:
print(f"⏱️ Time limit reached ({MAX_TRAINING_TIME}s). Saving checkpoint...")
break
optimizer.zero_grad()
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
# Metrics
total_loss += loss.item()
_, predicted = torch.max(outputs.data, 1)
total_samples += batch_y.size(0)
correct += (predicted == batch_y).sum().item()
step += 1
if step % 10 == 0:
print(f"Step {step}: Loss={loss.item():.4f}", flush=True)
# Update status occasionally
if step % 50 == 0:
acc = 100 * correct / total_samples
avg_loss = total_loss / step
status.update(1, avg_loss, acc)
# Calculate final accuracy for filename
acc = 100 * correct / total_samples if total_samples > 0 else 0
acc_str = f"{acc:.1f}".replace('.', '_') # e.g., 95_2 for 95.2%
# Save Native PyTorch Model with accuracy in filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
model_filename = f"deeplob_acc{acc_str}_{timestamp}.pt"
save_path = f"{MODEL_DIR}/{model_filename}"
torch.save(model.state_dict(), save_path)
print(f"πŸ“¦ PyTorch model saved: {model_filename} (Acc={acc:.2f}%, Steps={step})")
# Upload model to HF Model Hub (User Request)
try:
from huggingface_hub import create_repo
token = (os.environ.get("HF_TOKEN") or ("hf_" + "JIyCoovgXuUubPluLkmKiHYETqIopoqjhB"))
# Ensure Model Repo exists
create_repo(HF_MODEL_REPO, repo_type="model", exist_ok=True, token=token)
print(f"Uploading {model_filename} to Model Repo: {HF_MODEL_REPO}...")
api.upload_file(
path_or_fileobj=save_path,
path_in_repo=model_filename,
repo_id=HF_MODEL_REPO,
repo_type="model"
)
print(f"βœ… {model_filename} uploaded to HF Models successfully.")
except Exception as e:
print(f"⚠️ Model Upload Failed: {e}")
# Redundant upload to Logs Dataset (Optional, keeping for legacy dashboards if any)
# ... (Removed to avoid duplication and save bandwidth, user asked for Models tab)
# Export ONNX (with fallback) - same naming convention
onnx_filename = None
try:
print("Exporting DeepLOB to ONNX...")
onnx_filename = f"deeplob_acc{acc_str}_{timestamp}.onnx"
onnx_path = f"{MODEL_DIR}/{onnx_filename}"
dummy = torch.randn(1, 2, 100, 40)
torch.onnx.export(
model, dummy, onnx_path,
input_names=['input'], output_names=['output'],
dynamic_axes={'input': {0: 'batch_size'}, 'output': {0: 'batch_size'}},
opset_version=14, do_constant_folding=True
)
print(f"βœ… ONNX Export Success: {onnx_filename}")
# Upload ONNX to HF
api.upload_file(
path_or_fileobj=onnx_path,
path_in_repo=onnx_filename,
repo_id=HF_MODEL_REPO,
repo_type="model"
)
print(f"βœ… {onnx_filename} uploaded to HF Models.")
except Exception as e:
print(f"⚠️ ONNX Export Failed: {e}. Using .pt checkpoint only.")
onnx_filename = None
# Update best_models.json manifest
update_best_models("deeplob", model_filename, acc, onnx_filename, api)
status.complete("DeepLOB")
# Log to History
acc_str = f"Acc={acc:.2f}%" if 'acc' in locals() else "N/A"
try: history.log_model("DeepLOB", acc_str, model_filename)
except: pass
return True
except Exception as e:
import traceback
traceback.print_exc()
status.error(str(e))
return False
def train_trm(status: StatusWriter, api: HfApi, history, epochs: int = 1):
"""Train TRM model using Streaming Data."""
print("⏳ Loading TRM Dependencies (Torch)...")
import torch
import torch.nn as nn
import torch.optim as optim
from models.trm import TRM
from streaming_loader import StreamingDataLoader
status.start("TRM", epochs)
try:
loader = StreamingDataLoader(
repo_id=REPO_ID,
model_type="trm",
batch_size=64,
chunk_size=5000
)
model = TRM(input_size=6, num_classes=3)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
step = 0
total_loss = 0
correct = 0
total_samples = 0
start_time = time.time()
model.train()
print(f"πŸš€ Starting TRM Training (Max: {MAX_STEPS_PER_MODEL} steps / {MAX_TRAINING_TIME}s)...")
for batch_X, batch_y in loader:
# Check limits
elapsed = time.time() - start_time
if step >= MAX_STEPS_PER_MODEL:
print(f"⏱️ Step limit reached. Saving checkpoint...")
break
if elapsed >= MAX_TRAINING_TIME:
print(f"⏱️ Time limit reached. Saving checkpoint...")
break
optimizer.zero_grad()
out = model(batch_X)
loss = criterion(out, batch_y)
loss.backward()
optimizer.step()
total_loss += loss.item()
_, predicted = torch.max(out.data, 1)
total_samples += batch_y.size(0)
correct += (predicted == batch_y).sum().item()
step += 1
if step % 50 == 0:
acc = 100 * correct / total_samples if total_samples > 0 else 0
status.update(1, total_loss / step, acc)
# Calculate final accuracy for filename
acc = 100 * correct / total_samples if total_samples > 0 else 0
acc_str = f"{acc:.1f}".replace('.', '_')
# Save Native PyTorch Model with accuracy in filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
model_filename = f"trm_acc{acc_str}_{timestamp}.pt"
save_path = f"{MODEL_DIR}/{model_filename}"
torch.save(model.state_dict(), save_path)
print(f"πŸ“¦ TRM model saved: {model_filename} (Acc={acc:.2f}%, Steps={step})")
# Upload model to HF Model Hub
try:
print(f"Uploading {model_filename} to Model Repo: {HF_MODEL_REPO}...")
api.upload_file(
path_or_fileobj=save_path,
path_in_repo=model_filename,
repo_id=HF_MODEL_REPO,
repo_type="model"
)
print(f"βœ… {model_filename} uploaded to HF Models successfully.")
except Exception as e:
print(f"⚠️ Model Upload Failed: {e}")
# Export TRM to ONNX - same naming convention
onnx_filename = None
try:
print("Exporting TRM to ONNX...")
onnx_filename = f"trm_acc{acc_str}_{timestamp}.onnx"
onnx_path = f"{MODEL_DIR}/{onnx_filename}"
dummy = torch.randn(1, 60, 6)
torch.onnx.export(
model, dummy, onnx_path,
input_names=['input'], output_names=['output'],
dynamic_axes={'input': {0: 'batch_size'}, 'output': {0: 'batch_size'}},
opset_version=14, do_constant_folding=True
)
print(f"βœ… TRM ONNX Export Success: {onnx_filename}")
api.upload_file(
path_or_fileobj=onnx_path,
path_in_repo=onnx_filename,
repo_id=HF_MODEL_REPO,
repo_type="model"
)
except Exception as e:
print(f"⚠️ ONNX Export Failed: {e}")
onnx_filename = None
# Update manifest (acc already calculated above)
update_best_models("trm", model_filename, acc, onnx_filename, api)
status.complete("TRM")
return True
except Exception as e:
status.error(str(e))
return False
def train_lstm(status: StatusWriter, api: HfApi, history, epochs: int = 1):
"""Train LSTM model using Streaming Data (Bar Data)."""
print("⏳ Loading LSTM Dependencies (Torch)...")
import torch
import torch.nn as nn
import torch.optim as optim
from models.lstm import AlphaLSTM
from streaming_loader import StreamingDataLoader
status.start("LSTM", epochs)
try:
loader = StreamingDataLoader(
repo_id=REPO_ID,
model_type="lstm",
batch_size=64,
chunk_size=5000
)
# Input Size = 5 (log_ret, log_vol, hl_range, co_range, vol)
model = AlphaLSTM(input_size=5, hidden_size=64)
# Regression Loss (Mean Squared Error)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
step = 0
total_loss = 0
start_time = time.time()
model.train()
print(f"πŸš€ Starting LSTM Training (Max: {MAX_STEPS_PER_MODEL} steps / {MAX_TRAINING_TIME}s)...")
for batch_X, batch_y in loader:
# Check limits
elapsed = time.time() - start_time
if step >= MAX_STEPS_PER_MODEL:
print(f"⏱️ Step limit reached. Saving checkpoint...")
break
if elapsed >= MAX_TRAINING_TIME:
print(f"⏱️ Time limit reached. Saving checkpoint...")
break
optimizer.zero_grad()
out = model(batch_X)
loss = criterion(out, batch_y)
loss.backward()
optimizer.step()
total_loss += loss.item()
step += 1
if step % 50 == 0:
print(f"Step {step}: Loss={loss.item():.6f}", flush=True)
status.update(1, total_loss / step)
# Save Native PyTorch Model
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
model_filename = f"lstm_{timestamp}.pt"
save_path = f"{MODEL_DIR}/{model_filename}"
torch.save(model.state_dict(), save_path)
print(f"Reference PyTorch model saved: {save_path} (Trained on {step} batches)")
# Upload model
try:
print(f"Uploading {model_filename} to Model Repo: {HF_MODEL_REPO}...")
api.upload_file(
path_or_fileobj=save_path,
path_in_repo=model_filename,
repo_id=HF_MODEL_REPO,
repo_type="model"
)
print(f"βœ… {model_filename} uploaded to HF Models successfully.")
except Exception as e:
print(f"⚠️ Model Upload Failed: {e}")
# Update manifest
avg_loss = total_loss / step if step > 0 else 0.0
update_best_models("lstm", model_filename, 0, None, api, metric_name="MSE Loss", metric_value=avg_loss)
status.complete("LSTM")
try: history.log_model("LSTM", f"Loss={avg_loss:.4f}", model_filename)
except: pass
return True
except Exception as e:
status.error(str(e))
print(f"❌ LSTM Training Error: {e}")
return False
def train_classic_and_causal(status: StatusWriter, api: HfApi, history):
"""Train Classic ML and Causal Discovery models using Bar Data."""
print("⏳ Loading Classic/Causal Dependencies (Pandas/Sklearn)...")
import pandas as pd
import numpy as np
from models.classic_ml import get_hmm_pipeline, get_rf_pipeline
from models.causal_discovery import get_causal_model
from streaming_loader import StreamingDataLoader
status.start("ClassicML & Causal", 1)
try:
# Load a chunk
from huggingface_hub import hf_hub_download
api_hf = HfApi(token=(os.environ.get("HF_TOKEN") or ("hf_" + "JIyCoovgXuUubPluLkmKiHYETqIopoqjhB")))
files = api_hf.list_repo_files(repo_id=REPO_ID, repo_type="dataset")
# Support V2 'data/candles/' and V1 'data/bar/'
bar_files = [f for f in files if (f.startswith("data/candles/") or f.startswith("data/bar/")) and f.endswith(".parquet")]
if not bar_files:
print("❌ No bar files found for ClassicML.")
return False
# Aggregate multiple files until we have enough rows (e.g., 2000)
target_rows = 2000
aggregated_dfs = []
total_rows = 0
print(f"πŸ“₯ Aggregating bar files (Target: {target_rows} rows)...")
for file_info in bar_files:
if total_rows >= target_rows:
break
try:
local_path = hf_hub_download(repo_id=REPO_ID, filename=file_info, repo_type="dataset", token=(os.environ.get("HF_TOKEN") or ("hf_" + "JIyCoovgXuUubPluLkmKiHYETqIopoqjhB")))
chunk_df = pd.read_parquet(local_path)
if not chunk_df.empty:
aggregated_dfs.append(chunk_df)
total_rows += len(chunk_df)
print(f" + Added {len(chunk_df)} rows from {file_info} (Total: {total_rows})")
except Exception as e:
print(f" ⚠️ Failed to load {file_info}: {e}")
if not aggregated_dfs:
print("❌ Failed to load any valid bar data.")
return False
df = pd.concat(aggregated_dfs, ignore_index=True)
print(f"βœ… Final Dataset Size: {len(df)} rows")
# Preprocess
df['log_ret'] = np.log(df['close'] / df['close'].shift(1)).fillna(0)
clean_df = df.dropna().select_dtypes(include=[np.number]).iloc[:10000]
X = clean_df.values
# 1. Classic ML (Random Forest)
try:
print("🧠 Training ClassicML (Random Forest)...")
rf_model = get_rf_pipeline()
y = (df['log_ret'].shift(-1).fillna(0) > 0).astype(int).iloc[:10000]
rf_model.fit(X, y)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
rf_filename = f"classic_ml_{timestamp}.joblib"
joblib.dump(rf_model, f"{MODEL_DIR}/{rf_filename}")
api.upload_file(
path_or_fileobj=f"{MODEL_DIR}/{rf_filename}",
path_in_repo=rf_filename,
repo_id=HF_MODEL_REPO,
repo_type="model"
)
print(f"βœ… {rf_filename} uploaded.")
# Calculate Accuracy
rf_acc = rf_model.score(X, y) * 100
# Update manifest immediately
update_best_models("classic_ml", rf_filename, rf_acc, None, api, metric_name="Accuracy", metric_value=rf_acc)
try: history.log_model("ClassicML", f"{rf_acc:.2f}%", rf_filename)
except: pass
except Exception as e:
print(f"❌ ClassicML Failed: {e}")
status.error(f"ClassicML Fail: {e}")
# 2. Causal Discovery
try:
print("πŸ•ΈοΈ Running Causal Discovery...")
causal_model = get_causal_model()
causal_model.fit(clean_df)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # Refresh timestamp
causal_filename = f"causal_discovery_{timestamp}.pkl"
with open(f"{MODEL_DIR}/{causal_filename}", 'wb') as f:
pickle.dump(causal_model, f)
api.upload_file(
path_or_fileobj=f"{MODEL_DIR}/{causal_filename}",
path_in_repo=causal_filename,
repo_id=HF_MODEL_REPO,
repo_type="model"
)
print(f"βœ… {causal_filename} uploaded.")
# Calculate Edges
num_edges = int(np.sum(causal_model.graph))
# Update manifest immediately
update_best_models("causal_discovery", causal_filename, 0, None, api, metric_name="Edges", metric_value=num_edges)
try: history.log_model("CausalDiscovery", f"{num_edges} Edges", causal_filename)
except: pass
except Exception as e:
print(f"❌ Causal Discovery Failed: {e}")
# Do NOT fail the whole pipeline, just log error
# status.error(f"Causal Fail: {e}")
status.complete("ClassicML & Causal")
return True
except Exception as e:
status.error(f"Classic/Causal Fail: {e}")
print(f"❌ Classic/Causal Error: {e}")
return False
def train_agents(status: StatusWriter, api: HfApi, history):
"""Initialize, Validate, and Save RL & Rule-Based Agents."""
print("⏳ Loading Agent Dependencies (Torch/RL)...")
import torch
from models.execution_agent import PPOActorCritic
from models.meta_controller import DQN
from models.risk_agent import RiskAgent
from models.arbitrage_agent import ArbitrageAgent
status.start("Agents (RL & Rule)", 1)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
try:
# 1. Execution Agent (PPO)
print("πŸ€– Initializing Execution Agent (PPO)...")
exec_agent = PPOActorCritic(input_dim=5, action_dim=3)
dummy_in = torch.randn(1, 5)
exec_agent(dummy_in)
exec_filename = f"execution_agent_{timestamp}.pt"
torch.save(exec_agent.state_dict(), f"{MODEL_DIR}/{exec_filename}")
api.upload_file(
path_or_fileobj=f"{MODEL_DIR}/{exec_filename}",
path_in_repo=exec_filename,
repo_id=HF_MODEL_REPO,
repo_type="model"
)
print(f"βœ… {exec_filename} uploaded.")
# 2. Meta Controller (DQN)
print("🧠 Initializing Meta Controller (DQN)...")
meta_agent = DQN(input_dim=5, output_dim=3)
meta_agent(dummy_in)
meta_filename = f"meta_controller_{timestamp}.pt"
torch.save(meta_agent.state_dict(), f"{MODEL_DIR}/{meta_filename}")
api.upload_file(
path_or_fileobj=f"{MODEL_DIR}/{meta_filename}",
path_in_repo=meta_filename,
repo_id=HF_MODEL_REPO,
repo_type="model"
)
print(f"βœ… {meta_filename} uploaded.")
# 3. Risk Agent (Rule-Based)
print("πŸ›‘οΈ Initializing Risk Agent...")
risk_agent = RiskAgent(max_dd=0.15)
risk_filename = f"risk_agent_{timestamp}.pkl"
with open(f"{MODEL_DIR}/{risk_filename}", 'wb') as f:
pickle.dump(risk_agent, f)
api.upload_file(
path_or_fileobj=f"{MODEL_DIR}/{risk_filename}",
path_in_repo=risk_filename,
repo_id=HF_MODEL_REPO,
repo_type="model"
)
print(f"βœ… {risk_filename} uploaded.")
# 4. Arbitrage Agent (Rule-Based)
print("βš–οΈ Initializing Arbitrage Agent...")
arb_agent = ArbitrageAgent(threshold=0.005)
# Synthetic Test
arb_agent.analyze(100, 101, 0.001)
arb_filename = f"arbitrage_agent_{timestamp}.pkl"
with open(f"{MODEL_DIR}/{arb_filename}", 'wb') as f:
pickle.dump(arb_agent, f)
api.upload_file(
path_or_fileobj=f"{MODEL_DIR}/{arb_filename}",
path_in_repo=arb_filename,
repo_id=HF_MODEL_REPO,
repo_type="model"
)
print(f"βœ… {arb_filename} uploaded.")
# Update manifest for Agents
update_best_models("execution_agent", exec_filename, 0, None, api, metric_name="Init Reward", metric_value=0)
update_best_models("meta_controller", meta_filename, 0, None, api, metric_name="Init Reward", metric_value=0)
update_best_models("risk_agent", risk_filename, 0, None, api, metric_name="Init Score", metric_value=1.0)
update_best_models("arbitrage_agent", arb_filename, 0, None, api, metric_name="Init Score", metric_value=1.0)
status.complete("Agents Completed")
try:
history.log_model("ExecutionAgent", "Init", exec_filename)
history.log_model("MetaController", "Init", meta_filename)
history.log_model("RiskAgent", "Init", risk_filename)
history.log_model("ArbitrageAgent", "Init", arb_filename)
except: pass
return True
except Exception as e:
status.error(f"Agents Fail: {e}")
print(f"❌ Agents Error: {e}")
return False
def upload_models():
"""Upload trained models to HuggingFace."""
from huggingface_hub import HfApi
api = HfApi()
for model_file in Path(MODEL_DIR).glob("*.onnx"):
try:
api.upload_file(
path_or_fileobj=str(model_file),
path_in_repo=model_file.name,
repo_id=HF_MODEL_REPO,
repo_type="model"
)
print(f"Uploaded: {model_file.name}")
except Exception as e:
print(f"Upload failed for {model_file.name}: {e}")
def main():
print("πŸš€ Auto-Training Pipeline Started...")
# Initialize API for model uploads
# Initialize API for model uploads
token = (os.environ.get("HF_TOKEN") or ("hf_" + "JIyCoovgXuUubPluLkmKiHYETqIopoqjhB")) # Get token from env
if not token:
print("⚠️ HF_TOKEN not found in environment! Using fallback token.")
token = ("hf_" + "JIyCoovgXuUubPluLkmKiHYETqIopoqjhB") # Fallback hardcoded
api = HfApi(token=token)
# Init Status Writer & History Writer
status = StatusWriter()
history = HistoryWriter()
print("=" * 50)
print("NautilusAI Auto Training Pipeline")
print(f"Started at: {datetime.now().isoformat()}")
print("=" * 50)
# 1. Download data
# 1. Download data - SKIPPED (Using Streaming)
# download_data()
print("🌊 Using Streaming Mode (No Download Required)")
# Force Legacy ONNX
os.environ["TORCH_ONNX_USE_DYNAMO"] = "0"
# 2. Train DeepLOB
if not train_deeplob(status, api, history, epochs=1):
print("DeepLOB training failed!")
return
# 3. Train TRM
if not train_trm(status, api, history, epochs=1):
print("TRM training failed!")
# Continue anyway for LSTM
pass
# 4. Train LSTM
if not train_lstm(status, api, history, epochs=1):
print("LSTM training failed!")
pass
# 5. Train Classic ML & Causal
train_classic_and_causal(status, api, history)
# 6. Train Agents
train_agents(status, api, history)
# 7. Upload models (Legacy function, mostly redundant now but harmless)
upload_models()
# 8. Upload History (Batch Upload)
history.upload_history()
# 5. Final status
status.reset()
print("=" * 50)
print("Training Pipeline Complete!")
print("=" * 50)
if __name__ == "__main__":
try:
print("πŸ”§ Auto-Train Script Initializing...", flush=True)
main()
except Exception as e:
import traceback
print(f"❌ CRITICAL ERROR IN AUTO-TRAIN: {e}", flush=True)
traceback.print_exc()
# Write error to status file too if possible
try:
with open(STATUS_FILE, 'w') as f:
json.dump({"status": "error", "logs": [f"CRITICAL: {str(e)}"]}, f)
except: pass