dabur-pricing-app / Modular app /upload_server.py
Bera
Deploy Genesis AI to Hugging Face Spaces
1b2f7eb
"""
upload_server.py
================
FastAPI server that:
1. Accepts an Excel onboarding template
2. Runs the Genesis AI pipeline (Python subprocess)
3. Stores JSON outputs in output/
4. Serves those JSON outputs to the React frontend via /api/data/{key}
React NEVER reads static files β€” all data comes through this server.
Endpoints:
POST /api/upload β€” upload xlsx, run pipeline
GET /api/upload/status β€” current job state
GET /api/upload/logs β€” full log from last run
POST /api/upload/reset β€” reset job state
GET /api/data/{key} β€” serve computed JSON (e.g. /api/data/models)
GET /api/data β€” list available data keys
"""
import json
import os
import shutil
import subprocess
import sys
import threading
import time
from pathlib import Path
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.staticfiles import StaticFiles
# ── Paths ─────────────────────────────────────────────────────────────────────
BASE_DIR = Path(__file__).parent # Modular app/
OUTPUT_DIR = BASE_DIR / "output"
# React dashboard public/data β€” where fresh JSONs land for the browser
REACT_DATA = BASE_DIR.parent / "react-dashboard" / "public" / "data"
UPLOAD_DIR = BASE_DIR / "_uploads"
OUTPUT_DIR.mkdir(exist_ok=True)
UPLOAD_DIR.mkdir(exist_ok=True)
REACT_DATA.mkdir(parents=True, exist_ok=True)
# ── Job state (in-memory, single-user tool) ───────────────────────────────────
_job = {
"status": "idle", # idle | running | done | error
"message": "",
"logs": "",
"started_at": None,
"finished_at": None,
"filename": None,
}
_job_lock = threading.Lock()
# ── App ───────────────────────────────────────────────────────────────────────
app = FastAPI(title="Genesis AI Upload Server")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
def _find_pipeline_python() -> str:
"""Return the Python executable that has pandas (pipeline deps)."""
# Try sys.executable first (works when server is started with the right Python)
for candidate in [
sys.executable,
r"C:\Users\sohini.bera\AppData\Local\Programs\Python\Python313\python.exe",
r"C:\Users\sohini.bera\AppData\Local\Programs\Python\Python312\python.exe",
r"C:\Python313\python.exe",
"python3",
"python",
]:
try:
result = subprocess.run(
[candidate, "-c", "import pandas"],
capture_output=True, timeout=5
)
if result.returncode == 0:
return candidate
except (FileNotFoundError, subprocess.TimeoutExpired):
continue
return sys.executable # fallback
def _tprint(msg: str):
"""Thread-safe print that flushes immediately (visible in terminal)."""
print(msg, flush=True)
# Tracks which step annotations have already been printed (per pipeline run)
_printed_steps: set = set()
def _reset_steps():
global _printed_steps
_printed_steps = set()
def _annotate(line: str, xlsx_name: str):
"""
For key pipeline output lines, print a descriptive step block BEFORE the line.
Each step block prints only once per run (guarded by _printed_steps).
"""
import re
def _once(key: str, lines: list[str]):
if key not in _printed_steps:
_printed_steps.add(key)
for l in lines:
_tprint(l)
# ── STEP 1: Data loading ───────────────────────────────────────────────
if re.search(r'\[load\]', line):
_once("load", [
"",
"=" * 65,
" STEP 1 | DATA LOADING (pipeline/loader.py)",
"=" * 65,
f" Source file : {xlsx_name}",
" Sheet read : 1_Source_Data",
" -> columns: Brand, Date, Channel, Region,",
" Pack Size Group, Value, Volume, Distribution",
" -> Price computed as Value / Volume",
" -> Date parsed, Month column added",
" -> rows with nulls in Brand/Month/Value/Volume dropped",
" Sheet read : 2_Brand_Config",
" -> Focal Brand Name, Competitor 1..N",
"-" * 65,
" Pipeline output:",
])
# ── STEP 2: Brand config ───────────────────────────────────────────────
elif re.search(r'\[config\] Focal brands', line):
_once("config", [
"",
"=" * 65,
" STEP 2 | BRAND CONFIGURATION (pipeline/run.py)",
"=" * 65,
" Reads focal brand + competitors from 2_Brand_Config sheet.",
" Pack order loaded from pipeline/config.py (DEFAULT_PACK_ORDER).",
" Any pack found in data but not in config gets order rank 50+.",
"-" * 65,
" Pipeline output:",
])
# ── STEP 3: Wide pivot ─────────────────────────────────────────────────
elif re.search(r'Wide pivot:', line):
_once("pivot", [
"",
"=" * 65,
" STEP 3 | WIDE PIVOT (pipeline/modelling.py -> build_wide_pivot)",
"=" * 65,
" Builds one wide monthly table per Channel x Region combination.",
" Columns added per table:",
" Focal brand : Vol_F, Price_F, Dist_F, Val_F",
" Per competitor : Price_<comp>, Vol_<comp>, Dist_<comp>",
" Category : Cat_Vol (sum of all brands at same CH/RG/Pack/Month)",
" Cannib. : Vol_Up, Vol_Down (adjacent pack volumes)",
" All tables concatenated -> single DataFrame passed to modelling.",
"-" * 65,
" Pipeline output:",
])
# ── STEP 4: Diagnostics ────────────────────────────────────────────────
elif re.search(r'\[Step 1\].*Running diagnostics', line):
_once("diag", [
"",
"=" * 65,
" STEP 4 | DIAGNOSTICS (pipeline/diagnostics.py)",
"=" * 65,
" Runs pre-modelling checks per grain (Channel x Region x Pack):",
" Descriptive stats : mean, std, CV for price & volume",
" Collinearity : Pearson |r| > 0.70 flagged as HIGH",
" RPI & Distribution: relative price index vs competitors",
" Cannibalization : cross-pack correlation within focal brand",
" Summary sheet : pass/warn/fail per grain",
" Output file: <brand>_Step1_Diagnostics.xlsx (in output/ folder)",
"-" * 65,
" Pipeline output:",
])
# ── STEP 5: OLS modelling ──────────────────────────────────────────────
elif re.search(r'\[Step 2\].*Running elasticity models', line):
_once("model", [
"",
"=" * 65,
" STEP 5 | OLS ELASTICITY MODELS (pipeline/modelling.py -> run_elasticity_models)",
"=" * 65,
" For each grain, tests all valid spec combinations:",
" Base spec : log(Vol_F) ~ log(Price_F) + log(Dist_F)",
" + Category : + log(Cat_Vol)",
" + Comps : + log(Price_<comp>) for up to 3 competitors",
" + Seasonal : + Q2/Q3/Q4 dummies (promoted if Adj-R2 >= +5pp)",
" Selection : Best Adj-R2 with own-price coef < 0 (correct sign)",
" Guardrails : min 6 obs per grain, elasticity clamped to [-6.0, 0]",
" Config file : pipeline/config.py (all thresholds defined here)",
"-" * 65,
" Pipeline output:",
])
# ── STEP 6: Proxy assignment ───────────────────────────────────────────
elif re.search(r'Sign OK \(pre-proxy\)', line):
_once("proxy", [
"",
"=" * 65,
" STEP 6 | PROXY ASSIGNMENT (pipeline/proxies.py -> assign_proxies)",
"=" * 65,
" Grains where best spec still has wrong sign get a proxy elasticity:",
" Priority 1 : Interpolate between adjacent packs (same CH x Region)",
" Priority 2 : Borrow from nearest valid pack (same CH x Region)",
" Priority 3 : Borrow from nearest valid pack (any Region, same CH)",
" Final_OwnE clamped to [-6.0, 0] for all grains.",
" Frequency anchors computed: dominant pack per CH x Region by vol %.",
"-" * 65,
" Pipeline output:",
])
# ── STEP 7: JSON export ────────────────────────────────────────────────
elif re.search(r'\[export\].*Saved.*models\.json', line):
_once("export", [
"",
"=" * 65,
" STEP 7 | JSON EXPORT (pipeline/exporters/)",
"=" * 65,
" stats.py -> models.json (elasticity results per grain)",
" -> freq_anchors.json (dominant pack per CH x Region)",
" -> stats.json (brand KPIs, vol/val summary)",
" market.py -> trend.json (monthly time-series rows)",
" -> ms.json (market share yr25 vs yr24)",
" -> vol_salience.json (pack vol % of brand total)",
" -> val_share.json (pack value % of brand total)",
" -> comp_ms.json (all-brand market share)",
" -> vtm.json (volume-to-market by grain)",
" ppa.py -> ppa_mt.json (reads Excel sheet 3_PPA_MT)",
" -> ppa_tt.json (reads Excel sheet 4_PPA_TT)",
" analytics.py -> interaction.json (cross-brand Pearson corr.)",
" -> growth_decomp.json (pp-contribution to growth)",
" -> pgi.json (price gradient index)",
" recommendations.py-> recs_full.json (pricing rec cards per pack)",
" All files written to: output/",
"-" * 65,
" Pipeline output:",
])
# ── STEP 8: Copy to dashboard ──────────────────────────────────────────
elif re.search(r'\[done\]', line):
_once("done", [
"",
"=" * 65,
" STEP 8 | COPY TO DASHBOARD (upload_server.py)",
"=" * 65,
" Source : output/*.json (18 files)",
" Target : react-dashboard/public/data/",
" Extra : recs_full.json also copied as recs.json",
" Effect : Vite serves public/data/ as static files.",
" Browser fetches /data/<key>.json on next reload.",
"-" * 65,
" Pipeline output:",
])
def _run_pipeline(xlsx_path: Path):
"""Runs in a background thread: pipeline -> copy JSONs -> update job state."""
t0 = time.time()
with _job_lock:
_job["status"] = "running"
_job["logs"] = ""
_job["message"] = "Running pipeline..."
_job["started_at"] = t0
_reset_steps()
# ── Terminal header ───────────────────────────────────────────────────────
_tprint("")
_tprint("=" * 65)
_tprint(" GENESIS AI | PIPELINE EXECUTION LOG")
_tprint("=" * 65)
_tprint(f" File : {xlsx_path.name}")
_tprint(f" Size : {xlsx_path.stat().st_size / 1024:.1f} KB")
_tprint(f" Saved to : {xlsx_path}")
_tprint(f" Started : {time.strftime('%Y-%m-%d %H:%M:%S')}")
_tprint("=" * 65)
try:
pipeline_python = _find_pipeline_python()
cmd = [
pipeline_python, "-m", "pipeline.run",
"--input", str(xlsx_path),
"--output", str(OUTPUT_DIR),
]
env = os.environ.copy()
env["PYTHONIOENCODING"] = "utf-8"
# Stream output line-by-line so terminal shows live progress
proc = subprocess.Popen(
cmd,
cwd=str(BASE_DIR),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # merge stderr into stdout
text=True,
env=env,
)
logs = ""
for line in proc.stdout:
line_stripped = line.rstrip()
_annotate(line_stripped, xlsx_path.name) # print step header if this is a key line
_tprint(line_stripped) # then print the raw pipeline line
logs += line
with _job_lock:
_job["logs"] = logs # incremental update for UI log viewer
proc.wait(timeout=300)
return_code = proc.returncode
if return_code != 0:
_tprint("")
_tprint(f" [ERROR] Pipeline exited with code {return_code}")
_tprint("=" * 65)
with _job_lock:
_job["status"] = "error"
_job["message"] = "Pipeline failed - see logs for details."
_job["logs"] = logs
_job["finished_at"] = time.time()
return
# ── Copy to public/data/ (static fallback) + log API routes ─────────
FILE_GROUPS = {
"stats.py": ["models.json", "freq_anchors.json", "stats.json"],
"market.py": ["trend.json", "ms.json", "vol_salience.json",
"val_share.json", "comp_ms.json", "vtm.json"],
"ppa.py": ["ppa_mt.json", "ppa_tt.json"],
"analytics.py": ["interaction.json", "growth_decomp.json", "pgi.json"],
"recommendations.py": ["recs_full.json"],
}
available = [f.name for f in OUTPUT_DIR.glob("*.json")]
elapsed = round(time.time() - t0, 1)
_tprint("")
_tprint("=" * 65)
_tprint(" STEP 8 | DATA READY")
_tprint("=" * 65)
_tprint(f" Primary : GET /api/data/{{key}} (FastAPI serves from output/)")
_tprint(f" Fallback : /data/{{key}}.json (Vite static from public/data/)")
_tprint("-" * 65)
# Copy to public/data/ so static fallback stays fresh
REACT_DATA.mkdir(parents=True, exist_ok=True)
for exporter, files in FILE_GROUPS.items():
_tprint(f" pipeline/exporters/{exporter}")
for fname in files:
src = OUTPUT_DIR / fname
if src.exists():
shutil.copy2(src, REACT_DATA / fname)
key = fname.replace(".json", "")
ready = "OK"
else:
key = fname.replace(".json", "")
ready = "MISS"
_tprint(f" [{ready}] /api/data/{key} + /data/{fname}")
# recs alias
recs_src = OUTPUT_DIR / "recs_full.json"
if recs_src.exists():
shutil.copy2(recs_src, REACT_DATA / "recs.json")
_tprint(" (alias)")
_tprint(" [OK ] /api/data/recs + /data/recs.json (<- recs_full)")
_tprint("-" * 65)
_tprint(f" {len(available)} files written to output/ + copied to public/data/")
logs += f"\n[OK] {len(available)} JSON files ready. API: /api/data/ | Static: /data/"
_tprint("")
_tprint("=" * 65)
_tprint(f" [DONE] Pipeline complete in {elapsed}s")
_tprint(f" [DONE] React fetches via API when server is up,")
_tprint(f" falls back to static files when server is down.")
_tprint("=" * 65)
_tprint("")
with _job_lock:
_job["status"] = "done"
_job["message"] = f"Pipeline complete in {elapsed}s. {len(available)} JSON files updated."
_job["logs"] = logs
_job["finished_at"] = time.time()
except subprocess.TimeoutExpired:
_tprint("")
_tprint(" [TIMEOUT] Pipeline exceeded 5-minute limit.")
_tprint("=" * 65)
with _job_lock:
_job["status"] = "error"
_job["message"] = "Pipeline timed out after 5 minutes."
_job["logs"] += "\n[TIMEOUT] Process exceeded 5-minute limit."
_job["finished_at"] = time.time()
except Exception as exc:
_tprint(f" [ERROR] {exc}")
_tprint("=" * 65)
with _job_lock:
_job["status"] = "error"
_job["message"] = f"Unexpected error: {exc}"
_job["logs"] += f"\n[ERROR] {exc}"
_job["finished_at"] = time.time()
# ── Routes ────────────────────────────────────────────────────────────────────
@app.post("/api/upload")
async def upload_excel(file: UploadFile = File(...)):
"""Accept an xlsx file and kick off the pipeline in a background thread."""
# Reject if a job is already running
with _job_lock:
if _job["status"] == "running":
raise HTTPException(status_code=409, detail="A pipeline run is already in progress.")
# Validate extension
if not file.filename.lower().endswith((".xlsx", ".xlsm", ".xls")):
raise HTTPException(status_code=400, detail="Only .xlsx / .xlsm / .xls files are accepted.")
# Save upload
dest = UPLOAD_DIR / file.filename
contents = await file.read()
dest.write_bytes(contents)
with _job_lock:
_job["filename"] = file.filename
_tprint(f"\n[REQUEST] POST /api/upload -> {file.filename} ({len(contents)/1024:.1f} KB)")
# Run pipeline in background so HTTP response returns immediately
thread = threading.Thread(target=_run_pipeline, args=(dest,), daemon=True)
thread.start()
return JSONResponse({"ok": True, "message": "Upload received. Pipeline started.", "filename": file.filename})
@app.get("/api/upload/status")
def get_status():
"""Poll this endpoint to track pipeline progress."""
with _job_lock:
return {
"status": _job["status"],
"message": _job["message"],
"filename": _job["filename"],
"started_at": _job["started_at"],
"finished_at": _job["finished_at"],
"elapsed_s": round(time.time() - _job["started_at"], 1) if _job["started_at"] else None,
}
@app.get("/api/upload/logs")
def get_logs():
"""Return full pipeline log from the last run."""
with _job_lock:
return {"logs": _job["logs"]}
@app.post("/api/upload/reset")
def reset_job():
"""Reset job state back to idle (useful after an error)."""
with _job_lock:
_job["status"] = "idle"
_job["message"] = ""
_job["logs"] = ""
_job["started_at"] = None
_job["finished_at"] = None
_job["filename"] = None
return {"ok": True}
# ── Data serving routes ───────────────────────────────────────────────────────
# React fetches all dashboard data through these endpoints.
# Data is read from output/ (written by the pipeline after each upload).
# Maps frontend key -> actual filename in output/
DATA_KEY_MAP = {
"models": "models.json",
"stats": "stats.json",
"freq_anchors": "freq_anchors.json",
"trend": "trend.json",
"ms": "ms.json",
"vol_salience": "vol_salience.json",
"val_share": "val_share.json",
"comp_ms": "comp_ms.json",
"vtm": "vtm.json",
"ppa_mt": "ppa_mt.json",
"ppa_tt": "ppa_tt.json",
"interaction": "interaction.json",
"growth_decomp": "growth_decomp.json",
"pgi": "pgi.json",
"recs": "recs_full.json", # alias: recs -> recs_full
"recs_full": "recs_full.json",
}
@app.get("/api/data")
def list_data_keys():
"""List all available data keys and whether their file exists."""
available = {}
for key, fname in DATA_KEY_MAP.items():
path = OUTPUT_DIR / fname
available[key] = {
"file": fname,
"available": path.exists(),
"size_kb": round(path.stat().st_size / 1024, 1) if path.exists() else None,
}
_tprint(f"[DATA] GET /api/data -> listed {len(available)} keys")
return available
@app.get("/api/data/{key}")
def get_data(key: str):
"""
Serve computed pipeline JSON to the React frontend.
React calls this for every dashboard tab on load and after each upload.
"""
fname = DATA_KEY_MAP.get(key)
if not fname:
_tprint(f"[DATA] GET /api/data/{key} -> 404 unknown key")
raise HTTPException(status_code=404, detail=f"Unknown data key: '{key}'")
json_file = OUTPUT_DIR / fname
if not json_file.exists():
_tprint(f"[DATA] GET /api/data/{key} -> 404 not found (pipeline not run yet?)")
raise HTTPException(
status_code=404,
detail=f"Data for '{key}' not found. Upload an Excel file first to run the pipeline."
)
size_kb = round(json_file.stat().st_size / 1024, 1)
_tprint(f"[DATA] GET /api/data/{key:<16} -> output/{fname} ({size_kb} KB) -> React")
with open(json_file, encoding="utf-8") as fh:
data = json.load(fh)
return JSONResponse(content=data)
# ── Static file serving (production / Hugging Face) ──────────────────────────
# These mounts are registered AFTER all /api/* routes so the API always wins.
#
# /data/<key>.json β†’ react-dashboard/public/data/ (pre-seeded + updated on upload)
# / β†’ react-dashboard/dist/ (compiled React SPA)
#
# In local dev, Vite serves both; these mounts do nothing when dist/ doesn't exist.
_REACT_DATA_DIR = BASE_DIR.parent / "react-dashboard" / "public" / "data"
_REACT_DIST_DIR = BASE_DIR.parent / "react-dashboard" / "dist"
if _REACT_DATA_DIR.exists():
app.mount("/data", StaticFiles(directory=str(_REACT_DATA_DIR)), name="data_static")
if _REACT_DIST_DIR.exists():
app.mount("/", StaticFiles(directory=str(_REACT_DIST_DIR), html=True), name="spa")
if __name__ == "__main__":
import uvicorn
print("Starting Genesis AI Upload Server on http://localhost:8000")
print(f"Using Python: {sys.executable}")
uvicorn.run("upload_server:app", host="0.0.0.0", port=8000, reload=False)