nexacore commited on
Commit
00fb193
Β·
verified Β·
1 Parent(s): 3582907

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +57 -60
app.py CHANGED
@@ -1,5 +1,4 @@
1
  import os
2
- import io
3
  import json
4
  import joblib
5
  import shutil
@@ -16,18 +15,29 @@ import matplotlib.pyplot as plt
16
 
17
  from datetime import datetime, timezone
18
  from huggingface_hub import HfApi, hf_hub_download, list_repo_files
 
19
 
20
  # ── Config ────────────────────────────────────────────────────────────────────
21
- DATASET_REPO = "nexacore/solana-dex-data"
22
- MODEL_FILE = "nexa_lgbm_v1.joblib"
23
- MANIFEST_FILE = "trained_files_manifest.json" # tracks which files already trained
24
- HF_TOKEN = os.environ.get("HF_TOKEN")
25
- DATA_DIR = "/tmp/nexa_data"
26
- MODEL_PATH = f"/tmp/{MODEL_FILE}"
27
- MANIFEST_PATH = f"/tmp/{MANIFEST_FILE}"
28
 
29
  api = HfApi(token=HF_TOKEN)
30
 
 
 
 
 
 
 
 
 
 
 
31
  # ── Shared state ──────────────────────────────────────────────────────────────
32
  state = {
33
  "status": "idle",
@@ -50,9 +60,8 @@ def log(msg):
50
  if len(state["log"]) > 300:
51
  state["log"] = state["log"][-300:]
52
 
53
- # ── Manifest: tracks which CSV files have already been trained on ──────────────
54
  def load_manifest():
55
- """Download manifest from HF, or return empty if first run."""
56
  try:
57
  local = hf_hub_download(
58
  repo_id = DATASET_REPO,
@@ -67,7 +76,6 @@ def load_manifest():
67
  return {"trained_files": [], "model_version": 0, "total_rows": 0}
68
 
69
  def save_manifest(manifest):
70
- """Upload manifest back to HF."""
71
  with open(MANIFEST_PATH, "w") as f:
72
  json.dump(manifest, f, indent=2)
73
  api.upload_file(
@@ -87,7 +95,7 @@ def download_new_files(manifest):
87
  f for f in list_repo_files(DATASET_REPO, repo_type="dataset", token=HF_TOKEN)
88
  if f.endswith(".csv") and not f.startswith("refs/")
89
  ]
90
-
91
  already_trained = set(manifest["trained_files"])
92
  new_files = [f for f in all_remote if f not in already_trained]
93
 
@@ -103,16 +111,16 @@ def download_new_files(manifest):
103
  local_path = os.path.join(DATA_DIR, filename)
104
  try:
105
  hf_hub_download(
106
- repo_id = DATASET_REPO,
107
- filename = remote_path,
108
- repo_type = "dataset",
109
- token = HF_TOKEN,
110
- local_dir = "/tmp/nexa_raw",
111
  force_download = True,
112
  )
113
  src = f"/tmp/nexa_raw/{remote_path}"
114
  if not os.path.exists(src):
115
- src = f"/tmp/nexa_raw/{os.path.basename(remote_path)}"
116
  shutil.copy(src, local_path)
117
  downloaded.append((remote_path, local_path))
118
  log(f" downloaded: {filename}")
@@ -157,21 +165,23 @@ def engineer_features(df):
157
  df["dt"] = pd.to_datetime(df["block_time_unix"], unit="s", utc=True)
158
  df = df.set_index("dt").sort_index()
159
 
160
- df["is_buy"] = (df["side"] == "BUY").astype(float)
161
- df["is_sell"] = (df["side"] == "SELL").astype(float)
162
- df["is_noise"] = (df["side"] == "NOISE").astype(float)
163
- df["buy_vol"] = df["amount_sol"] * df["is_buy"]
164
- df["sell_vol"] = df["amount_sol"] * df["is_sell"]
165
- df["noise_vol"]= df["amount_sol"] * df["is_noise"]
166
 
167
  price_1s = df["binance_price"].resample("1s").last().ffill()
168
- flows_1s = df[["buy_vol","sell_vol","noise_vol","is_buy","is_sell","is_noise"]]\
169
- .resample("1s").sum()
 
170
 
171
  feat = flows_1s.join(price_1s.rename("price"), how="outer").ffill().fillna(0)
 
172
  eps = 1e-9
173
 
174
- # Rolling windows β€” multiple sizes so model chooses what matters
175
  for w in ["15s", "30s", "1min", "5min", "15min"]:
176
  bv = feat["buy_vol"].rolling(w).sum()
177
  sv = feat["sell_vol"].rolling(w).sum()
@@ -190,8 +200,6 @@ def engineer_features(df):
190
  feat[f"flow_imbalance_{w}"] = bv / (bv + sv + eps)
191
  feat[f"noise_ratio_{w}"] = nc / (tc + eps)
192
  feat[f"tx_freq_{w}"] = tc
193
- feat[f"large_buy_{w}"] = ((df["buy_vol"] > 1.0).resample("1s").sum()
194
- if w == "30s" else feat.get(f"large_buy_{w}", 0))
195
 
196
  # Price features
197
  for secs, label in [(30,"30s"),(60,"1min"),(300,"5min"),(900,"15min")]:
@@ -202,10 +210,9 @@ def engineer_features(df):
202
  feat["price_vol_1m"] = feat["price"].rolling("1min").std()
203
 
204
  # CEX/DEX spread
205
- jup_1s = df["jupiter_price"].resample("1s").last().ffill()
206
- feat["dex_cex_spread"] = (jup_1s - feat["price"]) / (feat["price"] + eps)
207
 
208
- # Divergence (core hypothesis)
209
  fi = feat["flow_imbalance_30s"]
210
  pc = feat["price_change_30s"]
211
  feat["divergence_buy"] = ((fi > 0.7) & (pc < 0)).astype(float)
@@ -224,30 +231,30 @@ def engineer_features(df):
224
  log(f"Features: {len(feat):,} rows Γ— {len(feat.columns)} cols")
225
  return feat
226
 
227
- # ── Step 4: Incremental LightGBM update ───────────────────────────────────────
228
  def train_model(feat, manifest):
229
  log("Training / updating LightGBM model...")
230
 
231
- target_col = "target_30s" # primary target
232
- drop_cols = [c for c in feat.columns if c.startswith("target_") or c == "price"]
233
  feature_cols = [c for c in feat.columns if c not in drop_cols]
234
 
235
  X = feat[feature_cols].values
236
  y = feat[target_col].values
237
 
238
  # Chronological 80/20 β€” never shuffle
239
- split = int(len(X) * 0.8)
240
- X_train = X[:split]; X_test = X[split:]
241
- y_train = y[:split]; y_test = y[split:]
242
 
243
  log(f"Train: {len(X_train):,} | Test: {len(X_test):,}")
244
 
245
- # Load existing model for incremental update
246
  init_model = None
247
  if os.path.exists(MODEL_PATH):
248
  try:
249
- wrapped = joblib.load(MODEL_PATH)
250
- init_model = wrapped.booster_
251
  log(f"Incrementally updating model v{manifest['model_version']}")
252
  except Exception:
253
  log("Starting fresh model")
@@ -266,7 +273,7 @@ def train_model(feat, manifest):
266
  "n_jobs": -1,
267
  }
268
 
269
- # Map -1,0,1 β†’ 0,1,2
270
  y_tr = y_train + 1
271
  y_te = y_test + 1
272
 
@@ -281,11 +288,11 @@ def train_model(feat, manifest):
281
  )
