from fastapi import FastAPI, File, UploadFile, Form, JSONResponse, StreamingResponse from fastapi.middleware.cors import CORSMiddleware import pandas as pd from google import genai from google.genai import types import os import json import asyncio import numpy as np # ------------------------------- # 🔑 Configuration # ------------------------------- API_KEY = os.getenv("GEMINI_API_KEY", "YOUR_GEMINI_KEY") MODEL = "gemini-2.5-flash-lite" client = genai.Client(api_key=API_KEY) # ------------------------------- # ⚡ FastAPI Setup # ------------------------------- app = FastAPI() # Enable CORS for all origins app.add_middleware( CORSMiddleware, allow_origins=["*"], # or specify your frontend URLs allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ------------------------------- # 🛠️ Helper Functions # ------------------------------- def get_metadata(df: pd.DataFrame): """Extract JSON-serializable metadata.""" def serialize_value(x): if isinstance(x, pd.Timestamp): return x.isoformat() elif isinstance(x, (np.integer, np.int64, np.int32)): return int(x) elif isinstance(x, (np.floating, np.float64, np.float32)): return float(x) elif pd.isna(x): return None else: return x metadata = { "columns": list(df.columns), "dtypes": df.dtypes.apply(lambda x: str(x)).to_dict(), "num_rows": df.shape[0], "num_cols": df.shape[1], "null_counts": {k: int(v) for k, v in df.isnull().sum().to_dict().items()}, "unique_counts": {k: int(v) for k, v in df.nunique().to_dict().items()}, "sample_rows": [ {col: serialize_value(val) for col, val in row.items()} for row in df.head(3).to_dict(orient="records") ], } return metadata async def stream_insights(user_query, metadata): """Generator that yields insights step by step as JSON strings.""" # Step 1: Start yield json.dumps({"status": "started", "message": "File received. Extracting metadata..."}) + "\n" await asyncio.sleep(0.2) # Step 2: Metadata yield json.dumps({"status": "metadata", "metadata": metadata}) + "\n" await asyncio.sleep(0.2) # Step 3: Call Gemini for structured insights system_prompt = """ You are a data analysis assistant. Always return JSON with this schema: { "excel_info": {...}, "data_type_context": "...", "auto_insights": { "insights": [ {... Efficiency Analysis ...}, {... Cumulative Performance ...}, {... Process Issues ...}, {... Planning vs Projection ...}, {... Loss Analysis ...} ] }, "query_insights": {...} } """ user_prompt = f"Dataset metadata: {metadata}\nUser request: {user_query}" contents = [types.Content(role="user", parts=[types.Part.from_text(text=user_prompt)])] config = types.GenerateContentConfig( temperature=0, max_output_tokens=2000, system_instruction=[types.Part.from_text(text=system_prompt)], ) result = "" for chunk in client.models.generate_content_stream(model=MODEL, contents=contents, config=config): if chunk.text: result += chunk.text try: parsed = json.loads(result) except Exception: yield json.dumps({"status": "error", "raw_output": result}) + "\n" return # Step 4: Excel info yield json.dumps({"status": "excel_info", "excel_info": parsed.get("excel_info", {})}) + "\n" await asyncio.sleep(0.2) # Step 5: Data type context yield json.dumps({"status": "context", "data_type_context": parsed.get("data_type_context", "")}) + "\n" await asyncio.sleep(0.2) # Step 6: Stream each insight for insight in parsed.get("auto_insights", {}).get("insights", []): yield json.dumps({"status": "insight", "insight": insight}) + "\n" await asyncio.sleep(0.2) # Step 7: Query insights yield json.dumps({"status": "query", "query_insights": parsed.get("query_insights", {})}) + "\n" # Step 8: Completed yield json.dumps({"status": "completed", "message": "All insights generated"}) + "\n" # ------------------------------- # 🌐 API Routes # ------------------------------- @app.post("/stream_insights") async def stream_insight_file(file: UploadFile = File(...), query: str = Form("Analyze the dataset")): """Stream structured JSON insights step by step.""" try: df = pd.read_excel(file.file) except Exception as e: return JSONResponse({"success": False, "error": f"Failed to read file: {str(e)}"}) metadata = get_metadata(df) return StreamingResponse(stream_insights(query, metadata), media_type="application/json")