triflix commited on
Commit
17e5cd7
·
verified ·
1 Parent(s): 75c3b7d

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +94 -131
app.py CHANGED
@@ -1,133 +1,96 @@
1
- import os
2
- import uuid
3
- import json
4
- from fastapi import FastAPI, File, UploadFile, Form
5
- from fastapi.responses import HTMLResponse, JSONResponse
6
- from fastapi.staticfiles import StaticFiles
7
- from fastapi.templating import Jinja2Templates
8
- from fastapi.requests import Request
9
- import pandas as pd
10
- from google import genai
11
- from google.genai import types
12
-
13
- # -----------------------------
14
- # FastAPI setup
15
- # -----------------------------
16
- app = FastAPI()
17
- app.mount("/static", StaticFiles(directory="static"), name="static")
18
- templates = Jinja2Templates(directory="templates")
19
-
20
- # -----------------------------
21
- # Gemini client setup
22
- # -----------------------------
23
- client = genai.Client(api_key="AIzaSyB1jgGCuzg7ELPwNEEwaluQZoZhxhgLmAs")
24
-
25
- UPLOAD_DIR = "tmp/uploads"
26
  os.makedirs(UPLOAD_DIR, exist_ok=True)
27
 
28
- # -----------------------------
29
- # Helper functions
30
- # -----------------------------
31
- def get_metadata(df):
32
- return {
33
- "columns": list(df.columns),
34
- "dtypes": df.dtypes.apply(str).to_dict(),
35
- "null_counts": df.isnull().sum().to_dict(),
36
- "unique_counts": df.nunique().to_dict(),
37
- "sample_rows": df.head(3).to_dict(orient="records")
38
- }
39
-
40
- def generate_metadata_analysis(metadata):
41
- metadata_text = str(metadata)
42
- model = "gemini-2.5-flash-lite"
43
-
44
- contents = [
45
- types.Content(
46
- role="user",
47
- parts=[types.Part.from_text(
48
- text=f"Analyze the following structured data metadata:\n{metadata_text}"
49
- )],
50
- ),
51
- ]
52
-
53
- generate_content_config = types.GenerateContentConfig(
54
- thinking_config=types.ThinkingConfig(thinking_budget=0),
55
- response_mime_type="application/json",
56
- system_instruction=[types.Part.from_text(text="""You are a structured data analysis AI.
57
- 1️⃣ Summary: concise description of data, assumptions
58
- 2️⃣ Suggestions: up to 3 actionable analyses/visualizations
59
- Output must be strict JSON: {"Summary": "<short summary>", "Suggestion": ["<analysis #1>", "<analysis #2>", "<analysis #3>"]}
60
- """)],
61
- )
62
-
63
- output_text = ""
64
- for chunk in client.models.generate_content_stream(
65
- model=model,
66
- contents=contents,
67
- config=generate_content_config,
68
- ):
69
- output_text += chunk.text
70
-
71
- return json.loads(output_text)
72
-
73
- def generate_visualization(command, file_path):
74
- system_prompt_text = f"""
75
- You are a Python assistant that MUST return output strictly in JSON format and NOTHING else.
76
- The top-level JSON MUST contain exactly three keys in this order: "type", "code", "explanation".
77
- Requirements:
78
- - "type": visualization type ("bar", "pie", "line", etc.)
79
- - "code": Python code as a string that prints numeric JSON to stdout. Use this for data access: df = pd.read_excel(r"{file_path}")
80
- - "explanation": one-sentence description
81
- """
82
- MODEL = "gemini-2.5-flash-lite"
83
- contents = [types.Content(role="user", parts=[types.Part.from_text(text=command)])]
84
-
85
- generate_content_config = types.GenerateContentConfig(
86
- thinking_config=types.ThinkingConfig(thinking_budget=0),
87
- response_mime_type="application/json",
88
- system_instruction=[types.Part.from_text(text=system_prompt_text)],
89
- )
90
-
91
- output = ""
92
- for chunk in client.models.generate_content_stream(
93
- model=MODEL,
94
- contents=contents,
95
- config=generate_content_config,
96
- ):
97
- output += chunk.text
98
-
99
- return json.loads(output)
100
-
101
- # -----------------------------
102
- # Routes
103
- # -----------------------------
104
- @app.get("/", response_class=HTMLResponse)
105
- def home(request: Request):
106
- return templates.TemplateResponse("index.html", {"request": request})
107
-
108
- @app.post("/upload", response_class=JSONResponse)
109
- async def upload_excel(file: UploadFile = File(...)):
110
- file_ext = os.path.splitext(file.filename)[1]
111
- file_id = str(uuid.uuid4())
112
- file_path = os.path.join(UPLOAD_DIR, f"{file_id}{file_ext}")
113
-
114
- with open(file_path, "wb") as f:
115
- f.write(await file.read())
116
-
117
- df = pd.read_excel(file_path)
118
- metadata = get_metadata(df)
119
- analysis = generate_metadata_analysis(metadata)
120
-
121
- # Store session info temporarily
122
- session_data = {
123
- "file_path": file_path,
124
- "metadata": metadata,
125
- "analysis": analysis
126
- }
127
-
128
- return JSONResponse(session_data)
129
-
130
- @app.post("/generate_plot", response_class=JSONResponse)
131
- async def generate_plot(command: str = Form(...), file_path: str = Form(...)):
132
- visualization_json = generate_visualization(command, file_path)
133
- return JSONResponse(visualization_json)
 