282
 
283
  # Evaluate
284
- proba = model.predict(X_test) # (n, 3)
285
- pred = proba.argmax(axis=1) - 1 # back to -1,0,1
286
  buy_mask = y_test != 0
 
287
  try:
288
- from sklearn.metrics import roc_auc_score
289
  auc = roc_auc_score(
290
  (y_test[buy_mask] == 1).astype(int),
291
  proba[buy_mask, 2]
@@ -311,17 +318,9 @@ def train_model(feat, manifest):
311
  ax.invert_yaxis()
312
  plt.tight_layout()
313
  state["fig_importance"] = fig
 
314
 
315
- # Save wrapped model
316
- class LGBWrapper:
317
- def __init__(self, booster, features):
318
- self.booster_ = booster
319
- self.feature_names_ = features
320
- def predict(self, X):
321
- return self.booster_.predict(X).argmax(axis=1) - 1
322
- def predict_proba(self, X):
323
- return self.booster_.predict(X)
324
-
325
  wrapped = LGBWrapper(model, feature_cols)
326
  joblib.dump(wrapped, MODEL_PATH)
327
  log("Model saved locally")
@@ -338,7 +337,6 @@ def upload_and_update(manifest, newly_trained_files):
338
  token = HF_TOKEN,
339
  )
340
 
341
- # Update manifest
342
  manifest["trained_files"].extend(newly_trained_files)
343
  manifest["model_version"] += 1
344
  manifest["total_rows"] += state["new_rows"]
@@ -369,7 +367,7 @@ def run_pipeline():
369
  state["status"] = "idle"
370
  return
371
 
372
- df = load_new_data(downloaded)
373
  if df is None or len(df) < 1000:
374
  log(f"Not enough new data ({len(df) if df is not None else 0} rows) β€” skipping")
375
  state["status"] = "idle"
@@ -389,7 +387,7 @@ def run_pipeline():
389
  finally:
390
  state["status"] = "idle"
391
 
