nilotpaldhar2004's picture
Update app.py
fe2e2be unverified
import os
import re
import io
import json
import time
import sqlite3
import tempfile
import threading
import pandas as pd
import urllib.request
import urllib.error
try:
import requests as _requests
HAS_REQUESTS = True
except ImportError:
HAS_REQUESTS = False
from fastapi import FastAPI, File, UploadFile, HTTPException, Request
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
# ─────────────────────────────
# CONFIG
# ─────────────────────────────
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "")
_db_store = {}
_schema_store = {}
app = FastAPI(title="QueryMind AI Analyst", version="6.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# ─────────────────────────────
# REQUEST MODEL
# ─────────────────────────────
class QueryRequest(BaseModel):
session_id: str
question: str
# ─────────────────────────────
# GEMINI CALL
# retries=1 → fail fast, tell user immediately
# do NOT change retries back to 4
# ─────────────────────────────
def _call_gemini(question: str, schema: str, columns: list, table: str,
retries: int = 1):
if not GEMINI_API_KEY:
return ""
prompt = f"""
You are an expert SQLite data analyst.
Return ONLY a valid SQL query. No explanation, no markdown, no text before or after.
STRICT RULES:
- Use only the table "{table}"
- Never use DROP or DELETE
- For GROUP BY queries, always include the grouped column in SELECT
- For filtering, use the exact case as in the data
- Return only one SQL statement ending without semicolon
COLUMNS AVAILABLE:
{", ".join(columns)}
SCHEMA:
{schema}
QUESTION:
{question}
"""
url = (
"https://generativelanguage.googleapis.com"
"/v1beta/models/gemini-2.5-flash"
f":generateContent?key={GEMINI_API_KEY}"
)
payload = json.dumps({
"contents": [{"role": "user", "parts": [{"text": prompt}]}]
}).encode("utf-8")
for attempt in range(1, retries + 1):
try:
req = urllib.request.Request(
url,
data=payload,
headers={"Content-Type": "application/json"}
)
with urllib.request.urlopen(req, timeout=30) as resp:
data = json.loads(resp.read().decode())
try:
sql = data["candidates"][0]["content"]["parts"][0]["text"]
except Exception:
return ""
if not sql:
return ""
sql = sql.replace("```sql", "").replace("```", "").strip()
sql = sql.split(";")[0].strip()
if "drop" in sql.lower() or "delete" in sql.lower():
return f'SELECT * FROM "{table}" LIMIT 10'
return sql
except urllib.error.HTTPError as e:
if e.code == 429:
print(f"⚠️ Gemini 429 — rate limited (attempt {attempt}/{retries})")
# fail fast — do not retry, tell user immediately
return ""
else:
print(f"❌ GEMINI HTTP ERROR {e.code}: {e}")
return ""
except Exception as e:
print(f"❌ GEMINI ERROR: {e}")
return ""
return ""
# ─────────────────────────────
# EXECUTE SQL
# ─────────────────────────────
def execute_sql(sql: str, db_bytes: bytes):
conn = sqlite3.connect(":memory:")
temp_path = None
try:
with tempfile.NamedTemporaryFile(delete=False) as f:
f.write(db_bytes)
temp_path = f.name
disk = sqlite3.connect(temp_path)
disk.backup(conn)
disk.close()
conn.row_factory = sqlite3.Row
cur = conn.execute(sql)
return [dict(r) for r in cur.fetchall()]
except Exception as e:
return [{"error": str(e)}]
finally:
conn.close()
if temp_path and os.path.exists(temp_path):
os.remove(temp_path)
# ─────────────────────────────
# UPLOAD CSV
# ─────────────────────────────
@app.post("/upload")
async def upload_csv(file: UploadFile = File(...)):
try:
content = await file.read()
df = pd.read_csv(io.BytesIO(content)).dropna(how="all")
session_id = os.urandom(8).hex()
table_name = re.sub(r"[^a-zA-Z0-9_]", "_", file.filename)
if table_name and table_name[0].isdigit():
table_name = "t_" + table_name
table_name = table_name[:40]
with tempfile.NamedTemporaryFile(delete=False) as tf:
conn = sqlite3.connect(tf.name)
df.to_sql(table_name, conn, index=False, if_exists="replace")
schema = conn.execute(
"SELECT sql FROM sqlite_master WHERE type='table'"
).fetchone()[0]
conn.close()
db_bytes = open(tf.name, "rb").read()
os.remove(tf.name)
_db_store[session_id] = {
"bytes": db_bytes,
"table": table_name,
"cols": list(df.columns)
}
_schema_store[session_id] = schema
return {
"session_id": session_id,
"row_count": len(df),
"columns": list(df.columns),
"table_name": table_name
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ─────────────────────────────
# QUERY ENGINE
# ─────────────────────────────
@app.post("/query")
async def query(req: QueryRequest):
data = _db_store.get(req.session_id)
if not data and _db_store:
data = list(_db_store.values())[-1]
if not data:
raise HTTPException(status_code=404, detail="No dataset loaded")
table = data["table"]
schema = (
_schema_store.get(req.session_id)
or list(_schema_store.values())[-1]
)
sql = _call_gemini(req.question, schema, data["cols"], table)
# If Gemini failed (429 or error), return clear error to Render bot
if not sql:
return {
"sql": "",
"results": [],
"error": "⚠️ AI busy (rate limit). Wait 60s and retry."
}
results = execute_sql(sql, data["bytes"])
return {"sql": sql, "results": results}
# ─────────────────────────────
# HEALTH
# ─────────────────────────────
@app.get("/health")
def health():
return {
"status": "ok",
"model": "gemini-2.5-flash",
"service": "AI Data Analyst"
}
# ─────────────────────────────
# TELEGRAM WEBHOOK — DISABLED
# Telegram is now handled by Render bot (polling)
# Do not re-enable — causes conflict with Render polling
# ─────────────────────────────
# @app.on_event("startup")
# async def on_startup():
# thread = threading.Thread(target=_background_webhook_setup, daemon=True)
# thread.start()
# @app.get("/set-webhook")
# async def set_webhook_endpoint():
# ...
# @app.post("/webhook/{token}")
# async def telegram_webhook(token: str, request: Request):
# ...
# ─────────────────────────────
# FRONTEND
# ─────────────────────────────
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/")
def root():
return FileResponse("static/webapp.html")