Spaces:
Sleeping
Sleeping
| """ | |
| Supabase database connection and utilities for MCP servers. | |
| This module provides both: | |
| 1. Direct PostgreSQL connections (via psycopg2) for pgvector operations | |
| 2. Supabase client for REST API operations | |
| """ | |
| import os | |
| from typing import Optional, List, Dict, Any | |
| import psycopg2 | |
| import psycopg2.extras | |
| from supabase import create_client, Client | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| # ----------------------------------- | |
| # Environment variables | |
| # ----------------------------------- | |
| DATABASE_URL = os.getenv("POSTGRESQL_URL") # Direct PostgreSQL connection | |
| SUPABASE_URL = os.getenv("SUPABASE_URL") | |
| SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_KEY") # MUST be service role key | |
| # Global Supabase client instance | |
| _supabase_client: Optional[Client] = None | |
| # ----------------------------------- | |
| # PostgreSQL Connection (for pgvector) | |
| # ----------------------------------- | |
| def get_connection(): | |
| """ | |
| Establish a direct PostgreSQL connection for pgvector operations. | |
| """ | |
| if not DATABASE_URL: | |
| raise ValueError( | |
| "PostgreSQL connection string not configured. " | |
| "Set POSTGRESQL_URL in your .env file." | |
| ) | |
| return psycopg2.connect(DATABASE_URL) | |
| # ----------------------------------- | |
| # Database Schema Initialization | |
| # ----------------------------------- | |
| def initialize_database(): | |
| """ | |
| Initialize the database schema: | |
| - Enable pgvector extension | |
| - Create documents table with vector support | |
| """ | |
| try: | |
| conn = get_connection() | |
| cur = conn.cursor() | |
| # Enable pgvector extension | |
| cur.execute("CREATE EXTENSION IF NOT EXISTS vector;") | |
| print("β pgvector extension enabled") | |
| # Create documents table | |
| cur.execute(""" | |
| CREATE TABLE IF NOT EXISTS documents ( | |
| id BIGSERIAL PRIMARY KEY, | |
| tenant_id TEXT NOT NULL, | |
| chunk_text TEXT NOT NULL, | |
| embedding vector(384) NOT NULL, | |
| created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() | |
| ); | |
| """) | |
| print("β documents table created") | |
| # Create index for vector similarity search | |
| cur.execute(""" | |
| CREATE INDEX IF NOT EXISTS documents_embedding_idx | |
| ON documents | |
| USING ivfflat (embedding vector_cosine_ops) | |
| WITH (lists = 100); | |
| """) | |
| print("β vector index created") | |
| # Create index for tenant_id for faster filtering | |
| cur.execute(""" | |
| CREATE INDEX IF NOT EXISTS documents_tenant_id_idx | |
| ON documents (tenant_id); | |
| """) | |
| print("β tenant_id index created") | |
| conn.commit() | |
| cur.close() | |
| conn.close() | |
| print("β Database schema initialized successfully") | |
| except Exception as e: | |
| print(f"β Database initialization error: {e}") | |
| # Don't raise - allow the app to continue even if table exists | |
| if "already exists" not in str(e).lower(): | |
| raise | |
| # ----------------------------------- | |
| # Document + Embedding Operations | |
| # ----------------------------------- | |
| def insert_document_chunks(tenant_id: str, text: str, embedding: list): | |
| """ | |
| Insert document chunk + embedding. | |
| """ | |
| try: | |
| conn = get_connection() | |
| cur = conn.cursor() | |
| cur.execute( | |
| """ | |
| INSERT INTO documents (tenant_id, chunk_text, embedding) | |
| VALUES (%s, %s, %s); | |
| """, | |
| (tenant_id, text, embedding) | |
| ) | |
| conn.commit() | |
| cur.close() | |
| conn.close() | |
| except Exception as e: | |
| print("DB INSERT ERROR:", e) | |
| raise | |
| def search_vectors(tenant_id: str, vector: list, limit: int = 5) -> List[Dict[str, Any]]: | |
| """ | |
| Perform semantic vector search using pgvector. | |
| """ | |
| try: | |
| conn = get_connection() | |
| cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) | |
| cur.execute( | |
| """ | |
| SELECT | |
| chunk_text, | |
| 1 - (embedding <=> %s::vector(384)) AS similarity | |
| FROM documents | |
| WHERE tenant_id = %s | |
| ORDER BY embedding <=> %s::vector(384) | |
| LIMIT %s; | |
| """, | |
| (vector, tenant_id, vector, limit) | |
| ) | |
| rows = cur.fetchall() | |
| cur.close() | |
| conn.close() | |
| results: List[Dict[str, Any]] = [] | |
| for row in rows: | |
| results.append( | |
| { | |
| "text": row["chunk_text"], | |
| "similarity": float(row.get("similarity", 0.0)), | |
| } | |
| ) | |
| return results | |
| except Exception as e: | |
| print("DB SEARCH ERROR:", e) | |
| return [] | |
| def list_all_documents(tenant_id: str, limit: int = 1000, offset: int = 0) -> Dict[str, Any]: | |
| """ | |
| List all documents for a tenant with pagination. | |
| """ | |
| try: | |
| conn = get_connection() | |
| cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) | |
| cur.execute( | |
| """ | |
| SELECT | |
| id, | |
| chunk_text, | |
| created_at | |
| FROM documents | |
| WHERE tenant_id = %s | |
| ORDER BY created_at DESC | |
| LIMIT %s OFFSET %s; | |
| """, | |
| (tenant_id, limit, offset) | |
| ) | |
| rows = cur.fetchall() | |
| # Get total count | |
| cur.execute( | |
| """ | |
| SELECT COUNT(*) as total | |
| FROM documents | |
| WHERE tenant_id = %s; | |
| """, | |
| (tenant_id,) | |
| ) | |
| total_row = cur.fetchone() | |
| total = total_row["total"] if total_row else 0 | |
| cur.close() | |
| conn.close() | |
| results: List[Dict[str, Any]] = [] | |
| for row in rows: | |
| results.append( | |
| { | |
| "id": row["id"], | |
| "text": row["chunk_text"], | |
| "created_at": row["created_at"].isoformat() if row["created_at"] else None, | |
| } | |
| ) | |
| return {"documents": results, "total": total, "limit": limit, "offset": offset} | |
| except Exception as e: | |
| print("DB LIST ERROR:", e) | |
| return {"documents": [], "total": 0, "limit": limit, "offset": offset} | |
| # ----------------------------------- | |
| # Supabase Client (for REST operations) | |
| # ----------------------------------- | |
| def get_supabase_client() -> Client: | |
| """ | |
| Get or create Supabase client. | |
| """ | |
| global _supabase_client | |
| if _supabase_client is None: | |
| if not SUPABASE_URL or not SUPABASE_KEY: | |
| raise ValueError( | |
| "Supabase credentials missing. " | |
| "Set SUPABASE_URL and SUPABASE_SERVICE_KEY." | |
| ) | |
| _supabase_client = create_client(SUPABASE_URL, SUPABASE_KEY) | |
| return _supabase_client | |
| def reset_client(): | |
| global _supabase_client | |
| _supabase_client = None | |
| # Table names | |
| TABLES = { | |
| "tenants": "tenants", | |
| "documents": "documents", | |
| "embeddings": "tenant_embeddings", | |
| "redflag_rules": "redflag_rules", | |
| "analytics": "analytics_events", | |
| "tool_usage": "tool_usage_stats", | |
| } | |