import os import re import io import json import time import sqlite3 import tempfile import threading import pandas as pd import urllib.request import urllib.error try: import requests as _requests HAS_REQUESTS = True except ImportError: HAS_REQUESTS = False from fastapi import FastAPI, File, UploadFile, HTTPException, Request from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel # ───────────────────────────── # CONFIG # ───────────────────────────── GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "") _db_store = {} _schema_store = {} app = FastAPI(title="QueryMind AI Analyst", version="6.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) # ───────────────────────────── # REQUEST MODEL # ───────────────────────────── class QueryRequest(BaseModel): session_id: str question: str # ───────────────────────────── # GEMINI CALL # retries=1 → fail fast, tell user immediately # do NOT change retries back to 4 # ───────────────────────────── def _call_gemini(question: str, schema: str, columns: list, table: str, retries: int = 1): if not GEMINI_API_KEY: return "" prompt = f""" You are an expert SQLite data analyst. Return ONLY a valid SQL query. No explanation, no markdown, no text before or after. STRICT RULES: - Use only the table "{table}" - Never use DROP or DELETE - For GROUP BY queries, always include the grouped column in SELECT - For filtering, use the exact case as in the data - Return only one SQL statement ending without semicolon COLUMNS AVAILABLE: {", ".join(columns)} SCHEMA: {schema} QUESTION: {question} """ url = ( "https://generativelanguage.googleapis.com" "/v1beta/models/gemini-2.5-flash" f":generateContent?key={GEMINI_API_KEY}" ) payload = json.dumps({ "contents": [{"role": "user", "parts": [{"text": prompt}]}] }).encode("utf-8") for attempt in range(1, retries + 1): try: req = urllib.request.Request( url, data=payload, headers={"Content-Type": "application/json"} ) with urllib.request.urlopen(req, timeout=30) as resp: data = json.loads(resp.read().decode()) try: sql = data["candidates"][0]["content"]["parts"][0]["text"] except Exception: return "" if not sql: return "" sql = sql.replace("```sql", "").replace("```", "").strip() sql = sql.split(";")[0].strip() if "drop" in sql.lower() or "delete" in sql.lower(): return f'SELECT * FROM "{table}" LIMIT 10' return sql except urllib.error.HTTPError as e: if e.code == 429: print(f"⚠️ Gemini 429 — rate limited (attempt {attempt}/{retries})") # fail fast — do not retry, tell user immediately return "" else: print(f"❌ GEMINI HTTP ERROR {e.code}: {e}") return "" except Exception as e: print(f"❌ GEMINI ERROR: {e}") return "" return "" # ───────────────────────────── # EXECUTE SQL # ───────────────────────────── def execute_sql(sql: str, db_bytes: bytes): conn = sqlite3.connect(":memory:") temp_path = None try: with tempfile.NamedTemporaryFile(delete=False) as f: f.write(db_bytes) temp_path = f.name disk = sqlite3.connect(temp_path) disk.backup(conn) disk.close() conn.row_factory = sqlite3.Row cur = conn.execute(sql) return [dict(r) for r in cur.fetchall()] except Exception as e: return [{"error": str(e)}] finally: conn.close() if temp_path and os.path.exists(temp_path): os.remove(temp_path) # ───────────────────────────── # UPLOAD CSV # ───────────────────────────── @app.post("/upload") async def upload_csv(file: UploadFile = File(...)): try: content = await file.read() df = pd.read_csv(io.BytesIO(content)).dropna(how="all") session_id = os.urandom(8).hex() table_name = re.sub(r"[^a-zA-Z0-9_]", "_", file.filename) if table_name and table_name[0].isdigit(): table_name = "t_" + table_name table_name = table_name[:40] with tempfile.NamedTemporaryFile(delete=False) as tf: conn = sqlite3.connect(tf.name) df.to_sql(table_name, conn, index=False, if_exists="replace") schema = conn.execute( "SELECT sql FROM sqlite_master WHERE type='table'" ).fetchone()[0] conn.close() db_bytes = open(tf.name, "rb").read() os.remove(tf.name) _db_store[session_id] = { "bytes": db_bytes, "table": table_name, "cols": list(df.columns) } _schema_store[session_id] = schema return { "session_id": session_id, "row_count": len(df), "columns": list(df.columns), "table_name": table_name } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ───────────────────────────── # QUERY ENGINE # ───────────────────────────── @app.post("/query") async def query(req: QueryRequest): data = _db_store.get(req.session_id) if not data and _db_store: data = list(_db_store.values())[-1] if not data: raise HTTPException(status_code=404, detail="No dataset loaded") table = data["table"] schema = ( _schema_store.get(req.session_id) or list(_schema_store.values())[-1] ) sql = _call_gemini(req.question, schema, data["cols"], table) # If Gemini failed (429 or error), return clear error to Render bot if not sql: return { "sql": "", "results": [], "error": "⚠️ AI busy (rate limit). Wait 60s and retry." } results = execute_sql(sql, data["bytes"]) return {"sql": sql, "results": results} # ───────────────────────────── # HEALTH # ───────────────────────────── @app.get("/health") def health(): return { "status": "ok", "model": "gemini-2.5-flash", "service": "AI Data Analyst" } # ───────────────────────────── # TELEGRAM WEBHOOK — DISABLED # Telegram is now handled by Render bot (polling) # Do not re-enable — causes conflict with Render polling # ───────────────────────────── # @app.on_event("startup") # async def on_startup(): # thread = threading.Thread(target=_background_webhook_setup, daemon=True) # thread.start() # @app.get("/set-webhook") # async def set_webhook_endpoint(): # ... # @app.post("/webhook/{token}") # async def telegram_webhook(token: str, request: Request): # ... # ───────────────────────────── # FRONTEND # ───────────────────────────── app.mount("/static", StaticFiles(directory="static"), name="static") @app.get("/") def root(): return FileResponse("static/webapp.html")