aashish-bindal's picture
Genesis AI deploy
bb004f6
"""
Genesis AI – Dabur Demand Intelligence Platform
Hugging Face Spaces entry point.
HF Spaces requires:
- Entry file named app.py
- App listening on port 7860
- SDK: docker (we use Flask, not Gradio/Streamlit)
"""
from flask import Flask, request, jsonify, send_from_directory, session
import io, traceback, os, uuid, threading, time
from dabur_baseline_forecast_v2 import run_pipeline, CONFIG
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
SECRET_KEY = os.environ.get("SECRET_KEY", "dabur-genesisai-hf-2025")
PORT = int(os.environ.get("PORT", 7860)) # HF Spaces requires 7860
app = Flask(__name__, static_folder=BASE_DIR, static_url_path="")
app.secret_key = SECRET_KEY
# =============================================================================
# USERS – set via HF Space Secrets for security
# ANALYST_PASSWORD / VIEWER_PASSWORD
# =============================================================================
USERS = {
"analyst": {
"password": os.environ.get("ANALYST_PASSWORD", "dabur@analyst"),
"role": "analyst",
},
"viewer": {
"password": os.environ.get("VIEWER_PASSWORD", "dabur@viewer"),
"role": "viewer",
},
}
# =============================================================================
# JOB STORE – background thread per forecast run
# Frontend submits β†’ gets job_id instantly β†’ polls /job/<id> every 5s
# =============================================================================
_jobs: dict = {}
_latest_forecast = None
_jobs_lock = threading.Lock()
def _run_job(job_id: str, file_bytes: bytes, cfg: dict):
"""Pipeline runs in daemon thread. Safe to run long without timeout."""
global _latest_forecast
try:
result = run_pipeline(io.BytesIO(file_bytes), cfg)
with _jobs_lock:
_jobs[job_id].update({
"status": "done",
"result": result,
"finished_at": time.time(),
})
_latest_forecast = result
print(f"[JOB {job_id[:8]}] Done – {result.get('n_series',0)} series")
except Exception as e:
tb = traceback.format_exc()
print(f"[JOB {job_id[:8]}] ERROR:\n{tb}")
with _jobs_lock:
_jobs[job_id].update({
"status": "error",
"error": str(e),
"traceback": tb,
"finished_at": time.time(),
})
# Purge jobs older than 2 hours
now = time.time()
with _jobs_lock:
for jid in [k for k, v in _jobs.items()
if v.get("finished_at") and now - v["finished_at"] > 7200]:
del _jobs[jid]
# =============================================================================
# ROUTES
# =============================================================================
@app.route("/")
def index():
return send_from_directory(BASE_DIR, "dabur_demand_platform.html")
# ── Auth ───────────────────────────────────────────────────────────────
@app.route("/login", methods=["POST"])
def login():
data = request.get_json() or {}
username = data.get("username", "").strip().lower()
password = data.get("password", "")
user = USERS.get(username)
if not user or user["password"] != password:
return jsonify({"status": "error", "message": "Invalid username or password"}), 401
session["username"] = username
session["role"] = user["role"]
print(f"[AUTH] Login: {username} ({user['role']})")
return jsonify({"status": "ok", "username": username, "role": user["role"]})
@app.route("/logout", methods=["POST"])
def logout():
print(f"[AUTH] Logout: {session.get('username','?')}")
session.clear()
return jsonify({"status": "ok"})
@app.route("/me", methods=["GET"])
def me():
if "username" in session:
return jsonify({"status": "ok",
"username": session["username"],
"role": session["role"]})
return jsonify({"status": "error", "message": "Not logged in"}), 401
# ── Health ─────────────────────────────────────────────────────────────
@app.route("/health", methods=["GET"])
def health():
return jsonify({"status": "ok", "message": "Genesis AI – running on HF Spaces"})
# ── Submit forecast job ────────────────────────────────────────────────
@app.route("/run", methods=["POST"])
def run():
if session.get("role") != "analyst":
return jsonify({"status": "error",
"message": "Access denied – analyst role required"}), 403
if "file" not in request.files:
return jsonify({"status": "error", "message": "No file uploaded"}), 400
f = request.files["file"]
if not f.filename.lower().endswith((".xlsx", ".xls")):
return jsonify({"status": "error",
"message": "Only .xlsx files are supported"}), 400
cfg = CONFIG.copy()
try:
if request.form.get("horizon"): cfg["forecast_horizon"] = int(request.form["horizon"])
if request.form.get("season"): cfg["season_length"] = int(request.form["season"])
if request.form.get("cv_horizon"): cfg["cv_horizon"] = int(request.form["cv_horizon"])
if request.form.get("top_n"): cfg["top_n_ensemble"] = int(request.form["top_n"])
except ValueError as e:
return jsonify({"status": "error", "message": f"Invalid config value: {e}"}), 400
file_bytes = f.read()
job_id = str(uuid.uuid4())
with _jobs_lock:
_jobs[job_id] = {
"status": "running",
"result": None,
"error": None,
"started_at": time.time(),
"finished_at": None,
}
threading.Thread(target=_run_job, args=(job_id, file_bytes, cfg),
daemon=True).start()
print(f"[JOB {job_id[:8]}] Started by {session.get('username','?')}")
return jsonify({"status": "started", "job_id": job_id})
# ── Poll job status ────────────────────────────────────────────────────
@app.route("/job/<job_id>", methods=["GET"])
def job_status(job_id):
if "username" not in session:
return jsonify({"status": "error", "message": "Not logged in"}), 401
with _jobs_lock:
job = _jobs.get(job_id)
if not job:
return jsonify({"status": "error", "message": "Job not found"}), 404
if job["status"] == "running":
return jsonify({"status": "running",
"elapsed_seconds": round(time.time() - job["started_at"])})
if job["status"] == "error":
return jsonify({"status": "error", "message": job["error"]}), 500
return jsonify({"status": "done", "result": job["result"]})
# ── Latest forecast for viewer ─────────────────────────────────────────
@app.route("/forecast-result", methods=["GET"])
def forecast_result():
if "username" not in session:
return jsonify({"status": "error", "message": "Not logged in"}), 401
if _latest_forecast is None:
return jsonify({"status": "empty",
"message": "No forecast has been run yet. Ask the analyst to run the pipeline."})
return jsonify(_latest_forecast)
# =============================================================================
# BOOT
# =============================================================================
if __name__ == "__main__":
print("=" * 60)
print(" Genesis AI – Dabur Demand Intelligence Platform")
print(f" Running on port {PORT}")
print(f" analyst / {USERS['analyst']['password']} β†’ Full access")
print(f" viewer / {USERS['viewer']['password']} β†’ Output only")
print("=" * 60)
app.run(host="0.0.0.0", port=PORT, debug=False, threaded=True)