Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, File, UploadFile, Form, JSONResponse, StreamingResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import pandas as pd | |
| from google import genai | |
| from google.genai import types | |
| import os | |
| import json | |
| import asyncio | |
| import numpy as np | |
| # ------------------------------- | |
| # ๐ Configuration | |
| # ------------------------------- | |
| API_KEY = os.getenv("GEMINI_API_KEY", "YOUR_GEMINI_KEY") | |
| MODEL = "gemini-2.5-flash-lite" | |
| client = genai.Client(api_key=API_KEY) | |
| # ------------------------------- | |
| # โก FastAPI Setup | |
| # ------------------------------- | |
| app = FastAPI() | |
| # Enable CORS for all origins | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # or specify your frontend URLs | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ------------------------------- | |
| # ๐ ๏ธ Helper Functions | |
| # ------------------------------- | |
| def get_metadata(df: pd.DataFrame): | |
| """Extract JSON-serializable metadata.""" | |
| def serialize_value(x): | |
| if isinstance(x, pd.Timestamp): | |
| return x.isoformat() | |
| elif isinstance(x, (np.integer, np.int64, np.int32)): | |
| return int(x) | |
| elif isinstance(x, (np.floating, np.float64, np.float32)): | |
| return float(x) | |
| elif pd.isna(x): | |
| return None | |
| else: | |
| return x | |
| metadata = { | |
| "columns": list(df.columns), | |
| "dtypes": df.dtypes.apply(lambda x: str(x)).to_dict(), | |
| "num_rows": df.shape[0], | |
| "num_cols": df.shape[1], | |
| "null_counts": {k: int(v) for k, v in df.isnull().sum().to_dict().items()}, | |
| "unique_counts": {k: int(v) for k, v in df.nunique().to_dict().items()}, | |
| "sample_rows": [ | |
| {col: serialize_value(val) for col, val in row.items()} | |
| for row in df.head(3).to_dict(orient="records") | |
| ], | |
| } | |
| return metadata | |
| async def stream_insights(user_query, metadata): | |
| """Generator that yields insights step by step as JSON strings.""" | |
| # Step 1: Start | |
| yield json.dumps({"status": "started", "message": "File received. Extracting metadata..."}) + "\n" | |
| await asyncio.sleep(0.2) | |
| # Step 2: Metadata | |
| yield json.dumps({"status": "metadata", "metadata": metadata}) + "\n" | |
| await asyncio.sleep(0.2) | |
| # Step 3: Call Gemini for structured insights | |
| system_prompt = """ | |
| You are a data analysis assistant. | |
| Always return JSON with this schema: | |
| { | |
| "excel_info": {...}, | |
| "data_type_context": "...", | |
| "auto_insights": { | |
| "insights": [ | |
| {... Efficiency Analysis ...}, | |
| {... Cumulative Performance ...}, | |
| {... Process Issues ...}, | |
| {... Planning vs Projection ...}, | |
| {... Loss Analysis ...} | |
| ] | |
| }, | |
| "query_insights": {...} | |
| } | |
| """ | |
| user_prompt = f"Dataset metadata: {metadata}\nUser request: {user_query}" | |
| contents = [types.Content(role="user", parts=[types.Part.from_text(text=user_prompt)])] | |
| config = types.GenerateContentConfig( | |
| temperature=0, | |
| max_output_tokens=2000, | |
| system_instruction=[types.Part.from_text(text=system_prompt)], | |
| ) | |
| result = "" | |
| for chunk in client.models.generate_content_stream(model=MODEL, contents=contents, config=config): | |
| if chunk.text: | |
| result += chunk.text | |
| try: | |
| parsed = json.loads(result) | |
| except Exception: | |
| yield json.dumps({"status": "error", "raw_output": result}) + "\n" | |
| return | |
| # Step 4: Excel info | |
| yield json.dumps({"status": "excel_info", "excel_info": parsed.get("excel_info", {})}) + "\n" | |
| await asyncio.sleep(0.2) | |
| # Step 5: Data type context | |
| yield json.dumps({"status": "context", "data_type_context": parsed.get("data_type_context", "")}) + "\n" | |
| await asyncio.sleep(0.2) | |
| # Step 6: Stream each insight | |
| for insight in parsed.get("auto_insights", {}).get("insights", []): | |
| yield json.dumps({"status": "insight", "insight": insight}) + "\n" | |
| await asyncio.sleep(0.2) | |
| # Step 7: Query insights | |
| yield json.dumps({"status": "query", "query_insights": parsed.get("query_insights", {})}) + "\n" | |
| # Step 8: Completed | |
| yield json.dumps({"status": "completed", "message": "All insights generated"}) + "\n" | |
| # ------------------------------- | |
| # ๐ API Routes | |
| # ------------------------------- | |
| async def stream_insight_file(file: UploadFile = File(...), query: str = Form("Analyze the dataset")): | |
| """Stream structured JSON insights step by step.""" | |
| try: | |
| df = pd.read_excel(file.file) | |
| except Exception as e: | |
| return JSONResponse({"success": False, "error": f"Failed to read file: {str(e)}"}) | |
| metadata = get_metadata(df) | |
| return StreamingResponse(stream_insights(query, metadata), media_type="application/json") | |