1
+ from fastapi import FastAPI, UploadFile, File, Form, HTTPException
2
+ from fastapi.responses import JSONResponse
3
+ import uuid, os, asyncio, time
4
+ from typing import Dict, Any
5
+ import pathlib
6
+
7
+ # import functions from pipeline file (unchanged)
8
+ from pipeline_with_agents import process_file
9
+
10
+ UPLOAD_DIR = "/tmp/uploads"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
  os.makedirs(UPLOAD_DIR, exist_ok=True)
12
 
13
+ app = FastAPI(title="AI Pipeline Service")
14
+
15
+ # in-memory job store (simple). Replace with DB for production.
16
+ JOBS: Dict[str, Dict[str, Any]] = {}
17
+
18
+
19
+ @app.get("/health")
20
+ async def health():
21
+ return {"status": "ok", "time": int(time.time())}
22
+
23
+
24
+ @app.post("/api/files/upload")
25
+ async def upload_file(file: UploadFile = File(...)):
26
+ filename = pathlib.Path(file.filename).name
27
+ ext = pathlib.Path(filename).suffix.lower()
28
+ if ext not in [".csv", ".xls", ".xlsx", ".txt"]:
29
+ raise HTTPException(status_code=400, detail="Unsupported file type")
30
+ file_id = f"{uuid.uuid4()}{ext}"
31
+ path = os.path.join(UPLOAD_DIR, file_id)
32
+ contents = await file.read()
33
+ with open(path, "wb") as f:
34
+ f.write(contents)
35
+ return {"file_id": path, "filename": filename, "size": len(contents)}
36
+
37
+
38
+ @app.post("/api/jobs")
39
+ async def create_job(file_id: str = Form(...), sheet: str = Form(None), model: str = Form("gemini-2.5-flash-lite"), wait: bool = Form(False)):
40
+ if not os.path.exists(file_id):
41
+ raise HTTPException(status_code=400, detail="file_id not found on server")
42
+ job_id = str(uuid.uuid4())
43
+ JOBS[job_id] = {"status": "queued", "file_id": file_id, "result": None, "created_at": int(time.time())}
44
+
45
+ if wait:
46
+ # synchronous run, return result
47
+ result = process_file(file_id, sheet, model)
48
+ JOBS[job_id]["status"] = "finished"
49
+ JOBS[job_id]["result"] = result
50
+ JOBS[job_id]["finished_at"] = int(time.time())
51
+ return {"job_id": job_id, "status": "finished", "result": result}
52
+
53
+ # asynchronous run
54
+ loop = asyncio.get_running_loop()
55
+
56
+ async def run_and_store():
57
+ JOBS[job_id]["status"] = "running"
58
+ try:
59
+ result = await loop.run_in_executor(None, process_file, file_id, sheet, model)
60
+ JOBS[job_id]["result"] = result
61
+ JOBS[job_id]["status"] = "finished"
62
+ JOBS[job_id]["finished_at"] = int(time.time())
63
+ except Exception as e:
64
+ JOBS[job_id]["status"] = "failed"
65
+ JOBS[job_id]["error"] = str(e)
66
+
67
+ asyncio.create_task(run_and_store())
68
+ return {"job_id": job_id, "status": "queued"}
69
+
70
+
71
+ @app.get("/api/jobs/{job_id}")
72
+ async def get_job(job_id: str):
73
+ job = JOBS.get(job_id)
74
+ if not job:
75
+ raise HTTPException(status_code=404, detail="job_id not found")
76
+ # return snapshot (may include result if ready)
77
+ return JSONResponse(content=job)
78
+
79
+
80
+ @app.get("/api/jobs/{job_id}/charts")
81
+ async def get_job_charts(job_id: str):
82
+ job = JOBS.get(job_id)
83
+ if not job:
84
+ raise HTTPException(status_code=404, detail="job_id not found")
85
+ result = job.get("result")
86
+ if not result:
87
+ return {"status": job["status"], "charts": None}
88
+ return {"status": job["status"], "charts": result.get("charts")}
89
+
90
+
91
+ @app.get("/api/jobs")
92
+ async def list_jobs():
93
+ out = []
94
+ for jid, info in JOBS.items():
95
+ out.append({"job_id": jid, "status": info.get("status"), "file_id": info.get("file_id"), "created_at": info.get("created_at")})
96
+ return {"jobs": out}