Spaces:
Sleeping
Sleeping
| """ | |
| Few-shot object classification using Nomic embed-vision-v1.5 + embed-text-v1.5 via ONNX Runtime. | |
| Same treatment as current PyTorch version: | |
| - vision refs -> average image embeddings | |
| - text prompts -> average text embeddings | |
| - combine with text_weight | |
| This version uses: | |
| - nomic-ai/nomic-embed-text-v1.5 -> ONNX | |
| - nomic-ai/nomic-embed-vision-v1.5 -> ONNX | |
| Transformers is used only for preprocessing: | |
| - AutoTokenizer | |
| - AutoImageProcessor | |
| """ | |
| import time | |
| from pathlib import Path | |
| import numpy as np | |
| import onnxruntime as ort | |
| from PIL import Image | |
| from huggingface_hub import hf_hub_download | |
| from transformers import AutoImageProcessor, AutoTokenizer | |
| from jina_fewshot import CLASS_PROMPTS, IMAGE_EXTS | |
| def _l2_normalize(x: np.ndarray, axis: int = -1, eps: float = 1e-12) -> np.ndarray: | |
| x = np.nan_to_num(x, nan=0.0, posinf=0.0, neginf=0.0) | |
| norms = np.linalg.norm(x, axis=axis, keepdims=True) | |
| norms = np.maximum(norms, eps) | |
| return (x / norms).astype(np.float32) | |
| def _mean_pool(last_hidden_state: np.ndarray, attention_mask: np.ndarray) -> np.ndarray: | |
| """ | |
| last_hidden_state: [B, T, D] | |
| attention_mask: [B, T] | |
| """ | |
| mask = attention_mask.astype(np.float32)[..., None] # [B, T, 1] | |
| summed = np.sum(last_hidden_state * mask, axis=1) | |
| denom = np.clip(np.sum(mask, axis=1), 1e-9, None) | |
| return summed / denom | |
| def _pick_output(outputs: list[np.ndarray], output_names: list[str], kind: str) -> np.ndarray: | |
| """ | |
| Try to find the main embedding tensor robustly. | |
| For both text and vision Nomic ONNX exports, we expect a 3D tensor [B, T, D] | |
| or sometimes a 2D tensor [B, D]. | |
| """ | |
| # Prefer names that look like hidden states / embeddings | |
| preferred_keywords = [ | |
| "last_hidden_state", | |
| "hidden_state", | |
| "sentence_embedding", | |
| "embedding", | |
| "embeddings", | |
| ] | |
| for kw in preferred_keywords: | |
| for i, name in enumerate(output_names): | |
| if kw in name.lower(): | |
| arr = outputs[i] | |
| if arr.ndim in (2, 3): | |
| return arr | |
| # Fallback: first 3D output, then first 2D output | |
| for arr in outputs: | |
| if arr.ndim == 3: | |
| return arr | |
| for arr in outputs: | |
| if arr.ndim == 2: | |
| return arr | |
| raise RuntimeError( | |
| f"Could not identify a usable {kind} ONNX output. " | |
| f"Output names={output_names}, shapes={[getattr(o, 'shape', None) for o in outputs]}" | |
| ) | |
| def _download_onnx_model(repo_id: str, filename: str = "onnx/model.onnx") -> str: | |
| print(f" Downloading ONNX model from {repo_id} ...") | |
| onnx_path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=filename, | |
| ) | |
| print(f" Downloaded: {onnx_path}") | |
| return onnx_path | |
| class NomicTextEncoderONNX: | |
| """ | |
| Nomic embed-text-v1.5 ONNX: | |
| text -> token embeddings / hidden states -> mean pool -> L2 normalize | |
| """ | |
| def __init__(self, device: str = "cuda"): | |
| self.device = device | |
| self.repo_id = "nomic-ai/nomic-embed-text-v1.5" | |
| print("[*] Loading nomic-embed-text-v1.5 (ONNX)...") | |
| t0 = time.perf_counter() | |
| onnx_path = _download_onnx_model(self.repo_id) | |
| available = ort.get_available_providers() | |
| if "CUDAExecutionProvider" in available and device == "cuda": | |
| providers = ["CUDAExecutionProvider", "CPUExecutionProvider"] | |
| else: | |
| providers = ["CPUExecutionProvider"] | |
| print(f" ONNX providers: {providers}") | |
| self.session = ort.InferenceSession(onnx_path, providers=providers) | |
| self.tokenizer = AutoTokenizer.from_pretrained(self.repo_id, trust_remote_code=True) | |
| self.input_names = [inp.name for inp in self.session.get_inputs()] | |
| self.output_names = [out.name for out in self.session.get_outputs()] | |
| print(f" ONNX inputs: {self.input_names}") | |
| print(f" ONNX outputs: {self.output_names}") | |
| self._ids_name = None | |
| self._mask_name = None | |
| self._token_type_name = None | |
| for name in self.input_names: | |
| nl = name.lower() | |
| if nl == "input_ids" or "input_ids" in nl: | |
| self._ids_name = name | |
| elif nl == "attention_mask" or "attention" in nl: | |
| self._mask_name = name | |
| elif nl == "token_type_ids" or "token_type" in nl: | |
| self._token_type_name = name | |
| print( | |
| f" Mapped: input_ids={self._ids_name}, " | |
| f"attention_mask={self._mask_name}, token_type_ids={self._token_type_name}" | |
| ) | |
| # Sanity check | |
| test = self.encode_texts(["a red square"]) | |
| nrm = float(np.linalg.norm(test[0])) | |
| print(f" [SANITY] text embed norm={nrm:.4f}") | |
| print(f"[*] Loaded in {time.perf_counter() - t0:.1f}s\n") | |
| def encode_texts(self, texts: list[str]) -> np.ndarray: | |
| prefixed = [f"classification: {t}" for t in texts] | |
| tokens = self.tokenizer( | |
| prefixed, | |
| padding=True, | |
| truncation=True, | |
| return_tensors="np", | |
| max_length=512, | |
| ) | |
| input_ids = np.asarray(tokens["input_ids"], dtype=np.int64) | |
| attention_mask = np.asarray(tokens["attention_mask"], dtype=np.int64) | |
| feeds = {} | |
| if self._ids_name is not None: | |
| feeds[self._ids_name] = input_ids | |
| if self._mask_name is not None: | |
| feeds[self._mask_name] = attention_mask | |
| if self._token_type_name is not None: | |
| feeds[self._token_type_name] = np.zeros_like(input_ids, dtype=np.int64) | |
| outputs = self.session.run(self.output_names, feeds) | |
| main_out = _pick_output(outputs, self.output_names, kind="text") | |
| # Current PyTorch behavior: mean-pool last_hidden_state | |
| if main_out.ndim == 3: | |
| embs = _mean_pool(main_out, attention_mask) | |
| elif main_out.ndim == 2: | |
| embs = main_out | |
| else: | |
| raise RuntimeError(f"Unexpected text output rank: {main_out.ndim}") | |
| return _l2_normalize(embs, axis=1) | |
| class NomicVisionEncoderONNX: | |
| """ | |
| Nomic embed-vision-v1.5 ONNX: | |
| image -> hidden states -> CLS token -> L2 normalize | |
| """ | |
| def __init__(self, device: str = "cuda"): | |
| self.device = device | |
| self.repo_id = "nomic-ai/nomic-embed-vision-v1.5" | |
| print("[*] Loading nomic-embed-vision-v1.5 (ONNX)...") | |
| t0 = time.perf_counter() | |
| onnx_path = _download_onnx_model(self.repo_id) | |
| available = ort.get_available_providers() | |
| if "CUDAExecutionProvider" in available and device == "cuda": | |
| providers = ["CUDAExecutionProvider", "CPUExecutionProvider"] | |
| else: | |
| providers = ["CPUExecutionProvider"] | |
| print(f" ONNX providers: {providers}") | |
| self.session = ort.InferenceSession(onnx_path, providers=providers) | |
| self.processor = AutoImageProcessor.from_pretrained(self.repo_id, trust_remote_code=True) | |
| self.input_names = [inp.name for inp in self.session.get_inputs()] | |
| self.output_names = [out.name for out in self.session.get_outputs()] | |
| print(f" ONNX inputs: {self.input_names}") | |
| print(f" ONNX outputs: {self.output_names}") | |
| self._pixel_name = None | |
| for name in self.input_names: | |
| if "pixel" in name.lower(): | |
| self._pixel_name = name | |
| break | |
| print(f" Mapped: pixel_values={self._pixel_name}") | |
| # Sanity check | |
| dummy = Image.new("RGB", (224, 224), color=(255, 0, 0)) | |
| test = self.encode_images([dummy]) | |
| nrm = float(np.linalg.norm(test[0])) | |
| print(f" [SANITY] image embed norm={nrm:.4f}") | |
| print(f"[*] Loaded in {time.perf_counter() - t0:.1f}s\n") | |
| def encode_images(self, images: list[Image.Image]) -> np.ndarray: | |
| rgb = [img.convert("RGB") for img in images] | |
| processed = self.processor(images=rgb, return_tensors="pt") | |
| if "pixel_values" not in processed: | |
| raise RuntimeError(f"Processor did not return pixel_values. Keys={list(processed.keys())}") | |
| pixel_values = processed["pixel_values"].detach().cpu().numpy().astype(np.float32) | |
| if self._pixel_name is None: | |
| raise RuntimeError(f"Could not find pixel input in ONNX inputs: {self.input_names}") | |
| outputs = self.session.run(self.output_names, {self._pixel_name: pixel_values}) | |
| main_out = _pick_output(outputs, self.output_names, kind="vision") | |
| if main_out.ndim == 3: | |
| embs = main_out[:, 0, :] | |
| elif main_out.ndim == 2: | |
| embs = main_out | |
| else: | |
| raise RuntimeError(f"Unexpected vision output rank: {main_out.ndim}") | |
| return _l2_normalize(embs, axis=1) | |
| def build_refs_nomic( | |
| encoder: NomicVisionEncoderONNX, | |
| refs_dir: Path, | |
| batch_size: int = 16, | |
| text_encoder: NomicTextEncoderONNX | None = None, | |
| text_weight: float = 0.3, | |
| ): | |
| """ | |
| Build one ref embedding per class. | |
| Same treatment as Jina: | |
| - average reference image embeddings | |
| - average class prompt text embeddings | |
| - combine with text_weight | |
| """ | |
| class_dirs = sorted(d for d in refs_dir.iterdir() if d.is_dir()) | |
| if not class_dirs: | |
| raise ValueError(f"No subfolders in {refs_dir}") | |
| labels = [] | |
| embeddings = [] | |
| if text_encoder is not None: | |
| print(f" Text weight: {text_weight:.1f} | Image weight: {1 - text_weight:.1f}\n") | |
| for d in class_dirs: | |
| name = d.name | |
| paths = sorted(str(p) for p in d.iterdir() if p.suffix.lower() in IMAGE_EXTS) | |
| if not paths: | |
| continue | |
| all_embs = [] | |
| for i in range(0, len(paths), batch_size): | |
| batch = [Image.open(p).convert("RGB") for p in paths[i:i + batch_size]] | |
| all_embs.append(encoder.encode_images(batch)) | |
| img_embs = np.concatenate(all_embs, axis=0) | |
| img_avg = np.nan_to_num(img_embs.mean(axis=0), nan=0.0, posinf=0.0, neginf=0.0) | |
| img_avg = img_avg / (np.linalg.norm(img_avg) + 1e-12) | |
| if text_encoder is not None: | |
| prompts = CLASS_PROMPTS.get(name, [f"a {name}", f"a person holding a {name}"]) | |
| text_embs = text_encoder.encode_texts(prompts) | |
| text_avg = np.nan_to_num(text_embs.mean(axis=0), nan=0.0, posinf=0.0, neginf=0.0) | |
| text_avg = text_avg / (np.linalg.norm(text_avg) + 1e-12) | |
| combined = (1.0 - text_weight) * img_avg + text_weight * text_avg | |
| combined = np.nan_to_num(combined, nan=0.0, posinf=0.0, neginf=0.0) | |
| combined = combined / (np.linalg.norm(combined) + 1e-12) | |
| labels.append(name) | |
| embeddings.append(combined) | |
| sim = float(np.dot(img_avg, text_avg)) | |
| print( | |
| f" {name:<14}: {len(paths)} imgs + {len(prompts)} prompts | " | |
| f"img-text sim: {sim:.4f}" | |
| ) | |
| else: | |
| labels.append(name) | |
| embeddings.append(img_avg) | |
| print(f" {name:<14}: {len(paths)} imgs") | |
| return labels, np.stack(embeddings).astype(np.float32) | |
| NomicTextEncoder = NomicTextEncoderONNX | |
| NomicVisionEncoder = NomicVisionEncoderONNX |