Spaces:
Sleeping
Sleeping
File size: 3,397 Bytes
17e5cd7 27c947d 28dcf64 17e5cd7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.responses import JSONResponse
import uuid, os, asyncio, time
from typing import Dict, Any
import pathlib
# import functions from pipeline file (unchanged)
from pipeline_with_agents import process_file
UPLOAD_DIR = "/tmp/uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)
app = FastAPI(title="AI Pipeline Service")
# in-memory job store (simple). Replace with DB for production.
JOBS: Dict[str, Dict[str, Any]] = {}
@app.get("/health")
async def health():
return {"status": "ok", "time": int(time.time())}
@app.post("/api/files/upload")
async def upload_file(file: UploadFile = File(...)):
filename = pathlib.Path(file.filename).name
ext = pathlib.Path(filename).suffix.lower()
if ext not in [".csv", ".xls", ".xlsx", ".txt"]:
raise HTTPException(status_code=400, detail="Unsupported file type")
file_id = f"{uuid.uuid4()}{ext}"
path = os.path.join(UPLOAD_DIR, file_id)
contents = await file.read()
with open(path, "wb") as f:
f.write(contents)
return {"file_id": path, "filename": filename, "size": len(contents)}
@app.post("/api/jobs")
async def create_job(file_id: str = Form(...), sheet: str = Form(None), model: str = Form("gemini-2.5-flash-lite"), wait: bool = Form(False)):
if not os.path.exists(file_id):
raise HTTPException(status_code=400, detail="file_id not found on server")
job_id = str(uuid.uuid4())
JOBS[job_id] = {"status": "queued", "file_id": file_id, "result": None, "created_at": int(time.time())}
if wait:
# synchronous run, return result
result = process_file(file_id, sheet, model)
JOBS[job_id]["status"] = "finished"
JOBS[job_id]["result"] = result
JOBS[job_id]["finished_at"] = int(time.time())
return {"job_id": job_id, "status": "finished", "result": result}
# asynchronous run
loop = asyncio.get_running_loop()
async def run_and_store():
JOBS[job_id]["status"] = "running"
try:
result = await loop.run_in_executor(None, process_file, file_id, sheet, model)
JOBS[job_id]["result"] = result
JOBS[job_id]["status"] = "finished"
JOBS[job_id]["finished_at"] = int(time.time())
except Exception as e:
JOBS[job_id]["status"] = "failed"
JOBS[job_id]["error"] = str(e)
asyncio.create_task(run_and_store())
return {"job_id": job_id, "status": "queued"}
@app.get("/api/jobs/{job_id}")
async def get_job(job_id: str):
job = JOBS.get(job_id)
if not job:
raise HTTPException(status_code=404, detail="job_id not found")
# return snapshot (may include result if ready)
return JSONResponse(content=job)
@app.get("/api/jobs/{job_id}/charts")
async def get_job_charts(job_id: str):
job = JOBS.get(job_id)
if not job:
raise HTTPException(status_code=404, detail="job_id not found")
result = job.get("result")
if not result:
return {"status": job["status"], "charts": None}
return {"status": job["status"], "charts": result.get("charts")}
@app.get("/api/jobs")
async def list_jobs():
out = []
for jid, info in JOBS.items():
out.append({"job_id": jid, "status": info.get("status"), "file_id": info.get("file_id"), "created_at": info.get("created_at")})
return {"jobs": out}
|