IntegraChat / backend /api /storage /rules_store.py
nothingworry's picture
feat: Enhance admin rules with file upload, drag-and-drop, chunk processing, and improved UI
a477044
raw
history blame
18.2 kB
import sqlite3
import time
import os
from pathlib import Path
from typing import List, Optional, Dict, Any
# Try to import Supabase client
try:
from supabase import create_client, Client
SUPABASE_AVAILABLE = True
except ImportError:
SUPABASE_AVAILABLE = False
Client = None
class RulesStore:
"""
Store for admin rules with support for both Supabase and SQLite.
Uses Supabase if configured, otherwise falls back to SQLite.
"""
def __init__(self, use_supabase: Optional[bool] = None, auto_create_table: bool = False):
"""
Initialize RulesStore.
Args:
use_supabase: If True, use Supabase; if False, use SQLite.
If None, auto-detect based on environment variables.
auto_create_table: If True, attempt to create Supabase table if it doesn't exist.
Default False to avoid blocking startup. Use create_table() method separately.
"""
self.use_supabase = use_supabase
self._table_verified = False
# Auto-detect if Supabase should be used
if self.use_supabase is None:
supabase_url = os.getenv("SUPABASE_URL")
supabase_key = os.getenv("SUPABASE_SERVICE_KEY")
self.use_supabase = bool(supabase_url and supabase_key and SUPABASE_AVAILABLE)
if self.use_supabase:
# Initialize Supabase client
supabase_url = os.getenv("SUPABASE_URL")
supabase_key = os.getenv("SUPABASE_SERVICE_KEY")
if not supabase_url or not supabase_key:
# Don't raise error, fall back to SQLite instead
print("⚠️ Supabase credentials missing. Falling back to SQLite.")
self.use_supabase = False
else:
try:
self.supabase_client = create_client(supabase_url, supabase_key)
self.table_name = "admin_rules"
# Only verify table existence, don't create during init
if auto_create_table:
self._ensure_supabase_table()
else:
# Quick check without blocking
self._quick_table_check()
except Exception as e:
print(f"⚠️ Failed to initialize Supabase client: {e}. Falling back to SQLite.")
self.use_supabase = False
if not self.use_supabase:
# Initialize SQLite
root_dir = Path(__file__).resolve().parents[3] # points to project root
data_dir = root_dir / "data"
data_dir.mkdir(parents=True, exist_ok=True)
self.db_path = data_dir / "admin_rules.db"
self._init_db()
def _quick_table_check(self):
"""Quick non-blocking check if table exists."""
try:
self.supabase_client.table(self.table_name).select("id").limit(1).execute()
self._table_verified = True
except Exception:
# Table might not exist, but don't block startup
self._table_verified = False
def _ensure_supabase_table(self):
"""Ensure the Supabase table exists, create it if needed."""
try:
# Try to query the table to see if it exists
self.supabase_client.table(self.table_name).select("id").limit(1).execute()
# If we get here, table exists
self._table_verified = True
return True
except Exception as e:
error_str = str(e).lower()
if "relation" in error_str or "does not exist" in error_str or "not found" in error_str:
# Table doesn't exist, try to create it (non-blocking)
print(f"⚠️ Table '{self.table_name}' does not exist in Supabase.")
print(" Run 'python create_supabase_table.py' to create it, or create manually in Supabase SQL Editor.")
# Don't block startup - just log the issue
return False
else:
# Other error, assume table exists
self._table_verified = True
return True
def create_table_if_needed(self):
"""Public method to create table if needed. Can be called after startup."""
if not self.use_supabase:
return False
if self._table_verified:
return True
return self._create_supabase_table()
def _create_supabase_table(self) -> bool:
"""Create the admin_rules table in Supabase programmatically."""
try:
# Read SQL file
sql_file = Path(__file__).resolve().parents[3] / "supabase_admin_rules_table.sql"
if not sql_file.exists():
print(f" ⚠️ SQL file not found: {sql_file}")
self._show_manual_instructions()
return False
with open(sql_file, "r", encoding="utf-8") as f:
sql_content = f.read()
# Method 1: Try using psql if POSTGRESQL_URL is available (non-blocking with timeout)
postgres_url = os.getenv("POSTGRESQL_URL")
if postgres_url:
try:
import subprocess
print(" 🔧 Attempting to create table via psql...")
# Execute SQL using psql with timeout
result = subprocess.run(
["psql", postgres_url, "-c", sql_content],
capture_output=True,
text=True,
timeout=10 # Shorter timeout to avoid blocking
)
if result.returncode == 0:
print(" ✅ Table created successfully via psql!")
self._table_verified = True
return True
else:
print(f" ⚠️ psql returned error: {result.stderr[:200]}")
except FileNotFoundError:
print(" ⚠️ psql not found in PATH")
except subprocess.TimeoutExpired:
print(" ⚠️ psql timed out (table creation may still be in progress)")
except Exception as e:
print(f" ⚠️ psql method failed: {e}")
# Method 2: Try using Supabase REST API (if custom function exists)
try:
import httpx
# Try different possible RPC endpoints
endpoints = [
"/rest/v1/rpc/exec_sql",
"/rest/v1/rpc/execute_sql",
"/rest/v1/rpc/run_sql"
]
for endpoint in endpoints:
try:
response = httpx.post(
f"{os.getenv('SUPABASE_URL')}{endpoint}",
headers={
"apikey": os.getenv("SUPABASE_SERVICE_KEY"),
"Authorization": f"Bearer {os.getenv('SUPABASE_SERVICE_KEY')}",
"Content-Type": "application/json",
"Prefer": "return=representation"
},
json={"query": sql_content, "sql": sql_content},
timeout=10 # Shorter timeout
)
if response.status_code in [200, 201, 204]:
print(" ✅ Table created successfully via API!")
self._table_verified = True
return True
except Exception:
continue
except ImportError:
pass
except Exception as e:
print(f" ⚠️ API method failed: {e}")
# Method 3: Show manual instructions (don't block)
self._show_manual_instructions()
return False
except Exception as e:
print(f" ❌ Error: {e}")
self._show_manual_instructions()
return False
def _show_manual_instructions(self):
"""Show instructions for manual table creation."""
sql_file = Path(__file__).resolve().parents[3] / "supabase_admin_rules_table.sql"
print("\n 📝 Manual Setup Required:")
print(" 1. Go to: https://app.supabase.com → Your Project → SQL Editor")
print(" 2. Click 'New query'")
if sql_file.exists():
print(f" 3. Open file: {sql_file.name}")
print(" 4. Copy all SQL and paste into SQL Editor")
else:
print(" 3. Copy the SQL from supabase_admin_rules_table.sql")
print(" 5. Click 'Run' to execute")
print("\n 💡 After creating the table, restart your application.")
def _init_db(self):
with sqlite3.connect(self.db_path) as conn:
# Create table with regex pattern and severity support
conn.execute(
"""
CREATE TABLE IF NOT EXISTS admin_rules (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tenant_id TEXT NOT NULL,
rule TEXT NOT NULL,
pattern TEXT,
severity TEXT DEFAULT 'medium',
description TEXT,
enabled BOOLEAN DEFAULT 1,
created_at INTEGER,
UNIQUE(tenant_id, rule)
)
"""
)
# Add new columns if they don't exist (for backward compatibility)
try:
conn.execute("ALTER TABLE admin_rules ADD COLUMN pattern TEXT")
except sqlite3.OperationalError:
pass # Column already exists
try:
conn.execute("ALTER TABLE admin_rules ADD COLUMN severity TEXT DEFAULT 'medium'")
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE admin_rules ADD COLUMN description TEXT")
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE admin_rules ADD COLUMN enabled BOOLEAN DEFAULT 1")
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE admin_rules ADD COLUMN created_at INTEGER")
except sqlite3.OperationalError:
pass
conn.commit()
def get_rules(self, tenant_id: str) -> List[str]:
"""Get all rules as a list of rule text strings (backward compatibility)."""
if self.use_supabase:
try:
response = self.supabase_client.table(self.table_name)\
.select("rule")\
.eq("tenant_id", tenant_id)\
.eq("enabled", True)\
.order("id")\
.execute()
return [row["rule"] for row in response.data]
except Exception as e:
print(f"Error fetching rules from Supabase: {e}")
return []
else:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"SELECT rule FROM admin_rules WHERE tenant_id = ? AND enabled = 1 ORDER BY id ASC",
(tenant_id,),
)
return [row[0] for row in cursor.fetchall()]
def get_rules_detailed(self, tenant_id: str) -> List[Dict[str, Any]]:
"""Get all rules with full metadata including pattern, severity, etc."""
if self.use_supabase:
try:
response = self.supabase_client.table(self.table_name)\
.select("*")\
.eq("tenant_id", tenant_id)\
.eq("enabled", True)\
.order("id")\
.execute()
# Convert to list of dicts and ensure created_at is a timestamp
rules = []
for row in response.data:
rule_dict = dict(row)
# Convert created_at to Unix timestamp if it's a string
if "created_at" in rule_dict and isinstance(rule_dict["created_at"], str):
try:
from datetime import datetime
dt = datetime.fromisoformat(rule_dict["created_at"].replace("Z", "+00:00"))
rule_dict["created_at"] = int(dt.timestamp())
except:
rule_dict["created_at"] = int(time.time())
rules.append(rule_dict)
return rules
except Exception as e:
print(f"Error fetching detailed rules from Supabase: {e}")
return []
else:
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute(
"""SELECT id, tenant_id, rule, pattern, severity, description, enabled, created_at
FROM admin_rules WHERE tenant_id = ? AND enabled = 1 ORDER BY id ASC""",
(tenant_id,),
)
rows = cursor.fetchall()
return [dict(row) for row in rows]
def add_rule(
self,
tenant_id: str,
rule: str,
pattern: Optional[str] = None,
severity: str = "medium",
description: Optional[str] = None,
enabled: bool = True
) -> bool:
"""
Add a rule with optional regex pattern and severity.
If pattern is None, the rule text itself is used as the pattern.
"""
# If pattern not provided, use rule text as pattern
pattern_value = pattern or rule
description_value = description or rule
if self.use_supabase:
try:
# Use upsert to handle unique constraint
data = {
"tenant_id": tenant_id,
"rule": rule,
"pattern": pattern_value,
"severity": severity,
"description": description_value,
"enabled": enabled
}
# Supabase upsert - will insert or update based on unique constraint
response = self.supabase_client.table(self.table_name)\
.upsert(data)\
.execute()
return True
except Exception as e:
print(f"Error adding rule to Supabase: {e}")
return False
else:
try:
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""INSERT OR IGNORE INTO admin_rules
(tenant_id, rule, pattern, severity, description, enabled, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(tenant_id, rule, pattern_value, severity, description_value, 1 if enabled else 0, int(time.time())),
)
conn.commit()
return True
except sqlite3.Error:
return False
def add_rules_bulk(self, tenant_id: str, rules: List[str]) -> List[str]:
added = []
if self.use_supabase:
try:
# Prepare bulk data
bulk_data = [
{
"tenant_id": tenant_id,
"rule": rule,
"pattern": rule, # Use rule text as pattern
"severity": "medium",
"description": rule,
"enabled": True
}
for rule in rules
]
# Upsert all rules at once
response = self.supabase_client.table(self.table_name)\
.upsert(bulk_data)\
.execute()
added = rules # All rules were attempted
except Exception as e:
print(f"Error adding bulk rules to Supabase: {e}")
# Fallback: try one by one
for rule in rules:
if self.add_rule(tenant_id, rule):
added.append(rule)
else:
with sqlite3.connect(self.db_path) as conn:
for rule in rules:
try:
conn.execute(
"INSERT OR IGNORE INTO admin_rules (tenant_id, rule) VALUES (?, ?)",
(tenant_id, rule),
)
added.append(rule)
except sqlite3.Error:
continue
conn.commit()
return added
def delete_rule(self, tenant_id: str, rule: str) -> bool:
if self.use_supabase:
try:
response = self.supabase_client.table(self.table_name)\
.delete()\
.eq("tenant_id", tenant_id)\
.eq("rule", rule)\
.execute()
# Check if any rows were deleted
return len(response.data) > 0 if response.data else False
except Exception as e:
print(f"Error deleting rule from Supabase: {e}")
return False
else:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"DELETE FROM admin_rules WHERE tenant_id = ? AND rule = ?",
(tenant_id, rule),
)
conn.commit()
return cursor.rowcount > 0