import os import json import joblib import shutil import threading import schedule import time import numpy as np import pandas as pd import gradio as gr import lightgbm as lgb import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt from datetime import datetime, timezone from huggingface_hub import HfApi, hf_hub_download, list_repo_files from sklearn.metrics import roc_auc_score # ── Config ──────────────────────────────────────────────────────────────────── DATASET_REPO = "nexacore/solana-dex-data" MODEL_REPO = "nexacore/solana-dex-model" MODEL_FILE = "nexa_lgbm_v1.joblib" MANIFEST_FILE = "trained_files_manifest.json" HF_TOKEN = os.environ.get("HF_TOKEN") DATA_DIR = "/tmp/nexa_data" MODEL_PATH = f"/tmp/{MODEL_FILE}" MANIFEST_PATH = f"/tmp/{MANIFEST_FILE}" api = HfApi(token=HF_TOKEN) # ── Module-level wrapper — MUST be here for joblib pickling to work ─────────── class LGBWrapper: def __init__(self, booster, features): self.booster_ = booster self.feature_names_ = features def predict(self, X): return self.booster_.predict(X).argmax(axis=1) - 1 def predict_proba(self, X): return self.booster_.predict(X) # ── Shared state ────────────────────────────────────────────────────────────── state = { "status": "idle", "last_run": "Never", "last_auc": None, "total_rows": 0, "new_rows": 0, "trained_files": 0, "last_model_saved": "Never", "model_version": 0, "log": [], "fig_importance": None, } def log(msg): ts = datetime.now(timezone.utc).strftime("%H:%M:%S") entry = f"[{ts}] {msg}" print(entry) state["log"].append(entry) if len(state["log"]) > 300: state["log"] = state["log"][-300:] # ── Manifest ────────────────────────────────────────────────────────────────── def load_manifest(): try: local = hf_hub_download( repo_id = DATASET_REPO, filename = MANIFEST_FILE, repo_type = "dataset", token = HF_TOKEN, local_dir = "/tmp", ) with open(local) as f: return json.load(f) except Exception: return {"trained_files": [], "model_version": 0, "total_rows": 0} def save_manifest(manifest): with open(MANIFEST_PATH, "w") as f: json.dump(manifest, f, indent=2) api.upload_file( path_or_fileobj = MANIFEST_PATH, path_in_repo = MANIFEST_FILE, repo_id = DATASET_REPO, repo_type = "dataset", token = HF_TOKEN, ) # ── Step 1: Download only NEW files ────────────────────────────────────────── def download_new_files(manifest): log("Checking for new CSV files on HF...") os.makedirs(DATA_DIR, exist_ok=True) all_remote = [ f for f in list_repo_files(DATASET_REPO, repo_type="dataset", token=HF_TOKEN) if f.endswith(".csv") and not f.startswith("refs/") ] already_trained = set(manifest["trained_files"]) new_files = [f for f in all_remote if f not in already_trained] if not new_files: log("No new files since last training run") return [], already_trained, all_remote log(f"Found {len(new_files)} new files (skipping {len(already_trained)} already trained)") downloaded = [] for remote_path in new_files: filename = os.path.basename(remote_path) local_path = os.path.join(DATA_DIR, filename) try: hf_hub_download( repo_id = DATASET_REPO, filename = remote_path, repo_type = "dataset", token = HF_TOKEN, local_dir = "/tmp/nexa_raw", force_download = True, ) src = f"/tmp/nexa_raw/{remote_path}" if not os.path.exists(src): src = f"/tmp/nexa_raw/{filename}" shutil.copy(src, local_path) downloaded.append((remote_path, local_path)) log(f" downloaded: {filename}") except Exception as e: log(f" skipped {filename}: {e}") return downloaded, already_trained, all_remote # ── Step 2: Load new CSVs ───────────────────────────────────────────────────── def load_new_data(downloaded_files): log("Loading new CSV files...") dfs = [] for remote_path, local_path in downloaded_files: try: df = pd.read_csv(local_path) dfs.append(df) log(f" loaded {os.path.basename(local_path)}: {len(df):,} rows") except Exception as e: log(f" failed to load {local_path}: {e}") if not dfs: return None df = pd.concat(dfs, ignore_index=True) df = df.dropna(subset=["block_time_unix", "signature", "side", "amount_sol"]) df = df.drop_duplicates(subset="signature") df = df.sort_values("block_time_unix").reset_index(drop=True) df["block_time_unix"] = df["block_time_unix"].astype(int) df["amount_sol"] = pd.to_numeric(df["amount_sol"], errors="coerce").fillna(0) df["binance_price"] = pd.to_numeric(df["binance_price"], errors="coerce").ffill() df["jupiter_price"] = pd.to_numeric(df["jupiter_price"], errors="coerce").ffill() log(f"New data: {len(df):,} unique rows") state["new_rows"] = len(df) return df # ── Step 3: Feature Engineering ────────────────────────────────────────────── def engineer_features(df): log("Engineering features...") df["dt"] = pd.to_datetime(df["block_time_unix"], unit="s", utc=True) df = df.set_index("dt").sort_index() df["is_buy"] = (df["side"] == "BUY").astype(float) df["is_sell"] = (df["side"] == "SELL").astype(float) df["is_noise"] = (df["side"] == "NOISE").astype(float) df["buy_vol"] = df["amount_sol"] * df["is_buy"] df["sell_vol"] = df["amount_sol"] * df["is_sell"] df["noise_vol"] = df["amount_sol"] * df["is_noise"] price_1s = df["binance_price"].resample("1s").last().ffill() jup_1s = df["jupiter_price"].resample("1s").last().ffill() flows_1s = df[["buy_vol","sell_vol","noise_vol","is_buy","is_sell","is_noise"]]\ .resample("1s").sum() feat = flows_1s.join(price_1s.rename("price"), how="outer").ffill().fillna(0) feat = feat.join(jup_1s.rename("jup_price"), how="left").ffill().fillna(0) eps = 1e-9 # Rolling windows for w in ["15s", "30s", "1min", "5min", "15min"]: bv = feat["buy_vol"].rolling(w).sum() sv = feat["sell_vol"].rolling(w).sum() nv = feat["noise_vol"].rolling(w).sum() bc = feat["is_buy"].rolling(w).sum() sc = feat["is_sell"].rolling(w).sum() nc = feat["is_noise"].rolling(w).sum() tc = bc + sc + nc feat[f"buy_vol_{w}"] = bv feat[f"sell_vol_{w}"] = sv feat[f"noise_vol_{w}"] = nv feat[f"buy_count_{w}"] = bc feat[f"sell_count_{w}"] = sc feat[f"noise_count_{w}"] = nc feat[f"flow_imbalance_{w}"] = bv / (bv + sv + eps) feat[f"noise_ratio_{w}"] = nc / (tc + eps) feat[f"tx_freq_{w}"] = tc # Price features for secs, label in [(30,"30s"),(60,"1min"),(300,"5min"),(900,"15min")]: feat[f"price_change_{label}"] = feat["price"].pct_change(secs) feat["price_momentum"] = feat["price_change_30s"].diff(10) feat["price_vol_5m"] = feat["price"].rolling("5min").std() feat["price_vol_1m"] = feat["price"].rolling("1min").std() # CEX/DEX spread feat["dex_cex_spread"] = (feat["jup_price"] - feat["price"]) / (feat["price"] + eps) # Divergence features (core hypothesis) fi = feat["flow_imbalance_30s"] pc = feat["price_change_30s"] feat["divergence_buy"] = ((fi > 0.7) & (pc < 0)).astype(float) feat["divergence_sell"] = ((fi < 0.3) & (pc > 0)).astype(float) feat["confirm_buy"] = ((fi > 0.7) & (pc > 0)).astype(float) feat["confirm_sell"] = ((fi < 0.3) & (pc < 0)).astype(float) # Targets at multiple horizons for secs, label in [(30,"30s"),(60,"1min"),(300,"5min")]: future = feat["price"].shift(-secs) pct = (future - feat["price"]) / (feat["price"] + eps) feat[f"target_{label}"] = np.where(pct > 0.0005, 1, np.where(pct < -0.0005, -1, 0)) feat = feat.dropna() log(f"Features: {len(feat):,} rows × {len(feat.columns)} cols") return feat # ── Step 4: Train / Incrementally Update LightGBM ──────────────────────────── def train_model(feat, manifest): log("Training / updating LightGBM model...") target_col = "target_30s" drop_cols = [c for c in feat.columns if c.startswith("target_") or c in ("price","jup_price")] feature_cols = [c for c in feat.columns if c not in drop_cols] X = feat[feature_cols].values y = feat[target_col].values # Chronological 80/20 — never shuffle split = int(len(X) * 0.8) X_train = X[:split]; X_test = X[split:] y_train = y[:split]; y_test = y[split:] log(f"Train: {len(X_train):,} | Test: {len(X_test):,}") # Load existing booster for incremental update init_model = None if os.path.exists(MODEL_PATH): try: existing = joblib.load(MODEL_PATH) init_model = existing.booster_ log(f"Incrementally updating model v{manifest['model_version']}") except Exception: log("Starting fresh model") params = { "objective": "multiclass", "num_class": 3, "metric": "multi_logloss", "num_leaves": 63, "learning_rate": 0.05, "feature_fraction": 0.8, "bagging_fraction": 0.8, "bagging_freq": 5, "class_weight": "balanced", "verbose": -1, "n_jobs": -1, } # Map -1,0,1 → 0,1,2 for multiclass y_tr = y_train + 1 y_te = y_test + 1 model = lgb.train( params, lgb.Dataset(X_train, label=y_tr), num_boost_round = 200 if init_model else 500, valid_sets = [lgb.Dataset(X_test, label=y_te)], callbacks = [lgb.early_stopping(30, verbose=False), lgb.log_evaluation(period=-1)], init_model = init_model, ) # Evaluate proba = model.predict(X_test) pred = proba.argmax(axis=1) - 1 buy_mask = y_test != 0 try: auc = roc_auc_score( (y_test[buy_mask] == 1).astype(int), proba[buy_mask, 2] ) log(f"AUC (BUY class): {auc:.4f}") state["last_auc"] = round(auc, 4) except Exception as e: log(f"AUC skipped: {e}") buy_sigs = pred == 1 if buy_sigs.sum() > 0: wr = (y_test[buy_sigs] == 1).mean() log(f"BUY win rate: {wr:.1%} on {buy_sigs.sum()} signals") # Feature importance chart imp = pd.Series( model.feature_importance("gain"), index=feature_cols ).sort_values(ascending=False).head(15) fig, ax = plt.subplots(figsize=(8, 5)) imp.plot(kind="barh", ax=ax, color="#2E5D8E") ax.set_title(f"Top 15 Features — Model v{manifest['model_version']+1}") ax.invert_yaxis() plt.tight_layout() state["fig_importance"] = fig plt.close(fig) # Save using module-level LGBWrapper wrapped = LGBWrapper(model, feature_cols) joblib.dump(wrapped, MODEL_PATH) log("Model saved locally") return wrapped # ── Step 5: Upload model to model repo + update manifest in dataset repo ─────── def upload_and_update(manifest, newly_trained_files): log(f"Uploading model to {MODEL_REPO}...") api.upload_file( path_or_fileobj = MODEL_PATH, path_in_repo = MODEL_FILE, repo_id = MODEL_REPO, repo_type = "model", token = HF_TOKEN, ) log(f"Model uploaded to {MODEL_REPO}/{MODEL_FILE}") manifest["trained_files"].extend(newly_trained_files) manifest["model_version"] += 1 manifest["total_rows"] += state["new_rows"] save_manifest(manifest) state["last_model_saved"] = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") state["model_version"] = manifest["model_version"] state["total_rows"] = manifest["total_rows"] state["trained_files"] = len(manifest["trained_files"]) log(f"Done — model v{manifest['model_version']} | total rows: {manifest['total_rows']:,}") # ── Full pipeline ───────────────────────────────────────────────────────────── def run_pipeline(): if state["status"] == "running": log("Already running — skipped") return state["status"] = "running" log("=" * 50) log("Pipeline started") try: manifest = load_manifest() downloaded, _, _ = download_new_files(manifest) if not downloaded: log("Nothing new to train on — pipeline skipped") state["status"] = "idle" return df = load_new_data(downloaded) if df is None or len(df) < 1000: log(f"Not enough new data ({len(df) if df is not None else 0} rows) — skipping") state["status"] = "idle" return feat = engineer_features(df) train_model(feat, manifest) newly_trained = [r for r, _ in downloaded] upload_and_update(manifest, newly_trained) state["last_run"] = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") log("Pipeline complete ✓") except Exception as e: log(f"Pipeline ERROR: {e}") import traceback log(traceback.format_exc()) finally: state["status"] = "idle" # ── Scheduler: every 24 hours ───────────────────────────────────────────────── def start_scheduler(): schedule.every(24).hours.do(run_pipeline) while True: schedule.run_pending() time.sleep(60) threading.Thread(target=start_scheduler, daemon=True).start() # ── Gradio UI ───────────────────────────────────────────────────────────────── def get_status(): auc = f"{state['last_auc']:.4f}" if state["last_auc"] else "N/A" return ( f"**Status:** {state['status']}\n\n" f"**Last Run:** {state['last_run']}\n\n" f"**Model Version:** v{state['model_version']}\n\n" f"**Files Trained On:** {state['trained_files']}\n\n" f"**Total Rows:** {state['total_rows']:,}\n\n" f"**New Rows (last run):** {state['new_rows']:,}\n\n" f"**Last AUC:** {auc}\n\n" f"**Model Saved:** {state['last_model_saved']}\n\n" f"**Model Repo:** `{MODEL_REPO}`" ) def get_logs(): return "\n".join(state["log"][-60:]) def trigger_pipeline(): threading.Thread(target=run_pipeline, daemon=True).start() return "Pipeline started — check logs tab" with gr.Blocks(title="NEXA ML Dashboard") as demo: gr.Markdown("# NEXA — Solana DEX Pattern Recognition") gr.Markdown( f"Data: `{DATASET_REPO}` · Model: `{MODEL_REPO}` · " f"Auto-trains every 24h on **new files only** · Incremental LightGBM" ) with gr.Row(): with gr.Column(scale=1): status_md = gr.Markdown(get_status) run_btn = gr.Button("▶ Run Now", variant="primary") run_out = gr.Textbox(label="", lines=1, interactive=False) run_btn.click(trigger_pipeline, outputs=run_out) with gr.Column(scale=2): with gr.Tabs(): with gr.Tab("Logs"): gr.Textbox( value = get_logs, lines = 25, max_lines = 25, label = "Live Logs", every = 5, ) with gr.Tab("Feature Importance"): imp_plot = gr.Plot(label="Top 15 Features by Gain") gr.Button("Refresh").click( lambda: state["fig_importance"], outputs=imp_plot ) gr.Timer(10).tick(get_status, outputs=status_md) # ── Startup ─────────────────────────────────────────────────────────────────── def startup(): time.sleep(8) manifest = load_manifest() state["model_version"] = manifest["model_version"] state["total_rows"] = manifest["total_rows"] state["trained_files"] = len(manifest["trained_files"]) if manifest["model_version"] == 0: log("First run — starting initial pipeline") run_pipeline() else: log(f"Model v{manifest['model_version']} exists — waiting for scheduler") threading.Thread(target=startup, daemon=True).start() demo.launch()