Spaces:
Sleeping
Sleeping
| import sqlite3 | |
| import time | |
| from pathlib import Path | |
| from typing import List, Optional, Dict, Any | |
| class RulesStore: | |
| """ | |
| Lightweight SQLite-backed store for admin rules. | |
| Ensures data persists across restarts without requiring external DB setup. | |
| """ | |
| def __init__(self): | |
| root_dir = Path(__file__).resolve().parents[3] # points to project root | |
| data_dir = root_dir / "data" | |
| data_dir.mkdir(parents=True, exist_ok=True) | |
| self.db_path = data_dir / "admin_rules.db" | |
| self._init_db() | |
| def _init_db(self): | |
| with sqlite3.connect(self.db_path) as conn: | |
| # Create table with regex pattern and severity support | |
| conn.execute( | |
| """ | |
| CREATE TABLE IF NOT EXISTS admin_rules ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| tenant_id TEXT NOT NULL, | |
| rule TEXT NOT NULL, | |
| pattern TEXT, | |
| severity TEXT DEFAULT 'medium', | |
| description TEXT, | |
| enabled BOOLEAN DEFAULT 1, | |
| created_at INTEGER, | |
| UNIQUE(tenant_id, rule) | |
| ) | |
| """ | |
| ) | |
| # Add new columns if they don't exist (for backward compatibility) | |
| try: | |
| conn.execute("ALTER TABLE admin_rules ADD COLUMN pattern TEXT") | |
| except sqlite3.OperationalError: | |
| pass # Column already exists | |
| try: | |
| conn.execute("ALTER TABLE admin_rules ADD COLUMN severity TEXT DEFAULT 'medium'") | |
| except sqlite3.OperationalError: | |
| pass | |
| try: | |
| conn.execute("ALTER TABLE admin_rules ADD COLUMN description TEXT") | |
| except sqlite3.OperationalError: | |
| pass | |
| try: | |
| conn.execute("ALTER TABLE admin_rules ADD COLUMN enabled BOOLEAN DEFAULT 1") | |
| except sqlite3.OperationalError: | |
| pass | |
| try: | |
| conn.execute("ALTER TABLE admin_rules ADD COLUMN created_at INTEGER") | |
| except sqlite3.OperationalError: | |
| pass | |
| conn.commit() | |
| def get_rules(self, tenant_id: str) -> List[str]: | |
| """Get all rules as a list of rule text strings (backward compatibility).""" | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.execute( | |
| "SELECT rule FROM admin_rules WHERE tenant_id = ? AND enabled = 1 ORDER BY id ASC", | |
| (tenant_id,), | |
| ) | |
| return [row[0] for row in cursor.fetchall()] | |
| def get_rules_detailed(self, tenant_id: str) -> List[Dict[str, Any]]: | |
| """Get all rules with full metadata including pattern, severity, etc.""" | |
| with sqlite3.connect(self.db_path) as conn: | |
| conn.row_factory = sqlite3.Row | |
| cursor = conn.execute( | |
| """SELECT id, tenant_id, rule, pattern, severity, description, enabled, created_at | |
| FROM admin_rules WHERE tenant_id = ? AND enabled = 1 ORDER BY id ASC""", | |
| (tenant_id,), | |
| ) | |
| rows = cursor.fetchall() | |
| return [dict(row) for row in rows] | |
| def add_rule( | |
| self, | |
| tenant_id: str, | |
| rule: str, | |
| pattern: Optional[str] = None, | |
| severity: str = "medium", | |
| description: Optional[str] = None, | |
| enabled: bool = True | |
| ) -> bool: | |
| """ | |
| Add a rule with optional regex pattern and severity. | |
| If pattern is None, the rule text itself is used as the pattern. | |
| """ | |
| try: | |
| with sqlite3.connect(self.db_path) as conn: | |
| # If pattern not provided, use rule text as pattern | |
| pattern_value = pattern or rule | |
| description_value = description or rule | |
| conn.execute( | |
| """INSERT OR IGNORE INTO admin_rules | |
| (tenant_id, rule, pattern, severity, description, enabled, created_at) | |
| VALUES (?, ?, ?, ?, ?, ?, ?)""", | |
| (tenant_id, rule, pattern_value, severity, description_value, 1 if enabled else 0, int(time.time())), | |
| ) | |
| conn.commit() | |
| return True | |
| except sqlite3.Error: | |
| return False | |
| def add_rules_bulk(self, tenant_id: str, rules: List[str]) -> List[str]: | |
| added = [] | |
| with sqlite3.connect(self.db_path) as conn: | |
| for rule in rules: | |
| try: | |
| conn.execute( | |
| "INSERT OR IGNORE INTO admin_rules (tenant_id, rule) VALUES (?, ?)", | |
| (tenant_id, rule), | |
| ) | |
| added.append(rule) | |
| except sqlite3.Error: | |
| continue | |
| conn.commit() | |
| return added | |
| def delete_rule(self, tenant_id: str, rule: str) -> bool: | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.execute( | |
| "DELETE FROM admin_rules WHERE tenant_id = ? AND rule = ?", | |
| (tenant_id, rule), | |
| ) | |
| conn.commit() | |
| return cursor.rowcount > 0 | |