from fastapi import FastAPI, UploadFile, File, Form, HTTPException from fastapi.responses import JSONResponse import uuid, os, asyncio, time from typing import Dict, Any import pathlib # import functions from pipeline file (unchanged) from pipeline_with_agents import process_file UPLOAD_DIR = "/tmp/uploads" os.makedirs(UPLOAD_DIR, exist_ok=True) app = FastAPI(title="AI Pipeline Service") # in-memory job store (simple). Replace with DB for production. JOBS: Dict[str, Dict[str, Any]] = {} @app.get("/health") async def health(): return {"status": "ok", "time": int(time.time())} @app.post("/api/files/upload") async def upload_file(file: UploadFile = File(...)): filename = pathlib.Path(file.filename).name ext = pathlib.Path(filename).suffix.lower() if ext not in [".csv", ".xls", ".xlsx", ".txt"]: raise HTTPException(status_code=400, detail="Unsupported file type") file_id = f"{uuid.uuid4()}{ext}" path = os.path.join(UPLOAD_DIR, file_id) contents = await file.read() with open(path, "wb") as f: f.write(contents) return {"file_id": path, "filename": filename, "size": len(contents)} @app.post("/api/jobs") async def create_job(file_id: str = Form(...), sheet: str = Form(None), model: str = Form("gemini-2.5-flash-lite"), wait: bool = Form(False)): if not os.path.exists(file_id): raise HTTPException(status_code=400, detail="file_id not found on server") job_id = str(uuid.uuid4()) JOBS[job_id] = {"status": "queued", "file_id": file_id, "result": None, "created_at": int(time.time())} if wait: # synchronous run, return result result = process_file(file_id, sheet, model) JOBS[job_id]["status"] = "finished" JOBS[job_id]["result"] = result JOBS[job_id]["finished_at"] = int(time.time()) return {"job_id": job_id, "status": "finished", "result": result} # asynchronous run loop = asyncio.get_running_loop() async def run_and_store(): JOBS[job_id]["status"] = "running" try: result = await loop.run_in_executor(None, process_file, file_id, sheet, model) JOBS[job_id]["result"] = result JOBS[job_id]["status"] = "finished" JOBS[job_id]["finished_at"] = int(time.time()) except Exception as e: JOBS[job_id]["status"] = "failed" JOBS[job_id]["error"] = str(e) asyncio.create_task(run_and_store()) return {"job_id": job_id, "status": "queued"} @app.get("/api/jobs/{job_id}") async def get_job(job_id: str): job = JOBS.get(job_id) if not job: raise HTTPException(status_code=404, detail="job_id not found") # return snapshot (may include result if ready) return JSONResponse(content=job) @app.get("/api/jobs/{job_id}/charts") async def get_job_charts(job_id: str): job = JOBS.get(job_id) if not job: raise HTTPException(status_code=404, detail="job_id not found") result = job.get("result") if not result: return {"status": job["status"], "charts": None} return {"status": job["status"], "charts": result.get("charts")} @app.get("/api/jobs") async def list_jobs(): out = [] for jid, info in JOBS.items(): out.append({"job_id": jid, "status": info.get("status"), "file_id": info.get("file_id"), "created_at": info.get("created_at")}) return {"jobs": out}