import sqlite3 import time import os from pathlib import Path from typing import List, Optional, Dict, Any # Try to import Supabase client try: from supabase import create_client, Client SUPABASE_AVAILABLE = True except ImportError: SUPABASE_AVAILABLE = False Client = None class RulesStore: """ Store for admin rules with support for both Supabase and SQLite. Uses Supabase if configured, otherwise falls back to SQLite. """ def __init__(self, use_supabase: Optional[bool] = None, auto_create_table: bool = False): """ Initialize RulesStore. Args: use_supabase: If True, use Supabase; if False, use SQLite. If None, auto-detect based on environment variables. auto_create_table: If True, attempt to create Supabase table if it doesn't exist. Default False to avoid blocking startup. Use create_table() method separately. """ self.use_supabase = use_supabase self._table_verified = False # Auto-detect if Supabase should be used if self.use_supabase is None: supabase_url = os.getenv("SUPABASE_URL") supabase_key = os.getenv("SUPABASE_SERVICE_KEY") self.use_supabase = bool(supabase_url and supabase_key and SUPABASE_AVAILABLE) if self.use_supabase: # Initialize Supabase client supabase_url = os.getenv("SUPABASE_URL") supabase_key = os.getenv("SUPABASE_SERVICE_KEY") if not supabase_url or not supabase_key: # Don't raise error, fall back to SQLite instead print("⚠️ Supabase credentials missing. Falling back to SQLite.") self.use_supabase = False else: try: self.supabase_client = create_client(supabase_url, supabase_key) self.table_name = "admin_rules" # Only verify table existence, don't create during init if auto_create_table: self._ensure_supabase_table() else: # Quick check without blocking self._quick_table_check() except Exception as e: print(f"⚠️ Failed to initialize Supabase client: {e}. Falling back to SQLite.") self.use_supabase = False if not self.use_supabase: # Initialize SQLite root_dir = Path(__file__).resolve().parents[3] # points to project root data_dir = root_dir / "data" data_dir.mkdir(parents=True, exist_ok=True) self.db_path = data_dir / "admin_rules.db" self._init_db() def _quick_table_check(self): """Quick non-blocking check if table exists.""" try: self.supabase_client.table(self.table_name).select("id").limit(1).execute() self._table_verified = True except Exception: # Table might not exist, but don't block startup self._table_verified = False def _ensure_supabase_table(self): """Ensure the Supabase table exists, create it if needed.""" try: # Try to query the table to see if it exists self.supabase_client.table(self.table_name).select("id").limit(1).execute() # If we get here, table exists self._table_verified = True return True except Exception as e: error_str = str(e).lower() if "relation" in error_str or "does not exist" in error_str or "not found" in error_str: # Table doesn't exist, try to create it (non-blocking) print(f"⚠️ Table '{self.table_name}' does not exist in Supabase.") print(" Run 'python create_supabase_table.py' to create it, or create manually in Supabase SQL Editor.") # Don't block startup - just log the issue return False else: # Other error, assume table exists self._table_verified = True return True def create_table_if_needed(self): """Public method to create table if needed. Can be called after startup.""" if not self.use_supabase: return False if self._table_verified: return True return self._create_supabase_table() def _create_supabase_table(self) -> bool: """Create the admin_rules table in Supabase programmatically.""" try: # Read SQL file sql_file = Path(__file__).resolve().parents[3] / "supabase_admin_rules_table.sql" if not sql_file.exists(): print(f" ⚠️ SQL file not found: {sql_file}") self._show_manual_instructions() return False with open(sql_file, "r", encoding="utf-8") as f: sql_content = f.read() # Method 1: Try using psql if POSTGRESQL_URL is available (non-blocking with timeout) postgres_url = os.getenv("POSTGRESQL_URL") if postgres_url: try: import subprocess print(" 🔧 Attempting to create table via psql...") # Execute SQL using psql with timeout result = subprocess.run( ["psql", postgres_url, "-c", sql_content], capture_output=True, text=True, timeout=10 # Shorter timeout to avoid blocking ) if result.returncode == 0: print(" ✅ Table created successfully via psql!") self._table_verified = True return True else: print(f" ⚠️ psql returned error: {result.stderr[:200]}") except FileNotFoundError: print(" ⚠️ psql not found in PATH") except subprocess.TimeoutExpired: print(" ⚠️ psql timed out (table creation may still be in progress)") except Exception as e: print(f" ⚠️ psql method failed: {e}") # Method 2: Try using Supabase REST API (if custom function exists) try: import httpx # Try different possible RPC endpoints endpoints = [ "/rest/v1/rpc/exec_sql", "/rest/v1/rpc/execute_sql", "/rest/v1/rpc/run_sql" ] for endpoint in endpoints: try: response = httpx.post( f"{os.getenv('SUPABASE_URL')}{endpoint}", headers={ "apikey": os.getenv("SUPABASE_SERVICE_KEY"), "Authorization": f"Bearer {os.getenv('SUPABASE_SERVICE_KEY')}", "Content-Type": "application/json", "Prefer": "return=representation" }, json={"query": sql_content, "sql": sql_content}, timeout=10 # Shorter timeout ) if response.status_code in [200, 201, 204]: print(" ✅ Table created successfully via API!") self._table_verified = True return True except Exception: continue except ImportError: pass except Exception as e: print(f" ⚠️ API method failed: {e}") # Method 3: Show manual instructions (don't block) self._show_manual_instructions() return False except Exception as e: print(f" ❌ Error: {e}") self._show_manual_instructions() return False def _show_manual_instructions(self): """Show instructions for manual table creation.""" sql_file = Path(__file__).resolve().parents[3] / "supabase_admin_rules_table.sql" print("\n 📝 Manual Setup Required:") print(" 1. Go to: https://app.supabase.com → Your Project → SQL Editor") print(" 2. Click 'New query'") if sql_file.exists(): print(f" 3. Open file: {sql_file.name}") print(" 4. Copy all SQL and paste into SQL Editor") else: print(" 3. Copy the SQL from supabase_admin_rules_table.sql") print(" 5. Click 'Run' to execute") print("\n 💡 After creating the table, restart your application.") def _init_db(self): with sqlite3.connect(self.db_path) as conn: # Create table with regex pattern and severity support conn.execute( """ CREATE TABLE IF NOT EXISTS admin_rules ( id INTEGER PRIMARY KEY AUTOINCREMENT, tenant_id TEXT NOT NULL, rule TEXT NOT NULL, pattern TEXT, severity TEXT DEFAULT 'medium', description TEXT, enabled BOOLEAN DEFAULT 1, created_at INTEGER, UNIQUE(tenant_id, rule) ) """ ) # Add new columns if they don't exist (for backward compatibility) try: conn.execute("ALTER TABLE admin_rules ADD COLUMN pattern TEXT") except sqlite3.OperationalError: pass # Column already exists try: conn.execute("ALTER TABLE admin_rules ADD COLUMN severity TEXT DEFAULT 'medium'") except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE admin_rules ADD COLUMN description TEXT") except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE admin_rules ADD COLUMN enabled BOOLEAN DEFAULT 1") except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE admin_rules ADD COLUMN created_at INTEGER") except sqlite3.OperationalError: pass conn.commit() def get_rules(self, tenant_id: str) -> List[str]: """Get all rules as a list of rule text strings (backward compatibility).""" if self.use_supabase: try: response = self.supabase_client.table(self.table_name)\ .select("rule")\ .eq("tenant_id", tenant_id)\ .eq("enabled", True)\ .order("id")\ .execute() return [row["rule"] for row in response.data] except Exception as e: print(f"Error fetching rules from Supabase: {e}") return [] else: with sqlite3.connect(self.db_path) as conn: cursor = conn.execute( "SELECT rule FROM admin_rules WHERE tenant_id = ? AND enabled = 1 ORDER BY id ASC", (tenant_id,), ) return [row[0] for row in cursor.fetchall()] def get_rules_detailed(self, tenant_id: str) -> List[Dict[str, Any]]: """Get all rules with full metadata including pattern, severity, etc.""" if self.use_supabase: try: response = self.supabase_client.table(self.table_name)\ .select("*")\ .eq("tenant_id", tenant_id)\ .eq("enabled", True)\ .order("id")\ .execute() # Convert to list of dicts and ensure created_at is a timestamp rules = [] for row in response.data: rule_dict = dict(row) # Convert created_at to Unix timestamp if it's a string if "created_at" in rule_dict and isinstance(rule_dict["created_at"], str): try: from datetime import datetime dt = datetime.fromisoformat(rule_dict["created_at"].replace("Z", "+00:00")) rule_dict["created_at"] = int(dt.timestamp()) except: rule_dict["created_at"] = int(time.time()) rules.append(rule_dict) return rules except Exception as e: print(f"Error fetching detailed rules from Supabase: {e}") return [] else: with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.execute( """SELECT id, tenant_id, rule, pattern, severity, description, enabled, created_at FROM admin_rules WHERE tenant_id = ? AND enabled = 1 ORDER BY id ASC""", (tenant_id,), ) rows = cursor.fetchall() return [dict(row) for row in rows] def add_rule( self, tenant_id: str, rule: str, pattern: Optional[str] = None, severity: str = "medium", description: Optional[str] = None, enabled: bool = True ) -> bool: """ Add a rule with optional regex pattern and severity. If pattern is None, the rule text itself is used as the pattern. """ # If pattern not provided, use rule text as pattern pattern_value = pattern or rule description_value = description or rule if self.use_supabase: try: # Use upsert to handle unique constraint data = { "tenant_id": tenant_id, "rule": rule, "pattern": pattern_value, "severity": severity, "description": description_value, "enabled": enabled } # Supabase upsert - will insert or update based on unique constraint response = self.supabase_client.table(self.table_name)\ .upsert(data)\ .execute() return True except Exception as e: print(f"Error adding rule to Supabase: {e}") return False else: try: with sqlite3.connect(self.db_path) as conn: conn.execute( """INSERT OR IGNORE INTO admin_rules (tenant_id, rule, pattern, severity, description, enabled, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)""", (tenant_id, rule, pattern_value, severity, description_value, 1 if enabled else 0, int(time.time())), ) conn.commit() return True except sqlite3.Error: return False def add_rules_bulk(self, tenant_id: str, rules: List[str]) -> List[str]: added = [] if self.use_supabase: try: # Prepare bulk data bulk_data = [ { "tenant_id": tenant_id, "rule": rule, "pattern": rule, # Use rule text as pattern "severity": "medium", "description": rule, "enabled": True } for rule in rules ] # Upsert all rules at once response = self.supabase_client.table(self.table_name)\ .upsert(bulk_data)\ .execute() added = rules # All rules were attempted except Exception as e: print(f"Error adding bulk rules to Supabase: {e}") # Fallback: try one by one for rule in rules: if self.add_rule(tenant_id, rule): added.append(rule) else: with sqlite3.connect(self.db_path) as conn: for rule in rules: try: conn.execute( "INSERT OR IGNORE INTO admin_rules (tenant_id, rule) VALUES (?, ?)", (tenant_id, rule), ) added.append(rule) except sqlite3.Error: continue conn.commit() return added def delete_rule(self, tenant_id: str, rule: str) -> bool: if self.use_supabase: try: response = self.supabase_client.table(self.table_name)\ .delete()\ .eq("tenant_id", tenant_id)\ .eq("rule", rule)\ .execute() # Check if any rows were deleted return len(response.data) > 0 if response.data else False except Exception as e: print(f"Error deleting rule from Supabase: {e}") return False else: with sqlite3.connect(self.db_path) as conn: cursor = conn.execute( "DELETE FROM admin_rules WHERE tenant_id = ? AND rule = ?", (tenant_id, rule), ) conn.commit() return cursor.rowcount > 0