import os import sqlite3 import requests import uvicorn from fastapi import FastAPI, Query from contextlib import asynccontextmanager DB_PATH = "insta_hitech.db" # Dataset direct download link DB_URL = "https://huggingface.co/datasets/Watchhrr/Insta17m/resolve/main/insta_hitech.db" TOKEN = os.getenv("MY_HF_TOKEN") def download_db(): if not os.path.exists(DB_PATH): print("📥 Downloading DB from Dataset...") headers = {"Authorization": f"Bearer {TOKEN}"} if TOKEN else {} with requests.get(DB_URL, headers=headers, stream=True) as r: r.raise_for_status() with open(DB_PATH, 'wb') as f: for chunk in r.iter_content(chunk_size=1024*1024): f.write(chunk) print("✅ DB Downloaded!") @asynccontextmanager async def lifespan(app: FastAPI): download_db() yield app = FastAPI(lifespan=lifespan) # 1. Total Count Check karne ke liye: /count @app.get("/count") def get_count(): if not os.path.exists(DB_PATH): return {"error": "DB not ready"} conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM users") total = cursor.fetchone()[0] conn.close() return {"total_records": total} # 2. Main Search: /search?q=XYZ @app.get("/search") async def search(q: str = Query(None)): if not q: return {"error": "Bhai, 'q' parameter missing hai!"} if not os.path.exists(DB_PATH): return {"error": "DB downloading..."} conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row cursor = conn.cursor() try: # Search query jo Username(u), ID(id), aur Phone(t) teeno check karega cursor.execute("SELECT * FROM users WHERE u=? OR id=? OR t=? LIMIT 10", (q, q, q)) rows = cursor.fetchall() if not rows: return {"status": "error", "message": "No Record Found"} return {"status": "success", "results": [dict(r) for r in rows]} finally: conn.close() if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)