Spaces:
Sleeping
Sleeping
| """ | |
| Vector embeddings utilities for semantic search. | |
| """ | |
| import os | |
| from typing import List, Optional, Union, Dict | |
| import numpy as np | |
| from pathlib import Path | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| SENTENCE_TRANSFORMERS_AVAILABLE = True | |
| except ImportError: | |
| SENTENCE_TRANSFORMERS_AVAILABLE = False | |
| SentenceTransformer = None | |
| # Available embedding models (ordered by preference for Vietnamese) | |
| # Models are ordered from fastest to best quality | |
| AVAILABLE_MODELS = { | |
| # Fast models (384 dim) - Good for production | |
| "paraphrase-multilingual": "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2", # Fast, 384 dim | |
| # High quality models (768 dim) - Better accuracy | |
| "multilingual-mpnet": "sentence-transformers/paraphrase-multilingual-mpnet-base-v2", # High quality, 768 dim, recommended | |
| "vietnamese-sbert": "keepitreal/vietnamese-sbert-v2", # Vietnamese-specific (may require auth) | |
| # Very high quality models (1024+ dim) - Best accuracy but slower | |
| "multilingual-e5-large": "intfloat/multilingual-e5-large", # Very high quality, 1024 dim, large model | |
| "multilingual-e5-base": "intfloat/multilingual-e5-base", # High quality, 768 dim, balanced | |
| # Vietnamese-specific models (if available) | |
| "vietnamese-embedding": "dangvantuan/vietnamese-embedding", # Vietnamese-specific (if available) | |
| "vietnamese-bi-encoder": "bkai-foundation-models/vietnamese-bi-encoder", # Vietnamese bi-encoder (if available) | |
| } | |
| # Default embedding model for Vietnamese (can be overridden via env var) | |
| # Use multilingual-mpnet as default - better quality than MiniLM, still reasonable size | |
| # Can be set via EMBEDDING_MODEL env var (supports both short names and full model paths) | |
| # Examples: | |
| # - EMBEDDING_MODEL=multilingual-mpnet (uses short name) | |
| # - EMBEDDING_MODEL=sentence-transformers/paraphrase-multilingual-mpnet-base-v2 (full path) | |
| # - EMBEDDING_MODEL=/path/to/local/model (local model path) | |
| # - EMBEDDING_MODEL=username/private-model (private HF model, requires HF_TOKEN) | |
| DEFAULT_MODEL_NAME = os.environ.get( | |
| "EMBEDDING_MODEL", | |
| AVAILABLE_MODELS.get("multilingual-mpnet", "sentence-transformers/paraphrase-multilingual-mpnet-base-v2") | |
| ) | |
| FALLBACK_MODEL_NAME = AVAILABLE_MODELS.get("paraphrase-multilingual", "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2") | |
| # Cache for model instance | |
| _model_cache: Optional[SentenceTransformer] = None | |
| _cached_model_name: Optional[str] = None | |
| def get_embedding_model(model_name: Optional[str] = None, force_reload: bool = False) -> Optional[SentenceTransformer]: | |
| """ | |
| Get or load embedding model instance. | |
| Args: | |
| model_name: Name of the model to load. Can be: | |
| - Full model name (e.g., "keepitreal/vietnamese-sbert-v2") | |
| - Short name (e.g., "vietnamese-sbert") | |
| - None (uses DEFAULT_MODEL_NAME from env or default) | |
| force_reload: Force reload model even if cached. | |
| Returns: | |
| SentenceTransformer instance or None if not available. | |
| """ | |
| global _model_cache, _cached_model_name | |
| if not SENTENCE_TRANSFORMERS_AVAILABLE: | |
| print("Warning: sentence-transformers not installed. Install with: pip install sentence-transformers") | |
| return None | |
| # Resolve model name (check if it's a short name) | |
| resolved_model_name = model_name or DEFAULT_MODEL_NAME | |
| if resolved_model_name in AVAILABLE_MODELS: | |
| resolved_model_name = AVAILABLE_MODELS[resolved_model_name] | |
| # Return cached model if same model and not forcing reload | |
| if _model_cache is not None and _cached_model_name == resolved_model_name and not force_reload: | |
| return _model_cache | |
| # Load new model | |
| try: | |
| print(f"Loading embedding model: {resolved_model_name}") | |
| # Check if it's a local path | |
| model_path = Path(resolved_model_name) | |
| if model_path.exists() and model_path.is_dir(): | |
| # Local model path | |
| print(f"Loading local model from: {resolved_model_name}") | |
| _model_cache = SentenceTransformer(str(model_path)) | |
| else: | |
| # Hugging Face model (public or private) | |
| hf_token = os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACE_TOKEN") | |
| model_kwargs = {} | |
| if hf_token: | |
| print(f"Using Hugging Face token for model: {resolved_model_name}") | |
| model_kwargs["token"] = hf_token | |
| # Public model (or token provided) | |
| _model_cache = SentenceTransformer(resolved_model_name, **model_kwargs) | |
| _cached_model_name = resolved_model_name | |
| # Get model dimension for info | |
| try: | |
| test_embedding = _model_cache.encode("test", show_progress_bar=False) | |
| dim = len(test_embedding) | |
| print(f"✅ Successfully loaded model: {resolved_model_name} (dimension: {dim})") | |
| except Exception: | |
| print(f"✅ Successfully loaded model: {resolved_model_name}") | |
| return _model_cache | |
| except Exception as e: | |
| print(f"❌ Error loading model {resolved_model_name}: {e}") | |
| if resolved_model_name != FALLBACK_MODEL_NAME: | |
| print(f"Trying fallback model: {FALLBACK_MODEL_NAME}") | |
| try: | |
| _model_cache = SentenceTransformer(FALLBACK_MODEL_NAME) | |
| _cached_model_name = FALLBACK_MODEL_NAME | |
| test_embedding = _model_cache.encode("test", show_progress_bar=False) | |
| dim = len(test_embedding) | |
| print(f"✅ Successfully loaded fallback model: {FALLBACK_MODEL_NAME} (dimension: {dim})") | |
| return _model_cache | |
| except Exception as e2: | |
| print(f"❌ Error loading fallback model: {e2}") | |
| return None | |
| def list_available_models() -> Dict[str, str]: | |
| """ | |
| List all available embedding models. | |
| Returns: | |
| Dictionary mapping short names to full model names. | |
| """ | |
| return AVAILABLE_MODELS.copy() | |
| def compare_models(texts: List[str], model_names: Optional[List[str]] = None) -> Dict[str, Dict[str, float]]: | |
| """ | |
| Compare different embedding models on sample texts. | |
| Args: | |
| texts: List of sample texts to test. | |
| model_names: List of model names to compare. If None, compares all available models. | |
| Returns: | |
| Dictionary with comparison results including: | |
| - dimension: Embedding dimension | |
| - encoding_time: Time to encode texts (seconds) | |
| - avg_similarity: Average similarity between texts | |
| """ | |
| import time | |
| if model_names is None: | |
| model_names = list(AVAILABLE_MODELS.keys()) | |
| results = {} | |
| for model_key in model_names: | |
| if model_key not in AVAILABLE_MODELS: | |
| continue | |
| model_name = AVAILABLE_MODELS[model_key] | |
| try: | |
| model = get_embedding_model(model_name, force_reload=True) | |
| if model is None: | |
| continue | |
| # Get dimension | |
| dim = get_embedding_dimension(model_name) | |
| # Measure encoding time | |
| start_time = time.time() | |
| embeddings = generate_embeddings_batch(texts, model=model) | |
| encoding_time = time.time() - start_time | |
| # Calculate average similarity | |
| similarities = [] | |
| for i in range(len(embeddings)): | |
| for j in range(i + 1, len(embeddings)): | |
| if embeddings[i] is not None and embeddings[j] is not None: | |
| sim = cosine_similarity(embeddings[i], embeddings[j]) | |
| similarities.append(sim) | |
| avg_similarity = sum(similarities) / len(similarities) if similarities else 0.0 | |
| results[model_key] = { | |
| "model_name": model_name, | |
| "dimension": dim, | |
| "encoding_time": encoding_time, | |
| "avg_similarity": avg_similarity | |
| } | |
| except Exception as e: | |
| print(f"Error comparing model {model_key}: {e}") | |
| results[model_key] = {"error": str(e)} | |
| return results | |
| def generate_embedding(text: str, model: Optional[SentenceTransformer] = None) -> Optional[np.ndarray]: | |
| """ | |
| Generate embedding vector for a single text. | |
| Args: | |
| text: Input text to embed. | |
| model: SentenceTransformer instance. If None, uses default model. | |
| Returns: | |
| Numpy array of embedding vector or None if error. | |
| """ | |
| if not text or not text.strip(): | |
| return None | |
| if model is None: | |
| model = get_embedding_model() | |
| if model is None: | |
| return None | |
| try: | |
| embedding = model.encode(text, normalize_embeddings=True, show_progress_bar=False) | |
| return embedding | |
| except Exception as e: | |
| print(f"Error generating embedding: {e}") | |
| return None | |
| def generate_embeddings_batch(texts: List[str], model: Optional[SentenceTransformer] = None, batch_size: int = 32) -> List[Optional[np.ndarray]]: | |
| """ | |
| Generate embeddings for a batch of texts. | |
| Args: | |
| texts: List of input texts. | |
| model: SentenceTransformer instance. If None, uses default model. | |
| batch_size: Batch size for processing. | |
| Returns: | |
| List of numpy arrays (embeddings) or None for failed texts. | |
| """ | |
| if not texts: | |
| return [] | |
| if model is None: | |
| model = get_embedding_model() | |
| if model is None: | |
| return [None] * len(texts) | |
| try: | |
| embeddings = model.encode( | |
| texts, | |
| batch_size=batch_size, | |
| normalize_embeddings=True, | |
| show_progress_bar=True, | |
| convert_to_numpy=True | |
| ) | |
| return [emb for emb in embeddings] | |
| except Exception as e: | |
| print(f"Error generating batch embeddings: {e}") | |
| return [None] * len(texts) | |
| def cosine_similarity(vec1: np.ndarray, vec2: np.ndarray) -> float: | |
| """ | |
| Calculate cosine similarity between two vectors. | |
| Args: | |
| vec1: First vector. | |
| vec2: Second vector. | |
| Returns: | |
| Cosine similarity score (0-1). | |
| """ | |
| if vec1 is None or vec2 is None: | |
| return 0.0 | |
| dot_product = np.dot(vec1, vec2) | |
| norm1 = np.linalg.norm(vec1) | |
| norm2 = np.linalg.norm(vec2) | |
| if norm1 == 0 or norm2 == 0: | |
| return 0.0 | |
| return float(dot_product / (norm1 * norm2)) | |
| def get_embedding_dimension(model_name: Optional[str] = None) -> int: | |
| """ | |
| Get embedding dimension for a model. | |
| Args: | |
| model_name: Model name. If None, uses default. | |
| Returns: | |
| Embedding dimension or 0 if unknown. | |
| """ | |
| model = get_embedding_model(model_name) | |
| if model is None: | |
| return 0 | |
| # Get dimension by encoding a dummy text | |
| try: | |
| dummy_embedding = model.encode("test", show_progress_bar=False) | |
| return len(dummy_embedding) | |
| except Exception: | |
| return 0 | |