from supabase import create_client, Client from contextlib import asynccontextmanager import asyncio from typing import Optional, List, Dict, Any import aiohttp from dataclasses import dataclass from langchain.schema import Document import numpy as np import logging import os from functools import lru_cache try: from .config.integrations import integration_config except ImportError: try: from config.integrations import integration_config except ImportError: # Fallback for when running as standalone script integration_config = None logging.warning("Could not import integration_config - using defaults") # Import centralized embedding manager from .embedding_manager import get_embedding_manager logger = logging.getLogger(__name__) @dataclass class SearchResult: """Structured search result""" content: str metadata: Dict[str, Any] score: float source: str class SupabaseConnectionPool: """Enhanced Supabase client with connection pooling""" def __init__(self, url: str, key: str, pool_size: int = 10): self.url = url self.key = key self.pool_size = pool_size self._pool = asyncio.Queue(maxsize=pool_size) self._session = None self._initialized = False async def initialize(self): """Initialize connection pool""" if self._initialized: return # Create custom aiohttp session with connection pooling connector = aiohttp.TCPConnector( limit=self.pool_size, limit_per_host=self.pool_size, ttl_dns_cache=300, keepalive_timeout=30 ) self._session = aiohttp.ClientSession(connector=connector) # Pre-create clients for _ in range(self.pool_size): client = create_client(self.url, self.key) await self._pool.put(client) self._initialized = True logger.info(f"Supabase connection pool initialized with {self.pool_size} connections") @asynccontextmanager async def get_client(self): """Get client from pool""" if not self._initialized: await self.initialize() client = await self._pool.get() try: yield client finally: await self._pool.put(client) async def close(self): """Close all connections""" if self._session: await self._session.close() self._initialized = False class OptimizedVectorStore: """Optimized vector store with batch operations and caching""" def __init__(self, pool: SupabaseConnectionPool): self.pool = pool self.config = integration_config # Use centralized embedding manager instead of local initialization self.embedding_manager = get_embedding_manager() # Use functools.lru_cache for proper caching self._embedding_cache = lru_cache(maxsize=1000)(self._compute_embedding) self._batch_size = config.supabase.batch_size if config else 100 def _compute_embedding(self, text: str) -> np.ndarray: """Compute actual embeddings using centralized manager""" embedding = self.embedding_manager.embed(text) return np.array(embedding) async def _get_cached_embedding(self, text: str) -> np.ndarray: """Get embedding with caching""" # Use the LRU cached method return self._embedding_cache(text) async def batch_insert_embeddings( self, documents: List[Document], batch_size: int = None ): """Batch insert for better performance""" if batch_size is None: batch_size = self._batch_size async with self.pool.get_client() as client: for i in range(0, len(documents), batch_size): batch = documents[i:i + batch_size] # Prepare batch data batch_data = [] for doc in batch: embedding = await self._get_cached_embedding(doc.page_content) batch_data.append({ "node_id": doc.metadata.get("id", str(hash(doc.page_content))), "embedding": embedding.tolist(), "text": doc.page_content, "metadata_": doc.metadata }) # Use upsert for conflict resolution try: result = await client.table("knowledge_base").upsert(batch_data).execute() logger.info(f"Inserted {len(batch_data)} documents") except Exception as e: logger.error(f"Batch insert failed: {e}") raise class HybridVectorSearch: """Combine vector similarity with metadata filtering and BM25""" def __init__(self, pool: SupabaseConnectionPool): self.pool = pool # Use centralized embedding manager self.embedding_manager = get_embedding_manager() async def get_embedding(self, text: str) -> np.ndarray: """Get embedding for query using centralized manager""" # FIXED: Use real embeddings instead of random embedding = self.embedding_manager.embed(text) return np.array(embedding) async def hybrid_search( self, query: str, top_k: int = 5, metadata_filter: Optional[Dict] = None, rerank: bool = True ) -> List[SearchResult]: """Enhanced search with multiple ranking strategies""" # 1. Vector similarity search query_embedding = await self.get_embedding(query) async with self.pool.get_client() as client: try: # Use RPC for complex queries results = await client.rpc( 'hybrid_match_documents', { 'query_embedding': query_embedding.tolist(), 'match_count': top_k * 3, # Get more for reranking 'metadata_filter': metadata_filter or {}, 'query_text': query # For BM25 } ).execute() # Convert to SearchResult objects search_results = [] for result in results.data: search_results.append(SearchResult( content=result.get('text', ''), metadata=result.get('metadata_', {}), score=result.get('similarity', 0.0), source=result.get('source', 'unknown') )) if rerank: search_results = await self._rerank_results(query, search_results) return search_results[:top_k] except Exception as e: logger.error(f"Hybrid search failed: {e}") # Fallback to simple vector search return await self._fallback_search(client, query_embedding, top_k) async def _rerank_results(self, query: str, results: List[SearchResult]) -> List[SearchResult]: """Rerank results using additional signals""" # Simple reranking based on content length and metadata for result in results: # Boost results with more metadata metadata_boost = len(result.metadata) * 0.1 result.score += metadata_boost # Sort by score results.sort(key=lambda x: x.score, reverse=True) return results async def _fallback_search(self, client, query_embedding: np.ndarray, top_k: int) -> List[SearchResult]: """Fallback to simple vector similarity search""" try: # Simple vector similarity search results = await client.rpc( 'match_documents', { 'query_embedding': query_embedding.tolist(), 'match_count': top_k } ).execute() search_results = [] for result in results.data: search_results.append(SearchResult( content=result.get('text', ''), metadata=result.get('metadata_', {}), score=result.get('similarity', 0.0), source=result.get('source', 'unknown') )) return search_results except Exception as e: logger.error(f"Fallback search also failed: {e}") return [] class SupabaseRealtimeManager: """Manage realtime subscriptions""" def __init__(self, client: Client): self.client = client self.subscriptions = {} async def subscribe_to_tool_metrics(self, callback): """Subscribe to tool execution metrics""" try: subscription = self.client.table('tool_metrics').on('INSERT', callback).subscribe() self.subscriptions['tool_metrics'] = subscription logger.info("Subscribed to tool metrics") except Exception as e: logger.error(f"Failed to subscribe to tool metrics: {e}") async def subscribe_to_knowledge_updates(self, callback): """Subscribe to knowledge base updates""" try: subscription = self.client.table('knowledge_base').on('INSERT', callback).subscribe() self.subscriptions['knowledge_updates'] = subscription logger.info("Subscribed to knowledge updates") except Exception as e: logger.error(f"Failed to subscribe to knowledge updates: {e}") async def unsubscribe_all(self): """Unsubscribe from all subscriptions""" for name, subscription in self.subscriptions.items(): try: await subscription.unsubscribe() logger.info(f"Unsubscribed from {name}") except Exception as e: logger.error(f"Failed to unsubscribe from {name}: {e}") self.subscriptions.clear() async def initialize_supabase_enhanced(url: Optional[str] = None, key: Optional[str] = None): """Initialize enhanced Supabase components""" # Use provided values or get from config if url is None or key is None: if integration_config and integration_config.supabase.is_configured(): url = integration_config.supabase.url key = integration_config.supabase.key else: raise ValueError("Supabase URL and key must be provided or configured") try: # Initialize connection pool pool = SupabaseConnectionPool(url, key) await pool.initialize() # Initialize vector store vector_store = OptimizedVectorStore(pool) # Initialize search hybrid_search = HybridVectorSearch(pool) # Initialize realtime manager client = create_client(url, key) realtime_manager = SupabaseRealtimeManager(client) logger.info("Enhanced Supabase components initialized successfully") return { 'connection_pool': pool, 'vector_store': vector_store, 'hybrid_search': hybrid_search, 'realtime_manager': realtime_manager, 'client': client } except Exception as e: logger.error(f"Failed to initialize Supabase: {e}") raise # Global instances for backward compatibility vector_store = None hybrid_search = None