import sqlite3 import time from pathlib import Path from typing import List, Optional, Dict, Any class RulesStore: """ Lightweight SQLite-backed store for admin rules. Ensures data persists across restarts without requiring external DB setup. """ def __init__(self): root_dir = Path(__file__).resolve().parents[3] # points to project root data_dir = root_dir / "data" data_dir.mkdir(parents=True, exist_ok=True) self.db_path = data_dir / "admin_rules.db" self._init_db() def _init_db(self): with sqlite3.connect(self.db_path) as conn: # Create table with regex pattern and severity support conn.execute( """ CREATE TABLE IF NOT EXISTS admin_rules ( id INTEGER PRIMARY KEY AUTOINCREMENT, tenant_id TEXT NOT NULL, rule TEXT NOT NULL, pattern TEXT, severity TEXT DEFAULT 'medium', description TEXT, enabled BOOLEAN DEFAULT 1, created_at INTEGER, UNIQUE(tenant_id, rule) ) """ ) # Add new columns if they don't exist (for backward compatibility) try: conn.execute("ALTER TABLE admin_rules ADD COLUMN pattern TEXT") except sqlite3.OperationalError: pass # Column already exists try: conn.execute("ALTER TABLE admin_rules ADD COLUMN severity TEXT DEFAULT 'medium'") except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE admin_rules ADD COLUMN description TEXT") except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE admin_rules ADD COLUMN enabled BOOLEAN DEFAULT 1") except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE admin_rules ADD COLUMN created_at INTEGER") except sqlite3.OperationalError: pass conn.commit() def get_rules(self, tenant_id: str) -> List[str]: """Get all rules as a list of rule text strings (backward compatibility).""" with sqlite3.connect(self.db_path) as conn: cursor = conn.execute( "SELECT rule FROM admin_rules WHERE tenant_id = ? AND enabled = 1 ORDER BY id ASC", (tenant_id,), ) return [row[0] for row in cursor.fetchall()] def get_rules_detailed(self, tenant_id: str) -> List[Dict[str, Any]]: """Get all rules with full metadata including pattern, severity, etc.""" with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.execute( """SELECT id, tenant_id, rule, pattern, severity, description, enabled, created_at FROM admin_rules WHERE tenant_id = ? AND enabled = 1 ORDER BY id ASC""", (tenant_id,), ) rows = cursor.fetchall() return [dict(row) for row in rows] def add_rule( self, tenant_id: str, rule: str, pattern: Optional[str] = None, severity: str = "medium", description: Optional[str] = None, enabled: bool = True ) -> bool: """ Add a rule with optional regex pattern and severity. If pattern is None, the rule text itself is used as the pattern. """ try: with sqlite3.connect(self.db_path) as conn: # If pattern not provided, use rule text as pattern pattern_value = pattern or rule description_value = description or rule conn.execute( """INSERT OR IGNORE INTO admin_rules (tenant_id, rule, pattern, severity, description, enabled, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)""", (tenant_id, rule, pattern_value, severity, description_value, 1 if enabled else 0, int(time.time())), ) conn.commit() return True except sqlite3.Error: return False def add_rules_bulk(self, tenant_id: str, rules: List[str]) -> List[str]: added = [] with sqlite3.connect(self.db_path) as conn: for rule in rules: try: conn.execute( "INSERT OR IGNORE INTO admin_rules (tenant_id, rule) VALUES (?, ?)", (tenant_id, rule), ) added.append(rule) except sqlite3.Error: continue conn.commit() return added def delete_rule(self, tenant_id: str, rule: str) -> bool: with sqlite3.connect(self.db_path) as conn: cursor = conn.execute( "DELETE FROM admin_rules WHERE tenant_id = ? AND rule = ?", (tenant_id, rule), ) conn.commit() return cursor.rowcount > 0