File size: 2,575 Bytes
e4abb85
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import sqlite3
from pathlib import Path
from typing import List


class RulesStore:
    """
    Lightweight SQLite-backed store for admin rules.
    Ensures data persists across restarts without requiring external DB setup.
    """

    def __init__(self):
        root_dir = Path(__file__).resolve().parents[3]  # points to project root
        data_dir = root_dir / "data"
        data_dir.mkdir(parents=True, exist_ok=True)
        self.db_path = data_dir / "admin_rules.db"
        self._init_db()

    def _init_db(self):
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                """
                CREATE TABLE IF NOT EXISTS admin_rules (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    tenant_id TEXT NOT NULL,
                    rule TEXT NOT NULL,
                    UNIQUE(tenant_id, rule)
                )
                """
            )
            conn.commit()

    def get_rules(self, tenant_id: str) -> List[str]:
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute(
                "SELECT rule FROM admin_rules WHERE tenant_id = ? ORDER BY id ASC",
                (tenant_id,),
            )
            return [row[0] for row in cursor.fetchall()]

    def add_rule(self, tenant_id: str, rule: str) -> bool:
        try:
            with sqlite3.connect(self.db_path) as conn:
                conn.execute(
                    "INSERT OR IGNORE INTO admin_rules (tenant_id, rule) VALUES (?, ?)",
                    (tenant_id, rule),
                )
                conn.commit()
            return True
        except sqlite3.Error:
            return False

    def add_rules_bulk(self, tenant_id: str, rules: List[str]) -> List[str]:
        added = []
        with sqlite3.connect(self.db_path) as conn:
            for rule in rules:
                try:
                    conn.execute(
                        "INSERT OR IGNORE INTO admin_rules (tenant_id, rule) VALUES (?, ?)",
                        (tenant_id, rule),
                    )
                    added.append(rule)
                except sqlite3.Error:
                    continue
            conn.commit()
        return added

    def delete_rule(self, tenant_id: str, rule: str) -> bool:
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute(
                "DELETE FROM admin_rules WHERE tenant_id = ? AND rule = ?",
                (tenant_id, rule),
            )
            conn.commit()
            return cursor.rowcount > 0