File size: 4,327 Bytes
8812f42 dbb6988 dfd32d8 b249c92 dfd32d8 dbb6988 dfd32d8 8812f42 44013a5 dbb6988 3c6d6b3 aa8cb73 dbb6988 dfd32d8 dbb6988 8812f42 dfd32d8 8812f42 44013a5 aa8cb73 44013a5 dfd32d8 44013a5 dfd32d8 44013a5 dfd32d8 44013a5 dfd32d8 44013a5 b249c92 dbb6988 b249c92 44013a5 b249c92 44013a5 dbb6988 44013a5 dbb6988 44013a5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
from typing import List, Optional
import numpy as np
from loguru import logger
import httpx
from .config import get_settings
from .utils import timing_decorator_async, timing_decorator_sync, call_endpoint_with_retry
from .llm import LLMClient
from .gemini_client import GeminiClient
class EmbeddingClient:
def __init__(self):
"""
Khởi tạo EmbeddingClient.
Input: None
Output: EmbeddingClient instance.
"""
self._client = httpx.AsyncClient()
settings = get_settings()
self.provider = getattr(settings, 'embedding_provider', 'default')
self.model = getattr(settings, 'embedding_model', 'models/embedding-001')
self.gemini_client: Optional[GeminiClient] = GeminiClient() if self.provider == 'gemini' else None
logger.info(f"[EMBEDDING] Initialized with provider={self.provider}, model={self.model}")
@timing_decorator_async
async def create_embedding(self, text: str, task_type: str = "retrieval_query") -> List[float]:
"""
Tạo embedding vector từ text bằng dịch vụ embedding (ví dụ OpenAI hoặc Gemini).
Input: text (str)
Output: list[float] embedding vector.
"""
if self.provider == 'gemini':
if not self.gemini_client:
raise RuntimeError("GeminiClient is not initialized")
try:
# GeminiClient.create_embedding là hàm sync, chạy trong executor
import asyncio
loop = asyncio.get_event_loop()
gemini_client = self.gemini_client # type: ignore
# Luôn sử dụng model từ config, không phụ thuộc vào key/model từ RequestLimitManager
logger.info(f"[EMBEDDING] Creating embedding with model={self.model}, task_type={task_type}")
embedding = await loop.run_in_executor(None, lambda: gemini_client.create_embedding(text, model=self.model, task_type=task_type))
# Kiểm tra kiểu dữ liệu trả về
if isinstance(embedding, list):
preview = f"{embedding[:10]}...{embedding[-10:]}" if len(embedding) > 20 else str(embedding)
logger.info(f"[EMBEDDING] API response: {preview}")
return embedding
else:
logger.error(f"[EMBEDDING] Unknown embedding type: {type(embedding)} - value: {embedding}")
raise RuntimeError(f"Embedding returned unexpected type: {type(embedding)}")
except Exception as e:
logger.error(f"[EMBEDDING] Error creating embedding with Gemini: {e}")
raise
# Fallback to HuggingFace embedding
url = "https://vietcat-vietnameseembeddingv2.hf.space/embed"
payload = {"text": text}
try:
response = await call_endpoint_with_retry(self._client, url, payload)
if response is not None:
data = response.json()
logger.info(f"[EMBEDDING] HuggingFace API response: {data['embedding'][:10]}...{data['embedding'][-10:]}")
return data["embedding"]
else:
logger.error("[EMBEDDING] HuggingFace API response is None")
raise RuntimeError("HuggingFace API response is None")
except Exception as e:
logger.error(f"[EMBEDDING] Error creating embedding with HuggingFace: {e}")
raise
def cosine_similarity(self, embedding1: List[float], embedding2: List[float]) -> float:
"""
Tính cosine similarity giữa hai embedding.
Input: embedding1 (list[float]), embedding2 (list[float])
Output: float (giá trị similarity)
"""
try:
a = np.array(embedding1)
b = np.array(embedding2)
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
except Exception as e:
logger.error(f"[EMBEDDING] Error calculating similarity: {e}")
return 0.0
def get_embedding_model(self) -> str:
"""
Trả về model được config cho embedding.
Dùng để verify rằng model đúng được sử dụng.
"""
return self.model |