chatplotapi / app.py
triflix's picture
Update app.py
17e5cd7 verified
raw
history blame
3.4 kB
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.responses import JSONResponse
import uuid, os, asyncio, time
from typing import Dict, Any
import pathlib
# import functions from pipeline file (unchanged)
from pipeline_with_agents import process_file
UPLOAD_DIR = "/tmp/uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)
app = FastAPI(title="AI Pipeline Service")
# in-memory job store (simple). Replace with DB for production.
JOBS: Dict[str, Dict[str, Any]] = {}
@app.get("/health")
async def health():
return {"status": "ok", "time": int(time.time())}
@app.post("/api/files/upload")
async def upload_file(file: UploadFile = File(...)):
filename = pathlib.Path(file.filename).name
ext = pathlib.Path(filename).suffix.lower()
if ext not in [".csv", ".xls", ".xlsx", ".txt"]:
raise HTTPException(status_code=400, detail="Unsupported file type")
file_id = f"{uuid.uuid4()}{ext}"
path = os.path.join(UPLOAD_DIR, file_id)
contents = await file.read()
with open(path, "wb") as f:
f.write(contents)
return {"file_id": path, "filename": filename, "size": len(contents)}
@app.post("/api/jobs")
async def create_job(file_id: str = Form(...), sheet: str = Form(None), model: str = Form("gemini-2.5-flash-lite"), wait: bool = Form(False)):
if not os.path.exists(file_id):
raise HTTPException(status_code=400, detail="file_id not found on server")
job_id = str(uuid.uuid4())
JOBS[job_id] = {"status": "queued", "file_id": file_id, "result": None, "created_at": int(time.time())}
if wait:
# synchronous run, return result
result = process_file(file_id, sheet, model)
JOBS[job_id]["status"] = "finished"
JOBS[job_id]["result"] = result
JOBS[job_id]["finished_at"] = int(time.time())
return {"job_id": job_id, "status": "finished", "result": result}
# asynchronous run
loop = asyncio.get_running_loop()
async def run_and_store():
JOBS[job_id]["status"] = "running"
try:
result = await loop.run_in_executor(None, process_file, file_id, sheet, model)
JOBS[job_id]["result"] = result
JOBS[job_id]["status"] = "finished"
JOBS[job_id]["finished_at"] = int(time.time())
except Exception as e:
JOBS[job_id]["status"] = "failed"
JOBS[job_id]["error"] = str(e)
asyncio.create_task(run_and_store())
return {"job_id": job_id, "status": "queued"}
@app.get("/api/jobs/{job_id}")
async def get_job(job_id: str):
job = JOBS.get(job_id)
if not job:
raise HTTPException(status_code=404, detail="job_id not found")
# return snapshot (may include result if ready)
return JSONResponse(content=job)
@app.get("/api/jobs/{job_id}/charts")
async def get_job_charts(job_id: str):
job = JOBS.get(job_id)
if not job:
raise HTTPException(status_code=404, detail="job_id not found")
result = job.get("result")
if not result:
return {"status": job["status"], "charts": None}
return {"status": job["status"], "charts": result.get("charts")}
@app.get("/api/jobs")
async def list_jobs():
out = []
for jid, info in JOBS.items():
out.append({"job_id": jid, "status": info.get("status"), "file_id": info.get("file_id"), "created_at": info.get("created_at")})
return {"jobs": out}