File size: 2,159 Bytes
c76014a
 
72e96d1
 
f55959d
c76014a
72e96d1
c76014a
72e96d1
 
 
c76014a
72e96d1
 
 
c76014a
f55959d
 
 
 
 
 
72e96d1
f55959d
c76014a
72e96d1
 
 
 
 
 
 
 
 
 
 
 
 
 
c76014a
 
 
72e96d1
c76014a
72e96d1
 
 
f55959d
 
72e96d1
f55959d
 
 
72e96d1
 
 
 
 
 
 
 
f55959d
 
 
 
72e96d1
 
 
 
 
 
c76014a
72e96d1
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import os
import time
import logging
from pathlib import Path
from typing import Optional, TypedDict

log = logging.getLogger(__name__)

# ------------------------------
# Config
# ------------------------------

# default upload directory (can override via .env)
_DB_UPLOAD_DIR = Path(os.getenv("DB_UPLOAD_DIR", "/tmp/nl2sql_dbs"))
_DB_UPLOAD_DIR.mkdir(parents=True, exist_ok=True)


class DBEntry(TypedDict):
    path: str
    ts: float


# in-memory map: {db_id: {"path": str, "ts": float}}
DB_MAP: dict[str, DBEntry] = {}

# cleanup threshold (hours)
DB_TTL_HOURS = 6


# ------------------------------
# Helpers
# ------------------------------


def register_db(db_id: str, path: str) -> None:
    """Register new DB in memory (and ensure dir exists)."""
    _DB_UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
    DB_MAP[db_id] = {"path": path, "ts": time.time()}
    log.info(f"📦 Registered DB {db_id} -> {path}")


def cleanup_stale_dbs() -> None:
    """Remove expired DBs from /tmp/nl2sql_dbs and memory map."""
    now = time.time()
    cutoff = DB_TTL_HOURS * 3600
    stale_ids = [db_id for db_id, entry in DB_MAP.items() if now - entry["ts"] > cutoff]
    for db_id in stale_ids:
        path_str = DB_MAP[db_id]["path"]
        path = Path(path_str)
        try:
            if path.exists():
                path.unlink()
                log.info(f"🧹 Deleted stale DB: {path}")
        except FileNotFoundError:
            pass
        DB_MAP.pop(db_id, None)


def get_db_path(db_id: str) -> Optional[str]:
    """Return full path of an uploaded DB (persistent lookup)."""
    entry = DB_MAP.get(db_id)
    if entry:
        path_str = entry["path"]
        if Path(path_str).exists():
            return path_str

    candidates = [
        _DB_UPLOAD_DIR / f"{db_id}.sqlite",
        _DB_UPLOAD_DIR / f"{db_id}.db",
        Path("data/uploads") / f"{db_id}.sqlite",
        Path("data/uploads") / f"{db_id}.db",
    ]
    for p in candidates:
        if p.exists():
            log.info(f"🔍 Recovered DB path for {db_id}: {p}")
            return str(p)

    log.warning(f"⚠️ DB file not found for id={db_id}")
    return None