Spaces:
Sleeping
Sleeping
File size: 4,848 Bytes
0cbcf4a 28dcf64 78ded87 0cbcf4a 28dcf64 0cbcf4a 28dcf64 0cbcf4a 28dcf64 0cbcf4a 28dcf64 0cbcf4a 28dcf64 0cbcf4a 28dcf64 78ded87 0cbcf4a 78ded87 0cbcf4a 78ded87 0cbcf4a 78ded87 28dcf64 78ded87 28dcf64 78ded87 28dcf64 78ded87 28dcf64 78ded87 28dcf64 78ded87 28dcf64 78ded87 0cbcf4a 78ded87 0cbcf4a 78ded87 0cbcf4a 78ded87 0cbcf4a 78ded87 28dcf64 0cbcf4a 28dcf64 78ded87 28dcf64 78ded87 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 | from fastapi import FastAPI, File, UploadFile, Form, JSONResponse, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import pandas as pd
from google import genai
from google.genai import types
import os
import json
import asyncio
import numpy as np
# -------------------------------
# ๐ Configuration
# -------------------------------
API_KEY = os.getenv("GEMINI_API_KEY", "YOUR_GEMINI_KEY")
MODEL = "gemini-2.5-flash-lite"
client = genai.Client(api_key=API_KEY)
# -------------------------------
# โก FastAPI Setup
# -------------------------------
app = FastAPI()
# Enable CORS for all origins
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # or specify your frontend URLs
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# -------------------------------
# ๐ ๏ธ Helper Functions
# -------------------------------
def get_metadata(df: pd.DataFrame):
"""Extract JSON-serializable metadata."""
def serialize_value(x):
if isinstance(x, pd.Timestamp):
return x.isoformat()
elif isinstance(x, (np.integer, np.int64, np.int32)):
return int(x)
elif isinstance(x, (np.floating, np.float64, np.float32)):
return float(x)
elif pd.isna(x):
return None
else:
return x
metadata = {
"columns": list(df.columns),
"dtypes": df.dtypes.apply(lambda x: str(x)).to_dict(),
"num_rows": df.shape[0],
"num_cols": df.shape[1],
"null_counts": {k: int(v) for k, v in df.isnull().sum().to_dict().items()},
"unique_counts": {k: int(v) for k, v in df.nunique().to_dict().items()},
"sample_rows": [
{col: serialize_value(val) for col, val in row.items()}
for row in df.head(3).to_dict(orient="records")
],
}
return metadata
async def stream_insights(user_query, metadata):
"""Generator that yields insights step by step as JSON strings."""
# Step 1: Start
yield json.dumps({"status": "started", "message": "File received. Extracting metadata..."}) + "\n"
await asyncio.sleep(0.2)
# Step 2: Metadata
yield json.dumps({"status": "metadata", "metadata": metadata}) + "\n"
await asyncio.sleep(0.2)
# Step 3: Call Gemini for structured insights
system_prompt = """
You are a data analysis assistant.
Always return JSON with this schema:
{
"excel_info": {...},
"data_type_context": "...",
"auto_insights": {
"insights": [
{... Efficiency Analysis ...},
{... Cumulative Performance ...},
{... Process Issues ...},
{... Planning vs Projection ...},
{... Loss Analysis ...}
]
},
"query_insights": {...}
}
"""
user_prompt = f"Dataset metadata: {metadata}\nUser request: {user_query}"
contents = [types.Content(role="user", parts=[types.Part.from_text(text=user_prompt)])]
config = types.GenerateContentConfig(
temperature=0,
max_output_tokens=2000,
system_instruction=[types.Part.from_text(text=system_prompt)],
)
result = ""
for chunk in client.models.generate_content_stream(model=MODEL, contents=contents, config=config):
if chunk.text:
result += chunk.text
try:
parsed = json.loads(result)
except Exception:
yield json.dumps({"status": "error", "raw_output": result}) + "\n"
return
# Step 4: Excel info
yield json.dumps({"status": "excel_info", "excel_info": parsed.get("excel_info", {})}) + "\n"
await asyncio.sleep(0.2)
# Step 5: Data type context
yield json.dumps({"status": "context", "data_type_context": parsed.get("data_type_context", "")}) + "\n"
await asyncio.sleep(0.2)
# Step 6: Stream each insight
for insight in parsed.get("auto_insights", {}).get("insights", []):
yield json.dumps({"status": "insight", "insight": insight}) + "\n"
await asyncio.sleep(0.2)
# Step 7: Query insights
yield json.dumps({"status": "query", "query_insights": parsed.get("query_insights", {})}) + "\n"
# Step 8: Completed
yield json.dumps({"status": "completed", "message": "All insights generated"}) + "\n"
# -------------------------------
# ๐ API Routes
# -------------------------------
@app.post("/stream_insights")
async def stream_insight_file(file: UploadFile = File(...), query: str = Form("Analyze the dataset")):
"""Stream structured JSON insights step by step."""
try:
df = pd.read_excel(file.file)
except Exception as e:
return JSONResponse({"success": False, "error": f"Failed to read file: {str(e)}"})
metadata = get_metadata(df)
return StreamingResponse(stream_insights(query, metadata), media_type="application/json")
|