Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import io | |
| import json | |
| import time | |
| import sqlite3 | |
| import tempfile | |
| import threading | |
| import pandas as pd | |
| import urllib.request | |
| import urllib.error | |
| try: | |
| import requests as _requests | |
| HAS_REQUESTS = True | |
| except ImportError: | |
| HAS_REQUESTS = False | |
| from fastapi import FastAPI, File, UploadFile, HTTPException, Request | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import FileResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| # ───────────────────────────── | |
| # CONFIG | |
| # ───────────────────────────── | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "") | |
| _db_store = {} | |
| _schema_store = {} | |
| app = FastAPI(title="QueryMind AI Analyst", version="6.0.0") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ───────────────────────────── | |
| # REQUEST MODEL | |
| # ───────────────────────────── | |
| class QueryRequest(BaseModel): | |
| session_id: str | |
| question: str | |
| # ───────────────────────────── | |
| # GEMINI CALL | |
| # retries=1 → fail fast, tell user immediately | |
| # do NOT change retries back to 4 | |
| # ───────────────────────────── | |
| def _call_gemini(question: str, schema: str, columns: list, table: str, | |
| retries: int = 1): | |
| if not GEMINI_API_KEY: | |
| return "" | |
| prompt = f""" | |
| You are an expert SQLite data analyst. | |
| Return ONLY a valid SQL query. No explanation, no markdown, no text before or after. | |
| STRICT RULES: | |
| - Use only the table "{table}" | |
| - Never use DROP or DELETE | |
| - For GROUP BY queries, always include the grouped column in SELECT | |
| - For filtering, use the exact case as in the data | |
| - Return only one SQL statement ending without semicolon | |
| COLUMNS AVAILABLE: | |
| {", ".join(columns)} | |
| SCHEMA: | |
| {schema} | |
| QUESTION: | |
| {question} | |
| """ | |
| url = ( | |
| "https://generativelanguage.googleapis.com" | |
| "/v1beta/models/gemini-2.5-flash" | |
| f":generateContent?key={GEMINI_API_KEY}" | |
| ) | |
| payload = json.dumps({ | |
| "contents": [{"role": "user", "parts": [{"text": prompt}]}] | |
| }).encode("utf-8") | |
| for attempt in range(1, retries + 1): | |
| try: | |
| req = urllib.request.Request( | |
| url, | |
| data=payload, | |
| headers={"Content-Type": "application/json"} | |
| ) | |
| with urllib.request.urlopen(req, timeout=30) as resp: | |
| data = json.loads(resp.read().decode()) | |
| try: | |
| sql = data["candidates"][0]["content"]["parts"][0]["text"] | |
| except Exception: | |
| return "" | |
| if not sql: | |
| return "" | |
| sql = sql.replace("```sql", "").replace("```", "").strip() | |
| sql = sql.split(";")[0].strip() | |
| if "drop" in sql.lower() or "delete" in sql.lower(): | |
| return f'SELECT * FROM "{table}" LIMIT 10' | |
| return sql | |
| except urllib.error.HTTPError as e: | |
| if e.code == 429: | |
| print(f"⚠️ Gemini 429 — rate limited (attempt {attempt}/{retries})") | |
| # fail fast — do not retry, tell user immediately | |
| return "" | |
| else: | |
| print(f"❌ GEMINI HTTP ERROR {e.code}: {e}") | |
| return "" | |
| except Exception as e: | |
| print(f"❌ GEMINI ERROR: {e}") | |
| return "" | |
| return "" | |
| # ───────────────────────────── | |
| # EXECUTE SQL | |
| # ───────────────────────────── | |
| def execute_sql(sql: str, db_bytes: bytes): | |
| conn = sqlite3.connect(":memory:") | |
| temp_path = None | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False) as f: | |
| f.write(db_bytes) | |
| temp_path = f.name | |
| disk = sqlite3.connect(temp_path) | |
| disk.backup(conn) | |
| disk.close() | |
| conn.row_factory = sqlite3.Row | |
| cur = conn.execute(sql) | |
| return [dict(r) for r in cur.fetchall()] | |
| except Exception as e: | |
| return [{"error": str(e)}] | |
| finally: | |
| conn.close() | |
| if temp_path and os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| # ───────────────────────────── | |
| # UPLOAD CSV | |
| # ───────────────────────────── | |
| async def upload_csv(file: UploadFile = File(...)): | |
| try: | |
| content = await file.read() | |
| df = pd.read_csv(io.BytesIO(content)).dropna(how="all") | |
| session_id = os.urandom(8).hex() | |
| table_name = re.sub(r"[^a-zA-Z0-9_]", "_", file.filename) | |
| if table_name and table_name[0].isdigit(): | |
| table_name = "t_" + table_name | |
| table_name = table_name[:40] | |
| with tempfile.NamedTemporaryFile(delete=False) as tf: | |
| conn = sqlite3.connect(tf.name) | |
| df.to_sql(table_name, conn, index=False, if_exists="replace") | |
| schema = conn.execute( | |
| "SELECT sql FROM sqlite_master WHERE type='table'" | |
| ).fetchone()[0] | |
| conn.close() | |
| db_bytes = open(tf.name, "rb").read() | |
| os.remove(tf.name) | |
| _db_store[session_id] = { | |
| "bytes": db_bytes, | |
| "table": table_name, | |
| "cols": list(df.columns) | |
| } | |
| _schema_store[session_id] = schema | |
| return { | |
| "session_id": session_id, | |
| "row_count": len(df), | |
| "columns": list(df.columns), | |
| "table_name": table_name | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ───────────────────────────── | |
| # QUERY ENGINE | |
| # ───────────────────────────── | |
| async def query(req: QueryRequest): | |
| data = _db_store.get(req.session_id) | |
| if not data and _db_store: | |
| data = list(_db_store.values())[-1] | |
| if not data: | |
| raise HTTPException(status_code=404, detail="No dataset loaded") | |
| table = data["table"] | |
| schema = ( | |
| _schema_store.get(req.session_id) | |
| or list(_schema_store.values())[-1] | |
| ) | |
| sql = _call_gemini(req.question, schema, data["cols"], table) | |
| # If Gemini failed (429 or error), return clear error to Render bot | |
| if not sql: | |
| return { | |
| "sql": "", | |
| "results": [], | |
| "error": "⚠️ AI busy (rate limit). Wait 60s and retry." | |
| } | |
| results = execute_sql(sql, data["bytes"]) | |
| return {"sql": sql, "results": results} | |
| # ───────────────────────────── | |
| # HEALTH | |
| # ───────────────────────────── | |
| def health(): | |
| return { | |
| "status": "ok", | |
| "model": "gemini-2.5-flash", | |
| "service": "AI Data Analyst" | |
| } | |
| # ───────────────────────────── | |
| # TELEGRAM WEBHOOK — DISABLED | |
| # Telegram is now handled by Render bot (polling) | |
| # Do not re-enable — causes conflict with Render polling | |
| # ───────────────────────────── | |
| # @app.on_event("startup") | |
| # async def on_startup(): | |
| # thread = threading.Thread(target=_background_webhook_setup, daemon=True) | |
| # thread.start() | |
| # @app.get("/set-webhook") | |
| # async def set_webhook_endpoint(): | |
| # ... | |
| # @app.post("/webhook/{token}") | |
| # async def telegram_webhook(token: str, request: Request): | |
| # ... | |
| # ───────────────────────────── | |
| # FRONTEND | |
| # ───────────────────────────── | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| def root(): | |
| return FileResponse("static/webapp.html") | |