Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import joblib | |
| import shutil | |
| import threading | |
| import schedule | |
| import time | |
| import numpy as np | |
| import pandas as pd | |
| import gradio as gr | |
| import lightgbm as lgb | |
| import matplotlib | |
| matplotlib.use("Agg") | |
| import matplotlib.pyplot as plt | |
| from datetime import datetime, timezone | |
| from huggingface_hub import HfApi, hf_hub_download, list_repo_files | |
| from sklearn.metrics import roc_auc_score | |
| # ββ Config ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| DATASET_REPO = "nexacore/solana-dex-data" | |
| MODEL_REPO = "nexacore/solana-dex-model" | |
| MODEL_FILE = "nexa_lgbm_v1.joblib" | |
| MANIFEST_FILE = "trained_files_manifest.json" | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| DATA_DIR = "/tmp/nexa_data" | |
| MODEL_PATH = f"/tmp/{MODEL_FILE}" | |
| MANIFEST_PATH = f"/tmp/{MANIFEST_FILE}" | |
| api = HfApi(token=HF_TOKEN) | |
| # ββ Module-level wrapper β MUST be here for joblib pickling to work βββββββββββ | |
| class LGBWrapper: | |
| def __init__(self, booster, features): | |
| self.booster_ = booster | |
| self.feature_names_ = features | |
| def predict(self, X): | |
| return self.booster_.predict(X).argmax(axis=1) - 1 | |
| def predict_proba(self, X): | |
| return self.booster_.predict(X) | |
| # ββ Shared state ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| state = { | |
| "status": "idle", | |
| "last_run": "Never", | |
| "last_auc": None, | |
| "total_rows": 0, | |
| "new_rows": 0, | |
| "trained_files": 0, | |
| "last_model_saved": "Never", | |
| "model_version": 0, | |
| "log": [], | |
| "fig_importance": None, | |
| } | |
| def log(msg): | |
| ts = datetime.now(timezone.utc).strftime("%H:%M:%S") | |
| entry = f"[{ts}] {msg}" | |
| print(entry) | |
| state["log"].append(entry) | |
| if len(state["log"]) > 300: | |
| state["log"] = state["log"][-300:] | |
| # ββ Manifest ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def load_manifest(): | |
| try: | |
| local = hf_hub_download( | |
| repo_id = DATASET_REPO, | |
| filename = MANIFEST_FILE, | |
| repo_type = "dataset", | |
| token = HF_TOKEN, | |
| local_dir = "/tmp", | |
| ) | |
| with open(local) as f: | |
| return json.load(f) | |
| except Exception: | |
| return {"trained_files": [], "model_version": 0, "total_rows": 0} | |
| def save_manifest(manifest): | |
| with open(MANIFEST_PATH, "w") as f: | |
| json.dump(manifest, f, indent=2) | |
| api.upload_file( | |
| path_or_fileobj = MANIFEST_PATH, | |
| path_in_repo = MANIFEST_FILE, | |
| repo_id = DATASET_REPO, | |
| repo_type = "dataset", | |
| token = HF_TOKEN, | |
| ) | |
| # ββ Step 1: Download only NEW files ββββββββββββββββββββββββββββββββββββββββββ | |
| def download_new_files(manifest): | |
| log("Checking for new CSV files on HF...") | |
| os.makedirs(DATA_DIR, exist_ok=True) | |
| all_remote = [ | |
| f for f in list_repo_files(DATASET_REPO, repo_type="dataset", token=HF_TOKEN) | |
| if f.endswith(".csv") and not f.startswith("refs/") | |
| ] | |
| already_trained = set(manifest["trained_files"]) | |
| new_files = [f for f in all_remote if f not in already_trained] | |
| if not new_files: | |
| log("No new files since last training run") | |
| return [], already_trained, all_remote | |
| log(f"Found {len(new_files)} new files (skipping {len(already_trained)} already trained)") | |
| downloaded = [] | |
| for remote_path in new_files: | |
| filename = os.path.basename(remote_path) | |
| local_path = os.path.join(DATA_DIR, filename) | |
| try: | |
| hf_hub_download( | |
| repo_id = DATASET_REPO, | |
| filename = remote_path, | |
| repo_type = "dataset", | |
| token = HF_TOKEN, | |
| local_dir = "/tmp/nexa_raw", | |
| force_download = True, | |
| ) | |
| src = f"/tmp/nexa_raw/{remote_path}" | |
| if not os.path.exists(src): | |
| src = f"/tmp/nexa_raw/{filename}" | |
| shutil.copy(src, local_path) | |
| downloaded.append((remote_path, local_path)) | |
| log(f" downloaded: {filename}") | |
| except Exception as e: | |
| log(f" skipped {filename}: {e}") | |
| return downloaded, already_trained, all_remote | |
| # ββ Step 2: Load new CSVs βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def load_new_data(downloaded_files): | |
| log("Loading new CSV files...") | |
| dfs = [] | |
| for remote_path, local_path in downloaded_files: | |
| try: | |
| df = pd.read_csv(local_path) | |
| dfs.append(df) | |
| log(f" loaded {os.path.basename(local_path)}: {len(df):,} rows") | |
| except Exception as e: | |
| log(f" failed to load {local_path}: {e}") | |
| if not dfs: | |
| return None | |
| df = pd.concat(dfs, ignore_index=True) | |
| df = df.dropna(subset=["block_time_unix", "signature", "side", "amount_sol"]) | |
| df = df.drop_duplicates(subset="signature") | |
| df = df.sort_values("block_time_unix").reset_index(drop=True) | |
| df["block_time_unix"] = df["block_time_unix"].astype(int) | |
| df["amount_sol"] = pd.to_numeric(df["amount_sol"], errors="coerce").fillna(0) | |
| df["binance_price"] = pd.to_numeric(df["binance_price"], errors="coerce").ffill() | |
| df["jupiter_price"] = pd.to_numeric(df["jupiter_price"], errors="coerce").ffill() | |
| log(f"New data: {len(df):,} unique rows") | |
| state["new_rows"] = len(df) | |
| return df | |
| # ββ Step 3: Feature Engineering ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def engineer_features(df): | |
| log("Engineering features...") | |
| df["dt"] = pd.to_datetime(df["block_time_unix"], unit="s", utc=True) | |
| df = df.set_index("dt").sort_index() | |
| df["is_buy"] = (df["side"] == "BUY").astype(float) | |
| df["is_sell"] = (df["side"] == "SELL").astype(float) | |
| df["is_noise"] = (df["side"] == "NOISE").astype(float) | |
| df["buy_vol"] = df["amount_sol"] * df["is_buy"] | |
| df["sell_vol"] = df["amount_sol"] * df["is_sell"] | |
| df["noise_vol"] = df["amount_sol"] * df["is_noise"] | |
| price_1s = df["binance_price"].resample("1s").last().ffill() | |
| jup_1s = df["jupiter_price"].resample("1s").last().ffill() | |
| flows_1s = df[["buy_vol","sell_vol","noise_vol","is_buy","is_sell","is_noise"]]\ | |
| .resample("1s").sum() | |
| feat = flows_1s.join(price_1s.rename("price"), how="outer").ffill().fillna(0) | |
| feat = feat.join(jup_1s.rename("jup_price"), how="left").ffill().fillna(0) | |
| eps = 1e-9 | |
| # Rolling windows | |
| for w in ["15s", "30s", "1min", "5min", "15min"]: | |
| bv = feat["buy_vol"].rolling(w).sum() | |
| sv = feat["sell_vol"].rolling(w).sum() | |
| nv = feat["noise_vol"].rolling(w).sum() | |
| bc = feat["is_buy"].rolling(w).sum() | |
| sc = feat["is_sell"].rolling(w).sum() | |
| nc = feat["is_noise"].rolling(w).sum() | |
| tc = bc + sc + nc | |
| feat[f"buy_vol_{w}"] = bv | |
| feat[f"sell_vol_{w}"] = sv | |
| feat[f"noise_vol_{w}"] = nv | |
| feat[f"buy_count_{w}"] = bc | |
| feat[f"sell_count_{w}"] = sc | |
| feat[f"noise_count_{w}"] = nc | |
| feat[f"flow_imbalance_{w}"] = bv / (bv + sv + eps) | |
| feat[f"noise_ratio_{w}"] = nc / (tc + eps) | |
| feat[f"tx_freq_{w}"] = tc | |
| # Price features | |
| for secs, label in [(30,"30s"),(60,"1min"),(300,"5min"),(900,"15min")]: | |
| feat[f"price_change_{label}"] = feat["price"].pct_change(secs) | |
| feat["price_momentum"] = feat["price_change_30s"].diff(10) | |
| feat["price_vol_5m"] = feat["price"].rolling("5min").std() | |
| feat["price_vol_1m"] = feat["price"].rolling("1min").std() | |
| # CEX/DEX spread | |
| feat["dex_cex_spread"] = (feat["jup_price"] - feat["price"]) / (feat["price"] + eps) | |
| # Divergence features (core hypothesis) | |
| fi = feat["flow_imbalance_30s"] | |
| pc = feat["price_change_30s"] | |
| feat["divergence_buy"] = ((fi > 0.7) & (pc < 0)).astype(float) | |
| feat["divergence_sell"] = ((fi < 0.3) & (pc > 0)).astype(float) | |
| feat["confirm_buy"] = ((fi > 0.7) & (pc > 0)).astype(float) | |
| feat["confirm_sell"] = ((fi < 0.3) & (pc < 0)).astype(float) | |
| # Targets at multiple horizons | |
| for secs, label in [(30,"30s"),(60,"1min"),(300,"5min")]: | |
| future = feat["price"].shift(-secs) | |
| pct = (future - feat["price"]) / (feat["price"] + eps) | |
| feat[f"target_{label}"] = np.where(pct > 0.0005, 1, | |
| np.where(pct < -0.0005, -1, 0)) | |
| feat = feat.dropna() | |
| log(f"Features: {len(feat):,} rows Γ {len(feat.columns)} cols") | |
| return feat | |
| # ββ Step 4: Train / Incrementally Update LightGBM ββββββββββββββββββββββββββββ | |
| def train_model(feat, manifest): | |
| log("Training / updating LightGBM model...") | |
| target_col = "target_30s" | |
| drop_cols = [c for c in feat.columns if c.startswith("target_") or c in ("price","jup_price")] | |
| feature_cols = [c for c in feat.columns if c not in drop_cols] | |
| X = feat[feature_cols].values | |
| y = feat[target_col].values | |
| # Chronological 80/20 β never shuffle | |
| split = int(len(X) * 0.8) | |
| X_train = X[:split]; X_test = X[split:] | |
| y_train = y[:split]; y_test = y[split:] | |
| log(f"Train: {len(X_train):,} | Test: {len(X_test):,}") | |
| # Load existing booster for incremental update | |
| init_model = None | |
| if os.path.exists(MODEL_PATH): | |
| try: | |
| existing = joblib.load(MODEL_PATH) | |
| init_model = existing.booster_ | |
| log(f"Incrementally updating model v{manifest['model_version']}") | |
| except Exception: | |
| log("Starting fresh model") | |
| params = { | |
| "objective": "multiclass", | |
| "num_class": 3, | |
| "metric": "multi_logloss", | |
| "num_leaves": 63, | |
| "learning_rate": 0.05, | |
| "feature_fraction": 0.8, | |
| "bagging_fraction": 0.8, | |
| "bagging_freq": 5, | |
| "class_weight": "balanced", | |
| "verbose": -1, | |
| "n_jobs": -1, | |
| } | |
| # Map -1,0,1 β 0,1,2 for multiclass | |
| y_tr = y_train + 1 | |
| y_te = y_test + 1 | |
| model = lgb.train( | |
| params, | |
| lgb.Dataset(X_train, label=y_tr), | |
| num_boost_round = 200 if init_model else 500, | |
| valid_sets = [lgb.Dataset(X_test, label=y_te)], | |
| callbacks = [lgb.early_stopping(30, verbose=False), | |
| lgb.log_evaluation(period=-1)], | |
| init_model = init_model, | |
| ) | |
| # Evaluate | |
| proba = model.predict(X_test) | |
| pred = proba.argmax(axis=1) - 1 | |
| buy_mask = y_test != 0 | |
| try: | |
| auc = roc_auc_score( | |
| (y_test[buy_mask] == 1).astype(int), | |
| proba[buy_mask, 2] | |
| ) | |
| log(f"AUC (BUY class): {auc:.4f}") | |
| state["last_auc"] = round(auc, 4) | |
| except Exception as e: | |
| log(f"AUC skipped: {e}") | |
| buy_sigs = pred == 1 | |
| if buy_sigs.sum() > 0: | |
| wr = (y_test[buy_sigs] == 1).mean() | |
| log(f"BUY win rate: {wr:.1%} on {buy_sigs.sum()} signals") | |
| # Feature importance chart | |
| imp = pd.Series( | |
| model.feature_importance("gain"), index=feature_cols | |
| ).sort_values(ascending=False).head(15) | |
| fig, ax = plt.subplots(figsize=(8, 5)) | |
| imp.plot(kind="barh", ax=ax, color="#2E5D8E") | |
| ax.set_title(f"Top 15 Features β Model v{manifest['model_version']+1}") | |
| ax.invert_yaxis() | |
| plt.tight_layout() | |
| state["fig_importance"] = fig | |
| plt.close(fig) | |
| # Save using module-level LGBWrapper | |
| wrapped = LGBWrapper(model, feature_cols) | |
| joblib.dump(wrapped, MODEL_PATH) | |
| log("Model saved locally") | |
| return wrapped | |
| # ββ Step 5: Upload model to model repo + update manifest in dataset repo βββββββ | |
| def upload_and_update(manifest, newly_trained_files): | |
| log(f"Uploading model to {MODEL_REPO}...") | |
| api.upload_file( | |
| path_or_fileobj = MODEL_PATH, | |
| path_in_repo = MODEL_FILE, | |
| repo_id = MODEL_REPO, | |
| repo_type = "model", | |
| token = HF_TOKEN, | |
| ) | |
| log(f"Model uploaded to {MODEL_REPO}/{MODEL_FILE}") | |
| manifest["trained_files"].extend(newly_trained_files) | |
| manifest["model_version"] += 1 | |
| manifest["total_rows"] += state["new_rows"] | |
| save_manifest(manifest) | |
| state["last_model_saved"] = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") | |
| state["model_version"] = manifest["model_version"] | |
| state["total_rows"] = manifest["total_rows"] | |
| state["trained_files"] = len(manifest["trained_files"]) | |
| log(f"Done β model v{manifest['model_version']} | total rows: {manifest['total_rows']:,}") | |
| # ββ Full pipeline βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_pipeline(): | |
| if state["status"] == "running": | |
| log("Already running β skipped") | |
| return | |
| state["status"] = "running" | |
| log("=" * 50) | |
| log("Pipeline started") | |
| try: | |
| manifest = load_manifest() | |
| downloaded, _, _ = download_new_files(manifest) | |
| if not downloaded: | |
| log("Nothing new to train on β pipeline skipped") | |
| state["status"] = "idle" | |
| return | |
| df = load_new_data(downloaded) | |
| if df is None or len(df) < 1000: | |
| log(f"Not enough new data ({len(df) if df is not None else 0} rows) β skipping") | |
| state["status"] = "idle" | |
| return | |
| feat = engineer_features(df) | |
| train_model(feat, manifest) | |
| newly_trained = [r for r, _ in downloaded] | |
| upload_and_update(manifest, newly_trained) | |
| state["last_run"] = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") | |
| log("Pipeline complete β") | |
| except Exception as e: | |
| log(f"Pipeline ERROR: {e}") | |
| import traceback | |
| log(traceback.format_exc()) | |
| finally: | |
| state["status"] = "idle" | |
| # ββ Scheduler: every 24 hours βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def start_scheduler(): | |
| schedule.every(24).hours.do(run_pipeline) | |
| while True: | |
| schedule.run_pending() | |
| time.sleep(60) | |
| threading.Thread(target=start_scheduler, daemon=True).start() | |
| # ββ Gradio UI βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_status(): | |
| auc = f"{state['last_auc']:.4f}" if state["last_auc"] else "N/A" | |
| return ( | |
| f"**Status:** {state['status']}\n\n" | |
| f"**Last Run:** {state['last_run']}\n\n" | |
| f"**Model Version:** v{state['model_version']}\n\n" | |
| f"**Files Trained On:** {state['trained_files']}\n\n" | |
| f"**Total Rows:** {state['total_rows']:,}\n\n" | |
| f"**New Rows (last run):** {state['new_rows']:,}\n\n" | |
| f"**Last AUC:** {auc}\n\n" | |
| f"**Model Saved:** {state['last_model_saved']}\n\n" | |
| f"**Model Repo:** `{MODEL_REPO}`" | |
| ) | |
| def get_logs(): | |
| return "\n".join(state["log"][-60:]) | |
| def trigger_pipeline(): | |
| threading.Thread(target=run_pipeline, daemon=True).start() | |
| return "Pipeline started β check logs tab" | |
| with gr.Blocks(title="NEXA ML Dashboard") as demo: | |
| gr.Markdown("# NEXA β Solana DEX Pattern Recognition") | |
| gr.Markdown( | |
| f"Data: `{DATASET_REPO}` Β· Model: `{MODEL_REPO}` Β· " | |
| f"Auto-trains every 24h on **new files only** Β· Incremental LightGBM" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| status_md = gr.Markdown(get_status) | |
| run_btn = gr.Button("βΆ Run Now", variant="primary") | |
| run_out = gr.Textbox(label="", lines=1, interactive=False) | |
| run_btn.click(trigger_pipeline, outputs=run_out) | |
| with gr.Column(scale=2): | |
| with gr.Tabs(): | |
| with gr.Tab("Logs"): | |
| gr.Textbox( | |
| value = get_logs, | |
| lines = 25, | |
| max_lines = 25, | |
| label = "Live Logs", | |
| every = 5, | |
| ) | |
| with gr.Tab("Feature Importance"): | |
| imp_plot = gr.Plot(label="Top 15 Features by Gain") | |
| gr.Button("Refresh").click( | |
| lambda: state["fig_importance"], outputs=imp_plot | |
| ) | |
| gr.Timer(10).tick(get_status, outputs=status_md) | |
| # ββ Startup βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def startup(): | |
| time.sleep(8) | |
| manifest = load_manifest() | |
| state["model_version"] = manifest["model_version"] | |
| state["total_rows"] = manifest["total_rows"] | |
| state["trained_files"] = len(manifest["trained_files"]) | |
| if manifest["model_version"] == 0: | |
| log("First run β starting initial pipeline") | |
| run_pipeline() | |
| else: | |
| log(f"Model v{manifest['model_version']} exists β waiting for scheduler") | |
| threading.Thread(target=startup, daemon=True).start() | |
| demo.launch() |