Spaces:
Sleeping
Sleeping
File size: 2,159 Bytes
c76014a 72e96d1 f55959d c76014a 72e96d1 c76014a 72e96d1 c76014a 72e96d1 c76014a f55959d 72e96d1 f55959d c76014a 72e96d1 c76014a 72e96d1 c76014a 72e96d1 f55959d 72e96d1 f55959d 72e96d1 f55959d 72e96d1 c76014a 72e96d1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
import os
import time
import logging
from pathlib import Path
from typing import Optional, TypedDict
log = logging.getLogger(__name__)
# ------------------------------
# Config
# ------------------------------
# default upload directory (can override via .env)
_DB_UPLOAD_DIR = Path(os.getenv("DB_UPLOAD_DIR", "/tmp/nl2sql_dbs"))
_DB_UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
class DBEntry(TypedDict):
path: str
ts: float
# in-memory map: {db_id: {"path": str, "ts": float}}
DB_MAP: dict[str, DBEntry] = {}
# cleanup threshold (hours)
DB_TTL_HOURS = 6
# ------------------------------
# Helpers
# ------------------------------
def register_db(db_id: str, path: str) -> None:
"""Register new DB in memory (and ensure dir exists)."""
_DB_UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
DB_MAP[db_id] = {"path": path, "ts": time.time()}
log.info(f"📦 Registered DB {db_id} -> {path}")
def cleanup_stale_dbs() -> None:
"""Remove expired DBs from /tmp/nl2sql_dbs and memory map."""
now = time.time()
cutoff = DB_TTL_HOURS * 3600
stale_ids = [db_id for db_id, entry in DB_MAP.items() if now - entry["ts"] > cutoff]
for db_id in stale_ids:
path_str = DB_MAP[db_id]["path"]
path = Path(path_str)
try:
if path.exists():
path.unlink()
log.info(f"🧹 Deleted stale DB: {path}")
except FileNotFoundError:
pass
DB_MAP.pop(db_id, None)
def get_db_path(db_id: str) -> Optional[str]:
"""Return full path of an uploaded DB (persistent lookup)."""
entry = DB_MAP.get(db_id)
if entry:
path_str = entry["path"]
if Path(path_str).exists():
return path_str
candidates = [
_DB_UPLOAD_DIR / f"{db_id}.sqlite",
_DB_UPLOAD_DIR / f"{db_id}.db",
Path("data/uploads") / f"{db_id}.sqlite",
Path("data/uploads") / f"{db_id}.db",
]
for p in candidates:
if p.exists():
log.info(f"🔍 Recovered DB path for {db_id}: {p}")
return str(p)
log.warning(f"⚠️ DB file not found for id={db_id}")
return None
|