Spaces:
Sleeping
Sleeping
| import httpx | |
| from typing import List | |
| import logging | |
| logger = logging.getLogger("embedding-client") | |
| class EmbeddingClient: | |
| def __init__(self, base_url: str): | |
| self.base_url = base_url.rstrip("/") | |
| async def encode(self, texts: List[str]) -> List[List[float]]: | |
| try: | |
| async with httpx.AsyncClient(timeout=30) as client: | |
| response = await client.post( | |
| f"{self.base_url}/embed", | |
| json={"texts": texts} | |
| ) | |
| response.raise_for_status() | |
| return response.json()["embeddings"] | |
| except Exception as e: | |
| logger.exception("Embedding service call failed") | |
| raise RuntimeError("EmbeddingServiceError") from e | |