Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, Request, File, UploadFile, Form | |
| from fastapi.responses import StreamingResponse, JSONResponse, HTMLResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.templating import Jinja2Templates | |
| import pandas as pd | |
| from google import genai | |
| from google.genai import types | |
| import os | |
| import json | |
| import asyncio | |
| # ------------------------------- | |
| # π Configuration | |
| # ------------------------------- | |
| API_KEY = os.getenv("GEMINI_API_KEY", "AIzaSyB1jgGCuzg7ELPwNEEwaluQZoZhxhgLmAs") | |
| MODEL = "gemini-2.5-flash-lite" | |
| client = genai.Client(api_key=API_KEY) | |
| # ------------------------------- | |
| # β‘ FastAPI Setup | |
| # ------------------------------- | |
| app = FastAPI() | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| templates = Jinja2Templates(directory="templates") | |
| # ------------------------------- | |
| # π οΈ Helper Functions | |
| # ------------------------------- | |
| def get_metadata(df: pd.DataFrame): | |
| return { | |
| "columns": list(df.columns), | |
| "dtypes": df.dtypes.apply(lambda x: str(x)).to_dict(), | |
| "num_rows": df.shape[0], | |
| "num_cols": df.shape[1], | |
| "null_counts": df.isnull().sum().to_dict(), | |
| "unique_counts": df.nunique().to_dict(), | |
| "sample_rows": df.head(3).to_dict(orient="records"), | |
| } | |
| async def stream_insights(user_query, metadata): | |
| """Generator that yields insights step by step as text/json strings.""" | |
| # Step 1: Start message | |
| yield json.dumps({"status": "started", "message": "File received. Extracting metadata..."}) + "\n" | |
| await asyncio.sleep(0.5) | |
| # Step 2: Metadata | |
| yield json.dumps({"status": "metadata", "metadata": metadata}) + "\n" | |
| await asyncio.sleep(0.5) | |
| # Step 3: Call Gemini for structured insights | |
| system_prompt = """ | |
| You are a data analysis assistant. | |
| Always return JSON with this schema: | |
| { | |
| "excel_info": {...}, | |
| "data_type_context": "...", | |
| "auto_insights": { | |
| "insights": [ | |
| {... Efficiency Analysis ...}, | |
| {... Cumulative Performance ...}, | |
| {... Process Issues ...}, | |
| {... Planning vs Projection ...}, | |
| {... Loss Analysis ...} | |
| ] | |
| }, | |
| "query_insights": {...} | |
| } | |
| """ | |
| user_prompt = f"Dataset metadata: {metadata}\nUser request: {user_query}" | |
| contents = [types.Content(role="user", parts=[types.Part.from_text(text=user_prompt)])] | |
| config = types.GenerateContentConfig( | |
| temperature=0, | |
| max_output_tokens=2000, | |
| system_instruction=[types.Part.from_text(text=system_prompt)], | |
| ) | |
| result = "" | |
| for chunk in client.models.generate_content_stream(model=MODEL, contents=contents, config=config): | |
| if chunk.text: | |
| result += chunk.text | |
| try: | |
| parsed = json.loads(result) | |
| except Exception: | |
| yield json.dumps({"status": "error", "raw_output": result}) + "\n" | |
| return | |
| # Step 4: Excel info | |
| yield json.dumps({"status": "excel_info", "excel_info": parsed.get("excel_info", {})}) + "\n" | |
| await asyncio.sleep(0.5) | |
| # Step 5: Data type context | |
| yield json.dumps({"status": "context", "data_type_context": parsed.get("data_type_context", "")}) + "\n" | |
| await asyncio.sleep(0.5) | |
| # Step 6: Stream each insight one by one | |
| for insight in parsed.get("auto_insights", {}).get("insights", []): | |
| yield json.dumps({"status": "insight", "insight": insight}) + "\n" | |
| await asyncio.sleep(0.5) | |
| # Step 7: Query insights | |
| yield json.dumps({"status": "query", "query_insights": parsed.get("query_insights", {})}) + "\n" | |
| # Step 8: Completed | |
| yield json.dumps({"status": "completed", "message": "All insights generated"}) + "\n" | |
| # ------------------------------- | |
| # π Routes | |
| # ------------------------------- | |
| async def home(request: Request): | |
| return templates.TemplateResponse("index.html", {"request": request}) | |
| async def stream_insight_file(file: UploadFile = File(...), query: str = Form("Analyze the dataset")): | |
| """Stream structured JSON insights step by step.""" | |
| try: | |
| df = pd.read_excel(file.file) | |
| except Exception as e: | |
| return JSONResponse({"success": False, "error": f"Failed to read file: {str(e)}"}) | |
| metadata = get_metadata(df) | |
| return StreamingResponse(stream_insights(query, metadata), media_type="application/json") | |