davidtran999's picture
Upload backend/core/embeddings.py with huggingface_hub
57b3892 verified
raw
history blame
11.1 kB
"""
Vector embeddings utilities for semantic search.
"""
import os
from typing import List, Optional, Union, Dict
import numpy as np
from pathlib import Path
try:
from sentence_transformers import SentenceTransformer
SENTENCE_TRANSFORMERS_AVAILABLE = True
except ImportError:
SENTENCE_TRANSFORMERS_AVAILABLE = False
SentenceTransformer = None
# Available embedding models (ordered by preference for Vietnamese)
# Models are ordered from fastest to best quality
AVAILABLE_MODELS = {
# Fast models (384 dim) - Good for production
"paraphrase-multilingual": "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2", # Fast, 384 dim
# High quality models (768 dim) - Better accuracy
"multilingual-mpnet": "sentence-transformers/paraphrase-multilingual-mpnet-base-v2", # High quality, 768 dim, recommended
"vietnamese-sbert": "keepitreal/vietnamese-sbert-v2", # Vietnamese-specific (may require auth)
# Very high quality models (1024+ dim) - Best accuracy but slower
"multilingual-e5-large": "intfloat/multilingual-e5-large", # Very high quality, 1024 dim, large model
"multilingual-e5-base": "intfloat/multilingual-e5-base", # High quality, 768 dim, balanced
# Vietnamese-specific models (if available)
"vietnamese-embedding": "dangvantuan/vietnamese-embedding", # Vietnamese-specific (if available)
"vietnamese-bi-encoder": "bkai-foundation-models/vietnamese-bi-encoder", # Vietnamese bi-encoder (if available)
}
# Default embedding model for Vietnamese (can be overridden via env var)
# Use multilingual-mpnet as default - better quality than MiniLM, still reasonable size
# Can be set via EMBEDDING_MODEL env var (supports both short names and full model paths)
# Examples:
# - EMBEDDING_MODEL=multilingual-mpnet (uses short name)
# - EMBEDDING_MODEL=sentence-transformers/paraphrase-multilingual-mpnet-base-v2 (full path)
# - EMBEDDING_MODEL=/path/to/local/model (local model path)
# - EMBEDDING_MODEL=username/private-model (private HF model, requires HF_TOKEN)
DEFAULT_MODEL_NAME = os.environ.get(
"EMBEDDING_MODEL",
AVAILABLE_MODELS.get("multilingual-mpnet", "sentence-transformers/paraphrase-multilingual-mpnet-base-v2")
)
FALLBACK_MODEL_NAME = AVAILABLE_MODELS.get("paraphrase-multilingual", "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2")
# Cache for model instance
_model_cache: Optional[SentenceTransformer] = None
_cached_model_name: Optional[str] = None
def get_embedding_model(model_name: Optional[str] = None, force_reload: bool = False) -> Optional[SentenceTransformer]:
"""
Get or load embedding model instance.
Args:
model_name: Name of the model to load. Can be:
- Full model name (e.g., "keepitreal/vietnamese-sbert-v2")
- Short name (e.g., "vietnamese-sbert")
- None (uses DEFAULT_MODEL_NAME from env or default)
force_reload: Force reload model even if cached.
Returns:
SentenceTransformer instance or None if not available.
"""
global _model_cache, _cached_model_name
if not SENTENCE_TRANSFORMERS_AVAILABLE:
print("Warning: sentence-transformers not installed. Install with: pip install sentence-transformers")
return None
# Resolve model name (check if it's a short name)
resolved_model_name = model_name or DEFAULT_MODEL_NAME
if resolved_model_name in AVAILABLE_MODELS:
resolved_model_name = AVAILABLE_MODELS[resolved_model_name]
# Return cached model if same model and not forcing reload
if _model_cache is not None and _cached_model_name == resolved_model_name and not force_reload:
return _model_cache
# Load new model
try:
print(f"Loading embedding model: {resolved_model_name}")
# Check if it's a local path
model_path = Path(resolved_model_name)
if model_path.exists() and model_path.is_dir():
# Local model path
print(f"Loading local model from: {resolved_model_name}")
_model_cache = SentenceTransformer(str(model_path))
else:
# Hugging Face model (public or private)
hf_token = os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACE_TOKEN")
model_kwargs = {}
if hf_token:
print(f"Using Hugging Face token for model: {resolved_model_name}")
model_kwargs["token"] = hf_token
# Public model (or token provided)
_model_cache = SentenceTransformer(resolved_model_name, **model_kwargs)
_cached_model_name = resolved_model_name
# Get model dimension for info
try:
test_embedding = _model_cache.encode("test", show_progress_bar=False)
dim = len(test_embedding)
print(f"✅ Successfully loaded model: {resolved_model_name} (dimension: {dim})")
except Exception:
print(f"✅ Successfully loaded model: {resolved_model_name}")
return _model_cache
except Exception as e:
print(f"❌ Error loading model {resolved_model_name}: {e}")
if resolved_model_name != FALLBACK_MODEL_NAME:
print(f"Trying fallback model: {FALLBACK_MODEL_NAME}")
try:
_model_cache = SentenceTransformer(FALLBACK_MODEL_NAME)
_cached_model_name = FALLBACK_MODEL_NAME
test_embedding = _model_cache.encode("test", show_progress_bar=False)
dim = len(test_embedding)
print(f"✅ Successfully loaded fallback model: {FALLBACK_MODEL_NAME} (dimension: {dim})")
return _model_cache
except Exception as e2:
print(f"❌ Error loading fallback model: {e2}")
return None
def list_available_models() -> Dict[str, str]:
"""
List all available embedding models.
Returns:
Dictionary mapping short names to full model names.
"""
return AVAILABLE_MODELS.copy()
def compare_models(texts: List[str], model_names: Optional[List[str]] = None) -> Dict[str, Dict[str, float]]:
"""
Compare different embedding models on sample texts.
Args:
texts: List of sample texts to test.
model_names: List of model names to compare. If None, compares all available models.
Returns:
Dictionary with comparison results including:
- dimension: Embedding dimension
- encoding_time: Time to encode texts (seconds)
- avg_similarity: Average similarity between texts
"""
import time
if model_names is None:
model_names = list(AVAILABLE_MODELS.keys())
results = {}
for model_key in model_names:
if model_key not in AVAILABLE_MODELS:
continue
model_name = AVAILABLE_MODELS[model_key]
try:
model = get_embedding_model(model_name, force_reload=True)
if model is None:
continue
# Get dimension
dim = get_embedding_dimension(model_name)
# Measure encoding time
start_time = time.time()
embeddings = generate_embeddings_batch(texts, model=model)
encoding_time = time.time() - start_time
# Calculate average similarity
similarities = []
for i in range(len(embeddings)):
for j in range(i + 1, len(embeddings)):
if embeddings[i] is not None and embeddings[j] is not None:
sim = cosine_similarity(embeddings[i], embeddings[j])
similarities.append(sim)
avg_similarity = sum(similarities) / len(similarities) if similarities else 0.0
results[model_key] = {
"model_name": model_name,
"dimension": dim,
"encoding_time": encoding_time,
"avg_similarity": avg_similarity
}
except Exception as e:
print(f"Error comparing model {model_key}: {e}")
results[model_key] = {"error": str(e)}
return results
def generate_embedding(text: str, model: Optional[SentenceTransformer] = None) -> Optional[np.ndarray]:
"""
Generate embedding vector for a single text.
Args:
text: Input text to embed.
model: SentenceTransformer instance. If None, uses default model.
Returns:
Numpy array of embedding vector or None if error.
"""
if not text or not text.strip():
return None
if model is None:
model = get_embedding_model()
if model is None:
return None
try:
embedding = model.encode(text, normalize_embeddings=True, show_progress_bar=False)
return embedding
except Exception as e:
print(f"Error generating embedding: {e}")
return None
def generate_embeddings_batch(texts: List[str], model: Optional[SentenceTransformer] = None, batch_size: int = 32) -> List[Optional[np.ndarray]]:
"""
Generate embeddings for a batch of texts.
Args:
texts: List of input texts.
model: SentenceTransformer instance. If None, uses default model.
batch_size: Batch size for processing.
Returns:
List of numpy arrays (embeddings) or None for failed texts.
"""
if not texts:
return []
if model is None:
model = get_embedding_model()
if model is None:
return [None] * len(texts)
try:
embeddings = model.encode(
texts,
batch_size=batch_size,
normalize_embeddings=True,
show_progress_bar=True,
convert_to_numpy=True
)
return [emb for emb in embeddings]
except Exception as e:
print(f"Error generating batch embeddings: {e}")
return [None] * len(texts)
def cosine_similarity(vec1: np.ndarray, vec2: np.ndarray) -> float:
"""
Calculate cosine similarity between two vectors.
Args:
vec1: First vector.
vec2: Second vector.
Returns:
Cosine similarity score (0-1).
"""
if vec1 is None or vec2 is None:
return 0.0
dot_product = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
if norm1 == 0 or norm2 == 0:
return 0.0
return float(dot_product / (norm1 * norm2))
def get_embedding_dimension(model_name: Optional[str] = None) -> int:
"""
Get embedding dimension for a model.
Args:
model_name: Model name. If None, uses default.
Returns:
Embedding dimension or 0 if unknown.
"""
model = get_embedding_model(model_name)
if model is None:
return 0
# Get dimension by encoding a dummy text
try:
dummy_embedding = model.encode("test", show_progress_bar=False)
return len(dummy_embedding)
except Exception:
return 0