chatplotapi / app.py
triflix's picture
Update app.py
78ded87 verified
raw
history blame
4.42 kB
from fastapi import FastAPI, Request, File, UploadFile, Form
from fastapi.responses import StreamingResponse, JSONResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
import pandas as pd
from google import genai
from google.genai import types
import os
import json
import asyncio
# -------------------------------
# πŸ”‘ Configuration
# -------------------------------
API_KEY = os.getenv("GEMINI_API_KEY", "AIzaSyB1jgGCuzg7ELPwNEEwaluQZoZhxhgLmAs")
MODEL = "gemini-2.5-flash-lite"
client = genai.Client(api_key=API_KEY)
# -------------------------------
# ⚑ FastAPI Setup
# -------------------------------
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
# -------------------------------
# πŸ› οΈ Helper Functions
# -------------------------------
def get_metadata(df: pd.DataFrame):
return {
"columns": list(df.columns),
"dtypes": df.dtypes.apply(lambda x: str(x)).to_dict(),
"num_rows": df.shape[0],
"num_cols": df.shape[1],
"null_counts": df.isnull().sum().to_dict(),
"unique_counts": df.nunique().to_dict(),
"sample_rows": df.head(3).to_dict(orient="records"),
}
async def stream_insights(user_query, metadata):
"""Generator that yields insights step by step as text/json strings."""
# Step 1: Start message
yield json.dumps({"status": "started", "message": "File received. Extracting metadata..."}) + "\n"
await asyncio.sleep(0.5)
# Step 2: Metadata
yield json.dumps({"status": "metadata", "metadata": metadata}) + "\n"
await asyncio.sleep(0.5)
# Step 3: Call Gemini for structured insights
system_prompt = """
You are a data analysis assistant.
Always return JSON with this schema:
{
"excel_info": {...},
"data_type_context": "...",
"auto_insights": {
"insights": [
{... Efficiency Analysis ...},
{... Cumulative Performance ...},
{... Process Issues ...},
{... Planning vs Projection ...},
{... Loss Analysis ...}
]
},
"query_insights": {...}
}
"""
user_prompt = f"Dataset metadata: {metadata}\nUser request: {user_query}"
contents = [types.Content(role="user", parts=[types.Part.from_text(text=user_prompt)])]
config = types.GenerateContentConfig(
temperature=0,
max_output_tokens=2000,
system_instruction=[types.Part.from_text(text=system_prompt)],
)
result = ""
for chunk in client.models.generate_content_stream(model=MODEL, contents=contents, config=config):
if chunk.text:
result += chunk.text
try:
parsed = json.loads(result)
except Exception:
yield json.dumps({"status": "error", "raw_output": result}) + "\n"
return
# Step 4: Excel info
yield json.dumps({"status": "excel_info", "excel_info": parsed.get("excel_info", {})}) + "\n"
await asyncio.sleep(0.5)
# Step 5: Data type context
yield json.dumps({"status": "context", "data_type_context": parsed.get("data_type_context", "")}) + "\n"
await asyncio.sleep(0.5)
# Step 6: Stream each insight one by one
for insight in parsed.get("auto_insights", {}).get("insights", []):
yield json.dumps({"status": "insight", "insight": insight}) + "\n"
await asyncio.sleep(0.5)
# Step 7: Query insights
yield json.dumps({"status": "query", "query_insights": parsed.get("query_insights", {})}) + "\n"
# Step 8: Completed
yield json.dumps({"status": "completed", "message": "All insights generated"}) + "\n"
# -------------------------------
# 🌐 Routes
# -------------------------------
@app.get("/", response_class=HTMLResponse)
async def home(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/stream_insights")
async def stream_insight_file(file: UploadFile = File(...), query: str = Form("Analyze the dataset")):
"""Stream structured JSON insights step by step."""
try:
df = pd.read_excel(file.file)
except Exception as e:
return JSONResponse({"success": False, "error": f"Failed to read file: {str(e)}"})
metadata = get_metadata(df)
return StreamingResponse(stream_insights(query, metadata), media_type="application/json")