| | import os |
| | import time |
| | import threading |
| | import numpy as np |
| | import pandas as pd |
| | import torch |
| | from flask import Flask, jsonify, render_template, request |
| |
|
| | from config import config |
| | from data_loader import load_asset_series, make_features, build_windows |
| | from trainer import TarangTrainer |
| | from storage_manager import StorageManager |
| | from live_ingest import update_stooq_daily, update_binance_klines |
| |
|
| | app = Flask(__name__, template_folder="templates") |
| |
|
| | MODEL_LOCK = threading.Lock() |
| | BOOTSTRAP_DONE = threading.Event() |
| |
|
| | STATE = { |
| | "boot_time": time.strftime("%Y-%m-%d %H:%M:%S"), |
| | "training": False, |
| | "trained": False, |
| | "error": None, |
| |
|
| | "last_train_status": None, |
| |
|
| | "last_ingest_at": None, |
| | "last_ingest_changed": [], |
| | "last_ingest_errors": [], |
| |
|
| | "last_checkpoint_at": None, |
| | "neon_enabled": bool(config.NEON_CONNECTION), |
| |
|
| | "ingest_every_seconds": config.INGEST_EVERY_SECONDS, |
| | "checkpoint_every_seconds": config.CHECKPOINT_EVERY_SECONDS, |
| | |
| | |
| | "checkpoint_loop_alive": True, |
| | "ingest_loop_alive": True, |
| | } |
| |
|
| | trainer = TarangTrainer() |
| | storage = None |
| |
|
| |
|
| | def _asset_csv_path(asset): |
| | if asset.asset_type == "stock": |
| | return os.path.join(config.DATA_DIR, "stocks", "stooq", asset.symbol, f"{config.INTERVAL}.csv") |
| | return os.path.join(config.DATA_DIR, "crypto", "binance", asset.symbol, f"{config.INTERVAL}.csv") |
| |
|
| |
|
| | def _predict_next_for_asset(asset): |
| | feats = make_features(asset.df) |
| | if len(feats) < config.WINDOW + 2: |
| | return None |
| |
|
| | X, _, _ = build_windows(feats, window=config.WINDOW, horizon=config.HORIZON_DAYS) |
| | x_last = X[-1] |
| |
|
| | mu = X.reshape(-1, X.shape[-1]).mean(axis=0, keepdims=True) |
| | sd = X.reshape(-1, X.shape[-1]).std(axis=0, keepdims=True) + 1e-6 |
| | x_last = (x_last - mu) / sd |
| |
|
| | xb = torch.tensor(x_last, dtype=torch.float32).unsqueeze(0) |
| | with torch.no_grad(): |
| | pred_lr = float(trainer.model(xb).squeeze().cpu().item()) |
| |
|
| | last_close = float(feats["close"].iloc[-1]) |
| | last_date = pd.Timestamp(feats["date"].iloc[-1]) |
| |
|
| | if asset.asset_type == "stock": |
| | pred_date = last_date + pd.tseries.offsets.BDay(config.HORIZON_DAYS) |
| | else: |
| | pred_date = last_date + pd.Timedelta(days=config.HORIZON_DAYS) |
| |
|
| | pred_close = last_close * float(np.exp(pred_lr)) |
| |
|
| | return { |
| | "symbol": asset.symbol, |
| | "asset_type": asset.asset_type, |
| | "last_date": last_date.date().isoformat(), |
| | "predicted_for": pd.Timestamp(pred_date).date().isoformat(), |
| | "pred_log_return": pred_lr, |
| | "pred_pct_return": pred_lr * 100.0, |
| | "last_close": last_close, |
| | "pred_close": float(pred_close), |
| | } |
| |
|
| |
|
| | def _train_epochs(epochs: int, reason: str): |
| | if epochs <= 0: |
| | return |
| |
|
| | STATE["training"] = True |
| | STATE["error"] = None |
| | try: |
| | with MODEL_LOCK: |
| | status = trainer.fit(epochs) |
| | STATE["last_train_status"] = { |
| | **status, |
| | "reason": reason, |
| | "epochs_ran": int(epochs), |
| | } |
| | STATE["trained"] = True |
| | print(f"[train] ✓ Completed: {reason}", flush=True) |
| | except Exception as e: |
| | STATE["error"] = f"train_error: {e}" |
| | print(f"[train] ERROR: {e}", flush=True) |
| | finally: |
| | STATE["training"] = False |
| |
|
| |
|
| | def _ensure_storage(): |
| | global storage |
| | if storage is not None: |
| | return |
| | if not config.NEON_CONNECTION: |
| | return |
| | storage = StorageManager() |
| |
|
| |
|
| | def _load_checkpoint_from_neon(): |
| | if not config.NEON_CONNECTION: |
| | return False |
| |
|
| | try: |
| | _ensure_storage() |
| | ckpt = storage.load_latest() |
| | if not ckpt: |
| | return False |
| |
|
| | |
| | acquired = MODEL_LOCK.acquire(timeout=30) |
| | if not acquired: |
| | print("[bootstrap] WARNING: Could not acquire lock for checkpoint load", flush=True) |
| | return False |
| | |
| | try: |
| | trainer.model.load_state_dict(ckpt["model_state"]) |
| | trainer.opt.load_state_dict(ckpt["optim_state"]) |
| | trainer.model.eval() |
| | finally: |
| | MODEL_LOCK.release() |
| |
|
| | STATE["trained"] = True |
| | STATE["last_checkpoint_at"] = ckpt.get("created_at") |
| | print(f"[bootstrap] ✓ Loaded checkpoint from {ckpt.get('created_at')}", flush=True) |
| | return True |
| | except Exception as e: |
| | STATE["error"] = f"neon_load_error: {e}" |
| | print(f"[bootstrap] ERROR loading checkpoint: {e}", flush=True) |
| | return False |
| |
|
| |
|
| | def _checkpoint_once(): |
| | if not config.NEON_CONNECTION: |
| | return |
| | if not STATE["trained"]: |
| | return |
| |
|
| | try: |
| | _ensure_storage() |
| | meta = { |
| | "saved_at": time.strftime("%Y-%m-%d %H:%M:%S"), |
| | "train_status": STATE.get("last_train_status"), |
| | "last_ingest_at": STATE.get("last_ingest_at"), |
| | "last_ingest_changed": STATE.get("last_ingest_changed", []), |
| | "last_ingest_errors": STATE.get("last_ingest_errors", []), |
| | } |
| |
|
| | |
| | acquired = MODEL_LOCK.acquire(timeout=120) |
| | if not acquired: |
| | print(f"[checkpoint] WARNING: Could not acquire lock after 120s, skipping", flush=True) |
| | STATE["error"] = "checkpoint_timeout: could not acquire MODEL_LOCK" |
| | return |
| | |
| | try: |
| | storage.save_checkpoint(trainer.model.state_dict(), trainer.opt.state_dict(), meta) |
| | finally: |
| | MODEL_LOCK.release() |
| |
|
| | STATE["last_checkpoint_at"] = meta["saved_at"] |
| | print(f"[checkpoint] ✓ Saved to Neon at {meta['saved_at']}", flush=True) |
| | except Exception as e: |
| | STATE["error"] = f"checkpoint_error: {e}" |
| | print(f"[checkpoint] ERROR: {e}", flush=True) |
| |
|
| |
|
| | def _checkpoint_loop(): |
| | """ |
| | Runs every CHECKPOINT_EVERY_SECONDS (3600s = 1 hour). |
| | Wrapped in try/except to survive errors. [web:360][web:361] |
| | """ |
| | while True: |
| | try: |
| | print(f"[checkpoint] Sleeping for {config.CHECKPOINT_EVERY_SECONDS}s...", flush=True) |
| | time.sleep(config.CHECKPOINT_EVERY_SECONDS) |
| | |
| | print(f"[checkpoint] Woke up at {time.strftime('%Y-%m-%d %H:%M:%S')}, starting checkpoint...", flush=True) |
| | _checkpoint_once() |
| | |
| | except Exception as e: |
| | print(f"[checkpoint] LOOP ERROR: {e}", flush=True) |
| | STATE["error"] = f"checkpoint_loop_error: {e}" |
| | STATE["checkpoint_loop_alive"] = False |
| | time.sleep(60) |
| | STATE["checkpoint_loop_alive"] = True |
| |
|
| |
|
| | def _ingest_once(): |
| | changed, errors = [], [] |
| | assets = load_asset_series() |
| |
|
| | for a in assets: |
| | csv_path = _asset_csv_path(a) |
| | try: |
| | if a.asset_type == "stock": |
| | if update_stooq_daily(a.symbol, csv_path): |
| | changed.append(a.symbol) |
| | else: |
| | if update_binance_klines(a.symbol, config.INTERVAL, csv_path): |
| | changed.append(a.symbol) |
| | except Exception as e: |
| | errors.append({"symbol": a.symbol, "asset_type": a.asset_type, "error": str(e)[:200]}) |
| |
|
| | STATE["last_ingest_at"] = time.strftime("%Y-%m-%d %H:%M:%S") |
| | STATE["last_ingest_changed"] = changed |
| | STATE["last_ingest_errors"] = errors |
| | |
| | if changed: |
| | print(f"[ingest] ✓ Updated {len(changed)} assets: {changed}", flush=True) |
| | if errors: |
| | print(f"[ingest] Errors for {len(errors)} assets", flush=True) |
| | |
| | return changed |
| |
|
| |
|
| | def _ingest_loop(): |
| | """ |
| | Waits for bootstrap, then ingests every INGEST_EVERY_SECONDS (1800s = 30 min). |
| | Wrapped in try/except to survive errors. [web:360][web:361] |
| | """ |
| | print("[ingest] Waiting for bootstrap to complete...", flush=True) |
| | BOOTSTRAP_DONE.wait() |
| | print("[ingest] Bootstrap done, starting ingestion loop", flush=True) |
| |
|
| | while True: |
| | try: |
| | |
| | if STATE["training"]: |
| | print("[ingest] Training in progress, waiting 10s...", flush=True) |
| | time.sleep(10) |
| | continue |
| |
|
| | changed = _ingest_once() |
| |
|
| | |
| | if changed and not STATE["training"]: |
| | print(f"[ingest] Triggering update-train for {len(changed)} assets", flush=True) |
| | _train_epochs(config.UPDATE_TRAIN_EPOCHS, reason=f"ingest_update({len(changed)} assets)") |
| | |
| | |
| | print(f"[ingest] Update-train done, triggering immediate checkpoint", flush=True) |
| | _checkpoint_once() |
| |
|
| | print(f"[ingest] Sleeping for {config.INGEST_EVERY_SECONDS}s...", flush=True) |
| | time.sleep(config.INGEST_EVERY_SECONDS) |
| | |
| | except Exception as e: |
| | print(f"[ingest] LOOP ERROR: {e}", flush=True) |
| | STATE["error"] = f"ingest_loop_error: {e}" |
| | STATE["ingest_loop_alive"] = False |
| | time.sleep(60) |
| | STATE["ingest_loop_alive"] = True |
| |
|
| |
|
| | def _bootstrap_flow(): |
| | """ |
| | Flow: |
| | 1) Try load checkpoint from Neon. |
| | 2) If not found: train on historical data. |
| | 3) Save checkpoint to Neon once. |
| | 4) Mark BOOTSTRAP_DONE so ingestion can start. |
| | """ |
| | print("[bootstrap] Starting bootstrap flow...", flush=True) |
| | |
| | loaded = _load_checkpoint_from_neon() |
| | if loaded: |
| | print("[bootstrap] ✓ Loaded from Neon, bootstrap complete", flush=True) |
| | BOOTSTRAP_DONE.set() |
| | return |
| |
|
| | if config.RETRAIN_ON_START: |
| | print("[bootstrap] No checkpoint found, training on historical data...", flush=True) |
| | _train_epochs(config.INITIAL_TRAIN_EPOCHS, reason="boot_initial_from_historical") |
| |
|
| | |
| | print("[bootstrap] Saving initial checkpoint...", flush=True) |
| | _checkpoint_once() |
| | |
| | print("[bootstrap] ✓ Bootstrap complete", flush=True) |
| | BOOTSTRAP_DONE.set() |
| |
|
| |
|
| | @app.route("/") |
| | def index(): |
| | return render_template("dashboard.html") |
| |
|
| |
|
| | @app.route("/api/status") |
| | def api_status(): |
| | return jsonify({ |
| | "ok": True, |
| | "boot_time": STATE["boot_time"], |
| | "data_dir_exists": os.path.exists(config.DATA_DIR), |
| |
|
| | "training": STATE["training"], |
| | "trained": STATE["trained"], |
| | "error": STATE["error"], |
| |
|
| | "last_train_status": STATE["last_train_status"], |
| |
|
| | "last_ingest_at": STATE["last_ingest_at"], |
| | "last_ingest_changed": STATE["last_ingest_changed"], |
| | "last_ingest_errors": STATE["last_ingest_errors"], |
| |
|
| | "last_checkpoint_at": STATE["last_checkpoint_at"], |
| | "neon_enabled": STATE["neon_enabled"], |
| |
|
| | "ingest_every_seconds": STATE["ingest_every_seconds"], |
| | "checkpoint_every_seconds": STATE["checkpoint_every_seconds"], |
| | "bootstrap_done": BOOTSTRAP_DONE.is_set(), |
| | |
| | |
| | "checkpoint_loop_alive": STATE["checkpoint_loop_alive"], |
| | "ingest_loop_alive": STATE["ingest_loop_alive"], |
| | }) |
| |
|
| |
|
| | @app.route("/api/predict") |
| | def api_predict(): |
| | if STATE["training"]: |
| | return jsonify({"ok": False, "error": "Training in progress. Try again soon."}), 503 |
| | if not STATE["trained"]: |
| | return jsonify({"ok": False, "error": "Model not trained yet."}), 503 |
| |
|
| | symbol = request.args.get("symbol", "").strip().lower() |
| | assets = load_asset_series() |
| | if symbol: |
| | assets = [a for a in assets if a.symbol.lower() == symbol] |
| |
|
| | preds = [] |
| | with MODEL_LOCK: |
| | for a in assets: |
| | p = _predict_next_for_asset(a) |
| | if p: |
| | preds.append(p) |
| |
|
| | preds.sort(key=lambda x: abs(x["pred_pct_return"]), reverse=True) |
| | return jsonify({"ok": True, "count": len(preds), "predictions": preds}) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | print("="*60, flush=True) |
| | print(f"Tarang v2 starting at {STATE['boot_time']}", flush=True) |
| | print("="*60, flush=True) |
| | |
| | |
| | threading.Thread(target=_bootstrap_flow, daemon=True, name="bootstrap").start() |
| |
|
| | |
| | threading.Thread(target=_ingest_loop, daemon=True, name="ingest").start() |
| |
|
| | |
| | if config.NEON_CONNECTION: |
| | threading.Thread(target=_checkpoint_loop, daemon=True, name="checkpoint").start() |
| |
|
| | app.run(host=config.HOST, port=config.PORT, debug=False) |
| |
|