| """ |
| Vortex-Embed v2 — Retrieval-optimized LF4 static embedding model. |
| |
| Built on VTXAI/Vortex-Embed-4.7M (4-bit LF4 weights, 29 KB tokenizer). |
| All training-free upgrades: SIF IDF weighting, top-K principal component |
| removal, file-path header injection, and search-time file-extension score |
| bias. |
| |
| Key results (Webscout codebase, 5,168 chunks, 51 hand-verified queries): |
| |
| R@1 = 0.745 (baseline LF4: 0.314, +137%) |
| R@5 = 0.843 |
| R@10 = 0.882 |
| MRR = 0.779 |
| |
| Drop-in replacement for `LF4StaticEmbedding` from the v1 model. Same |
| weight format, same tokenizer, same embed dimension. New arguments are |
| optional and default to the v2 best configuration. |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import math |
| import re |
| from dataclasses import dataclass, field |
| from pathlib import Path |
| from typing import List, Optional, Sequence, Tuple, Union |
|
|
| import numpy as np |
| from safetensors.numpy import load_file, save_file |
|
|
| try: |
| from tokenizers import Tokenizer |
| except Exception: |
| Tokenizer = None |
|
|
|
|
| |
| |
| |
|
|
| _PATH_SEP_RE = re.compile(r"[_\-\.]+") |
|
|
|
|
| def _path_to_header_tokens(path: str) -> List[str]: |
| """Snake/kebab/dot-split a file path into semantic tokens. |
| |
| Returns the deduped list of directory parts + stem (with the file |
| extension stripped from the last part). Order is preserved. |
| |
| Example: |
| "llm4free/search/engines/duckduckgo_main.py" |
| -> ["llm4free", "search", "engines", "duckduckgo", "main"] |
| """ |
| p = Path(path) |
| parts = list(p.parts) |
| if parts and parts[0].startswith("."): |
| parts = parts[1:] |
| stem = p.stem |
| parts.append(stem) |
| suffix = p.suffix.lstrip(".").lower() |
| out: List[str] = [] |
| for part in parts: |
| for w in _PATH_SEP_RE.split(part): |
| wl = w.lower() |
| if wl and wl != suffix: |
| out.append(wl) |
| seen, dedup = set(), [] |
| for w in out: |
| if w not in seen: |
| seen.add(w) |
| dedup.append(w) |
| return dedup |
|
|
|
|
| |
| |
| |
|
|
|
|
| @dataclass |
| class VortexEmbedConfig: |
| """Configuration container mirroring the on-disk ``config.json``.""" |
|
|
| vocab_size: int = 29528 |
| embedding_dim: int = 256 |
| block_size: int = 32 |
| num_blocks: int = 8 |
| model_type: str = "vortex-embed" |
| architectures: List[str] = field(default_factory=lambda: ["VortexEmbedV2"]) |
| |
| sif_a: float = 1e-4 |
| sif_pc: float = 1.0 |
| pc_k: int = 8 |
| header_repeat: int = 15 |
| py_bonus: float = 0.05 |
| md_penalty: float = -0.02 |
| bias_top_k: int = 50 |
| quantization: str = "lf4" |
| bits: int = 4 |
|
|
| @classmethod |
| def from_dict(cls, d: dict) -> "VortexEmbedConfig": |
| |
| kw = {k: d[k] for k in d if k in cls.__dataclass_fields__} |
| return cls(**kw) |
|
|
| def to_dict(self) -> dict: |
| return {k: getattr(self, k) for k in self.__dataclass_fields__} |
|
|
|
|
| class VortexEmbedV2: |
| """Vortex-Embed v2 — retrieval-optimized LF4 static embedding. |
| |
| Pipeline at encode time (per chunk text): |
| 1. Augment: prepend path-header tokens × ``header_repeat`` |
| 2. Tokenize (HuggingFace fast tokenizer, same as v1) |
| 3. SIF IDF weighting on every token |
| 4. Sum tokens per chunk via ``torch.scatter_add_`` (CPU) |
| 5. Divide by SIF-weighted count |
| 6. Remove top-``pc_k`` principal components (fitted on corpus) |
| 7. L2-normalize |
| |
| Pipeline at search time (per query): |
| 1. Encode query with the same pipeline |
| 2. Cosine score against the index (``qn @ index.T``) |
| 3. Within top-``bias_top_k`` candidates, add a small per-extension |
| score bias (``+py_bonus`` for .py, ``+md_penalty`` for .md) to |
| break the ties where README.md / docs/*.md outrank code |
| |
| Args: |
| packed: ``uint8`` (vocab, dim//2) packed 4-bit weights. |
| scales: ``float16`` (vocab, num_blocks) per-block scales. |
| zeros: ``float16`` (vocab, num_blocks) per-block zero-points. |
| tokenizer_data: path to ``tokenizer.json`` or its raw JSON string. |
| config: configuration dict (or :class:`VortexEmbedConfig`). |
| precompute: if True, dequantize the full table to FP32 at load. |
| """ |
|
|
| def __init__( |
| self, |
| packed: np.ndarray, |
| scales: np.ndarray, |
| zeros: np.ndarray, |
| tokenizer_data: Union[str, Path], |
| config: Union[dict, VortexEmbedConfig], |
| *, |
| precompute: bool = True, |
| ) -> None: |
| self.packed = np.asarray(packed, dtype=np.uint8) |
| self.scales = np.asarray(scales, dtype=np.float16) |
| self.zeros = np.asarray(zeros, dtype=np.float16) |
| self.tokenizer_data = str(tokenizer_data) |
| if isinstance(config, dict): |
| self.config = VortexEmbedConfig.from_dict(config) |
| else: |
| self.config = config |
| self.vocab_size = int(self.config.vocab_size) |
| self.dim = int(self.config.embedding_dim) |
| self.block_size = int(self.config.block_size) |
| self.num_blocks = int(self.config.num_blocks) |
| |
| self.sif_a = float(self.config.sif_a) |
| self.sif_pc = float(self.config.sif_pc) |
| self.pc_k = int(self.config.pc_k) |
| self.header_repeat = int(self.config.header_repeat) |
| self.py_bonus = float(self.config.py_bonus) |
| self.md_penalty = float(self.config.md_penalty) |
| self.bias_top_k = int(self.config.bias_top_k) |
| |
| self._tokenizer: Optional[Tokenizer] = None |
| self._embedding_table: Optional[np.ndarray] = None |
| self._sif_weights: Optional[np.ndarray] = None |
| self._pc_directions: Optional[np.ndarray] = None |
| self._file_paths: Optional[List[str]] = None |
| self._chunk_is_py: Optional[np.ndarray] = None |
| self._chunk_is_md: Optional[np.ndarray] = None |
| self.cache_path: Optional[Path] = None |
| if precompute: |
| self._embedding_table = self._dequantize_all() |
|
|
| |
|
|
| @property |
| def tokenizer(self) -> Tokenizer: |
| if self._tokenizer is None: |
| if Tokenizer is None: |
| raise RuntimeError("tokenizers is required: install via `pip install tokenizers`") |
| self._tokenizer = Tokenizer.from_file(self.tokenizer_data) |
| return self._tokenizer |
|
|
| @property |
| def embedding_table(self) -> np.ndarray: |
| if self._embedding_table is None: |
| self._embedding_table = self._dequantize_all() |
| return self._embedding_table |
|
|
| @property |
| def model_size_mb(self) -> float: |
| if self._embedding_table is not None: |
| return self._embedding_table.nbytes / 1e6 |
| return (self.packed.nbytes + self.scales.nbytes + self.zeros.nbytes) / 1e6 |
|
|
| |
|
|
| @classmethod |
| def from_pretrained( |
| cls, |
| path_or_id: Union[str, Path], |
| *, |
| precompute: bool = True, |
| cache_path: Optional[Union[str, Path]] = None, |
| **overrides, |
| ) -> "VortexEmbedV2": |
| """Load from a local model directory or Hugging Face Hub id. |
| |
| Expected files in the directory: |
| - ``model.safetensors`` (LF4 packed weights) |
| - ``config.json`` (model + retrieval config) |
| - ``tokenizer.json`` |
| """ |
| path = Path(path_or_id) |
| if not path.is_dir(): |
| from huggingface_hub import snapshot_download |
| path = Path(snapshot_download(str(path_or_id))) |
| tensors = load_file(str(path / "model.safetensors")) |
| config = json.loads((path / "config.json").read_text()) |
| |
| for k, v in overrides.items(): |
| if k in VortexEmbedConfig.__dataclass_fields__: |
| config[k] = v |
| obj = cls( |
| packed=tensors["embedding_packed"], |
| scales=tensors["embedding_scales"], |
| zeros=tensors["embedding_zeros"], |
| tokenizer_data=str(path / "tokenizer.json"), |
| config=config, |
| precompute=precompute, |
| ) |
| if cache_path is not None: |
| obj.cache_path = Path(cache_path) |
| return obj |
|
|
| def save_pretrained(self, path: Union[str, Path]) -> None: |
| """Save weights + config + tokenizer to a local directory.""" |
| out = Path(path) |
| out.mkdir(parents=True, exist_ok=True) |
| save_file( |
| { |
| "embedding_packed": self.packed, |
| "embedding_scales": self.scales, |
| "embedding_zeros": self.zeros, |
| }, |
| str(out / "model.safetensors"), |
| ) |
| (out / "config.json").write_text( |
| json.dumps(self.config.to_dict(), indent=2) |
| ) |
| if not (out / "tokenizer.json").exists(): |
| (out / "tokenizer.json").write_text( |
| Path(self.tokenizer_data).read_text() |
| ) |
|
|
| |
|
|
| def _dequantize_all(self) -> np.ndarray: |
| """Dequantize the complete LF4 embedding table to FP32. |
| |
| Each output row is a 256-dim vector. Block-wise: for block b, |
| value = scale[b] * int4 + zero[b]. Int4 is stored as 2 nibbles |
| per byte (low / high). |
| """ |
| low = (self.packed & 0x0F).astype(np.float32) |
| high = ((self.packed >> 4) & 0x0F).astype(np.float32) |
| padded = self.packed.shape[1] * 2 |
| unpacked = np.empty((self.packed.shape[0], padded), dtype=np.float32) |
| unpacked[:, 0::2] = low |
| unpacked[:, 1::2] = high |
| blocked = unpacked.reshape(self.packed.shape[0], self.num_blocks, self.block_size) |
| scales = self.scales.astype(np.float32)[:, :, None] |
| zeros = self.zeros.astype(np.float32)[:, :, None] |
| out = (blocked * scales + zeros).reshape(self.packed.shape[0], padded) |
| return out[:, : self.dim] |
|
|
| def _dequantize_ids(self, token_ids: np.ndarray) -> np.ndarray: |
| """Dequantize a subset of rows by token id (fast path uses cache).""" |
| if self._embedding_table is not None: |
| return self._embedding_table[token_ids] |
| |
| unique = np.unique(token_ids) |
| packed = self.packed[unique] |
| low = (packed & 0x0F).astype(np.float32) |
| high = ((packed >> 4) & 0x0F).astype(np.float32) |
| padded = packed.shape[1] * 2 |
| unpacked = np.empty((packed.shape[0], padded), dtype=np.float32) |
| unpacked[:, 0::2] = low |
| unpacked[:, 1::2] = high |
| blocked = unpacked.reshape(packed.shape[0], self.num_blocks, self.block_size) |
| scales = self.scales[unique].astype(np.float32)[:, :, None] |
| zeros = self.zeros[unique].astype(np.float32)[:, :, None] |
| deq = (blocked * scales + zeros).reshape(packed.shape[0], padded)[:, : self.dim] |
| table = np.empty((self.vocab_size, self.dim), dtype=np.float32) |
| table[unique] = deq |
| self._embedding_table = table |
| return table[token_ids] |
|
|
| |
|
|
| def fit_idf(self, corpus_token_lists: Sequence[Sequence[int]]) -> "VortexEmbedV2": |
| """Compute SIF (Smoothed Inverse Frequency) weights from the corpus. |
| |
| weight(t) = a / (a + p(t)) where p(t) = count(t) / total_tokens. |
| |
| Tokens that never appear in the corpus get weight 1 (no down-weight). |
| Call once after tokenizing the corpus; reused for every encode. |
| """ |
| flat = (np.concatenate(corpus_token_lists) |
| if corpus_token_lists else np.empty(0, dtype=np.int64)) |
| total = max(int(flat.size), 1) |
| counts = np.bincount(flat, minlength=self.vocab_size).astype(np.float64) |
| p = counts / total |
| denom = self.sif_a + p |
| with np.errstate(divide="ignore", invalid="ignore"): |
| weights = np.where(p > 0, self.sif_a / denom, 1.0) |
| self._sif_weights = weights.astype(np.float32) |
| return self |
|
|
| def fit_pc(self, corpus_embeddings: np.ndarray, k: Optional[int] = None) -> "VortexEmbedV2": |
| """Compute the top-``k`` principal components of the corpus embeddings. |
| |
| These directions capture the dominant "common-topic" axis and are |
| removed from every chunk/query vector at encode time. SIF-style |
| trick from Arora et al. 2017. ``k=8`` is the v2 default. |
| """ |
| if k is None: |
| k = self.pc_k |
| if corpus_embeddings.size == 0 or k <= 0: |
| return self |
| x = corpus_embeddings.astype(np.float32) |
| x = x - x.mean(axis=0, keepdims=True) |
| try: |
| _, _, vt = np.linalg.svd(x, full_matrices=False) |
| pcs = vt[:k].astype(np.float32) |
| pcs = pcs / (np.linalg.norm(pcs, axis=1, keepdims=True) + 1e-12) |
| self._pc_directions = pcs |
| except np.linalg.LinAlgError: |
| self._pc_directions = None |
| return self |
|
|
| def _apply_pc(self, x: np.ndarray) -> np.ndarray: |
| if self.sif_pc <= 0 or self._pc_directions is None: |
| return x |
| out = x |
| for pc in self._pc_directions: |
| proj = (out @ pc)[:, None] * pc[None, :] |
| out = out - self.sif_pc * proj |
| return out |
|
|
| |
|
|
| def set_file_paths(self, file_paths: Sequence[str]) -> "VortexEmbedV2": |
| """Bind corpus file paths so encode() can prepend path headers. |
| |
| Also pre-classifies each chunk by extension so the search-time bias |
| can be applied in a tight loop without per-query re-classification. |
| """ |
| self._file_paths = list(file_paths) |
| if file_paths is None: |
| self._chunk_is_py = None |
| self._chunk_is_md = None |
| return self |
| self._chunk_is_py = np.fromiter( |
| (p.endswith(".py") for p in file_paths), dtype=bool, count=len(file_paths) |
| ) |
| self._chunk_is_md = np.fromiter( |
| (p.endswith(".md") for p in file_paths), dtype=bool, count=len(file_paths) |
| ) |
| return self |
|
|
| def _augment_texts(self, texts: Sequence[str]) -> List[str]: |
| if self._file_paths is None or len(self._file_paths) != len(texts): |
| return list(texts) |
| out: List[str] = [] |
| for text, path in zip(texts, self._file_paths): |
| header_tokens = _path_to_header_tokens(path) |
| if not header_tokens or self.header_repeat <= 0: |
| out.append(text) |
| continue |
| header = " ".join(header_tokens * self.header_repeat) |
| out.append(f"{header}\n{text}") |
| return out |
|
|
| |
|
|
| DEFAULT_MAX_CHARS_PER_TEXT = 50_000 |
| DEFAULT_MAX_TOKENS_PER_TEXT = 4096 |
| DEFAULT_MAX_TOKENS_PER_BATCH = 262_144 |
|
|
| def _tokenize_batch(self, texts: Sequence[str]) -> List[List[int]]: |
| encoded = self.tokenizer.encode_batch(list(texts)) |
| return [ |
| [tid for tid in item.ids if 0 <= int(tid) < self.vocab_size] |
| for item in encoded |
| ] |
|
|
| def _cap_inputs(self, texts: Sequence[str]) -> List[str]: |
| cap = self.DEFAULT_MAX_CHARS_PER_TEXT |
| if cap <= 0: |
| return list(texts) |
| out = [] |
| for t in texts: |
| if len(t) <= cap: |
| out.append(t) |
| else: |
| half = cap // 2 |
| out.append(t[:half] + t[-(cap - half):]) |
| return out |
|
|
| def _cap_token_lists(self, token_lists: List[List[int]]) -> List[List[int]]: |
| cap = self.DEFAULT_MAX_TOKENS_PER_TEXT |
| if cap <= 0: |
| return token_lists |
| out = [] |
| for ids in token_lists: |
| if len(ids) <= cap: |
| out.append(ids) |
| else: |
| half = cap // 2 |
| out.append(ids[:half] + ids[-(cap - half):]) |
| return out |
|
|
| @staticmethod |
| def _normalize_inplace(x: np.ndarray) -> None: |
| norms = np.linalg.norm(x, axis=1, keepdims=True) |
| np.divide(x, np.maximum(norms, 1e-12), out=x) |
|
|
| |
|
|
| def _encode_subbatch( |
| self, token_lists: Sequence[Sequence[int]], *, normalize: bool |
| ) -> np.ndarray: |
| n = len(token_lists) |
| flat = (np.concatenate(token_lists) |
| if token_lists else np.empty(0, dtype=np.int64)) |
| if flat.size == 0: |
| return np.zeros((n, self.dim), dtype=np.float32) |
|
|
| token_embs = self._dequantize_ids(flat) |
|
|
| if self._sif_weights is not None: |
| w = self._sif_weights[flat].astype(np.float32)[:, None] |
| token_embs = token_embs * w |
|
|
| import torch |
| ro = torch.from_numpy( |
| np.repeat(np.arange(n, dtype=np.int64), |
| [len(ids) for ids in token_lists]) |
| ) |
| em = torch.from_numpy(np.ascontiguousarray(token_embs)) |
| sums = torch.zeros((n, self.dim), dtype=torch.float32) |
| sums.index_add_(0, ro, em) |
|
|
| if self._sif_weights is not None: |
| w_flat = torch.from_numpy(self._sif_weights[flat]) |
| w_per_row = ro.bincount(minlength=n, weights=w_flat).clamp(min=1e-12) |
| else: |
| w_per_row = ro.bincount(minlength=n).clamp(min=1).to(torch.float32) |
|
|
| embeddings = (sums / w_per_row.unsqueeze(1)).numpy() |
| embeddings = self._apply_pc(embeddings) |
| if normalize: |
| self._normalize_inplace(embeddings) |
| return embeddings |
|
|
| def encode_batch( |
| self, |
| texts: Sequence[str], |
| *, |
| normalize: bool = True, |
| max_tokens_per_text: Optional[int] = None, |
| max_tokens_per_batch: Optional[int] = None, |
| max_chars_per_text: Optional[int] = None, |
| ) -> np.ndarray: |
| """Encode a list of texts into L2-normalized ``(len, dim)`` embeddings. |
| |
| Path-header augmentation runs first if file paths were bound via |
| :meth:`set_file_paths`. Token caps and sub-batching keep peak |
| memory bounded on large corpora. |
| """ |
| if not texts: |
| return np.zeros((0, self.dim), dtype=np.float32) |
|
|
| augmented = self._augment_texts(texts) |
| capped = self._cap_inputs(augmented) |
| token_lists = self._tokenize_batch(capped) |
| token_lists = self._cap_token_lists(token_lists) |
|
|
| cap_t = (self.DEFAULT_MAX_TOKENS_PER_TEXT |
| if max_tokens_per_text is None else int(max_tokens_per_text)) |
| cap_b = (self.DEFAULT_MAX_TOKENS_PER_BATCH |
| if max_tokens_per_batch is None else int(max_tokens_per_batch)) |
| _ = cap_t |
|
|
| total_tokens = sum(len(ids) for ids in token_lists) |
| if total_tokens == 0: |
| return np.zeros((len(texts), self.dim), dtype=np.float32) |
|
|
| |
| if total_tokens <= cap_b or len(texts) <= 1: |
| return self._encode_subbatch(token_lists, normalize=normalize) |
|
|
| |
| out = np.zeros((len(texts), self.dim), dtype=np.float32) |
| sub: List[List[int]] = [] |
| sub_tokens = 0 |
| sub_start = 0 |
| for i, ids in enumerate(token_lists): |
| if sub and (sub_tokens + len(ids) > cap_b): |
| out[sub_start:i] = self._encode_subbatch( |
| token_lists[sub_start:i], normalize=False |
| ) |
| sub_start = i |
| sub = [ids] |
| sub_tokens = len(ids) |
| else: |
| sub.append(ids) |
| sub_tokens += len(ids) |
| if sub: |
| out[sub_start:] = self._encode_subbatch( |
| token_lists[sub_start:], normalize=False |
| ) |
| if normalize: |
| self._normalize_inplace(out) |
| return out |
|
|
| def encode_batch_cached( |
| self, |
| texts: Sequence[str], |
| *, |
| normalize: bool = True, |
| cache_path: Optional[Union[str, Path]] = None, |
| **encode_kwargs, |
| ) -> np.ndarray: |
| """Encode with a SHA-1-keyed on-disk cache for fast re-indexing. |
| |
| Cache is keyed on the sorted SHA-1 of (texts, dim, tokenizer id). |
| On a hit, returns a fresh array without re-running the encode |
| pipeline. ``cache_path`` is a path prefix; the actual files are |
| ``{cache_path}.npy`` (embeddings) and ``{cache_path}.json`` (meta). |
| """ |
| if cache_path is None and self.cache_path is not None: |
| cache_path = self.cache_path |
| if cache_path is None: |
| return self.encode_batch(texts, normalize=normalize, **encode_kwargs) |
| cache_path = Path(cache_path) |
| cache_path.parent.mkdir(parents=True, exist_ok=True) |
| emb_path = cache_path.with_suffix(".npy") |
| meta_path = cache_path.with_suffix(".json") |
| import hashlib |
| h = hashlib.sha1() |
| h.update(f"{self.dim}|v2|{len(texts)}|".encode()) |
| for t in texts: |
| h.update(t.encode("utf-8", errors="replace")) |
| h.update(b"\x00") |
| fp = h.hexdigest() |
| if meta_path.exists() and emb_path.exists(): |
| try: |
| meta = json.loads(meta_path.read_text()) |
| if meta.get("fingerprint") == fp and meta.get("dim") == self.dim: |
| cached = np.load(emb_path, mmap_mode=None) |
| if cached.shape == (len(texts), self.dim): |
| return cached.copy() if normalize else cached |
| except Exception: |
| pass |
| emb = self.encode_batch(texts, normalize=normalize, **encode_kwargs) |
| np.save(emb_path, emb.astype(np.float32)) |
| meta_path.write_text(json.dumps({"fingerprint": fp, "dim": self.dim, "n": len(texts)})) |
| return emb |
|
|
| def encode(self, texts: Union[str, Sequence[str]], *, normalize: bool = True) -> np.ndarray: |
| """Encode one string or a list of strings. |
| |
| For a single string, returns a 1-D array of shape ``(dim,)``. |
| For a list, returns a 2-D array of shape ``(len, dim)``. |
| """ |
| if isinstance(texts, str): |
| return self.encode_batch([texts], normalize=normalize)[0] |
| return self.encode_batch(list(texts), normalize=normalize) |
|
|
| |
|
|
| def search( |
| self, |
| queries: np.ndarray, |
| index: np.ndarray, |
| top_k: int = 10, |
| *, |
| index_normalized: bool = True, |
| ) -> Tuple[np.ndarray, np.ndarray]: |
| """Cosine search with optional file-extension score bias. |
| |
| Returns ``(scores, indices)`` of shapes ``(Q, top_k)`` and |
| ``(Q, top_k)``. Indices are row indices into ``index``. |
| |
| Set ``index_normalized=False`` to have the index L2-normalized |
| in-place; otherwise it is assumed to be pre-normalized. |
| """ |
| queries = np.asarray(queries, dtype=np.float32) |
| index = np.asarray(index, dtype=np.float32) |
| if queries.ndim == 1: |
| queries = queries[None, :] |
| if not index_normalized: |
| index = index.copy() |
| self._normalize_inplace(index) |
| qn = queries.copy() |
| self._normalize_inplace(qn) |
|
|
| scores = qn @ index.T |
| n_docs = scores.shape[1] |
| k = min(int(top_k), n_docs) |
| if k <= 0: |
| return (np.empty((queries.shape[0], 0), dtype=np.float32), |
| np.empty((queries.shape[0], 0), dtype=np.int64)) |
|
|
| bias_pool = min(self.bias_top_k, n_docs) |
| if bias_pool >= n_docs: |
| order = np.argsort(-scores, axis=1) |
| else: |
| part = np.argpartition(-scores, bias_pool, axis=1)[:, :bias_pool] |
| ps = np.take_along_axis(scores, part, axis=1) |
| sub_order = np.argsort(-ps, axis=1) |
| order = np.take_along_axis(part, sub_order, axis=1) |
|
|
| |
| |
| |
| if self._chunk_is_py is not None or self._chunk_is_md is not None: |
| biased = scores.copy() |
| |
| chunk_bias = np.zeros(scores.shape[1], dtype=np.float32) |
| if self._chunk_is_py is not None: |
| chunk_bias += np.where(self._chunk_is_py, self.py_bonus, 0.0) |
| if self._chunk_is_md is not None: |
| chunk_bias += np.where(self._chunk_is_md, self.md_penalty, 0.0) |
| |
| |
| mask = np.zeros(scores.shape[1], dtype=bool) |
| for qi in range(scores.shape[0]): |
| mask[order[qi]] = True |
| chunk_bias = np.where(mask, chunk_bias, 0.0) |
| biased += chunk_bias[None, :] |
| scores = biased |
|
|
| if k == n_docs: |
| idx = np.argsort(-scores, axis=1)[:, :k] |
| else: |
| part = np.argpartition(-scores, kth=k, axis=1)[:, :k] |
| ps = np.take_along_axis(scores, part, axis=1) |
| order2 = np.argsort(-ps, axis=1) |
| idx = np.take_along_axis(part, order2, axis=1) |
| ordered_scores = np.take_along_axis(scores, idx, axis=1) |
| return (ordered_scores.astype(np.float32, copy=False), |
| idx.astype(np.int64, copy=False)) |
|
|