392
- # ── Scheduler ─────────────────────────────────────────────────────────────────
393
  def start_scheduler():
394
  schedule.every(24).hours.do(run_pipeline)
395
  while True:
@@ -449,10 +447,9 @@ with gr.Blocks(title="NEXA ML Dashboard") as demo:
449
  lambda: state["fig_importance"], outputs=imp_plot
450
  )
451
 
452
- # Refresh status every 10s
453
  gr.Timer(10).tick(get_status, outputs=status_md)
454
 
455
- # Startup: run pipeline if no model yet
456
  def startup():
457
  time.sleep(8)
458
  manifest = load_manifest()
 
1
  import os
 
2
  import json
3
  import joblib
4
  import shutil
 
15
 
16
  from datetime import datetime, timezone
17
  from huggingface_hub import HfApi, hf_hub_download, list_repo_files
18
+ from sklearn.metrics import roc_auc_score
19
 
20
  # ── Config ────────────────────────────────────────────────────────────────────
21
+ DATASET_REPO = "nexacore/solana-dex-data"
22
+ MODEL_FILE = "nexa_lgbm_v1.joblib"
23
+ MANIFEST_FILE = "trained_files_manifest.json"
24
+ HF_TOKEN = os.environ.get("HF_TOKEN")
25
+ DATA_DIR = "/tmp/nexa_data"
26
+ MODEL_PATH = f"/tmp/{MODEL_FILE}"
27
+ MANIFEST_PATH = f"/tmp/{MANIFEST_FILE}"
28
 
29
  api = HfApi(token=HF_TOKEN)
30
 
31
+ # ── Module-level wrapper β€” MUST be here for joblib pickling to work ───────────
32
+ class LGBWrapper:
33
+ def __init__(self, booster, features):
34
+ self.booster_ = booster
35
+ self.feature_names_ = features
36
+ def predict(self, X):
37
+ return self.booster_.predict(X).argmax(axis=1) - 1
38
+ def predict_proba(self, X):
39
+ return self.booster_.predict(X)
40
+
41
  # ── Shared state ──────────────────────────────────────────────────────────────
