Tgapi / app.py
Flix010's picture
Create app.py
2c859f7 verified
Raw
History Blame Contribute Delete
1.7 kB
import os
import duckdb
import uvicorn
from fastapi import FastAPI
from huggingface_hub import hf_hub_download
app = FastAPI()
# ๐Ÿš€ Global Path taaki baar-baar download na ho
DB_PATH = None
def get_db():
global DB_PATH
if DB_PATH is None or not os.path.exists(DB_PATH):
token = os.getenv("HF_TOKEN")
if token is None:
raise ValueError("HF_TOKEN environment variable not set")
DB_PATH = hf_hub_download(
repo_id="sheeturt/Telegram",
filename="Midnight_Master_v2.db",
repo_type="dataset",
token=token
)
return DB_PATH
@app.get("/search")
@app.post("/search")
async def search(q: str = None):
if not q:
return {"status": "error", "message": "Query 'q' missing"}
try:
path = get_db()
con = duckdb.connect(database=':memory:')
con.execute("INSTALL sqlite;")
con.execute("LOAD sqlite;")
query = f"""
SELECT user_id
FROM sqlite_scan('{path}', 'telegram_users')
WHERE user_id LIKE '{q};%'
LIMIT 1
"""
res = con.execute(query).fetchone()
results = []
if res:
p = res[0].split(';')
results.append({
"id": p[0],
"t": p[1] if len(p) > 1 else "N/A",
"u": p[2] if len(p) > 2 else "N/A",
"n": p[3] if len(p) > 3 else "N/A"
})
return {"status": "success", "results": results}
except Exception as e:
return {"status": "error", "details": str(e), "type": type(e).__name__}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)