Spaces:
Build error
Build error
| """ | |
| Centralized embedding manager for the AI Agent system. | |
| """ | |
| import os | |
| import logging | |
| from typing import List, Optional, Dict, Any | |
| import numpy as np | |
| from functools import lru_cache | |
| # Try to import embedding libraries | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| SENTENCE_TRANSFORMERS_AVAILABLE = True | |
| except ImportError: | |
| SENTENCE_TRANSFORMERS_AVAILABLE = False | |
| try: | |
| import openai | |
| OPENAI_AVAILABLE = True | |
| except ImportError: | |
| OPENAI_AVAILABLE = False | |
| logger = logging.getLogger(__name__) | |
| # Global embedding manager instance | |
| _embedding_manager = None | |
| class EmbeddingManager: | |
| """Centralized embedding manager with multiple backends""" | |
| def __init__(self, model_name: str = "all-MiniLM-L6-v2", backend: str = "sentence_transformers"): | |
| self.model_name = model_name | |
| self.backend = backend | |
| self.model = None | |
| self.dimension = 384 # Default for all-MiniLM-L6-v2 | |
| # Initialize the appropriate backend | |
| self._initialize_backend() | |
| logger.info(f"Embedding manager initialized with {backend} backend using {model_name}") | |
| def _initialize_backend(self): | |
| """Initialize the embedding backend""" | |
| if self.backend == "sentence_transformers" and SENTENCE_TRANSFORMERS_AVAILABLE: | |
| try: | |
| self.model = SentenceTransformer(self.model_name) | |
| self.dimension = self.model.get_sentence_embedding_dimension() | |
| logger.info(f"Initialized SentenceTransformer with dimension {self.dimension}") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize SentenceTransformer: {e}") | |
| self._fallback_initialization() | |
| elif self.backend == "openai" and OPENAI_AVAILABLE: | |
| try: | |
| api_key = os.getenv("OPENAI_API_KEY") | |
| if not api_key: | |
| raise ValueError("OPENAI_API_KEY not found in environment") | |
| self.model = "text-embedding-ada-002" | |
| self.dimension = 1536 # OpenAI ada-002 dimension | |
| logger.info("Initialized OpenAI embedding backend") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize OpenAI backend: {e}") | |
| self._fallback_initialization() | |
| else: | |
| self._fallback_initialization() | |
| def _fallback_initialization(self): | |
| """Fallback to simple hash-based embeddings""" | |
| logger.warning("Using fallback hash-based embeddings") | |
| self.backend = "fallback" | |
| self.dimension = 128 | |
| def embed(self, text: str) -> List[float]: | |
| """Generate embedding for a text string""" | |
| if not text or not text.strip(): | |
| return [0.0] * self.dimension | |
| try: | |
| if self.backend == "sentence_transformers" and self.model: | |
| embedding = self.model.encode(text) | |
| return embedding.tolist() | |
| elif self.backend == "openai": | |
| return self._openai_embed(text) | |
| else: | |
| return self._fallback_embed(text) | |
| except Exception as e: | |
| logger.error(f"Embedding failed for text: {text[:100]}... Error: {e}") | |
| return [0.0] * self.dimension | |
| def _openai_embed(self, text: str) -> List[float]: | |
| """Generate embedding using OpenAI API""" | |
| try: | |
| response = openai.Embedding.create( | |
| input=text, | |
| model=self.model | |
| ) | |
| return response['data'][0]['embedding'] | |
| except Exception as e: | |
| logger.error(f"OpenAI embedding failed: {e}") | |
| return [0.0] * self.dimension | |
| def _fallback_embed(self, text: str) -> List[float]: | |
| """Generate simple hash-based embedding""" | |
| import hashlib | |
| # Create a simple hash-based embedding | |
| hash_obj = hashlib.sha256(text.encode()) | |
| hash_bytes = hash_obj.digest() | |
| # Convert to float values | |
| embedding = [] | |
| for i in range(0, min(len(hash_bytes), self.dimension * 4), 4): | |
| if i + 3 < len(hash_bytes): | |
| value = int.from_bytes(hash_bytes[i:i+4], byteorder='big') | |
| embedding.append((value % 10000) / 10000.0) # Normalize to [0, 1] | |
| # Pad or truncate to required dimension | |
| while len(embedding) < self.dimension: | |
| embedding.append(0.0) | |
| return embedding[:self.dimension] | |
| def embed_batch(self, texts: List[str]) -> List[List[float]]: | |
| """Generate embeddings for a batch of texts""" | |
| if not texts: | |
| return [] | |
| try: | |
| if self.backend == "sentence_transformers" and self.model: | |
| embeddings = self.model.encode(texts) | |
| return embeddings.tolist() | |
| elif self.backend == "openai": | |
| return self._openai_embed_batch(texts) | |
| else: | |
| return [self._fallback_embed(text) for text in texts] | |
| except Exception as e: | |
| logger.error(f"Batch embedding failed: {e}") | |
| return [[0.0] * self.dimension for _ in texts] | |
| def _openai_embed_batch(self, texts: List[str]) -> List[List[float]]: | |
| """Generate batch embeddings using OpenAI API""" | |
| try: | |
| response = openai.Embedding.create( | |
| input=texts, | |
| model=self.model | |
| ) | |
| return [item['embedding'] for item in response['data']] | |
| except Exception as e: | |
| logger.error(f"OpenAI batch embedding failed: {e}") | |
| return [[0.0] * self.dimension for _ in texts] | |
| def similarity(self, embedding1: List[float], embedding2: List[float]) -> float: | |
| """Calculate cosine similarity between two embeddings""" | |
| try: | |
| vec1 = np.array(embedding1) | |
| vec2 = np.array(embedding2) | |
| # Normalize vectors | |
| norm1 = np.linalg.norm(vec1) | |
| norm2 = np.linalg.norm(vec2) | |
| if norm1 == 0 or norm2 == 0: | |
| return 0.0 | |
| # Calculate cosine similarity | |
| similarity = np.dot(vec1, vec2) / (norm1 * norm2) | |
| return float(similarity) | |
| except Exception as e: | |
| logger.error(f"Similarity calculation failed: {e}") | |
| return 0.0 | |
| def get_dimension(self) -> int: | |
| """Get the embedding dimension""" | |
| return self.dimension | |
| def get_backend_info(self) -> Dict[str, Any]: | |
| """Get information about the current backend""" | |
| return { | |
| "backend": self.backend, | |
| "model_name": self.model_name, | |
| "dimension": self.dimension, | |
| "sentence_transformers_available": SENTENCE_TRANSFORMERS_AVAILABLE, | |
| "openai_available": OPENAI_AVAILABLE | |
| } | |
| def get_embedding_manager(model_name: Optional[str] = None, backend: Optional[str] = None) -> EmbeddingManager: | |
| """Get or create the global embedding manager instance""" | |
| global _embedding_manager | |
| if _embedding_manager is None: | |
| # Use environment variables or defaults | |
| model_name = model_name or os.getenv("EMBEDDING_MODEL", "all-MiniLM-L6-v2") | |
| backend = backend or os.getenv("EMBEDDING_BACKEND", "sentence_transformers") | |
| _embedding_manager = EmbeddingManager(model_name=model_name, backend=backend) | |
| return _embedding_manager | |
| def reset_embedding_manager(): | |
| """Reset the global embedding manager (useful for testing)""" | |
| global _embedding_manager | |
| _embedding_manager = None | |
| # Convenience functions | |
| def embed_text(text: str) -> List[float]: | |
| """Quick function to embed a single text""" | |
| manager = get_embedding_manager() | |
| return manager.embed(text) | |
| def embed_batch(texts: List[str]) -> List[List[float]]: | |
| """Quick function to embed a batch of texts""" | |
| manager = get_embedding_manager() | |
| return manager.embed_batch(texts) | |
| def calculate_similarity(embedding1: List[float], embedding2: List[float]) -> float: | |
| """Quick function to calculate similarity between embeddings""" | |
| manager = get_embedding_manager() | |
| return manager.similarity(embedding1, embedding2) |