42
  state = {
43
  "status": "idle",
 
60
  if len(state["log"]) > 300:
61
  state["log"] = state["log"][-300:]
62
 
63
+ # ── Manifest ──────────────────────────────────────────────────────────────────
64
  def load_manifest():
 
65
  try:
66
  local = hf_hub_download(
67
  repo_id = DATASET_REPO,
 
76
  return {"trained_files": [], "model_version": 0, "total_rows": 0}
77
 
78
  def save_manifest(manifest):
 
79
  with open(MANIFEST_PATH, "w") as f:
80
  json.dump(manifest, f, indent=2)
81
  api.upload_file(
 
95
  f for f in list_repo_files(DATASET_REPO, repo_type="dataset", token=HF_TOKEN)
96
  if f.endswith(".csv") and not f.startswith("refs/")
97
  ]
98
+
99
  already_trained = set(manifest["trained_files"])
100
  new_files = [f for f in all_remote if f not in already_trained]
101
 
 
111
  local_path = os.path.join(DATA_DIR, filename)
112
  try:
113
  hf_hub_download(
114
+ repo_id = DATASET_REPO,
115
+ filename = remote_path,
116
+ repo_type = "dataset",
117
+ token = HF_TOKEN,
118
+ local_dir = "/tmp/nexa_raw",
119
  force_download = True,
120
  )
121
  src = f"/tmp/nexa_raw/{remote_path}"
122
  if not os.path.exists(src):
123
+ src = f"/tmp/nexa_raw/{filename}"
124
  shutil.copy(src, local_path)
125
  downloaded.append((remote_path, local_path))
126
  log(f" downloaded: {filename}")
 
165
  df["dt"] = pd.to_datetime(df["block_time_unix"], unit="s", utc=True)
166
  df = df.set_index("dt").sort_index()
167
 
168
+ df["is_buy"] = (df["side"] == "BUY").astype(float)
169
+ df["is_sell"] = (df["side"] == "SELL").astype(float)
170
+ df["is_noise"] = (df["side"] == "NOISE").astype(float)
171
+ df["buy_vol"] = df["amount_sol"] * df["is_buy"]
172
+ df["sell_vol"] = df["amount_sol"] * df["is_sell"]
173
+ df["noise_vol"] = df["amount_sol"] * df["is_noise"]
174
 
175
  price_1s = df["binance_price"].resample("1s").last().ffill()
176
+ jup_1s = df["jupiter_price"].resample("1s").last().ffill()
177
+ flows_1s = df[["buy_vol","sell_vol","noise_vol","is_buy","is_sell","is_noise"]]\
178
+ .resample("1s").sum()
179
 
180
  feat = flows_1s.join(price_1s.rename("price"), how="outer").ffill().fillna(0)
181
+ feat = feat.join(jup_1s.rename("jup_price"), how="left").ffill().fillna(0)
182
  eps = 1e-9
183
 
184
+ # Rolling windows
185
  for w in ["15s", "30s", "1min", "5min", "15min"]:
186
  bv = feat["buy_vol"].rolling(w).sum()
187
  sv = feat["sell_vol"].rolling(w).sum()
 
200
  feat[f"flow_imbalance_{w}"] = bv / (bv + sv + eps)
201
  feat[f"noise_ratio_{w}"] = nc / (tc + eps)
202
  feat[f"tx_freq_{w}"] = tc
 
 
203
 
204
  # Price features
205
  for secs, label in [(30,"30s"),(60,"1min"),(300,"5min"),(900,"15min")]:
 
210
  feat["price_vol_1m"] = feat["price"].rolling("1min").std()
211
 
212
  # CEX/DEX spread
213
+ feat["dex_cex_spread"] = (feat["jup_price"] - feat["price"]) / (feat["price"] + eps)
 
214
 
215
+ # Divergence features (core hypothesis)
216
  fi = feat["flow_imbalance_30s"]
217
  pc = feat["price_change_30s"]
218
  feat["divergence_buy"] = ((fi > 0.7) & (pc < 0)).astype(float)
 
231
  log(f"Features: {len(feat):,} rows Γ— {len(feat.columns)} cols")
232
  return feat
233
 
234
+ # ── Step 4: Train / Incrementally Update LightGBM ────────────────────────────
235
  def train_model(feat, manifest):
236
  log("Training / updating LightGBM model...")
237
 
238
+ target_col = "target_30s"
239
+ drop_cols = [c for c in feat.columns if c.startswith("target_") or c in ("price","jup_price")]
240
  feature_cols = [c for c in feat.columns if c not in drop_cols]
241
 
242
  X = feat[feature_cols].values
243
  y = feat[target_col].values
244
 
245
  # Chronological 80/20 β€” never shuffle
246
+ split = int(len(X) * 0.8)
247
+ X_train = X[:split]; X_test = X[split:]
248
+ y_train = y[:split]; y_test = y[split:]
249
 
250
  log(f"Train: {len(X_train):,} | Test: {len(X_test):,}")
251
 
252
+ # Load existing booster for incremental update
253
  init_model = None
254
  if os.path.exists(MODEL_PATH):
255
  try:
256
+ existing = joblib.load(MODEL_PATH)
257
+ init_model = existing.booster_
258
  log(f"Incrementally updating model v{manifest['model_version']}")
259
  except Exception:
260
  log("Starting fresh model")
 
273
  "n_jobs": -1,
274
  }
275
 
276
+ # Map -1,0,1 β†’ 0,1,2 for multiclass
277
  y_tr = y_train + 1
278
  y_te = y_test + 1
279
 
 
288
  )
289
 
290
  # Evaluate
291
+ proba = model.predict(X_test)
292
+ pred = proba.argmax(axis=1) - 1
293
  buy_mask = y_test != 0
294
+
295
  try:
 
296
  auc = roc_auc_score(
297
  (y_test[buy_mask] == 1).astype(int),
298
  proba[buy_mask, 2]
 
318
  ax.invert_yaxis()
319
  plt.tight_layout()
320
  state["fig_importance"] = fig
321
+ plt.close(fig)
322
 
323
+ # Save using module-level LGBWrapper β€” joblib can pickle it correctly
 
 
 
 
 
 
 
 
 
324
  wrapped = LGBWrapper(model, feature_cols)
325
  joblib.dump(wrapped, MODEL_PATH)
326
  log("Model saved locally")
 
337
  token = HF_TOKEN,
338
  )
339
 
 
340
  manifest["trained_files"].extend(newly_trained_files)
341
  manifest["model_version"] += 1
342
  manifest["total_rows"] += state["new_rows"]
 
367
  state["status"] = "idle"
368
  return
369
 
370
+ df = load_new_data(downloaded)
371
  if df is None or len(df) < 1000:
372
  log(f"Not enough new data ({len(df) if df is not None else 0} rows) β€” skipping")
373
  state["status"] = "idle"
 
387
  finally:
388
  state["status"] = "idle"
389
 
390
+ # ── Scheduler: every 24 hours ─────────────────────────────────────────────────
391
  def start_scheduler():
392
  schedule.every(24).hours.do(run_pipeline)
393
  while True:
 
447
  lambda: state["fig_importance"], outputs=imp_plot
448
  )
449
 
 
450
  gr.Timer(10).tick(get_status, outputs=status_md)
451
 
452
+ # ── Startup ───────────────────────────────────────────────────────────────────
453
  def startup():
454
  time.sleep(8)
455
  manifest = load_manifest()