File size: 3,397 Bytes
17e5cd7
 
 
 
 
 
 
 
 
 
27c947d
28dcf64
17e5cd7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.responses import JSONResponse
import uuid, os, asyncio, time
from typing import Dict, Any
import pathlib

# import functions from pipeline file (unchanged)
from pipeline_with_agents import process_file

UPLOAD_DIR = "/tmp/uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)

app = FastAPI(title="AI Pipeline Service")

# in-memory job store (simple). Replace with DB for production.
JOBS: Dict[str, Dict[str, Any]] = {}


@app.get("/health")
async def health():
    return {"status": "ok", "time": int(time.time())}


@app.post("/api/files/upload")
async def upload_file(file: UploadFile = File(...)):
    filename = pathlib.Path(file.filename).name
    ext = pathlib.Path(filename).suffix.lower()
    if ext not in [".csv", ".xls", ".xlsx", ".txt"]:
        raise HTTPException(status_code=400, detail="Unsupported file type")
    file_id = f"{uuid.uuid4()}{ext}"
    path = os.path.join(UPLOAD_DIR, file_id)
    contents = await file.read()
    with open(path, "wb") as f:
        f.write(contents)
    return {"file_id": path, "filename": filename, "size": len(contents)}


@app.post("/api/jobs")
async def create_job(file_id: str = Form(...), sheet: str = Form(None), model: str = Form("gemini-2.5-flash-lite"), wait: bool = Form(False)):
    if not os.path.exists(file_id):
        raise HTTPException(status_code=400, detail="file_id not found on server")
    job_id = str(uuid.uuid4())
    JOBS[job_id] = {"status": "queued", "file_id": file_id, "result": None, "created_at": int(time.time())}

    if wait:
        # synchronous run, return result
        result = process_file(file_id, sheet, model)
        JOBS[job_id]["status"] = "finished"
        JOBS[job_id]["result"] = result
        JOBS[job_id]["finished_at"] = int(time.time())
        return {"job_id": job_id, "status": "finished", "result": result}

    # asynchronous run
    loop = asyncio.get_running_loop()

    async def run_and_store():
        JOBS[job_id]["status"] = "running"
        try:
            result = await loop.run_in_executor(None, process_file, file_id, sheet, model)
            JOBS[job_id]["result"] = result
            JOBS[job_id]["status"] = "finished"
            JOBS[job_id]["finished_at"] = int(time.time())
        except Exception as e:
            JOBS[job_id]["status"] = "failed"
            JOBS[job_id]["error"] = str(e)

    asyncio.create_task(run_and_store())
    return {"job_id": job_id, "status": "queued"}


@app.get("/api/jobs/{job_id}")
async def get_job(job_id: str):
    job = JOBS.get(job_id)
    if not job:
        raise HTTPException(status_code=404, detail="job_id not found")
    # return snapshot (may include result if ready)
    return JSONResponse(content=job)


@app.get("/api/jobs/{job_id}/charts")
async def get_job_charts(job_id: str):
    job = JOBS.get(job_id)
    if not job:
        raise HTTPException(status_code=404, detail="job_id not found")
    result = job.get("result")
    if not result:
        return {"status": job["status"], "charts": None}
    return {"status": job["status"], "charts": result.get("charts")}


@app.get("/api/jobs")
async def list_jobs():
    out = []
    for jid, info in JOBS.items():
        out.append({"job_id": jid, "status": info.get("status"), "file_id": info.get("file_id"), "created_at": info.get("created_at")})
    return {"jobs": out}