chatplotapi / app.py
triflix's picture
Update app.py
93ab69f verified
raw
history blame
4.24 kB
from fastapi import FastAPI, File, UploadFile, Form
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import pandas as pd
from google import genai
from google.genai import types
import os
import json
import asyncio
# -------------------------------
# πŸ”‘ Configuration
# -------------------------------
API_KEY = os.getenv("GEMINI_API_KEY", "AIzaSyB1jgGCuzg7ELPwNEEwaluQZoZhxhgLmAs")
MODEL = "gemini-2.5-flash-lite"
client = genai.Client(api_key=API_KEY)
# -------------------------------
# ⚑ FastAPI Setup
# -------------------------------
app = FastAPI()
# Enable CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# -------------------------------
# πŸ› οΈ Helper Functions
# -------------------------------
def get_metadata(df: pd.DataFrame):
# Convert all timestamps to string to avoid JSON serialization issues
df_serializable = df.copy()
for col in df_serializable.select_dtypes(include=['datetime64[ns]']).columns:
df_serializable[col] = df_serializable[col].astype(str)
return {
"columns": list(df_serializable.columns),
"dtypes": df_serializable.dtypes.apply(lambda x: str(x)).to_dict(),
"num_rows": df_serializable.shape[0],
"num_cols": df_serializable.shape[1],
"null_counts": df_serializable.isnull().sum().to_dict(),
"unique_counts": df_serializable.nunique().to_dict(),
"sample_rows": df_serializable.head(3).to_dict(orient="records"),
}
async def stream_insights(user_query, metadata):
"""Stream insights step by step."""
yield json.dumps({"status": "started", "message": "File received. Extracting metadata..."}) + "\n"
await asyncio.sleep(0.5)
yield json.dumps({"status": "metadata", "metadata": metadata}) + "\n"
await asyncio.sleep(0.5)
# Gemini system prompt
system_prompt = """
You are a data analysis assistant.
Always return JSON with this schema:
{
"excel_info": {...},
"data_type_context": "...",
"auto_insights": {
"insights": [
{... Efficiency Analysis ...},
{... Cumulative Performance ...},
{... Process Issues ...},
{... Planning vs Projection ...},
{... Loss Analysis ...}
]
},
"query_insights": {...}
}
"""
user_prompt = f"Dataset metadata: {metadata}\nUser request: {user_query}"
contents = [types.Content(role="user", parts=[types.Part.from_text(text=user_prompt)])]
config = types.GenerateContentConfig(
temperature=0,
max_output_tokens=2000,
system_instruction=[types.Part.from_text(text=system_prompt)],
)
result = ""
for chunk in client.models.generate_content_stream(model=MODEL, contents=contents, config=config):
if chunk.text:
result += chunk.text
try:
parsed = json.loads(result)
except Exception:
yield json.dumps({"status": "error", "raw_output": result}) + "\n"
return
yield json.dumps({"status": "excel_info", "excel_info": parsed.get("excel_info", {})}) + "\n"
await asyncio.sleep(0.5)
yield json.dumps({"status": "context", "data_type_context": parsed.get("data_type_context", "")}) + "\n"
await asyncio.sleep(0.5)
for insight in parsed.get("auto_insights", {}).get("insights", []):
yield json.dumps({"status": "insight", "insight": insight}) + "\n"
await asyncio.sleep(0.5)
yield json.dumps({"status": "query", "query_insights": parsed.get("query_insights", {})}) + "\n"
yield json.dumps({"status": "completed", "message": "All insights generated"}) + "\n"
# -------------------------------
# 🌐 Routes
# -------------------------------
@app.post("/stream_insights")
async def stream_insight_file(file: UploadFile = File(...), query: str = Form("Analyze the dataset")):
try:
df = pd.read_excel(file.file)
except Exception as e:
return JSONResponse({"success": False, "error": f"Failed to read file: {str(e)}"})
metadata = get_metadata(df)
return StreamingResponse(stream_insights(query, metadata), media_type="application/json")