Spaces:
Running
Running
| from __future__ import annotations | |
| from typing import TYPE_CHECKING, Any | |
| from abc import ABC, abstractmethod | |
| from langchain_core.runnables.config import run_in_executor | |
| from pydantic import BaseModel, Field | |
| if TYPE_CHECKING: | |
| from collections.abc import Sequence | |
| class SparseVector(BaseModel, extra="forbid"): | |
| """Sparse vector structure.""" | |
| indices: list[int] = Field(..., description="indices must be unique") | |
| values: list[float] = Field( | |
| ..., description="values and indices must be the same length" | |
| ) | |
| class SparseEmbeddings(ABC): | |
| """An interface for sparse embedding models to use with Qdrant.""" | |
| def embed_documents(self, texts: list[str]) -> list[SparseVector]: | |
| """Embed search docs.""" | |
| def embed_query(self, text: str) -> SparseVector: | |
| """Embed query text.""" | |
| async def aembed_documents(self, texts: list[str]) -> list[SparseVector]: | |
| """Asynchronous Embed search docs.""" | |
| return await run_in_executor(None, self.embed_documents, texts) | |
| async def aembed_query(self, text: str) -> SparseVector: | |
| """Asynchronous Embed query text.""" | |
| return await run_in_executor(None, self.embed_query, text) | |
| class FastEmbedSparse(SparseEmbeddings): | |
| """An interface for sparse embedding models to use with Qdrant.""" | |
| def __init__( | |
| self, | |
| model_name: str = "Qdrant/bm25", | |
| batch_size: int = 256, | |
| cache_dir: str | None = None, | |
| threads: int | None = None, | |
| providers: Sequence[Any] | None = None, | |
| parallel: int | None = None, | |
| **kwargs: Any, | |
| ) -> None: | |
| """Sparse encoder implementation using FastEmbed. | |
| Uses [FastEmbed](https://qdrant.github.io/fastembed/) for sparse text | |
| embeddings. | |
| For a list of available models, see [the Qdrant docs](https://qdrant.github.io/fastembed/examples/Supported_Models/). | |
| Args: | |
| model_name (str): The name of the model to use. | |
| batch_size (int): Batch size for encoding. | |
| cache_dir (str, optional): The path to the model cache directory.\ | |
| Can also be set using the\ | |
| `FASTEMBED_CACHE_PATH` env variable. | |
| threads (int, optional): The number of threads onnxruntime session can use. | |
| providers (Sequence[Any], optional): List of ONNX execution providers.\ | |
| parallel (int, optional): If `>1`, data-parallel encoding will be used, r\ | |
| Recommended for encoding of large datasets.\ | |
| If `0`, use all available cores.\ | |
| If `None`, don't use data-parallel processing,\ | |
| use default onnxruntime threading instead.\ | |
| kwargs: Additional options to pass to `fastembed.SparseTextEmbedding` | |
| Raises: | |
| ValueError: If the `model_name` is not supported in `SparseTextEmbedding`. | |
| """ | |
| try: | |
| from fastembed import ( # type: ignore[import-not-found] # noqa: PLC0415 | |
| SparseTextEmbedding, | |
| ) | |
| except ImportError as err: | |
| msg = ( | |
| "The 'fastembed' package is not installed. " | |
| "Please install it with " | |
| "`pip install fastembed` or `pip install fastembed-gpu`." | |
| ) | |
| raise ValueError(msg) from err | |
| self._batch_size = batch_size | |
| self._parallel = parallel | |
| self._model = SparseTextEmbedding( | |
| model_name=model_name, | |
| cache_dir=cache_dir, | |
| threads=threads, | |
| providers=providers, | |
| **kwargs, | |
| ) | |
| def embed_documents(self, texts: list[str]) -> list[SparseVector]: | |
| results = self._model.embed( | |
| texts, batch_size=self._batch_size, parallel=self._parallel | |
| ) | |
| return [ | |
| SparseVector(indices=result.indices.tolist(), values=result.values.tolist()) | |
| for result in results | |
| ] | |
| def embed_query(self, text: str) -> SparseVector: | |
| result = next(self._model.embed(text)) | |
| return SparseVector( | |
| indices=result.indices.tolist(), values=result.values.tolist() | |
| ) |