Spaces:
Sleeping
Sleeping
| """ | |
| upload_server.py | |
| ================ | |
| FastAPI server that: | |
| 1. Accepts an Excel onboarding template | |
| 2. Runs the Genesis AI pipeline (Python subprocess) | |
| 3. Stores JSON outputs in output/ | |
| 4. Serves those JSON outputs to the React frontend via /api/data/{key} | |
| React NEVER reads static files β all data comes through this server. | |
| Endpoints: | |
| POST /api/upload β upload xlsx, run pipeline | |
| GET /api/upload/status β current job state | |
| GET /api/upload/logs β full log from last run | |
| POST /api/upload/reset β reset job state | |
| GET /api/data/{key} β serve computed JSON (e.g. /api/data/models) | |
| GET /api/data β list available data keys | |
| """ | |
| import json | |
| import os | |
| import shutil | |
| import subprocess | |
| import sys | |
| import threading | |
| import time | |
| from pathlib import Path | |
| from fastapi import FastAPI, File, UploadFile, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| from fastapi.staticfiles import StaticFiles | |
| # ββ Paths βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| BASE_DIR = Path(__file__).parent # Modular app/ | |
| OUTPUT_DIR = BASE_DIR / "output" | |
| # React dashboard public/data β where fresh JSONs land for the browser | |
| REACT_DATA = BASE_DIR.parent / "react-dashboard" / "public" / "data" | |
| UPLOAD_DIR = BASE_DIR / "_uploads" | |
| OUTPUT_DIR.mkdir(exist_ok=True) | |
| UPLOAD_DIR.mkdir(exist_ok=True) | |
| REACT_DATA.mkdir(parents=True, exist_ok=True) | |
| # ββ Job state (in-memory, single-user tool) βββββββββββββββββββββββββββββββββββ | |
| _job = { | |
| "status": "idle", # idle | running | done | error | |
| "message": "", | |
| "logs": "", | |
| "started_at": None, | |
| "finished_at": None, | |
| "filename": None, | |
| } | |
| _job_lock = threading.Lock() | |
| # ββ App βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| app = FastAPI(title="Genesis AI Upload Server") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def _find_pipeline_python() -> str: | |
| """Return the Python executable that has pandas (pipeline deps).""" | |
| # Try sys.executable first (works when server is started with the right Python) | |
| for candidate in [ | |
| sys.executable, | |
| r"C:\Users\sohini.bera\AppData\Local\Programs\Python\Python313\python.exe", | |
| r"C:\Users\sohini.bera\AppData\Local\Programs\Python\Python312\python.exe", | |
| r"C:\Python313\python.exe", | |
| "python3", | |
| "python", | |
| ]: | |
| try: | |
| result = subprocess.run( | |
| [candidate, "-c", "import pandas"], | |
| capture_output=True, timeout=5 | |
| ) | |
| if result.returncode == 0: | |
| return candidate | |
| except (FileNotFoundError, subprocess.TimeoutExpired): | |
| continue | |
| return sys.executable # fallback | |
| def _tprint(msg: str): | |
| """Thread-safe print that flushes immediately (visible in terminal).""" | |
| print(msg, flush=True) | |
| # Tracks which step annotations have already been printed (per pipeline run) | |
| _printed_steps: set = set() | |
| def _reset_steps(): | |
| global _printed_steps | |
| _printed_steps = set() | |
| def _annotate(line: str, xlsx_name: str): | |
| """ | |
| For key pipeline output lines, print a descriptive step block BEFORE the line. | |
| Each step block prints only once per run (guarded by _printed_steps). | |
| """ | |
| import re | |
| def _once(key: str, lines: list[str]): | |
| if key not in _printed_steps: | |
| _printed_steps.add(key) | |
| for l in lines: | |
| _tprint(l) | |
| # ββ STEP 1: Data loading βββββββββββββββββββββββββββββββββββββββββββββββ | |
| if re.search(r'\[load\]', line): | |
| _once("load", [ | |
| "", | |
| "=" * 65, | |
| " STEP 1 | DATA LOADING (pipeline/loader.py)", | |
| "=" * 65, | |
| f" Source file : {xlsx_name}", | |
| " Sheet read : 1_Source_Data", | |
| " -> columns: Brand, Date, Channel, Region,", | |
| " Pack Size Group, Value, Volume, Distribution", | |
| " -> Price computed as Value / Volume", | |
| " -> Date parsed, Month column added", | |
| " -> rows with nulls in Brand/Month/Value/Volume dropped", | |
| " Sheet read : 2_Brand_Config", | |
| " -> Focal Brand Name, Competitor 1..N", | |
| "-" * 65, | |
| " Pipeline output:", | |
| ]) | |
| # ββ STEP 2: Brand config βββββββββββββββββββββββββββββββββββββββββββββββ | |
| elif re.search(r'\[config\] Focal brands', line): | |
| _once("config", [ | |
| "", | |
| "=" * 65, | |
| " STEP 2 | BRAND CONFIGURATION (pipeline/run.py)", | |
| "=" * 65, | |
| " Reads focal brand + competitors from 2_Brand_Config sheet.", | |
| " Pack order loaded from pipeline/config.py (DEFAULT_PACK_ORDER).", | |
| " Any pack found in data but not in config gets order rank 50+.", | |
| "-" * 65, | |
| " Pipeline output:", | |
| ]) | |
| # ββ STEP 3: Wide pivot βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| elif re.search(r'Wide pivot:', line): | |
| _once("pivot", [ | |
| "", | |
| "=" * 65, | |
| " STEP 3 | WIDE PIVOT (pipeline/modelling.py -> build_wide_pivot)", | |
| "=" * 65, | |
| " Builds one wide monthly table per Channel x Region combination.", | |
| " Columns added per table:", | |
| " Focal brand : Vol_F, Price_F, Dist_F, Val_F", | |
| " Per competitor : Price_<comp>, Vol_<comp>, Dist_<comp>", | |
| " Category : Cat_Vol (sum of all brands at same CH/RG/Pack/Month)", | |
| " Cannib. : Vol_Up, Vol_Down (adjacent pack volumes)", | |
| " All tables concatenated -> single DataFrame passed to modelling.", | |
| "-" * 65, | |
| " Pipeline output:", | |
| ]) | |
| # ββ STEP 4: Diagnostics ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| elif re.search(r'\[Step 1\].*Running diagnostics', line): | |
| _once("diag", [ | |
| "", | |
| "=" * 65, | |
| " STEP 4 | DIAGNOSTICS (pipeline/diagnostics.py)", | |
| "=" * 65, | |
| " Runs pre-modelling checks per grain (Channel x Region x Pack):", | |
| " Descriptive stats : mean, std, CV for price & volume", | |
| " Collinearity : Pearson |r| > 0.70 flagged as HIGH", | |
| " RPI & Distribution: relative price index vs competitors", | |
| " Cannibalization : cross-pack correlation within focal brand", | |
| " Summary sheet : pass/warn/fail per grain", | |
| " Output file: <brand>_Step1_Diagnostics.xlsx (in output/ folder)", | |
| "-" * 65, | |
| " Pipeline output:", | |
| ]) | |
| # ββ STEP 5: OLS modelling ββββββββββββββββββββββββββββββββββββββββββββββ | |
| elif re.search(r'\[Step 2\].*Running elasticity models', line): | |
| _once("model", [ | |
| "", | |
| "=" * 65, | |
| " STEP 5 | OLS ELASTICITY MODELS (pipeline/modelling.py -> run_elasticity_models)", | |
| "=" * 65, | |
| " For each grain, tests all valid spec combinations:", | |
| " Base spec : log(Vol_F) ~ log(Price_F) + log(Dist_F)", | |
| " + Category : + log(Cat_Vol)", | |
| " + Comps : + log(Price_<comp>) for up to 3 competitors", | |
| " + Seasonal : + Q2/Q3/Q4 dummies (promoted if Adj-R2 >= +5pp)", | |
| " Selection : Best Adj-R2 with own-price coef < 0 (correct sign)", | |
| " Guardrails : min 6 obs per grain, elasticity clamped to [-6.0, 0]", | |
| " Config file : pipeline/config.py (all thresholds defined here)", | |
| "-" * 65, | |
| " Pipeline output:", | |
| ]) | |
| # ββ STEP 6: Proxy assignment βββββββββββββββββββββββββββββββββββββββββββ | |
| elif re.search(r'Sign OK \(pre-proxy\)', line): | |
| _once("proxy", [ | |
| "", | |
| "=" * 65, | |
| " STEP 6 | PROXY ASSIGNMENT (pipeline/proxies.py -> assign_proxies)", | |
| "=" * 65, | |
| " Grains where best spec still has wrong sign get a proxy elasticity:", | |
| " Priority 1 : Interpolate between adjacent packs (same CH x Region)", | |
| " Priority 2 : Borrow from nearest valid pack (same CH x Region)", | |
| " Priority 3 : Borrow from nearest valid pack (any Region, same CH)", | |
| " Final_OwnE clamped to [-6.0, 0] for all grains.", | |
| " Frequency anchors computed: dominant pack per CH x Region by vol %.", | |
| "-" * 65, | |
| " Pipeline output:", | |
| ]) | |
| # ββ STEP 7: JSON export ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| elif re.search(r'\[export\].*Saved.*models\.json', line): | |
| _once("export", [ | |
| "", | |
| "=" * 65, | |
| " STEP 7 | JSON EXPORT (pipeline/exporters/)", | |
| "=" * 65, | |
| " stats.py -> models.json (elasticity results per grain)", | |
| " -> freq_anchors.json (dominant pack per CH x Region)", | |
| " -> stats.json (brand KPIs, vol/val summary)", | |
| " market.py -> trend.json (monthly time-series rows)", | |
| " -> ms.json (market share yr25 vs yr24)", | |
| " -> vol_salience.json (pack vol % of brand total)", | |
| " -> val_share.json (pack value % of brand total)", | |
| " -> comp_ms.json (all-brand market share)", | |
| " -> vtm.json (volume-to-market by grain)", | |
| " ppa.py -> ppa_mt.json (reads Excel sheet 3_PPA_MT)", | |
| " -> ppa_tt.json (reads Excel sheet 4_PPA_TT)", | |
| " analytics.py -> interaction.json (cross-brand Pearson corr.)", | |
| " -> growth_decomp.json (pp-contribution to growth)", | |
| " -> pgi.json (price gradient index)", | |
| " recommendations.py-> recs_full.json (pricing rec cards per pack)", | |
| " All files written to: output/", | |
| "-" * 65, | |
| " Pipeline output:", | |
| ]) | |
| # ββ STEP 8: Copy to dashboard ββββββββββββββββββββββββββββββββββββββββββ | |
| elif re.search(r'\[done\]', line): | |
| _once("done", [ | |
| "", | |
| "=" * 65, | |
| " STEP 8 | COPY TO DASHBOARD (upload_server.py)", | |
| "=" * 65, | |
| " Source : output/*.json (18 files)", | |
| " Target : react-dashboard/public/data/", | |
| " Extra : recs_full.json also copied as recs.json", | |
| " Effect : Vite serves public/data/ as static files.", | |
| " Browser fetches /data/<key>.json on next reload.", | |
| "-" * 65, | |
| " Pipeline output:", | |
| ]) | |
| def _run_pipeline(xlsx_path: Path): | |
| """Runs in a background thread: pipeline -> copy JSONs -> update job state.""" | |
| t0 = time.time() | |
| with _job_lock: | |
| _job["status"] = "running" | |
| _job["logs"] = "" | |
| _job["message"] = "Running pipeline..." | |
| _job["started_at"] = t0 | |
| _reset_steps() | |
| # ββ Terminal header βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _tprint("") | |
| _tprint("=" * 65) | |
| _tprint(" GENESIS AI | PIPELINE EXECUTION LOG") | |
| _tprint("=" * 65) | |
| _tprint(f" File : {xlsx_path.name}") | |
| _tprint(f" Size : {xlsx_path.stat().st_size / 1024:.1f} KB") | |
| _tprint(f" Saved to : {xlsx_path}") | |
| _tprint(f" Started : {time.strftime('%Y-%m-%d %H:%M:%S')}") | |
| _tprint("=" * 65) | |
| try: | |
| pipeline_python = _find_pipeline_python() | |
| cmd = [ | |
| pipeline_python, "-m", "pipeline.run", | |
| "--input", str(xlsx_path), | |
| "--output", str(OUTPUT_DIR), | |
| ] | |
| env = os.environ.copy() | |
| env["PYTHONIOENCODING"] = "utf-8" | |
| # Stream output line-by-line so terminal shows live progress | |
| proc = subprocess.Popen( | |
| cmd, | |
| cwd=str(BASE_DIR), | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, # merge stderr into stdout | |
| text=True, | |
| env=env, | |
| ) | |
| logs = "" | |
| for line in proc.stdout: | |
| line_stripped = line.rstrip() | |
| _annotate(line_stripped, xlsx_path.name) # print step header if this is a key line | |
| _tprint(line_stripped) # then print the raw pipeline line | |
| logs += line | |
| with _job_lock: | |
| _job["logs"] = logs # incremental update for UI log viewer | |
| proc.wait(timeout=300) | |
| return_code = proc.returncode | |
| if return_code != 0: | |
| _tprint("") | |
| _tprint(f" [ERROR] Pipeline exited with code {return_code}") | |
| _tprint("=" * 65) | |
| with _job_lock: | |
| _job["status"] = "error" | |
| _job["message"] = "Pipeline failed - see logs for details." | |
| _job["logs"] = logs | |
| _job["finished_at"] = time.time() | |
| return | |
| # ββ Copy to public/data/ (static fallback) + log API routes βββββββββ | |
| FILE_GROUPS = { | |
| "stats.py": ["models.json", "freq_anchors.json", "stats.json"], | |
| "market.py": ["trend.json", "ms.json", "vol_salience.json", | |
| "val_share.json", "comp_ms.json", "vtm.json"], | |
| "ppa.py": ["ppa_mt.json", "ppa_tt.json"], | |
| "analytics.py": ["interaction.json", "growth_decomp.json", "pgi.json"], | |
| "recommendations.py": ["recs_full.json"], | |
| } | |
| available = [f.name for f in OUTPUT_DIR.glob("*.json")] | |
| elapsed = round(time.time() - t0, 1) | |
| _tprint("") | |
| _tprint("=" * 65) | |
| _tprint(" STEP 8 | DATA READY") | |
| _tprint("=" * 65) | |
| _tprint(f" Primary : GET /api/data/{{key}} (FastAPI serves from output/)") | |
| _tprint(f" Fallback : /data/{{key}}.json (Vite static from public/data/)") | |
| _tprint("-" * 65) | |
| # Copy to public/data/ so static fallback stays fresh | |
| REACT_DATA.mkdir(parents=True, exist_ok=True) | |
| for exporter, files in FILE_GROUPS.items(): | |
| _tprint(f" pipeline/exporters/{exporter}") | |
| for fname in files: | |
| src = OUTPUT_DIR / fname | |
| if src.exists(): | |
| shutil.copy2(src, REACT_DATA / fname) | |
| key = fname.replace(".json", "") | |
| ready = "OK" | |
| else: | |
| key = fname.replace(".json", "") | |
| ready = "MISS" | |
| _tprint(f" [{ready}] /api/data/{key} + /data/{fname}") | |
| # recs alias | |
| recs_src = OUTPUT_DIR / "recs_full.json" | |
| if recs_src.exists(): | |
| shutil.copy2(recs_src, REACT_DATA / "recs.json") | |
| _tprint(" (alias)") | |
| _tprint(" [OK ] /api/data/recs + /data/recs.json (<- recs_full)") | |
| _tprint("-" * 65) | |
| _tprint(f" {len(available)} files written to output/ + copied to public/data/") | |
| logs += f"\n[OK] {len(available)} JSON files ready. API: /api/data/ | Static: /data/" | |
| _tprint("") | |
| _tprint("=" * 65) | |
| _tprint(f" [DONE] Pipeline complete in {elapsed}s") | |
| _tprint(f" [DONE] React fetches via API when server is up,") | |
| _tprint(f" falls back to static files when server is down.") | |
| _tprint("=" * 65) | |
| _tprint("") | |
| with _job_lock: | |
| _job["status"] = "done" | |
| _job["message"] = f"Pipeline complete in {elapsed}s. {len(available)} JSON files updated." | |
| _job["logs"] = logs | |
| _job["finished_at"] = time.time() | |
| except subprocess.TimeoutExpired: | |
| _tprint("") | |
| _tprint(" [TIMEOUT] Pipeline exceeded 5-minute limit.") | |
| _tprint("=" * 65) | |
| with _job_lock: | |
| _job["status"] = "error" | |
| _job["message"] = "Pipeline timed out after 5 minutes." | |
| _job["logs"] += "\n[TIMEOUT] Process exceeded 5-minute limit." | |
| _job["finished_at"] = time.time() | |
| except Exception as exc: | |
| _tprint(f" [ERROR] {exc}") | |
| _tprint("=" * 65) | |
| with _job_lock: | |
| _job["status"] = "error" | |
| _job["message"] = f"Unexpected error: {exc}" | |
| _job["logs"] += f"\n[ERROR] {exc}" | |
| _job["finished_at"] = time.time() | |
| # ββ Routes ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def upload_excel(file: UploadFile = File(...)): | |
| """Accept an xlsx file and kick off the pipeline in a background thread.""" | |
| # Reject if a job is already running | |
| with _job_lock: | |
| if _job["status"] == "running": | |
| raise HTTPException(status_code=409, detail="A pipeline run is already in progress.") | |
| # Validate extension | |
| if not file.filename.lower().endswith((".xlsx", ".xlsm", ".xls")): | |
| raise HTTPException(status_code=400, detail="Only .xlsx / .xlsm / .xls files are accepted.") | |
| # Save upload | |
| dest = UPLOAD_DIR / file.filename | |
| contents = await file.read() | |
| dest.write_bytes(contents) | |
| with _job_lock: | |
| _job["filename"] = file.filename | |
| _tprint(f"\n[REQUEST] POST /api/upload -> {file.filename} ({len(contents)/1024:.1f} KB)") | |
| # Run pipeline in background so HTTP response returns immediately | |
| thread = threading.Thread(target=_run_pipeline, args=(dest,), daemon=True) | |
| thread.start() | |
| return JSONResponse({"ok": True, "message": "Upload received. Pipeline started.", "filename": file.filename}) | |
| def get_status(): | |
| """Poll this endpoint to track pipeline progress.""" | |
| with _job_lock: | |
| return { | |
| "status": _job["status"], | |
| "message": _job["message"], | |
| "filename": _job["filename"], | |
| "started_at": _job["started_at"], | |
| "finished_at": _job["finished_at"], | |
| "elapsed_s": round(time.time() - _job["started_at"], 1) if _job["started_at"] else None, | |
| } | |
| def get_logs(): | |
| """Return full pipeline log from the last run.""" | |
| with _job_lock: | |
| return {"logs": _job["logs"]} | |
| def reset_job(): | |
| """Reset job state back to idle (useful after an error).""" | |
| with _job_lock: | |
| _job["status"] = "idle" | |
| _job["message"] = "" | |
| _job["logs"] = "" | |
| _job["started_at"] = None | |
| _job["finished_at"] = None | |
| _job["filename"] = None | |
| return {"ok": True} | |
| # ββ Data serving routes βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # React fetches all dashboard data through these endpoints. | |
| # Data is read from output/ (written by the pipeline after each upload). | |
| # Maps frontend key -> actual filename in output/ | |
| DATA_KEY_MAP = { | |
| "models": "models.json", | |
| "stats": "stats.json", | |
| "freq_anchors": "freq_anchors.json", | |
| "trend": "trend.json", | |
| "ms": "ms.json", | |
| "vol_salience": "vol_salience.json", | |
| "val_share": "val_share.json", | |
| "comp_ms": "comp_ms.json", | |
| "vtm": "vtm.json", | |
| "ppa_mt": "ppa_mt.json", | |
| "ppa_tt": "ppa_tt.json", | |
| "interaction": "interaction.json", | |
| "growth_decomp": "growth_decomp.json", | |
| "pgi": "pgi.json", | |
| "recs": "recs_full.json", # alias: recs -> recs_full | |
| "recs_full": "recs_full.json", | |
| } | |
| def list_data_keys(): | |
| """List all available data keys and whether their file exists.""" | |
| available = {} | |
| for key, fname in DATA_KEY_MAP.items(): | |
| path = OUTPUT_DIR / fname | |
| available[key] = { | |
| "file": fname, | |
| "available": path.exists(), | |
| "size_kb": round(path.stat().st_size / 1024, 1) if path.exists() else None, | |
| } | |
| _tprint(f"[DATA] GET /api/data -> listed {len(available)} keys") | |
| return available | |
| def get_data(key: str): | |
| """ | |
| Serve computed pipeline JSON to the React frontend. | |
| React calls this for every dashboard tab on load and after each upload. | |
| """ | |
| fname = DATA_KEY_MAP.get(key) | |
| if not fname: | |
| _tprint(f"[DATA] GET /api/data/{key} -> 404 unknown key") | |
| raise HTTPException(status_code=404, detail=f"Unknown data key: '{key}'") | |
| json_file = OUTPUT_DIR / fname | |
| if not json_file.exists(): | |
| _tprint(f"[DATA] GET /api/data/{key} -> 404 not found (pipeline not run yet?)") | |
| raise HTTPException( | |
| status_code=404, | |
| detail=f"Data for '{key}' not found. Upload an Excel file first to run the pipeline." | |
| ) | |
| size_kb = round(json_file.stat().st_size / 1024, 1) | |
| _tprint(f"[DATA] GET /api/data/{key:<16} -> output/{fname} ({size_kb} KB) -> React") | |
| with open(json_file, encoding="utf-8") as fh: | |
| data = json.load(fh) | |
| return JSONResponse(content=data) | |
| # ββ Static file serving (production / Hugging Face) ββββββββββββββββββββββββββ | |
| # These mounts are registered AFTER all /api/* routes so the API always wins. | |
| # | |
| # /data/<key>.json β react-dashboard/public/data/ (pre-seeded + updated on upload) | |
| # / β react-dashboard/dist/ (compiled React SPA) | |
| # | |
| # In local dev, Vite serves both; these mounts do nothing when dist/ doesn't exist. | |
| _REACT_DATA_DIR = BASE_DIR.parent / "react-dashboard" / "public" / "data" | |
| _REACT_DIST_DIR = BASE_DIR.parent / "react-dashboard" / "dist" | |
| if _REACT_DATA_DIR.exists(): | |
| app.mount("/data", StaticFiles(directory=str(_REACT_DATA_DIR)), name="data_static") | |
| if _REACT_DIST_DIR.exists(): | |
| app.mount("/", StaticFiles(directory=str(_REACT_DIST_DIR), html=True), name="spa") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| print("Starting Genesis AI Upload Server on http://localhost:8000") | |
| print(f"Using Python: {sys.executable}") | |
| uvicorn.run("upload_server:app", host="0.0.0.0", port=8000, reload=False) | |