nexacore commited on
Commit
e42e883
Β·
verified Β·
1 Parent(s): f6881a6

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +467 -0
app.py ADDED
@@ -0,0 +1,467 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import io
3
+ import json
4
+ import joblib
5
+ import shutil
6
+ import threading
7
+ import schedule
8
+ import time
9
+ import numpy as np
10
+ import pandas as pd
11
+ import gradio as gr
12
+ import lightgbm as lgb
13
+ import matplotlib
14
+ matplotlib.use("Agg")
15
+ import matplotlib.pyplot as plt
16
+
17
+ from datetime import datetime, timezone
18
+ from huggingface_hub import HfApi, hf_hub_download, list_repo_files
19
+
20
+ # ── Config ────────────────────────────────────────────────────────────────────
21
+ DATASET_REPO = "nexacore/solana-dex-data"
22
+ MODEL_FILE = "nexa_lgbm_v1.joblib"
23
+ MANIFEST_FILE = "trained_files_manifest.json" # tracks which files already trained
24
+ HF_TOKEN = os.environ.get("HF_TOKEN")
25
+ DATA_DIR = "/tmp/nexa_data"
26
+ MODEL_PATH = f"/tmp/{MODEL_FILE}"
27
+ MANIFEST_PATH = f"/tmp/{MANIFEST_FILE}"
28
+
29
+ api = HfApi(token=HF_TOKEN)
30
+
31
+ # ── Shared state ──────────────────────────────────────────────────────────────
32
+ state = {
33
+ "status": "idle",
34
+ "last_run": "Never",
35
+ "last_auc": None,
36
+ "total_rows": 0,
37
+ "new_rows": 0,
38
+ "trained_files": 0,
39
+ "last_model_saved": "Never",
40
+ "model_version": 0,
41
+ "log": [],
42
+ "fig_importance": None,
43
+ }
44
+
45
+ def log(msg):
46
+ ts = datetime.now(timezone.utc).strftime("%H:%M:%S")
47
+ entry = f"[{ts}] {msg}"
48
+ print(entry)
49
+ state["log"].append(entry)
50
+ if len(state["log"]) > 300:
51
+ state["log"] = state["log"][-300:]
52
+
53
+ # ── Manifest: tracks which CSV files have already been trained on ──────────────
54
+ def load_manifest():
55
+ """Download manifest from HF, or return empty if first run."""
56
+ try:
57
+ local = hf_hub_download(
58
+ repo_id = DATASET_REPO,
59
+ filename = MANIFEST_FILE,
60
+ repo_type = "dataset",
61
+ token = HF_TOKEN,
62
+ local_dir = "/tmp",
63
+ )
64
+ with open(local) as f:
65
+ return json.load(f)
66
+ except Exception:
67
+ return {"trained_files": [], "model_version": 0, "total_rows": 0}
68
+
69
+ def save_manifest(manifest):
70
+ """Upload manifest back to HF."""
71
+ with open(MANIFEST_PATH, "w") as f:
72
+ json.dump(manifest, f, indent=2)
73
+ api.upload_file(
74
+ path_or_fileobj = MANIFEST_PATH,
75
+ path_in_repo = MANIFEST_FILE,
76
+ repo_id = DATASET_REPO,
77
+ repo_type = "dataset",
78
+ token = HF_TOKEN,
79
+ )
80
+
81
+ # ── Step 1: Download only NEW files ──────────────────────────────────────────
82
+ def download_new_files(manifest):
83
+ log("Checking for new CSV files on HF...")
84
+ os.makedirs(DATA_DIR, exist_ok=True)
85
+
86
+ all_remote = [
87
+ f for f in list_repo_files(DATASET_REPO, repo_type="dataset", token=HF_TOKEN)
88
+ if f.startswith("data/") and f.endswith(".csv")
89
+ ]
90
+
91
+ already_trained = set(manifest["trained_files"])
92
+ new_files = [f for f in all_remote if f not in already_trained]
93
+
94
+ if not new_files:
95
+ log("No new files since last training run")
96
+ return [], already_trained, all_remote
97
+
98
+ log(f"Found {len(new_files)} new files (skipping {len(already_trained)} already trained)")
99
+
100
+ downloaded = []
101
+ for remote_path in new_files:
102
+ filename = os.path.basename(remote_path)
103
+ local_path = os.path.join(DATA_DIR, filename)
104
+ try:
105
+ hf_hub_download(
106
+ repo_id = DATASET_REPO,
107
+ filename = remote_path,
108
+ repo_type = "dataset",
109
+ token = HF_TOKEN,
110
+ local_dir = "/tmp/nexa_raw",
111
+ force_download = True,
112
+ )
113
+ src = f"/tmp/nexa_raw/{remote_path}"
114
+ shutil.copy(src, local_path)
115
+ downloaded.append((remote_path, local_path))
116
+ log(f" downloaded: {filename}")
117
+ except Exception as e:
118
+ log(f" skipped {filename}: {e}")
119
+
120
+ return downloaded, already_trained, all_remote
121
+
122
+ # ── Step 2: Load new CSVs ─────────────────────────────────────────────────────
123
+ def load_new_data(downloaded_files):
124
+ log("Loading new CSV files...")
125
+ dfs = []
126
+ for remote_path, local_path in downloaded_files:
127
+ try:
128
+ df = pd.read_csv(local_path)
129
+ dfs.append(df)
130
+ log(f" loaded {os.path.basename(local_path)}: {len(df):,} rows")
131
+ except Exception as e:
132
+ log(f" failed to load {local_path}: {e}")
133
+
134
+ if not dfs:
135
+ return None
136
+
137
+ df = pd.concat(dfs, ignore_index=True)
138
+ df = df.dropna(subset=["block_time_unix", "signature", "side", "amount_sol"])
139
+ df = df.drop_duplicates(subset="signature")
140
+ df = df.sort_values("block_time_unix").reset_index(drop=True)
141
+
142
+ df["block_time_unix"] = df["block_time_unix"].astype(int)
143
+ df["amount_sol"] = pd.to_numeric(df["amount_sol"], errors="coerce").fillna(0)
144
+ df["binance_price"] = pd.to_numeric(df["binance_price"], errors="coerce").ffill()
145
+ df["jupiter_price"] = pd.to_numeric(df["jupiter_price"], errors="coerce").ffill()
146
+
147
+ log(f"New data: {len(df):,} unique rows")
148
+ state["new_rows"] = len(df)
149
+ return df
150
+
151
+ # ── Step 3: Feature Engineering ──────────────────────────────────────────────
152
+ def engineer_features(df):
153
+ log("Engineering features...")
154
+
155
+ df["dt"] = pd.to_datetime(df["block_time_unix"], unit="s", utc=True)
156
+ df = df.set_index("dt").sort_index()
157
+
158
+ df["is_buy"] = (df["side"] == "BUY").astype(float)
159
+ df["is_sell"] = (df["side"] == "SELL").astype(float)
160
+ df["is_noise"] = (df["side"] == "NOISE").astype(float)
161
+ df["buy_vol"] = df["amount_sol"] * df["is_buy"]
162
+ df["sell_vol"] = df["amount_sol"] * df["is_sell"]
163
+ df["noise_vol"]= df["amount_sol"] * df["is_noise"]
164
+
165
+ price_1s = df["binance_price"].resample("1s").last().ffill()
166
+ flows_1s = df[["buy_vol","sell_vol","noise_vol","is_buy","is_sell","is_noise"]]\
167
+ .resample("1s").sum()
168
+
169
+ feat = flows_1s.join(price_1s.rename("price"), how="outer").ffill().fillna(0)
170
+ eps = 1e-9
171
+
172
+ # Rolling windows β€” multiple sizes so model chooses what matters
173
+ for w in ["15s", "30s", "1min", "5min", "15min"]:
174
+ bv = feat["buy_vol"].rolling(w).sum()
175
+ sv = feat["sell_vol"].rolling(w).sum()
176
+ nv = feat["noise_vol"].rolling(w).sum()
177
+ bc = feat["is_buy"].rolling(w).sum()
178
+ sc = feat["is_sell"].rolling(w).sum()
179
+ nc = feat["is_noise"].rolling(w).sum()
180
+ tc = bc + sc + nc
181
+
182
+ feat[f"buy_vol_{w}"] = bv
183
+ feat[f"sell_vol_{w}"] = sv
184
+ feat[f"noise_vol_{w}"] = nv
185
+ feat[f"buy_count_{w}"] = bc
186
+ feat[f"sell_count_{w}"] = sc
187
+ feat[f"noise_count_{w}"] = nc
188
+ feat[f"flow_imbalance_{w}"] = bv / (bv + sv + eps)
189
+ feat[f"noise_ratio_{w}"] = nc / (tc + eps)
190
+ feat[f"tx_freq_{w}"] = tc
191
+ feat[f"large_buy_{w}"] = ((df["buy_vol"] > 1.0).resample("1s").sum()
192
+ if w == "30s" else feat.get(f"large_buy_{w}", 0))
193
+
194
+ # Price features
195
+ for secs, label in [(30,"30s"),(60,"1min"),(300,"5min"),(900,"15min")]:
196
+ feat[f"price_change_{label}"] = feat["price"].pct_change(secs)
197
+
198
+ feat["price_momentum"] = feat["price_change_30s"].diff(10)
199
+ feat["price_vol_5m"] = feat["price"].rolling("5min").std()
200
+ feat["price_vol_1m"] = feat["price"].rolling("1min").std()
201
+
202
+ # CEX/DEX spread
203
+ jup_1s = df["jupiter_price"].resample("1s").last().ffill()
204
+ feat["dex_cex_spread"] = (jup_1s - feat["price"]) / (feat["price"] + eps)
205
+
206
+ # Divergence (core hypothesis)
207
+ fi = feat["flow_imbalance_30s"]
208
+ pc = feat["price_change_30s"]
209
+ feat["divergence_buy"] = ((fi > 0.7) & (pc < 0)).astype(float)
210
+ feat["divergence_sell"] = ((fi < 0.3) & (pc > 0)).astype(float)
211
+ feat["confirm_buy"] = ((fi > 0.7) & (pc > 0)).astype(float)
212
+ feat["confirm_sell"] = ((fi < 0.3) & (pc < 0)).astype(float)
213
+
214
+ # Targets at multiple horizons
215
+ for secs, label in [(30,"30s"),(60,"1min"),(300,"5min")]:
216
+ future = feat["price"].shift(-secs)
217
+ pct = (future - feat["price"]) / (feat["price"] + eps)
218
+ feat[f"target_{label}"] = np.where(pct > 0.0005, 1,
219
+ np.where(pct < -0.0005, -1, 0))
220
+
221
+ feat = feat.dropna()
222
+ log(f"Features: {len(feat):,} rows Γ— {len(feat.columns)} cols")
223
+ return feat
224
+
225
+ # ── Step 4: Incremental LightGBM update ───────────────────────────────────────
226
+ def train_model(feat, manifest):
227
+ log("Training / updating LightGBM model...")
228
+
229
+ target_col = "target_30s" # primary target
230
+ drop_cols = [c for c in feat.columns if c.startswith("target_") or c == "price"]
231
+ feature_cols = [c for c in feat.columns if c not in drop_cols]
232
+
233
+ X = feat[feature_cols].values
234
+ y = feat[target_col].values
235
+
236
+ # Chronological 80/20 β€” never shuffle
237
+ split = int(len(X) * 0.8)
238
+ X_train = X[:split]; X_test = X[split:]
239
+ y_train = y[:split]; y_test = y[split:]
240
+
241
+ log(f"Train: {len(X_train):,} | Test: {len(X_test):,}")
242
+
243
+ # Load existing model for incremental update
244
+ init_model = None
245
+ if os.path.exists(MODEL_PATH):
246
+ try:
247
+ wrapped = joblib.load(MODEL_PATH)
248
+ init_model = wrapped.booster_
249
+ log(f"Incrementally updating model v{manifest['model_version']}")
250
+ except Exception:
251
+ log("Starting fresh model")
252
+
253
+ params = {
254
+ "objective": "multiclass",
255
+ "num_class": 3,
256
+ "metric": "multi_logloss",
257
+ "num_leaves": 63,
258
+ "learning_rate": 0.05,
259
+ "feature_fraction": 0.8,
260
+ "bagging_fraction": 0.8,
261
+ "bagging_freq": 5,
262
+ "class_weight": "balanced",
263
+ "verbose": -1,
264
+ "n_jobs": -1,
265
+ }
266
+
267
+ # Map -1,0,1 β†’ 0,1,2
268
+ y_tr = y_train + 1
269
+ y_te = y_test + 1
270
+
271
+ model = lgb.train(
272
+ params,
273
+ lgb.Dataset(X_train, label=y_tr),
274
+ num_boost_round = 200 if init_model else 500,
275
+ valid_sets = [lgb.Dataset(X_test, label=y_te)],
276
+ callbacks = [lgb.early_stopping(30, verbose=False),
277
+ lgb.log_evaluation(period=-1)],
278
+ init_model = init_model,
279
+ )
280
+
281
+ # Evaluate
282
+ proba = model.predict(X_test) # (n, 3)
283
+ pred = proba.argmax(axis=1) - 1 # back to -1,0,1
284
+ buy_mask = y_test != 0
285
+ try:
286
+ from sklearn.metrics import roc_auc_score
287
+ auc = roc_auc_score(
288
+ (y_test[buy_mask] == 1).astype(int),
289
+ proba[buy_mask, 2]
290
+ )
291
+ log(f"AUC (BUY class): {auc:.4f}")
292
+ state["last_auc"] = round(auc, 4)
293
+ except Exception as e:
294
+ log(f"AUC skipped: {e}")
295
+
296
+ buy_sigs = pred == 1
297
+ if buy_sigs.sum() > 0:
298
+ wr = (y_test[buy_sigs] == 1).mean()
299
+ log(f"BUY win rate: {wr:.1%} on {buy_sigs.sum()} signals")
300
+
301
+ # Feature importance chart
302
+ imp = pd.Series(
303
+ model.feature_importance("gain"), index=feature_cols
304
+ ).sort_values(ascending=False).head(15)
305
+
306
+ fig, ax = plt.subplots(figsize=(8, 5))
307
+ imp.plot(kind="barh", ax=ax, color="#2E5D8E")
308
+ ax.set_title(f"Top 15 Features β€” Model v{manifest['model_version']+1}")
309
+ ax.invert_yaxis()
310
+ plt.tight_layout()
311
+ state["fig_importance"] = fig
312
+
313
+ # Save wrapped model
314
+ class LGBWrapper:
315
+ def __init__(self, booster, features):
316
+ self.booster_ = booster
317
+ self.feature_names_ = features
318
+ def predict(self, X):
319
+ return self.booster_.predict(X).argmax(axis=1) - 1
320
+ def predict_proba(self, X):
321
+ return self.booster_.predict(X)
322
+
323
+ wrapped = LGBWrapper(model, feature_cols)
324
+ joblib.dump(wrapped, MODEL_PATH)
325
+ log("Model saved locally")
326
+ return wrapped
327
+
328
+ # ── Step 5: Upload model + update manifest ────────────────────────────────────
329
+ def upload_and_update(manifest, newly_trained_files):
330
+ log("Uploading model to HF...")
331
+ api.upload_file(
332
+ path_or_fileobj = MODEL_PATH,
333
+ path_in_repo = MODEL_FILE,
334
+ repo_id = DATASET_REPO,
335
+ repo_type = "dataset",
336
+ token = HF_TOKEN,
337
+ )
338
+
339
+ # Update manifest
340
+ manifest["trained_files"].extend(newly_trained_files)
341
+ manifest["model_version"] += 1
342
+ manifest["total_rows"] += state["new_rows"]
343
+ save_manifest(manifest)
344
+
345
+ state["last_model_saved"] = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
346
+ state["model_version"] = manifest["model_version"]
347
+ state["total_rows"] = manifest["total_rows"]
348
+ state["trained_files"] = len(manifest["trained_files"])
349
+ log(f"Done β€” model v{manifest['model_version']} | total rows: {manifest['total_rows']:,}")
350
+
351
+ # ── Full pipeline ─────────────────────────────────────────────────────────────
352
+ def run_pipeline():
353
+ if state["status"] == "running":
354
+ log("Already running β€” skipped")
355
+ return
356
+
357
+ state["status"] = "running"
358
+ log("=" * 50)
359
+ log("Pipeline started")
360
+
361
+ try:
362
+ manifest = load_manifest()
363
+ downloaded, _, _ = download_new_files(manifest)
364
+
365
+ if not downloaded:
366
+ log("Nothing new to train on β€” pipeline skipped")
367
+ state["status"] = "idle"
368
+ return
369
+
370
+ df = load_new_data(downloaded)
371
+ if df is None or len(df) < 1000:
372
+ log(f"Not enough new data ({len(df) if df is not None else 0} rows) β€” skipping")
373
+ state["status"] = "idle"
374
+ return
375
+
376
+ feat = engineer_features(df)
377
+ train_model(feat, manifest)
378
+ newly_trained = [r for r, _ in downloaded]
379
+ upload_and_update(manifest, newly_trained)
380
+ state["last_run"] = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
381
+ log("Pipeline complete βœ“")
382
+
383
+ except Exception as e:
384
+ log(f"Pipeline ERROR: {e}")
385
+ import traceback
386
+ log(traceback.format_exc())
387
+ finally:
388
+ state["status"] = "idle"
389
+
390
+ # ── Scheduler ─────────────────────────────────────────────────────────────────
391
+ def start_scheduler():
392
+ schedule.every(24).hours.do(run_pipeline)
393
+ while True:
394
+ schedule.run_pending()
395
+ time.sleep(60)
396
+
397
+ threading.Thread(target=start_scheduler, daemon=True).start()
398
+
399
+ # ── Gradio UI ─────────────────────────────────────────────────────────────────
400
+ def get_status():
401
+ auc = f"{state['last_auc']:.4f}" if state["last_auc"] else "N/A"
402
+ return (
403
+ f"**Status:** {state['status']}\n\n"
404
+ f"**Last Run:** {state['last_run']}\n\n"
405
+ f"**Model Version:** v{state['model_version']}\n\n"
406
+ f"**Files Trained On:** {state['trained_files']}\n\n"
407
+ f"**Total Rows:** {state['total_rows']:,}\n\n"
408
+ f"**New Rows (last run):** {state['new_rows']:,}\n\n"
409
+ f"**Last AUC:** {auc}\n\n"
410
+ f"**Model Saved:** {state['last_model_saved']}"
411
+ )
412
+
413
+ def get_logs():
414
+ return "\n".join(state["log"][-60:])
415
+
416
+ def trigger_pipeline():
417
+ threading.Thread(target=run_pipeline, daemon=True).start()
418
+ return "Pipeline started β€” check logs tab"
419
+
420
+ with gr.Blocks(title="NEXA ML Dashboard") as demo:
421
+ gr.Markdown("# NEXA β€” Solana DEX Pattern Recognition")
422
+ gr.Markdown(
423
+ f"Dataset: `{DATASET_REPO}` Β· Auto-trains every 24h on **new files only** Β· "
424
+ f"Incremental LightGBM updates"
425
+ )
426
+
427
+ with gr.Row():
428
+ with gr.Column(scale=1):
429
+ status_md = gr.Markdown(get_status)
430
+ run_btn = gr.Button("β–Ά Run Now", variant="primary")
431
+ run_out = gr.Textbox(label="", lines=1, interactive=False)
432
+ run_btn.click(trigger_pipeline, outputs=run_out)
433
+
434
+ with gr.Column(scale=2):
435
+ with gr.Tabs():
436
+ with gr.Tab("Logs"):
437
+ gr.Textbox(
438
+ value = get_logs,
439
+ lines = 25,
440
+ max_lines = 25,
441
+ label = "Live Logs",
442
+ every = 5,
443
+ )
444
+ with gr.Tab("Feature Importance"):
445
+ imp_plot = gr.Plot(label="Top 15 Features by Gain")
446
+ gr.Button("Refresh").click(
447
+ lambda: state["fig_importance"], outputs=imp_plot
448
+ )
449
+
450
+ # Refresh status every 10s
451
+ gr.Timer(10).tick(get_status, outputs=status_md)
452
+
453
+ # Startup: run pipeline if no model yet
454
+ def startup():
455
+ time.sleep(8)
456
+ manifest = load_manifest()
457
+ state["model_version"] = manifest["model_version"]
458
+ state["total_rows"] = manifest["total_rows"]
459
+ state["trained_files"] = len(manifest["trained_files"])
460
+ if manifest["model_version"] == 0:
461
+ log("First run β€” starting initial pipeline")
462
+ run_pipeline()
463
+ else:
464
+ log(f"Model v{manifest['model_version']} exists β€” waiting for scheduler")
465
+
466
+ threading.Thread(target=startup, daemon=True).start()
467
+ demo.launch()