nexacore's picture
Update app.py
2531c21 verified
import os
import json
import joblib
import shutil
import threading
import schedule
import time
import numpy as np
import pandas as pd
import gradio as gr
import lightgbm as lgb
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from datetime import datetime, timezone
from huggingface_hub import HfApi, hf_hub_download, list_repo_files
from sklearn.metrics import roc_auc_score
# ── Config ────────────────────────────────────────────────────────────────────
DATASET_REPO = "nexacore/solana-dex-data"
MODEL_REPO = "nexacore/solana-dex-model"
MODEL_FILE = "nexa_lgbm_v1.joblib"
MANIFEST_FILE = "trained_files_manifest.json"
HF_TOKEN = os.environ.get("HF_TOKEN")
DATA_DIR = "/tmp/nexa_data"
MODEL_PATH = f"/tmp/{MODEL_FILE}"
MANIFEST_PATH = f"/tmp/{MANIFEST_FILE}"
api = HfApi(token=HF_TOKEN)
# ── Module-level wrapper β€” MUST be here for joblib pickling to work ───────────
class LGBWrapper:
def __init__(self, booster, features):
self.booster_ = booster
self.feature_names_ = features
def predict(self, X):
return self.booster_.predict(X).argmax(axis=1) - 1
def predict_proba(self, X):
return self.booster_.predict(X)
# ── Shared state ──────────────────────────────────────────────────────────────
state = {
"status": "idle",
"last_run": "Never",
"last_auc": None,
"total_rows": 0,
"new_rows": 0,
"trained_files": 0,
"last_model_saved": "Never",
"model_version": 0,
"log": [],
"fig_importance": None,
}
def log(msg):
ts = datetime.now(timezone.utc).strftime("%H:%M:%S")
entry = f"[{ts}] {msg}"
print(entry)
state["log"].append(entry)
if len(state["log"]) > 300:
state["log"] = state["log"][-300:]
# ── Manifest ──────────────────────────────────────────────────────────────────
def load_manifest():
try:
local = hf_hub_download(
repo_id = DATASET_REPO,
filename = MANIFEST_FILE,
repo_type = "dataset",
token = HF_TOKEN,
local_dir = "/tmp",
)
with open(local) as f:
return json.load(f)
except Exception:
return {"trained_files": [], "model_version": 0, "total_rows": 0}
def save_manifest(manifest):
with open(MANIFEST_PATH, "w") as f:
json.dump(manifest, f, indent=2)
api.upload_file(
path_or_fileobj = MANIFEST_PATH,
path_in_repo = MANIFEST_FILE,
repo_id = DATASET_REPO,
repo_type = "dataset",
token = HF_TOKEN,
)
# ── Step 1: Download only NEW files ──────────────────────────────────────────
def download_new_files(manifest):
log("Checking for new CSV files on HF...")
os.makedirs(DATA_DIR, exist_ok=True)
all_remote = [
f for f in list_repo_files(DATASET_REPO, repo_type="dataset", token=HF_TOKEN)
if f.endswith(".csv") and not f.startswith("refs/")
]
already_trained = set(manifest["trained_files"])
new_files = [f for f in all_remote if f not in already_trained]
if not new_files:
log("No new files since last training run")
return [], already_trained, all_remote
log(f"Found {len(new_files)} new files (skipping {len(already_trained)} already trained)")
downloaded = []
for remote_path in new_files:
filename = os.path.basename(remote_path)
local_path = os.path.join(DATA_DIR, filename)
try:
hf_hub_download(
repo_id = DATASET_REPO,
filename = remote_path,
repo_type = "dataset",
token = HF_TOKEN,
local_dir = "/tmp/nexa_raw",
force_download = True,
)
src = f"/tmp/nexa_raw/{remote_path}"
if not os.path.exists(src):
src = f"/tmp/nexa_raw/{filename}"
shutil.copy(src, local_path)
downloaded.append((remote_path, local_path))
log(f" downloaded: {filename}")
except Exception as e:
log(f" skipped {filename}: {e}")
return downloaded, already_trained, all_remote
# ── Step 2: Load new CSVs ─────────────────────────────────────────────────────
def load_new_data(downloaded_files):
log("Loading new CSV files...")
dfs = []
for remote_path, local_path in downloaded_files:
try:
df = pd.read_csv(local_path)
dfs.append(df)
log(f" loaded {os.path.basename(local_path)}: {len(df):,} rows")
except Exception as e:
log(f" failed to load {local_path}: {e}")
if not dfs:
return None
df = pd.concat(dfs, ignore_index=True)
df = df.dropna(subset=["block_time_unix", "signature", "side", "amount_sol"])
df = df.drop_duplicates(subset="signature")
df = df.sort_values("block_time_unix").reset_index(drop=True)
df["block_time_unix"] = df["block_time_unix"].astype(int)
df["amount_sol"] = pd.to_numeric(df["amount_sol"], errors="coerce").fillna(0)
df["binance_price"] = pd.to_numeric(df["binance_price"], errors="coerce").ffill()
df["jupiter_price"] = pd.to_numeric(df["jupiter_price"], errors="coerce").ffill()
log(f"New data: {len(df):,} unique rows")
state["new_rows"] = len(df)
return df
# ── Step 3: Feature Engineering ──────────────────────────────────────────────
def engineer_features(df):
log("Engineering features...")
df["dt"] = pd.to_datetime(df["block_time_unix"], unit="s", utc=True)
df = df.set_index("dt").sort_index()
df["is_buy"] = (df["side"] == "BUY").astype(float)
df["is_sell"] = (df["side"] == "SELL").astype(float)
df["is_noise"] = (df["side"] == "NOISE").astype(float)
df["buy_vol"] = df["amount_sol"] * df["is_buy"]
df["sell_vol"] = df["amount_sol"] * df["is_sell"]
df["noise_vol"] = df["amount_sol"] * df["is_noise"]
price_1s = df["binance_price"].resample("1s").last().ffill()
jup_1s = df["jupiter_price"].resample("1s").last().ffill()
flows_1s = df[["buy_vol","sell_vol","noise_vol","is_buy","is_sell","is_noise"]]\
.resample("1s").sum()
feat = flows_1s.join(price_1s.rename("price"), how="outer").ffill().fillna(0)
feat = feat.join(jup_1s.rename("jup_price"), how="left").ffill().fillna(0)
eps = 1e-9
# Rolling windows
for w in ["15s", "30s", "1min", "5min", "15min"]:
bv = feat["buy_vol"].rolling(w).sum()
sv = feat["sell_vol"].rolling(w).sum()
nv = feat["noise_vol"].rolling(w).sum()
bc = feat["is_buy"].rolling(w).sum()
sc = feat["is_sell"].rolling(w).sum()
nc = feat["is_noise"].rolling(w).sum()
tc = bc + sc + nc
feat[f"buy_vol_{w}"] = bv
feat[f"sell_vol_{w}"] = sv
feat[f"noise_vol_{w}"] = nv
feat[f"buy_count_{w}"] = bc
feat[f"sell_count_{w}"] = sc
feat[f"noise_count_{w}"] = nc
feat[f"flow_imbalance_{w}"] = bv / (bv + sv + eps)
feat[f"noise_ratio_{w}"] = nc / (tc + eps)
feat[f"tx_freq_{w}"] = tc
# Price features
for secs, label in [(30,"30s"),(60,"1min"),(300,"5min"),(900,"15min")]:
feat[f"price_change_{label}"] = feat["price"].pct_change(secs)
feat["price_momentum"] = feat["price_change_30s"].diff(10)
feat["price_vol_5m"] = feat["price"].rolling("5min").std()
feat["price_vol_1m"] = feat["price"].rolling("1min").std()
# CEX/DEX spread
feat["dex_cex_spread"] = (feat["jup_price"] - feat["price"]) / (feat["price"] + eps)
# Divergence features (core hypothesis)
fi = feat["flow_imbalance_30s"]
pc = feat["price_change_30s"]
feat["divergence_buy"] = ((fi > 0.7) & (pc < 0)).astype(float)
feat["divergence_sell"] = ((fi < 0.3) & (pc > 0)).astype(float)
feat["confirm_buy"] = ((fi > 0.7) & (pc > 0)).astype(float)
feat["confirm_sell"] = ((fi < 0.3) & (pc < 0)).astype(float)
# Targets at multiple horizons
for secs, label in [(30,"30s"),(60,"1min"),(300,"5min")]:
future = feat["price"].shift(-secs)
pct = (future - feat["price"]) / (feat["price"] + eps)
feat[f"target_{label}"] = np.where(pct > 0.0005, 1,
np.where(pct < -0.0005, -1, 0))
feat = feat.dropna()
log(f"Features: {len(feat):,} rows Γ— {len(feat.columns)} cols")
return feat
# ── Step 4: Train / Incrementally Update LightGBM ────────────────────────────
def train_model(feat, manifest):
log("Training / updating LightGBM model...")
target_col = "target_30s"
drop_cols = [c for c in feat.columns if c.startswith("target_") or c in ("price","jup_price")]
feature_cols = [c for c in feat.columns if c not in drop_cols]
X = feat[feature_cols].values
y = feat[target_col].values
# Chronological 80/20 β€” never shuffle
split = int(len(X) * 0.8)
X_train = X[:split]; X_test = X[split:]
y_train = y[:split]; y_test = y[split:]
log(f"Train: {len(X_train):,} | Test: {len(X_test):,}")
# Load existing booster for incremental update
init_model = None
if os.path.exists(MODEL_PATH):
try:
existing = joblib.load(MODEL_PATH)
init_model = existing.booster_
log(f"Incrementally updating model v{manifest['model_version']}")
except Exception:
log("Starting fresh model")
params = {
"objective": "multiclass",
"num_class": 3,
"metric": "multi_logloss",
"num_leaves": 63,
"learning_rate": 0.05,
"feature_fraction": 0.8,
"bagging_fraction": 0.8,
"bagging_freq": 5,
"class_weight": "balanced",
"verbose": -1,
"n_jobs": -1,
}
# Map -1,0,1 β†’ 0,1,2 for multiclass
y_tr = y_train + 1
y_te = y_test + 1
model = lgb.train(
params,
lgb.Dataset(X_train, label=y_tr),
num_boost_round = 200 if init_model else 500,
valid_sets = [lgb.Dataset(X_test, label=y_te)],
callbacks = [lgb.early_stopping(30, verbose=False),
lgb.log_evaluation(period=-1)],
init_model = init_model,
)
# Evaluate
proba = model.predict(X_test)
pred = proba.argmax(axis=1) - 1
buy_mask = y_test != 0
try:
auc = roc_auc_score(
(y_test[buy_mask] == 1).astype(int),
proba[buy_mask, 2]
)
log(f"AUC (BUY class): {auc:.4f}")
state["last_auc"] = round(auc, 4)
except Exception as e:
log(f"AUC skipped: {e}")
buy_sigs = pred == 1
if buy_sigs.sum() > 0:
wr = (y_test[buy_sigs] == 1).mean()
log(f"BUY win rate: {wr:.1%} on {buy_sigs.sum()} signals")
# Feature importance chart
imp = pd.Series(
model.feature_importance("gain"), index=feature_cols
).sort_values(ascending=False).head(15)
fig, ax = plt.subplots(figsize=(8, 5))
imp.plot(kind="barh", ax=ax, color="#2E5D8E")
ax.set_title(f"Top 15 Features β€” Model v{manifest['model_version']+1}")
ax.invert_yaxis()
plt.tight_layout()
state["fig_importance"] = fig
plt.close(fig)
# Save using module-level LGBWrapper
wrapped = LGBWrapper(model, feature_cols)
joblib.dump(wrapped, MODEL_PATH)
log("Model saved locally")
return wrapped
# ── Step 5: Upload model to model repo + update manifest in dataset repo ───────
def upload_and_update(manifest, newly_trained_files):
log(f"Uploading model to {MODEL_REPO}...")
api.upload_file(
path_or_fileobj = MODEL_PATH,
path_in_repo = MODEL_FILE,
repo_id = MODEL_REPO,
repo_type = "model",
token = HF_TOKEN,
)
log(f"Model uploaded to {MODEL_REPO}/{MODEL_FILE}")
manifest["trained_files"].extend(newly_trained_files)
manifest["model_version"] += 1
manifest["total_rows"] += state["new_rows"]
save_manifest(manifest)
state["last_model_saved"] = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
state["model_version"] = manifest["model_version"]
state["total_rows"] = manifest["total_rows"]
state["trained_files"] = len(manifest["trained_files"])
log(f"Done β€” model v{manifest['model_version']} | total rows: {manifest['total_rows']:,}")
# ── Full pipeline ─────────────────────────────────────────────────────────────
def run_pipeline():
if state["status"] == "running":
log("Already running β€” skipped")
return
state["status"] = "running"
log("=" * 50)
log("Pipeline started")
try:
manifest = load_manifest()
downloaded, _, _ = download_new_files(manifest)
if not downloaded:
log("Nothing new to train on β€” pipeline skipped")
state["status"] = "idle"
return
df = load_new_data(downloaded)
if df is None or len(df) < 1000:
log(f"Not enough new data ({len(df) if df is not None else 0} rows) β€” skipping")
state["status"] = "idle"
return
feat = engineer_features(df)
train_model(feat, manifest)
newly_trained = [r for r, _ in downloaded]
upload_and_update(manifest, newly_trained)
state["last_run"] = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
log("Pipeline complete βœ“")
except Exception as e:
log(f"Pipeline ERROR: {e}")
import traceback
log(traceback.format_exc())
finally:
state["status"] = "idle"
# ── Scheduler: every 24 hours ─────────────────────────────────────────────────
def start_scheduler():
schedule.every(24).hours.do(run_pipeline)
while True:
schedule.run_pending()
time.sleep(60)
threading.Thread(target=start_scheduler, daemon=True).start()
# ── Gradio UI ─────────────────────────────────────────────────────────────────
def get_status():
auc = f"{state['last_auc']:.4f}" if state["last_auc"] else "N/A"
return (
f"**Status:** {state['status']}\n\n"
f"**Last Run:** {state['last_run']}\n\n"
f"**Model Version:** v{state['model_version']}\n\n"
f"**Files Trained On:** {state['trained_files']}\n\n"
f"**Total Rows:** {state['total_rows']:,}\n\n"
f"**New Rows (last run):** {state['new_rows']:,}\n\n"
f"**Last AUC:** {auc}\n\n"
f"**Model Saved:** {state['last_model_saved']}\n\n"
f"**Model Repo:** `{MODEL_REPO}`"
)
def get_logs():
return "\n".join(state["log"][-60:])
def trigger_pipeline():
threading.Thread(target=run_pipeline, daemon=True).start()
return "Pipeline started β€” check logs tab"
with gr.Blocks(title="NEXA ML Dashboard") as demo:
gr.Markdown("# NEXA β€” Solana DEX Pattern Recognition")
gr.Markdown(
f"Data: `{DATASET_REPO}` Β· Model: `{MODEL_REPO}` Β· "
f"Auto-trains every 24h on **new files only** Β· Incremental LightGBM"
)
with gr.Row():
with gr.Column(scale=1):
status_md = gr.Markdown(get_status)
run_btn = gr.Button("β–Ά Run Now", variant="primary")
run_out = gr.Textbox(label="", lines=1, interactive=False)
run_btn.click(trigger_pipeline, outputs=run_out)
with gr.Column(scale=2):
with gr.Tabs():
with gr.Tab("Logs"):
gr.Textbox(
value = get_logs,
lines = 25,
max_lines = 25,
label = "Live Logs",
every = 5,
)
with gr.Tab("Feature Importance"):
imp_plot = gr.Plot(label="Top 15 Features by Gain")
gr.Button("Refresh").click(
lambda: state["fig_importance"], outputs=imp_plot
)
gr.Timer(10).tick(get_status, outputs=status_md)
# ── Startup ───────────────────────────────────────────────────────────────────
def startup():
time.sleep(8)
manifest = load_manifest()
state["model_version"] = manifest["model_version"]
state["total_rows"] = manifest["total_rows"]
state["trained_files"] = len(manifest["trained_files"])
if manifest["model_version"] == 0:
log("First run β€” starting initial pipeline")
run_pipeline()
else:
log(f"Model v{manifest['model_version']} exists β€” waiting for scheduler")
threading.Thread(target=startup, daemon=True).start()
demo.launch()