""" Supabase/PostgreSQL database utilities shared by all MCP tools. This module provides: 1. Direct PostgreSQL connections (via psycopg2) for pgvector operations 2. A Supabase client for REST-style administrative needs """ from __future__ import annotations import os from typing import Optional, List, Dict, Any import psycopg2 import psycopg2.extras from dotenv import load_dotenv try: # Supabase client is optional; in environments without Supabase (e.g. HF Spaces demo), # we don't want import errors (including websockets.asyncio) to crash the MCP server. from supabase import Client, create_client # type: ignore SUPABASE_AVAILABLE = True except Exception: Client = None # type: ignore create_client = None # type: ignore SUPABASE_AVAILABLE = False # Load environment variables load_dotenv() # ----------------------------------- # Environment variables # ----------------------------------- DATABASE_URL = os.getenv("POSTGRESQL_URL") # Direct PostgreSQL connection SUPABASE_URL = os.getenv("SUPABASE_URL") SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_KEY") # MUST be service role key # Global Supabase client instance _supabase_client: Optional["Client"] = None # ----------------------------------- # PostgreSQL Connection (for pgvector) # ----------------------------------- def get_connection(): """ Establish a direct PostgreSQL connection for pgvector operations. """ if not DATABASE_URL: raise ValueError( "PostgreSQL connection string not configured. " "Set POSTGRESQL_URL in your .env file." ) return psycopg2.connect(DATABASE_URL) # ----------------------------------- # Database Schema Initialization # ----------------------------------- def initialize_database(): """ Initialize the database schema: - Enable pgvector extension - Create documents table with vector support """ try: conn = get_connection() cur = conn.cursor() # Enable pgvector extension cur.execute("CREATE EXTENSION IF NOT EXISTS vector;") print("✅ pgvector extension enabled") # Create documents table cur.execute( """ CREATE TABLE IF NOT EXISTS documents ( id BIGSERIAL PRIMARY KEY, tenant_id TEXT NOT NULL, chunk_text TEXT NOT NULL, embedding vector(384) NOT NULL, metadata JSONB, doc_id TEXT, created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() ); """ ) print("✅ documents table created") # Add metadata column if it doesn't exist (for existing tables) try: cur.execute("ALTER TABLE documents ADD COLUMN IF NOT EXISTS metadata JSONB;") cur.execute("ALTER TABLE documents ADD COLUMN IF NOT EXISTS doc_id TEXT;") conn.commit() except Exception: pass # Column might already exist # Create index for vector similarity search cur.execute( """ CREATE INDEX IF NOT EXISTS documents_embedding_idx ON documents USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100); """ ) print("✅ vector index created") # Create index for tenant_id for faster filtering cur.execute( """ CREATE INDEX IF NOT EXISTS documents_tenant_id_idx ON documents (tenant_id); """ ) print("✅ tenant_id index created") conn.commit() cur.close() conn.close() print("✅ Database schema initialized successfully") except Exception as e: print(f"❌ Database initialization error: {e}") # Don't raise - allow the app to continue even if table exists if "already exists" not in str(e).lower(): raise # ----------------------------------- # Document + Embedding Operations # ----------------------------------- def insert_document_chunks(tenant_id: str, text: str, embedding: list, metadata: Optional[Dict[str, Any]] = None, doc_id: Optional[str] = None): """ Insert document chunk + embedding with optional metadata. Args: tenant_id: Tenant identifier text: Chunk text content embedding: Vector embedding (384 dimensions) metadata: Optional JSON metadata (title, summary, tags, topics, etc.) doc_id: Optional document ID to group chunks from the same document """ import json import traceback # Normalize tenant_id to ensure consistency tenant_id = tenant_id.strip() if not tenant_id: raise ValueError("tenant_id cannot be empty") if not text or not text.strip(): raise ValueError("text cannot be empty") if not embedding or len(embedding) != 384: raise ValueError(f"embedding must be a 384-dimensional vector, got {len(embedding) if embedding else 0} dimensions") try: conn = get_connection() cur = conn.cursor() # Convert metadata dict to JSON string for JSONB column metadata_json = json.dumps(metadata) if metadata else None cur.execute( """ INSERT INTO documents (tenant_id, chunk_text, embedding, metadata, doc_id) VALUES (%s, %s, %s, %s::jsonb, %s); """, (tenant_id, text, embedding, metadata_json, doc_id), ) conn.commit() cur.close() conn.close() print(f"✅ DB INSERT: Successfully inserted chunk for tenant '{tenant_id}' (doc_id: {doc_id or 'N/A'})") except ValueError as ve: # Re-raise ValueError as-is (validation errors) print(f"❌ DB INSERT VALIDATION ERROR: {ve}") raise except Exception as e: error_msg = f"DB INSERT ERROR (tenant_id='{tenant_id}'): {str(e)}" print(f"❌ {error_msg}") print(traceback.format_exc()) # Wrap in a more descriptive error raise RuntimeError( f"Failed to insert document into database: {str(e)}\n" f"Please check:\n" f"1. POSTGRESQL_URL is set correctly in .env\n" f"2. Database is accessible and pgvector extension is installed\n" f"3. Documents table exists (run initialize_database() if needed)" ) from e def search_vectors(tenant_id: str, vector: list, limit: int = 5) -> List[Dict[str, Any]]: """ Perform semantic vector search using pgvector. Results are filtered by tenant_id to ensure data isolation. """ try: # Validate tenant_id if not tenant_id or not tenant_id.strip(): print("DB SEARCH ERROR: tenant_id is empty") return [] tenant_id_normalized = tenant_id.strip() conn = get_connection() cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) # Query with normalized tenant_id filtering cur.execute( """ SELECT chunk_text, tenant_id, 1 - (embedding <=> %s::vector(384)) AS similarity FROM documents WHERE TRIM(tenant_id) = %s ORDER BY embedding <=> %s::vector(384) LIMIT %s; """, (vector, tenant_id_normalized, vector, limit), ) rows = cur.fetchall() # Verify all results belong to the requested tenant (safety check) results: List[Dict[str, Any]] = [] for row in rows: row_tenant_id = row.get("tenant_id", "") if row_tenant_id and row_tenant_id.strip() != tenant_id_normalized: print( f"WARNING: Found document with tenant_id '{row_tenant_id}' when searching for '{tenant_id_normalized}' - skipping" ) continue results.append( { "text": row["chunk_text"], "similarity": float(row.get("similarity", 0.0)), } ) cur.close() conn.close() return results except Exception as e: print(f"DB SEARCH ERROR (tenant_id={tenant_id}): {e}") import traceback traceback.print_exc() return [] def list_all_documents( tenant_id: str, limit: int = 1000, offset: int = 0 ) -> Dict[str, Any]: """ List all documents for a tenant with pagination. tenant_id comparison is normalized via TRIM() to handle historical data. """ try: tenant_id_normalized = tenant_id.strip() conn = get_connection() cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cur.execute( """ SELECT id, chunk_text, created_at FROM documents WHERE TRIM(tenant_id) = %s ORDER BY created_at DESC LIMIT %s OFFSET %s; """, (tenant_id_normalized, limit, offset), ) rows = cur.fetchall() cur.execute( """ SELECT COUNT(*) as total FROM documents WHERE TRIM(tenant_id) = %s; """, (tenant_id_normalized,), ) total_row = cur.fetchone() total = total_row["total"] if total_row else 0 cur.close() conn.close() results: List[Dict[str, Any]] = [] for row in rows: results.append( { "id": row["id"], "text": row["chunk_text"], "created_at": row["created_at"].isoformat() if row["created_at"] else None, } ) return { "documents": results, "total": total, "limit": limit, "offset": offset, } except Exception as e: print("DB LIST ERROR:", e) return {"documents": [], "total": 0, "limit": limit, "offset": offset} def delete_document(tenant_id: str, document_id: int) -> bool: """ Delete a specific document by ID for a tenant. Returns True if document was deleted, False otherwise. """ try: tenant_id_normalized = tenant_id.strip() conn = get_connection() cur = conn.cursor() cur.execute( """ DELETE FROM documents WHERE id = %s AND TRIM(tenant_id) = %s; """, (document_id, tenant_id_normalized), ) deleted = cur.rowcount > 0 if deleted: print(f"DB DELETE: Deleted document {document_id} for tenant '{tenant_id_normalized}'") else: print(f"DB DELETE: Document {document_id} not found for tenant '{tenant_id_normalized}'") conn.commit() cur.close() conn.close() return deleted except Exception as e: print(f"DB DELETE ERROR (document_id={document_id}, tenant_id={tenant_id}): {e}") import traceback traceback.print_exc() return False def delete_all_documents(tenant_id: str) -> int: """ Delete all documents for a tenant. Returns the number of documents deleted. Handles tenant_id normalization to match documents stored with different formatting. """ try: tenant_id_normalized = tenant_id.strip() conn = get_connection() cur = conn.cursor() cur.execute( """ DELETE FROM documents WHERE TRIM(tenant_id) = %s; """, (tenant_id_normalized,), ) deleted_count = cur.rowcount print(f"DB DELETE ALL: Deleted {deleted_count} document(s) for tenant '{tenant_id_normalized}'") conn.commit() cur.close() conn.close() return deleted_count except Exception as e: print(f"DB DELETE ALL ERROR (tenant_id={tenant_id}): {e}") import traceback traceback.print_exc() return 0 # ----------------------------------- # Supabase Client (for REST operations) # ----------------------------------- def get_supabase_client() -> Client: """ Get or create Supabase client. """ global _supabase_client if not SUPABASE_AVAILABLE: raise RuntimeError( "Supabase client is not available in this deployment. " "Install and configure Supabase to use MCP Supabase features." ) if _supabase_client is None: if not SUPABASE_URL or not SUPABASE_KEY: raise ValueError( "Supabase credentials missing. " "Set SUPABASE_URL and SUPABASE_SERVICE_KEY." ) _supabase_client = create_client(SUPABASE_URL, SUPABASE_KEY) return _supabase_client def reset_client(): global _supabase_client _supabase_client = None # Table names TABLES = { "tenants": "tenants", "documents": "documents", "embeddings": "tenant_embeddings", "redflag_rules": "redflag_rules", "analytics": "analytics_events", "tool_usage": "tool_usage_stats", }