Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException | |
| from fastapi.responses import JSONResponse | |
| import uuid, os, asyncio, time | |
| from typing import Dict, Any | |
| import pathlib | |
| # import functions from pipeline file (unchanged) | |
| from pipeline_with_agents import process_file | |
| UPLOAD_DIR = "/tmp/uploads" | |
| os.makedirs(UPLOAD_DIR, exist_ok=True) | |
| app = FastAPI(title="AI Pipeline Service") | |
| # in-memory job store (simple). Replace with DB for production. | |
| JOBS: Dict[str, Dict[str, Any]] = {} | |
| async def health(): | |
| return {"status": "ok", "time": int(time.time())} | |
| async def upload_file(file: UploadFile = File(...)): | |
| filename = pathlib.Path(file.filename).name | |
| ext = pathlib.Path(filename).suffix.lower() | |
| if ext not in [".csv", ".xls", ".xlsx", ".txt"]: | |
| raise HTTPException(status_code=400, detail="Unsupported file type") | |
| file_id = f"{uuid.uuid4()}{ext}" | |
| path = os.path.join(UPLOAD_DIR, file_id) | |
| contents = await file.read() | |
| with open(path, "wb") as f: | |
| f.write(contents) | |
| return {"file_id": path, "filename": filename, "size": len(contents)} | |
| async def create_job(file_id: str = Form(...), sheet: str = Form(None), model: str = Form("gemini-2.5-flash-lite"), wait: bool = Form(False)): | |
| if not os.path.exists(file_id): | |
| raise HTTPException(status_code=400, detail="file_id not found on server") | |
| job_id = str(uuid.uuid4()) | |
| JOBS[job_id] = {"status": "queued", "file_id": file_id, "result": None, "created_at": int(time.time())} | |
| if wait: | |
| # synchronous run, return result | |
| result = process_file(file_id, sheet, model) | |
| JOBS[job_id]["status"] = "finished" | |
| JOBS[job_id]["result"] = result | |
| JOBS[job_id]["finished_at"] = int(time.time()) | |
| return {"job_id": job_id, "status": "finished", "result": result} | |
| # asynchronous run | |
| loop = asyncio.get_running_loop() | |
| async def run_and_store(): | |
| JOBS[job_id]["status"] = "running" | |
| try: | |
| result = await loop.run_in_executor(None, process_file, file_id, sheet, model) | |
| JOBS[job_id]["result"] = result | |
| JOBS[job_id]["status"] = "finished" | |
| JOBS[job_id]["finished_at"] = int(time.time()) | |
| except Exception as e: | |
| JOBS[job_id]["status"] = "failed" | |
| JOBS[job_id]["error"] = str(e) | |
| asyncio.create_task(run_and_store()) | |
| return {"job_id": job_id, "status": "queued"} | |
| async def get_job(job_id: str): | |
| job = JOBS.get(job_id) | |
| if not job: | |
| raise HTTPException(status_code=404, detail="job_id not found") | |
| # return snapshot (may include result if ready) | |
| return JSONResponse(content=job) | |
| async def get_job_charts(job_id: str): | |
| job = JOBS.get(job_id) | |
| if not job: | |
| raise HTTPException(status_code=404, detail="job_id not found") | |
| result = job.get("result") | |
| if not result: | |
| return {"status": job["status"], "charts": None} | |
| return {"status": job["status"], "charts": result.get("charts")} | |
| async def list_jobs(): | |
| out = [] | |
| for jid, info in JOBS.items(): | |
| out.append({"job_id": jid, "status": info.get("status"), "file_id": info.get("file_id"), "created_at": info.get("created_at")}) | |
| return {"jobs": out} | |