File size: 5,274 Bytes
e4abb85
c509b44
e4abb85
c509b44
e4abb85
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c509b44
e4abb85
 
 
 
 
 
c509b44
 
 
 
 
e4abb85
 
 
 
c509b44
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e4abb85
 
 
c509b44
e4abb85
 
c509b44
e4abb85
 
 
c509b44
 
 
 
 
 
 
 
 
 
 
 
e4abb85
c509b44
 
 
 
 
 
 
 
 
 
 
 
 
e4abb85
 
c509b44
 
 
 
e4abb85
c509b44
 
 
 
e4abb85
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import sqlite3
import time
from pathlib import Path
from typing import List, Optional, Dict, Any


class RulesStore:
    """
    Lightweight SQLite-backed store for admin rules.
    Ensures data persists across restarts without requiring external DB setup.
    """

    def __init__(self):
        root_dir = Path(__file__).resolve().parents[3]  # points to project root
        data_dir = root_dir / "data"
        data_dir.mkdir(parents=True, exist_ok=True)
        self.db_path = data_dir / "admin_rules.db"
        self._init_db()

    def _init_db(self):
        with sqlite3.connect(self.db_path) as conn:
            # Create table with regex pattern and severity support
            conn.execute(
                """
                CREATE TABLE IF NOT EXISTS admin_rules (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    tenant_id TEXT NOT NULL,
                    rule TEXT NOT NULL,
                    pattern TEXT,
                    severity TEXT DEFAULT 'medium',
                    description TEXT,
                    enabled BOOLEAN DEFAULT 1,
                    created_at INTEGER,
                    UNIQUE(tenant_id, rule)
                )
                """
            )
            # Add new columns if they don't exist (for backward compatibility)
            try:
                conn.execute("ALTER TABLE admin_rules ADD COLUMN pattern TEXT")
            except sqlite3.OperationalError:
                pass  # Column already exists
            try:
                conn.execute("ALTER TABLE admin_rules ADD COLUMN severity TEXT DEFAULT 'medium'")
            except sqlite3.OperationalError:
                pass
            try:
                conn.execute("ALTER TABLE admin_rules ADD COLUMN description TEXT")
            except sqlite3.OperationalError:
                pass
            try:
                conn.execute("ALTER TABLE admin_rules ADD COLUMN enabled BOOLEAN DEFAULT 1")
            except sqlite3.OperationalError:
                pass
            try:
                conn.execute("ALTER TABLE admin_rules ADD COLUMN created_at INTEGER")
            except sqlite3.OperationalError:
                pass
            conn.commit()

    def get_rules(self, tenant_id: str) -> List[str]:
        """Get all rules as a list of rule text strings (backward compatibility)."""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute(
                "SELECT rule FROM admin_rules WHERE tenant_id = ? AND enabled = 1 ORDER BY id ASC",
                (tenant_id,),
            )
            return [row[0] for row in cursor.fetchall()]
    
    def get_rules_detailed(self, tenant_id: str) -> List[Dict[str, Any]]:
        """Get all rules with full metadata including pattern, severity, etc."""
        with sqlite3.connect(self.db_path) as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.execute(
                """SELECT id, tenant_id, rule, pattern, severity, description, enabled, created_at 
                FROM admin_rules WHERE tenant_id = ? AND enabled = 1 ORDER BY id ASC""",
                (tenant_id,),
            )
            rows = cursor.fetchall()
            return [dict(row) for row in rows]

    def add_rule(
        self,
        tenant_id: str,
        rule: str,
        pattern: Optional[str] = None,
        severity: str = "medium",
        description: Optional[str] = None,
        enabled: bool = True
    ) -> bool:
        """
        Add a rule with optional regex pattern and severity.
        If pattern is None, the rule text itself is used as the pattern.
        """
        try:
            with sqlite3.connect(self.db_path) as conn:
                # If pattern not provided, use rule text as pattern
                pattern_value = pattern or rule
                description_value = description or rule
                
                conn.execute(
                    """INSERT OR IGNORE INTO admin_rules 
                    (tenant_id, rule, pattern, severity, description, enabled, created_at) 
                    VALUES (?, ?, ?, ?, ?, ?, ?)""",
                    (tenant_id, rule, pattern_value, severity, description_value, 1 if enabled else 0, int(time.time())),
                )
                conn.commit()
            return True
        except sqlite3.Error:
            return False

    def add_rules_bulk(self, tenant_id: str, rules: List[str]) -> List[str]:
        added = []
        with sqlite3.connect(self.db_path) as conn:
            for rule in rules:
                try:
                    conn.execute(
                        "INSERT OR IGNORE INTO admin_rules (tenant_id, rule) VALUES (?, ?)",
                        (tenant_id, rule),
                    )
                    added.append(rule)
                except sqlite3.Error:
                    continue
            conn.commit()
        return added

    def delete_rule(self, tenant_id: str, rule: str) -> bool:
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute(
                "DELETE FROM admin_rules WHERE tenant_id = ? AND rule = ?",
                (tenant_id, rule),
            )
            conn.commit()
            return cursor.rowcount > 0