from fastapi import FastAPI, Request, File, UploadFile, Form from fastapi.responses import StreamingResponse, JSONResponse, HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates import pandas as pd from google import genai from google.genai import types import os import json import asyncio # ------------------------------- # 🔑 Configuration # ------------------------------- API_KEY = os.getenv("GEMINI_API_KEY", "AIzaSyB1jgGCuzg7ELPwNEEwaluQZoZhxhgLmAs") MODEL = "gemini-2.5-flash-lite" client = genai.Client(api_key=API_KEY) # ------------------------------- # ⚡ FastAPI Setup # ------------------------------- app = FastAPI() app.mount("/static", StaticFiles(directory="static"), name="static") templates = Jinja2Templates(directory="templates") # ------------------------------- # 🛠️ Helper Functions # ------------------------------- def get_metadata(df: pd.DataFrame): return { "columns": list(df.columns), "dtypes": df.dtypes.apply(lambda x: str(x)).to_dict(), "num_rows": df.shape[0], "num_cols": df.shape[1], "null_counts": df.isnull().sum().to_dict(), "unique_counts": df.nunique().to_dict(), "sample_rows": df.head(3).to_dict(orient="records"), } async def stream_insights(user_query, metadata): """Generator that yields insights step by step as text/json strings.""" # Step 1: Start message yield json.dumps({"status": "started", "message": "File received. Extracting metadata..."}) + "\n" await asyncio.sleep(0.5) # Step 2: Metadata yield json.dumps({"status": "metadata", "metadata": metadata}) + "\n" await asyncio.sleep(0.5) # Step 3: Call Gemini for structured insights system_prompt = """ You are a data analysis assistant. Always return JSON with this schema: { "excel_info": {...}, "data_type_context": "...", "auto_insights": { "insights": [ {... Efficiency Analysis ...}, {... Cumulative Performance ...}, {... Process Issues ...}, {... Planning vs Projection ...}, {... Loss Analysis ...} ] }, "query_insights": {...} } """ user_prompt = f"Dataset metadata: {metadata}\nUser request: {user_query}" contents = [types.Content(role="user", parts=[types.Part.from_text(text=user_prompt)])] config = types.GenerateContentConfig( temperature=0, max_output_tokens=2000, system_instruction=[types.Part.from_text(text=system_prompt)], ) result = "" for chunk in client.models.generate_content_stream(model=MODEL, contents=contents, config=config): if chunk.text: result += chunk.text try: parsed = json.loads(result) except Exception: yield json.dumps({"status": "error", "raw_output": result}) + "\n" return # Step 4: Excel info yield json.dumps({"status": "excel_info", "excel_info": parsed.get("excel_info", {})}) + "\n" await asyncio.sleep(0.5) # Step 5: Data type context yield json.dumps({"status": "context", "data_type_context": parsed.get("data_type_context", "")}) + "\n" await asyncio.sleep(0.5) # Step 6: Stream each insight one by one for insight in parsed.get("auto_insights", {}).get("insights", []): yield json.dumps({"status": "insight", "insight": insight}) + "\n" await asyncio.sleep(0.5) # Step 7: Query insights yield json.dumps({"status": "query", "query_insights": parsed.get("query_insights", {})}) + "\n" # Step 8: Completed yield json.dumps({"status": "completed", "message": "All insights generated"}) + "\n" # ------------------------------- # 🌐 Routes # ------------------------------- @app.get("/", response_class=HTMLResponse) async def home(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.post("/stream_insights") async def stream_insight_file(file: UploadFile = File(...), query: str = Form("Analyze the dataset")): """Stream structured JSON insights step by step.""" try: df = pd.read_excel(file.file) except Exception as e: return JSONResponse({"success": False, "error": f"Failed to read file: {str(e)}"}) metadata = get_metadata(df) return StreamingResponse(stream_insights(query, metadata), media_type="application/json")