""" Genesis AI – Dabur Demand Intelligence Platform Hugging Face Spaces entry point. HF Spaces requires: - Entry file named app.py - App listening on port 7860 - SDK: docker (we use Flask, not Gradio/Streamlit) """ from flask import Flask, request, jsonify, send_from_directory, session import io, traceback, os, uuid, threading, time from dabur_baseline_forecast_v2 import run_pipeline, CONFIG BASE_DIR = os.path.dirname(os.path.abspath(__file__)) SECRET_KEY = os.environ.get("SECRET_KEY", "dabur-genesisai-hf-2025") PORT = int(os.environ.get("PORT", 7860)) # HF Spaces requires 7860 app = Flask(__name__, static_folder=BASE_DIR, static_url_path="") app.secret_key = SECRET_KEY # ============================================================================= # USERS – set via HF Space Secrets for security # ANALYST_PASSWORD / VIEWER_PASSWORD # ============================================================================= USERS = { "analyst": { "password": os.environ.get("ANALYST_PASSWORD", "dabur@analyst"), "role": "analyst", }, "viewer": { "password": os.environ.get("VIEWER_PASSWORD", "dabur@viewer"), "role": "viewer", }, } # ============================================================================= # JOB STORE – background thread per forecast run # Frontend submits → gets job_id instantly → polls /job/ every 5s # ============================================================================= _jobs: dict = {} _latest_forecast = None _jobs_lock = threading.Lock() def _run_job(job_id: str, file_bytes: bytes, cfg: dict): """Pipeline runs in daemon thread. Safe to run long without timeout.""" global _latest_forecast try: result = run_pipeline(io.BytesIO(file_bytes), cfg) with _jobs_lock: _jobs[job_id].update({ "status": "done", "result": result, "finished_at": time.time(), }) _latest_forecast = result print(f"[JOB {job_id[:8]}] Done – {result.get('n_series',0)} series") except Exception as e: tb = traceback.format_exc() print(f"[JOB {job_id[:8]}] ERROR:\n{tb}") with _jobs_lock: _jobs[job_id].update({ "status": "error", "error": str(e), "traceback": tb, "finished_at": time.time(), }) # Purge jobs older than 2 hours now = time.time() with _jobs_lock: for jid in [k for k, v in _jobs.items() if v.get("finished_at") and now - v["finished_at"] > 7200]: del _jobs[jid] # ============================================================================= # ROUTES # ============================================================================= @app.route("/") def index(): return send_from_directory(BASE_DIR, "dabur_demand_platform.html") # ── Auth ─────────────────────────────────────────────────────────────── @app.route("/login", methods=["POST"]) def login(): data = request.get_json() or {} username = data.get("username", "").strip().lower() password = data.get("password", "") user = USERS.get(username) if not user or user["password"] != password: return jsonify({"status": "error", "message": "Invalid username or password"}), 401 session["username"] = username session["role"] = user["role"] print(f"[AUTH] Login: {username} ({user['role']})") return jsonify({"status": "ok", "username": username, "role": user["role"]}) @app.route("/logout", methods=["POST"]) def logout(): print(f"[AUTH] Logout: {session.get('username','?')}") session.clear() return jsonify({"status": "ok"}) @app.route("/me", methods=["GET"]) def me(): if "username" in session: return jsonify({"status": "ok", "username": session["username"], "role": session["role"]}) return jsonify({"status": "error", "message": "Not logged in"}), 401 # ── Health ───────────────────────────────────────────────────────────── @app.route("/health", methods=["GET"]) def health(): return jsonify({"status": "ok", "message": "Genesis AI – running on HF Spaces"}) # ── Submit forecast job ──────────────────────────────────────────────── @app.route("/run", methods=["POST"]) def run(): if session.get("role") != "analyst": return jsonify({"status": "error", "message": "Access denied – analyst role required"}), 403 if "file" not in request.files: return jsonify({"status": "error", "message": "No file uploaded"}), 400 f = request.files["file"] if not f.filename.lower().endswith((".xlsx", ".xls")): return jsonify({"status": "error", "message": "Only .xlsx files are supported"}), 400 cfg = CONFIG.copy() try: if request.form.get("horizon"): cfg["forecast_horizon"] = int(request.form["horizon"]) if request.form.get("season"): cfg["season_length"] = int(request.form["season"]) if request.form.get("cv_horizon"): cfg["cv_horizon"] = int(request.form["cv_horizon"]) if request.form.get("top_n"): cfg["top_n_ensemble"] = int(request.form["top_n"]) except ValueError as e: return jsonify({"status": "error", "message": f"Invalid config value: {e}"}), 400 file_bytes = f.read() job_id = str(uuid.uuid4()) with _jobs_lock: _jobs[job_id] = { "status": "running", "result": None, "error": None, "started_at": time.time(), "finished_at": None, } threading.Thread(target=_run_job, args=(job_id, file_bytes, cfg), daemon=True).start() print(f"[JOB {job_id[:8]}] Started by {session.get('username','?')}") return jsonify({"status": "started", "job_id": job_id}) # ── Poll job status ──────────────────────────────────────────────────── @app.route("/job/", methods=["GET"]) def job_status(job_id): if "username" not in session: return jsonify({"status": "error", "message": "Not logged in"}), 401 with _jobs_lock: job = _jobs.get(job_id) if not job: return jsonify({"status": "error", "message": "Job not found"}), 404 if job["status"] == "running": return jsonify({"status": "running", "elapsed_seconds": round(time.time() - job["started_at"])}) if job["status"] == "error": return jsonify({"status": "error", "message": job["error"]}), 500 return jsonify({"status": "done", "result": job["result"]}) # ── Latest forecast for viewer ───────────────────────────────────────── @app.route("/forecast-result", methods=["GET"]) def forecast_result(): if "username" not in session: return jsonify({"status": "error", "message": "Not logged in"}), 401 if _latest_forecast is None: return jsonify({"status": "empty", "message": "No forecast has been run yet. Ask the analyst to run the pipeline."}) return jsonify(_latest_forecast) # ============================================================================= # BOOT # ============================================================================= if __name__ == "__main__": print("=" * 60) print(" Genesis AI – Dabur Demand Intelligence Platform") print(f" Running on port {PORT}") print(f" analyst / {USERS['analyst']['password']} → Full access") print(f" viewer / {USERS['viewer']['password']} → Output only") print("=" * 60) app.run(host="0.0.0.0", port=PORT, debug=False, threaded=True)