Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| One-shot migration script that copies existing SQLite data (admin rules + analytics) | |
| into Supabase tables. Run this after creating the Supabase schemas: | |
| 1. supabase_admin_rules_table.sql | |
| 2. supabase_analytics_tables.sql | |
| Usage: | |
| python migrate_sqlite_to_supabase.py [--force] | |
| Connection Methods (choose one): | |
| - POSTGRESQL_URL (recommended): Direct PostgreSQL connection | |
| Format: postgresql://user:password@host:port/database | |
| - SUPABASE_URL + SUPABASE_SERVICE_KEY: Supabase REST API | |
| Requires: SUPABASE_URL and SUPABASE_SERVICE_KEY in your .env | |
| Notes: | |
| - Does not delete local SQLite data | |
| - Re-running without --force will skip tables that already contain Supabase rows | |
| - POSTGRESQL_URL method is faster and doesn't require service_role key | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import os | |
| import socket | |
| import sqlite3 | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Any, Dict, Iterable, List | |
| from dotenv import load_dotenv | |
| # Try to import both connection methods | |
| try: | |
| from supabase import Client, create_client | |
| SUPABASE_AVAILABLE = True | |
| except ImportError: | |
| SUPABASE_AVAILABLE = False | |
| Client = None | |
| try: | |
| import psycopg2 | |
| from psycopg2.extras import execute_batch | |
| PSYCOPG2_AVAILABLE = True | |
| except ImportError: | |
| PSYCOPG2_AVAILABLE = False | |
| BATCH_SIZE = 500 | |
| def chunked(items: List[Dict[str, Any]], size: int) -> Iterable[List[Dict[str, Any]]]: | |
| for i in range(0, len(items), size): | |
| yield items[i : i + size] | |
| def get_connection_method(): | |
| """Determine which connection method to use: PostgreSQL direct or Supabase API.""" | |
| load_dotenv() | |
| postgres_url = os.getenv("POSTGRESQL_URL") | |
| supabase_url = os.getenv("SUPABASE_URL") | |
| supabase_key = os.getenv("SUPABASE_SERVICE_KEY") | |
| # Prefer PostgreSQL direct connection if available | |
| if postgres_url and PSYCOPG2_AVAILABLE: | |
| return "postgresql" | |
| elif supabase_url and supabase_key and SUPABASE_AVAILABLE: | |
| return "supabase" | |
| else: | |
| return None | |
| def get_postgres_connection(): | |
| """Get direct PostgreSQL connection using POSTGRESQL_URL.""" | |
| load_dotenv() | |
| postgres_url = os.getenv("POSTGRESQL_URL") | |
| if not postgres_url: | |
| raise RuntimeError( | |
| "POSTGRESQL_URL not set in .env file.\n" | |
| " Format: postgresql://user:password@host:port/database" | |
| ) | |
| if not PSYCOPG2_AVAILABLE: | |
| raise RuntimeError( | |
| "psycopg2 not installed. Install it with: pip install psycopg2-binary" | |
| ) | |
| try: | |
| conn = psycopg2.connect(postgres_url) | |
| return conn | |
| except Exception as e: | |
| raise RuntimeError( | |
| f"Failed to connect to PostgreSQL: {e}\n" | |
| " Check that POSTGRESQL_URL is correct and the database is accessible." | |
| ) from e | |
| def load_supabase_client() -> Client: | |
| load_dotenv() | |
| url = os.getenv("SUPABASE_URL") | |
| key = os.getenv("SUPABASE_SERVICE_KEY") | |
| if not url or not key: | |
| raise RuntimeError( | |
| "Supabase credentials missing. Set SUPABASE_URL and SUPABASE_SERVICE_KEY in your .env file.\n" | |
| f" SUPABASE_URL: {'β Set' if url else 'β Missing'}\n" | |
| f" SUPABASE_SERVICE_KEY: {'β Set' if key else 'β Missing'}" | |
| ) | |
| # Validate URL format | |
| if not url.startswith("https://"): | |
| raise RuntimeError( | |
| f"Invalid SUPABASE_URL format. Expected https://... but got: {url[:50]}...\n" | |
| " Example: https://your-project-id.supabase.co" | |
| ) | |
| if ".supabase.co" not in url: | |
| print(f"β οΈ Warning: SUPABASE_URL doesn't contain '.supabase.co': {url[:50]}...") | |
| print(" Make sure this is the correct Supabase project URL.") | |
| # Validate and clean API key format | |
| key_trimmed = key.strip() | |
| if key_trimmed != key: | |
| print("β οΈ Warning: SUPABASE_SERVICE_KEY has leading/trailing whitespace. Trimming...") | |
| key = key_trimmed # Use trimmed version | |
| if not key.startswith("eyJ"): | |
| print("β οΈ Warning: SUPABASE_SERVICE_KEY doesn't start with 'eyJ' (expected JWT format)") | |
| print(" Make sure you're using the service_role key, not the anon key.") | |
| if len(key) < 100: | |
| print("β οΈ Warning: SUPABASE_SERVICE_KEY seems too short (should be ~200+ characters)") | |
| print(" Make sure you copied the entire key from Supabase Dashboard.") | |
| # Mask URL and key for display | |
| masked_url = url[:20] + "..." + url[-15:] if len(url) > 35 else url[:20] + "..." | |
| masked_key = key[:10] + "..." + key[-10:] if len(key) > 20 else key[:10] + "..." | |
| print(f"π Connecting to Supabase: {masked_url}") | |
| print(f"π Using API key: {masked_key} ({len(key)} chars)") | |
| # Test DNS resolution first | |
| try: | |
| hostname = url.replace("https://", "").replace("http://", "").split("/")[0] | |
| print(f" Resolving DNS for: {hostname}...") | |
| socket.gethostbyname(hostname) | |
| print(" β DNS resolution successful") | |
| except socket.gaierror as dns_err: | |
| raise RuntimeError( | |
| f"β Cannot resolve DNS for Supabase URL: {url}\n" | |
| " This usually means:\n" | |
| " 1. The Supabase project doesn't exist or was deleted\n" | |
| " 2. The project is paused (check Supabase Dashboard)\n" | |
| " 3. The URL is incorrect\n" | |
| " 4. Network/DNS connectivity issue\n\n" | |
| " To fix:\n" | |
| " 1. Go to https://app.supabase.com and check your project status\n" | |
| " 2. If paused, resume it from the dashboard\n" | |
| " 3. Copy the correct URL from Settings β API\n" | |
| f" DNS Error: {dns_err}" | |
| ) from dns_err | |
| try: | |
| client = create_client(url, key) | |
| # Test connection with a simple query | |
| print(" Testing connection...") | |
| client.table("admin_rules").select("id").limit(0).execute() | |
| print(" β Connection successful!") | |
| return client | |
| except Exception as e: | |
| error_msg = str(e) | |
| if "getaddrinfo failed" in error_msg or "ConnectError" in str(type(e)): | |
| raise RuntimeError( | |
| f"β Cannot connect to Supabase URL: {url}\n" | |
| " Possible issues:\n" | |
| " 1. URL is incomplete or incorrect (should be: https://xxxxx.supabase.co)\n" | |
| " 2. Network connectivity problem\n" | |
| " 3. Supabase project doesn't exist or is paused\n" | |
| " 4. Firewall/proxy blocking the connection\n\n" | |
| " Check your Supabase project at: https://app.supabase.com\n" | |
| f" Error: {error_msg}" | |
| ) from e | |
| else: | |
| # Check if it's an API key error | |
| if "Invalid API key" in error_msg or "401" in error_msg: | |
| raise RuntimeError( | |
| f"β Invalid API Key Error\n" | |
| " The SUPABASE_SERVICE_KEY in your .env file is incorrect.\n\n" | |
| " To fix:\n" | |
| " 1. Go to https://app.supabase.com β Your Project β Settings β API\n" | |
| " 2. Find the 'service_role' key (NOT the 'anon' key)\n" | |
| " 3. Click 'Reveal' to show the full key\n" | |
| " 4. Copy the ENTIRE key (it's very long, ~200+ characters)\n" | |
| " 5. Update SUPABASE_SERVICE_KEY in your .env file\n" | |
| " 6. Make sure there are NO quotes, spaces, or line breaks\n\n" | |
| f" Current key length: {len(key)} characters\n" | |
| f" Expected: ~200+ characters (JWT token starting with 'eyJ')\n\n" | |
| f" Error details: {error_msg}" | |
| ) from e | |
| else: | |
| raise RuntimeError( | |
| f"β Failed to connect to Supabase: {error_msg}\n" | |
| " Check that:\n" | |
| " 1. SUPABASE_URL is correct and complete\n" | |
| " 2. SUPABASE_SERVICE_KEY is the service_role key (not anon key)\n" | |
| " 3. Your Supabase project is active (not paused)\n" | |
| " 4. The tables exist (run supabase_admin_rules_table.sql and supabase_analytics_tables.sql)" | |
| ) from e | |
| def sqlite_rows(db_path: Path, query: str) -> List[Dict[str, Any]]: | |
| conn = sqlite3.connect(db_path) | |
| conn.row_factory = sqlite3.Row | |
| rows = conn.execute(query).fetchall() | |
| conn.close() | |
| return [dict(row) for row in rows] | |
| def warn_if_table_has_rows(client: Client, table: str) -> bool: | |
| response = client.table(table).select("id", count="exact").limit(1).execute() | |
| count = getattr(response, "count", None) | |
| return bool(count and count > 0) | |
| def iso_from_unix(ts: Any) -> str | None: | |
| if ts is None: | |
| return None | |
| try: | |
| return datetime.fromtimestamp(int(ts), tz=timezone.utc).isoformat() | |
| except (ValueError, TypeError): | |
| return None | |
| def migrate_rules(client: Client, db_path: Path, force: bool): | |
| table = "admin_rules" | |
| if not force and warn_if_table_has_rows(client, table): | |
| print(f"β οΈ Supabase table '{table}' already has rows. Skipping (use --force to override).") | |
| return | |
| if not db_path.exists(): | |
| print(f"βΉοΈ No local rules database found at {db_path}, skipping rules migration.") | |
| return | |
| rows = sqlite_rows( | |
| db_path, | |
| """ | |
| SELECT tenant_id, rule, pattern, severity, description, enabled, created_at | |
| FROM admin_rules | |
| """, | |
| ) | |
| if not rows: | |
| print("βΉοΈ No rules to migrate.") | |
| return | |
| payload = [] | |
| for row in rows: | |
| payload.append( | |
| { | |
| "tenant_id": row["tenant_id"], | |
| "rule": row["rule"], | |
| "pattern": row["pattern"] or row["rule"], | |
| "severity": row.get("severity") or "medium", | |
| "description": row.get("description") or row["rule"], | |
| "enabled": bool(row.get("enabled", 1)), | |
| "created_at": iso_from_unix(row.get("created_at")) or None, | |
| } | |
| ) | |
| for batch in chunked(payload, BATCH_SIZE): | |
| client.table(table).upsert(batch, on_conflict="tenant_id,rule").execute() | |
| print(f"β Migrated {len(payload)} admin rule(s) to Supabase.") | |
| def migrate_tool_usage(client: Client, db_path: Path, force: bool): | |
| table = "tool_usage_events" | |
| if not force and warn_if_table_has_rows(client, table): | |
| print(f"β οΈ Supabase table '{table}' already has rows. Skipping (use --force to override).") | |
| return | |
| rows = sqlite_rows(db_path, "SELECT * FROM tool_usage_events") | |
| if not rows: | |
| print("βΉοΈ No tool usage events to migrate.") | |
| return | |
| payload = [] | |
| for row in rows: | |
| metadata = row.get("metadata") | |
| payload.append( | |
| { | |
| "tenant_id": row["tenant_id"], | |
| "user_id": row.get("user_id"), | |
| "tool_name": row["tool_name"], | |
| "timestamp": row["timestamp"], | |
| "latency_ms": row.get("latency_ms"), | |
| "tokens_used": row.get("tokens_used"), | |
| "success": bool(row.get("success", 1)), | |
| "error_message": row.get("error_message"), | |
| "metadata": json.loads(metadata) if metadata else None, | |
| } | |
| ) | |
| for batch in chunked(payload, BATCH_SIZE): | |
| client.table(table).insert(batch).execute() | |
| print(f"β Migrated {len(payload)} tool usage event(s).") | |
| def migrate_redflags(client: Client, db_path: Path, force: bool): | |
| table = "redflag_violations" | |
| if not force and warn_if_table_has_rows(client, table): | |
| print(f"β οΈ Supabase table '{table}' already has rows. Skipping (use --force to override).") | |
| return | |
| rows = sqlite_rows(db_path, "SELECT * FROM redflag_violations") | |
| if not rows: | |
| print("βΉοΈ No red-flag violations to migrate.") | |
| return | |
| payload = [] | |
| for row in rows: | |
| payload.append( | |
| { | |
| "tenant_id": row["tenant_id"], | |
| "user_id": row.get("user_id"), | |
| "rule_id": row["rule_id"], | |
| "rule_pattern": row.get("rule_pattern"), | |
| "severity": row["severity"], | |
| "matched_text": row.get("matched_text"), | |
| "confidence": row.get("confidence"), | |
| "message_preview": row.get("message_preview"), | |
| "timestamp": row["timestamp"], | |
| } | |
| ) | |
| for batch in chunked(payload, BATCH_SIZE): | |
| client.table(table).insert(batch).execute() | |
| print(f"β Migrated {len(payload)} red-flag violation(s).") | |
| def migrate_rag_searches(client: Client, db_path: Path, force: bool): | |
| table = "rag_search_events" | |
| if not force and warn_if_table_has_rows(client, table): | |
| print(f"β οΈ Supabase table '{table}' already has rows. Skipping (use --force to override).") | |
| return | |
| rows = sqlite_rows(db_path, "SELECT * FROM rag_search_events") | |
| if not rows: | |
| print("βΉοΈ No RAG search events to migrate.") | |
| return | |
| payload = [] | |
| for row in rows: | |
| payload.append( | |
| { | |
| "tenant_id": row["tenant_id"], | |
| "query": row["query"], | |
| "hits_count": row.get("hits_count"), | |
| "avg_score": row.get("avg_score"), | |
| "top_score": row.get("top_score"), | |
| "timestamp": row["timestamp"], | |
| "latency_ms": row.get("latency_ms"), | |
| } | |
| ) | |
| for batch in chunked(payload, BATCH_SIZE): | |
| client.table(table).insert(batch).execute() | |
| print(f"β Migrated {len(payload)} RAG search event(s).") | |
| def migrate_agent_queries(client: Client, db_path: Path, force: bool): | |
| table = "agent_query_events" | |
| if not force and warn_if_table_has_rows(client, table): | |
| print(f"β οΈ Supabase table '{table}' already has rows. Skipping (use --force to override).") | |
| return | |
| rows = sqlite_rows(db_path, "SELECT * FROM agent_query_events") | |
| if not rows: | |
| print("βΉοΈ No agent query events to migrate.") | |
| return | |
| payload = [] | |
| for row in rows: | |
| tools = row.get("tools_used") | |
| payload.append( | |
| { | |
| "tenant_id": row["tenant_id"], | |
| "user_id": row.get("user_id"), | |
| "message_preview": row.get("message_preview"), | |
| "intent": row.get("intent"), | |
| "tools_used": json.loads(tools) if tools else None, | |
| "total_tokens": row.get("total_tokens"), | |
| "total_latency_ms": row.get("total_latency_ms"), | |
| "success": bool(row.get("success", 1)), | |
| "timestamp": row["timestamp"], | |
| } | |
| ) | |
| for batch in chunked(payload, BATCH_SIZE): | |
| client.table(table).insert(batch).execute() | |
| print(f"β Migrated {len(payload)} agent query event(s).") | |
| def migrate_rules_postgres(conn, db_path: Path, force: bool, check_table_func): | |
| """Migrate rules using PostgreSQL direct connection.""" | |
| table = "admin_rules" | |
| # Check if table exists | |
| if not table_exists_postgres(conn, table): | |
| print(f"β Table '{table}' does not exist in PostgreSQL!") | |
| print(f" Please create it first by running 'supabase_admin_rules_table.sql' in Supabase SQL Editor") | |
| print(f" Skipping rules migration.") | |
| return | |
| if not force and check_table_func(table): | |
| print(f"β οΈ Supabase table '{table}' already has rows. Skipping (use --force to override).") | |
| return | |
| if not db_path.exists(): | |
| print(f"βΉοΈ No local rules database found at {db_path}, skipping rules migration.") | |
| return | |
| rows = sqlite_rows( | |
| db_path, | |
| """ | |
| SELECT tenant_id, rule, pattern, severity, description, enabled, created_at | |
| FROM admin_rules | |
| """, | |
| ) | |
| if not rows: | |
| print("βΉοΈ No rules to migrate.") | |
| return | |
| payload = [] | |
| for row in rows: | |
| payload.append({ | |
| "tenant_id": row["tenant_id"], | |
| "rule": row["rule"], | |
| "pattern": row["pattern"] or row["rule"], | |
| "severity": row.get("severity") or "medium", | |
| "description": row.get("description") or row["rule"], | |
| "enabled": bool(row.get("enabled", 1)), | |
| "created_at": iso_from_unix(row.get("created_at")) or None, | |
| }) | |
| columns = ["tenant_id", "rule", "pattern", "severity", "description", "enabled", "created_at"] | |
| for batch in chunked(payload, BATCH_SIZE): | |
| insert_batch_postgres(conn, table, columns, batch, on_conflict="tenant_id,rule") | |
| print(f"β Migrated {len(payload)} admin rule(s) to Supabase.") | |
| def migrate_tool_usage_postgres(conn, db_path: Path, force: bool, check_table_func): | |
| """Migrate tool usage events using PostgreSQL direct connection.""" | |
| table = "tool_usage_events" | |
| if not table_exists_postgres(conn, table): | |
| print(f"β Table '{table}' does not exist in PostgreSQL!") | |
| print(f" Please create it first by running 'supabase_analytics_tables.sql' in Supabase SQL Editor") | |
| print(f" Skipping tool usage migration.") | |
| return | |
| if not force and check_table_func(table): | |
| print(f"β οΈ Supabase table '{table}' already has rows. Skipping (use --force to override).") | |
| return | |
| rows = sqlite_rows(db_path, "SELECT * FROM tool_usage_events") | |
| if not rows: | |
| print("βΉοΈ No tool usage events to migrate.") | |
| return | |
| payload = [] | |
| for row in rows: | |
| metadata = row.get("metadata") | |
| payload.append({ | |
| "tenant_id": row["tenant_id"], | |
| "user_id": row.get("user_id"), | |
| "tool_name": row["tool_name"], | |
| "timestamp": row["timestamp"], | |
| "latency_ms": row.get("latency_ms"), | |
| "tokens_used": row.get("tokens_used"), | |
| "success": bool(row.get("success", 1)), | |
| "error_message": row.get("error_message"), | |
| "metadata": json.loads(metadata) if metadata else None, | |
| }) | |
| columns = ["tenant_id", "user_id", "tool_name", "timestamp", "latency_ms", "tokens_used", "success", "error_message", "metadata"] | |
| for batch in chunked(payload, BATCH_SIZE): | |
| insert_batch_postgres(conn, table, columns, batch) | |
| print(f"β Migrated {len(payload)} tool usage event(s).") | |
| def migrate_redflags_postgres(conn, db_path: Path, force: bool, check_table_func): | |
| """Migrate redflag violations using PostgreSQL direct connection.""" | |
| table = "redflag_violations" | |
| if not table_exists_postgres(conn, table): | |
| print(f"β Table '{table}' does not exist in PostgreSQL!") | |
| print(f" Please create it first by running 'supabase_analytics_tables.sql' in Supabase SQL Editor") | |
| print(f" Skipping redflag migration.") | |
| return | |
| if not force and check_table_func(table): | |
| print(f"β οΈ Supabase table '{table}' already has rows. Skipping (use --force to override).") | |
| return | |
| rows = sqlite_rows(db_path, "SELECT * FROM redflag_violations") | |
| if not rows: | |
| print("βΉοΈ No red-flag violations to migrate.") | |
| return | |
| payload = [] | |
| for row in rows: | |
| payload.append({ | |
| "tenant_id": row["tenant_id"], | |
| "user_id": row.get("user_id"), | |
| "rule_id": row["rule_id"], | |
| "rule_pattern": row.get("rule_pattern"), | |
| "severity": row["severity"], | |
| "matched_text": row.get("matched_text"), | |
| "confidence": row.get("confidence"), | |
| "message_preview": row.get("message_preview"), | |
| "timestamp": row["timestamp"], | |
| }) | |
| columns = ["tenant_id", "user_id", "rule_id", "rule_pattern", "severity", "matched_text", "confidence", "message_preview", "timestamp"] | |
| for batch in chunked(payload, BATCH_SIZE): | |
| insert_batch_postgres(conn, table, columns, batch) | |
| print(f"β Migrated {len(payload)} red-flag violation(s).") | |
| def migrate_rag_searches_postgres(conn, db_path: Path, force: bool, check_table_func): | |
| """Migrate RAG search events using PostgreSQL direct connection.""" | |
| table = "rag_search_events" | |
| if not table_exists_postgres(conn, table): | |
| print(f"β Table '{table}' does not exist in PostgreSQL!") | |
| print(f" Please create it first by running 'supabase_analytics_tables.sql' in Supabase SQL Editor") | |
| print(f" Skipping RAG search migration.") | |
| return | |
| if not force and check_table_func(table): | |
| print(f"β οΈ Supabase table '{table}' already has rows. Skipping (use --force to override).") | |
| return | |
| rows = sqlite_rows(db_path, "SELECT * FROM rag_search_events") | |
| if not rows: | |
| print("βΉοΈ No RAG search events to migrate.") | |
| return | |
| payload = [] | |
| for row in rows: | |
| payload.append({ | |
| "tenant_id": row["tenant_id"], | |
| "query": row["query"], | |
| "hits_count": row.get("hits_count"), | |
| "avg_score": row.get("avg_score"), | |
| "top_score": row.get("top_score"), | |
| "timestamp": row["timestamp"], | |
| "latency_ms": row.get("latency_ms"), | |
| }) | |
| columns = ["tenant_id", "query", "hits_count", "avg_score", "top_score", "timestamp", "latency_ms"] | |
| for batch in chunked(payload, BATCH_SIZE): | |
| insert_batch_postgres(conn, table, columns, batch) | |
| print(f"β Migrated {len(payload)} RAG search event(s).") | |
| def migrate_agent_queries_postgres(conn, db_path: Path, force: bool, check_table_func): | |
| """Migrate agent query events using PostgreSQL direct connection.""" | |
| table = "agent_query_events" | |
| if not table_exists_postgres(conn, table): | |
| print(f"β Table '{table}' does not exist in PostgreSQL!") | |
| print(f" Please create it first by running 'supabase_analytics_tables.sql' in Supabase SQL Editor") | |
| print(f" Skipping agent query migration.") | |
| return | |
| if not force and check_table_func(table): | |
| print(f"β οΈ Supabase table '{table}' already has rows. Skipping (use --force to override).") | |
| return | |
| rows = sqlite_rows(db_path, "SELECT * FROM agent_query_events") | |
| if not rows: | |
| print("βΉοΈ No agent query events to migrate.") | |
| return | |
| payload = [] | |
| for row in rows: | |
| tools = row.get("tools_used") | |
| payload.append({ | |
| "tenant_id": row["tenant_id"], | |
| "user_id": row.get("user_id"), | |
| "message_preview": row.get("message_preview"), | |
| "intent": row.get("intent"), | |
| "tools_used": json.loads(tools) if tools else None, | |
| "total_tokens": row.get("total_tokens"), | |
| "total_latency_ms": row.get("total_latency_ms"), | |
| "success": bool(row.get("success", 1)), | |
| "timestamp": row["timestamp"], | |
| }) | |
| columns = ["tenant_id", "user_id", "message_preview", "intent", "tools_used", "total_tokens", "total_latency_ms", "success", "timestamp"] | |
| for batch in chunked(payload, BATCH_SIZE): | |
| insert_batch_postgres(conn, table, columns, batch) | |
| print(f"β Migrated {len(payload)} agent query event(s).") | |
| def table_exists_postgres(conn, table: str) -> bool: | |
| """Check if PostgreSQL table exists.""" | |
| with conn.cursor() as cur: | |
| cur.execute(""" | |
| SELECT EXISTS ( | |
| SELECT FROM information_schema.tables | |
| WHERE table_schema = 'public' | |
| AND table_name = %s | |
| ) | |
| """, (table,)) | |
| return cur.fetchone()[0] | |
| def check_table_has_rows_postgres(conn, table: str) -> bool: | |
| """Check if PostgreSQL table has rows.""" | |
| if not table_exists_postgres(conn, table): | |
| return False | |
| try: | |
| with conn.cursor() as cur: | |
| cur.execute(f"SELECT COUNT(*) FROM {table}") | |
| count = cur.fetchone()[0] | |
| return count > 0 | |
| except Exception as e: | |
| error_str = str(e) | |
| if "does not exist" in error_str or "relation" in error_str.lower(): | |
| return False | |
| raise | |
| def check_table_has_rows_supabase(client: Client, table: str) -> bool: | |
| """Check if Supabase table has rows.""" | |
| try: | |
| response = client.table(table).select("id", count="exact").limit(1).execute() | |
| count = getattr(response, "count", None) | |
| return bool(count and count > 0) | |
| except: | |
| return False | |
| def insert_batch_postgres(conn, table: str, columns: List[str], batch: List[Dict[str, Any]], on_conflict: str = None): | |
| """Insert batch into PostgreSQL table.""" | |
| if not batch: | |
| return | |
| placeholders = ", ".join(["%s"] * len(columns)) | |
| cols = ", ".join(columns) | |
| if on_conflict: | |
| # For admin_rules with unique constraint | |
| update_cols = ", ".join([f"{col} = EXCLUDED.{col}" for col in columns if col != "id"]) | |
| query = f"INSERT INTO {table} ({cols}) VALUES ({placeholders}) ON CONFLICT ({on_conflict}) DO UPDATE SET {update_cols}" | |
| else: | |
| query = f"INSERT INTO {table} ({cols}) VALUES ({placeholders})" | |
| # Prepare values, converting dicts/lists to JSON for JSONB columns | |
| values = [] | |
| for row in batch: | |
| row_values = [] | |
| for col in columns: | |
| val = row.get(col) | |
| # Convert dict/list to JSON string for JSONB columns | |
| if col in ["metadata", "tools_used"] and val is not None: | |
| if isinstance(val, (dict, list)): | |
| val = json.dumps(val) | |
| row_values.append(val) | |
| values.append(row_values) | |
| with conn.cursor() as cur: | |
| execute_batch(cur, query, values) | |
| conn.commit() | |
| def insert_batch_supabase(client: Client, table: str, batch: List[Dict[str, Any]], on_conflict: str = None): | |
| """Insert batch into Supabase table.""" | |
| if not batch: | |
| return | |
| if on_conflict: | |
| client.table(table).upsert(batch, on_conflict=on_conflict).execute() | |
| else: | |
| for chunk in chunked(batch, BATCH_SIZE): | |
| client.table(table).insert(chunk).execute() | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Migrate SQLite analytics/rules data into Supabase.") | |
| parser.add_argument("--force", action="store_true", help="Insert even if Supabase tables already contain rows.") | |
| args = parser.parse_args() | |
| print("=" * 70) | |
| print("SQLite to Supabase Migration Tool") | |
| print("=" * 70) | |
| print() | |
| # Check for SQLite databases first | |
| root = Path(__file__).resolve().parent | |
| data_dir = root / "data" | |
| rules_db = data_dir / "admin_rules.db" | |
| analytics_db = data_dir / "analytics.db" | |
| print("π Checking for local SQLite databases:") | |
| print(f" Rules DB: {rules_db} {'β ' if rules_db.exists() else 'β Not found'}") | |
| print(f" Analytics DB: {analytics_db} {'β ' if analytics_db.exists() else 'β Not found'}") | |
| print() | |
| if not rules_db.exists() and not analytics_db.exists(): | |
| print("β οΈ No SQLite databases found. Nothing to migrate.") | |
| return | |
| # Determine connection method | |
| print("π Checking connection method...") | |
| method = get_connection_method() | |
| if method == "postgresql": | |
| print(" β Using PostgreSQL direct connection (POSTGRESQL_URL)") | |
| conn = get_postgres_connection() | |
| print(" β Connected to PostgreSQL") | |
| client = None | |
| check_table = lambda t: check_table_has_rows_postgres(conn, t) | |
| # Check if required tables exist | |
| print() | |
| print("π Checking if required tables exist...") | |
| required_tables = { | |
| "admin_rules": "supabase_admin_rules_table.sql", | |
| "tool_usage_events": "supabase_analytics_tables.sql", | |
| "redflag_violations": "supabase_analytics_tables.sql", | |
| "rag_search_events": "supabase_analytics_tables.sql", | |
| "agent_query_events": "supabase_analytics_tables.sql", | |
| } | |
| missing_tables = {} | |
| for table, sql_file in required_tables.items(): | |
| if table_exists_postgres(conn, table): | |
| print(f" β {table}") | |
| else: | |
| print(f" β {table} (missing)") | |
| if sql_file not in missing_tables: | |
| missing_tables[sql_file] = [] | |
| missing_tables[sql_file].append(table) | |
| if missing_tables: | |
| print() | |
| print("β οΈ Some tables are missing! Please create them first:") | |
| print() | |
| for sql_file, tables in missing_tables.items(): | |
| print(f" Run '{sql_file}' in Supabase SQL Editor to create:") | |
| for table in tables: | |
| print(f" - {table}") | |
| print() | |
| print(" Steps:") | |
| print(" 1. Go to https://app.supabase.com β Your Project β SQL Editor") | |
| print(" 2. Click 'New query'") | |
| for sql_file in missing_tables.keys(): | |
| print(f" 3. Open and copy contents of '{sql_file}' from your project") | |
| print(" 4. Paste into SQL Editor and click 'Run'") | |
| print(" 5. Run this migration script again") | |
| print() | |
| conn.close() | |
| return | |
| elif method == "supabase": | |
| print(" β Using Supabase API (SUPABASE_URL + SUPABASE_SERVICE_KEY)") | |
| client = load_supabase_client() | |
| conn = None | |
| check_table = lambda t: check_table_has_rows_supabase(client, t) | |
| else: | |
| print(" β No connection method available!") | |
| print() | |
| print(" Please set one of the following in your .env file:") | |
| print(" - POSTGRESQL_URL=postgresql://user:password@host:port/database") | |
| print(" OR") | |
| print(" - SUPABASE_URL=https://xxxxx.supabase.co") | |
| print(" - SUPABASE_SERVICE_KEY=your_service_role_key") | |
| return | |
| print() | |
| print("π Starting migration...") | |
| print() | |
| # Migrate using the appropriate method | |
| if method == "postgresql": | |
| migrate_rules_postgres(conn, rules_db, args.force, check_table) | |
| migrate_tool_usage_postgres(conn, analytics_db, args.force, check_table) | |
| migrate_redflags_postgres(conn, analytics_db, args.force, check_table) | |
| migrate_rag_searches_postgres(conn, analytics_db, args.force, check_table) | |
| migrate_agent_queries_postgres(conn, analytics_db, args.force, check_table) | |
| conn.close() | |
| else: | |
| migrate_rules(client, rules_db, args.force) | |
| migrate_tool_usage(client, analytics_db, args.force) | |
| migrate_redflags(client, analytics_db, args.force) | |
| migrate_rag_searches(client, analytics_db, args.force) | |
| migrate_agent_queries(client, analytics_db, args.force) | |
| print() | |
| print("=" * 70) | |
| print("π Migration completed!") | |
| print("=" * 70) | |
| print() | |
| print("π‘ Next steps:") | |
| print(" 1. Verify data in Supabase Dashboard β Table Editor") | |
| print(" 2. Restart your FastAPI/MCP services to use Supabase backend") | |
| print(" 3. (Optional) Back up SQLite files before deleting them") | |
| if __name__ == "__main__": | |
| main() | |