| """ |
| Multimodal encoder: text + images β shared vector space. |
| |
| Uses CLIP (via sentence-transformers) to encode both text and images |
| into the same 512-dim space. This means: |
| - An image neuron and a text neuron can be compared directly |
| - Convergence loop works identically on any modality |
| - "Search for images similar to this text" = same spatial query |
| |
| The encoder is a pre-trained map, not our model. Invariant #1 holds. |
| We don't train it. We use it to project different modalities into |
| a shared space where our convergence/reasoning/generation operates. |
| |
| Modality field on neurons tracks what each neuron represents: |
| - "text" β word or sentence |
| - "image" β image region or full image |
| - "audio" β audio segment (future) |
| - "video" β video frame (future) |
| """ |
|
|
| from pathlib import Path |
| from typing import Optional |
|
|
| import numpy as np |
|
|
| try: |
| from sentence_transformers import SentenceTransformer, CrossEncoder |
| HAS_SBERT = True |
| except ImportError: |
| HAS_SBERT = False |
|
|
| try: |
| from PIL import Image |
| HAS_PIL = True |
| except ImportError: |
| HAS_PIL = False |
|
|
|
|
| |
| |
| CLIP_MODEL_NAME = "clip-ViT-B-32" |
| CLIP_DIM = 512 |
|
|
| |
| |
| MULTILINGUAL_MODEL_NAME = "paraphrase-multilingual-mpnet-base-v2" |
| MULTILINGUAL_DIM = 768 |
|
|
| |
| NLI_MODEL_NAME = "cross-encoder/nli-MiniLM2-L6-H768" |
|
|
|
|
| class MultimodalEncoder: |
| """ |
| Encodes text and images into a shared vector space via CLIP. |
| |
| Text: string β 512-dim vector |
| Image: file path or PIL Image β 512-dim vector |
| |
| Both modalities land in the same space β cosine similarity |
| between a text vector and an image vector is meaningful. |
| """ |
|
|
| def __init__(self, model_name: str = CLIP_MODEL_NAME, device: str = "cpu"): |
| if not HAS_SBERT: |
| raise ImportError( |
| "sentence-transformers required for multimodal encoder. " |
| "Install: pip install sentence-transformers" |
| ) |
| self.model_name = model_name |
| self.device = device |
| self._model = None |
| self.dim = CLIP_DIM |
|
|
| @property |
| def model(self): |
| """Lazy-load the CLIP model on first use.""" |
| if self._model is None: |
| self._model = SentenceTransformer(self.model_name, device=self.device) |
| return self._model |
|
|
| def encode_text(self, text: str) -> np.ndarray: |
| """Encode text into the shared vector space.""" |
| vec = self.model.encode(text, convert_to_numpy=True).astype(np.float32) |
| norm = np.linalg.norm(vec) |
| if norm > 0: |
| vec = vec / norm |
| return vec |
|
|
| def encode_image(self, image) -> np.ndarray: |
| """ |
| Encode an image into the shared vector space. |
| |
| Args: |
| image: file path (str/Path), PIL Image, or numpy array |
| |
| Returns: |
| Normalized 512-dim vector in the same space as text. |
| """ |
| if not HAS_PIL: |
| raise ImportError("Pillow required for image encoding. Install: pip install Pillow") |
|
|
| |
| if isinstance(image, (str, Path)): |
| img = Image.open(image).convert("RGB") |
| elif isinstance(image, np.ndarray): |
| img = Image.fromarray(image).convert("RGB") |
| else: |
| img = image |
|
|
| vec = self.model.encode(img, convert_to_numpy=True).astype(np.float32) |
| norm = np.linalg.norm(vec) |
| if norm > 0: |
| vec = vec / norm |
| return vec |
|
|
| def encode_batch_text(self, texts: list) -> np.ndarray: |
| """Batch encode multiple texts. Returns (N, 512) matrix.""" |
| vecs = self.model.encode(texts, convert_to_numpy=True, |
| batch_size=32).astype(np.float32) |
| norms = np.linalg.norm(vecs, axis=1, keepdims=True) |
| norms = np.maximum(norms, 1e-10) |
| return vecs / norms |
|
|
| def encode_batch_images(self, images: list) -> np.ndarray: |
| """Batch encode multiple images. Returns (N, 512) matrix.""" |
| if not HAS_PIL: |
| raise ImportError("Pillow required") |
|
|
| pil_images = [] |
| for img in images: |
| if isinstance(img, (str, Path)): |
| pil_images.append(Image.open(img).convert("RGB")) |
| elif isinstance(img, np.ndarray): |
| pil_images.append(Image.fromarray(img).convert("RGB")) |
| else: |
| pil_images.append(img) |
|
|
| vecs = self.model.encode(pil_images, convert_to_numpy=True, |
| batch_size=32).astype(np.float32) |
| norms = np.linalg.norm(vecs, axis=1, keepdims=True) |
| norms = np.maximum(norms, 1e-10) |
| return vecs / norms |
|
|
| def encode_audio(self, audio, sr: int = 16000) -> np.ndarray: |
| """ |
| Encode audio into the shared vector space via spectrogram β CLIP. |
| |
| Converts audio to a mel spectrogram image, then encodes that |
| image through CLIP. This puts audio in the same vector space |
| as text and images β a sound can be compared to a word or a photo. |
| |
| Args: |
| audio: file path (str/Path) or numpy array of audio samples |
| sr: sample rate (default 16kHz) |
| |
| Returns: |
| Normalized 512-dim vector in the shared space. |
| """ |
| import scipy.signal |
|
|
| |
| if isinstance(audio, (str, Path)): |
| import soundfile |
| samples, sr = soundfile.read(str(audio)) |
| if len(samples.shape) > 1: |
| samples = samples.mean(axis=1) |
| samples = samples.astype(np.float32) |
| else: |
| samples = np.array(audio, dtype=np.float32) |
|
|
| |
| |
| n_fft = 512 |
| hop = 160 |
| n_mels = 64 |
|
|
| |
| _, _, Zxx = scipy.signal.stft(samples, fs=sr, nperseg=n_fft, |
| noverlap=n_fft - hop) |
| power = np.abs(Zxx) ** 2 |
|
|
| |
| mel_basis = self._mel_filterbank(sr, n_fft, n_mels) |
| mel_spec = mel_basis @ power |
|
|
| |
| mel_spec = np.log1p(mel_spec * 1000) |
|
|
| |
| if mel_spec.max() > mel_spec.min(): |
| mel_norm = (mel_spec - mel_spec.min()) / (mel_spec.max() - mel_spec.min()) |
| else: |
| mel_norm = np.zeros_like(mel_spec) |
|
|
| mel_uint8 = (mel_norm * 255).astype(np.uint8) |
|
|
| |
| if not HAS_PIL: |
| raise ImportError("Pillow required") |
| img = Image.fromarray(mel_uint8, mode='L').convert('RGB') |
| img = img.resize((224, 224), Image.BILINEAR) |
|
|
| |
| return self.encode_image(img) |
|
|
| @staticmethod |
| def _mel_filterbank(sr: int, n_fft: int, n_mels: int) -> np.ndarray: |
| """Create a mel filterbank matrix.""" |
| def hz_to_mel(hz): |
| return 2595 * np.log10(1 + hz / 700) |
| def mel_to_hz(mel): |
| return 700 * (10 ** (mel / 2595) - 1) |
|
|
| low_mel = hz_to_mel(0) |
| high_mel = hz_to_mel(sr / 2) |
| mel_points = np.linspace(low_mel, high_mel, n_mels + 2) |
| hz_points = mel_to_hz(mel_points) |
| bin_points = np.floor((n_fft + 1) * hz_points / sr).astype(int) |
|
|
| n_freqs = n_fft // 2 + 1 |
| filterbank = np.zeros((n_mels, n_freqs)) |
|
|
| for i in range(n_mels): |
| left = bin_points[i] |
| center = bin_points[i + 1] |
| right = bin_points[i + 2] |
| for j in range(left, center): |
| if center > left: |
| filterbank[i, j] = (j - left) / (center - left) |
| for j in range(center, right): |
| if right > center: |
| filterbank[i, j] = (right - j) / (right - center) |
|
|
| return filterbank |
|
|
| def cross_modal_similarity(self, text: str, image) -> float: |
| """ |
| Compute similarity between text and image. |
| This is the core of multimodal reasoning β "how related |
| is this text to this image?" answered by spatial proximity |
| in the shared vector space. |
| """ |
| text_vec = self.encode_text(text) |
| image_vec = self.encode_image(image) |
| return float(np.dot(text_vec, image_vec)) |
|
|
|
|
| class UniversalEncoder: |
| """ |
| The complete encoder: multilingual text + images + audio + polarity. |
| |
| Combines: |
| - paraphrase-multilingual-mpnet-base-v2: text in 50+ languages (768-dim) |
| - CLIP ViT-B-32: images β 512-dim (projected to 768 for unified space) |
| - NLI cross-encoder: contradiction/entailment detection for polarity |
| |
| All modalities map to 768-dim. One vector space for everything. |
| One convergence loop reasons across all modalities and languages. |
| """ |
|
|
| def __init__(self, device: str = "cpu"): |
| if not HAS_SBERT: |
| raise ImportError("sentence-transformers required") |
| self.device = device |
| self.dim = MULTILINGUAL_DIM |
|
|
| |
| self._text_model = None |
| self._clip_model = None |
| self._nli_model = None |
|
|
| |
| self._clip_projection = None |
|
|
| @property |
| def text_model(self): |
| if self._text_model is None: |
| self._text_model = SentenceTransformer( |
| MULTILINGUAL_MODEL_NAME, device=self.device |
| ) |
| return self._text_model |
|
|
| @property |
| def clip_model(self): |
| if self._clip_model is None: |
| self._clip_model = SentenceTransformer( |
| CLIP_MODEL_NAME, device=self.device |
| ) |
| return self._clip_model |
|
|
| @property |
| def nli_model(self): |
| if self._nli_model is None: |
| self._nli_model = CrossEncoder(NLI_MODEL_NAME, device=self.device) |
| return self._nli_model |
|
|
| def encode_text(self, text: str) -> np.ndarray: |
| """Encode text (any of 50+ languages) β 768-dim vector.""" |
| vec = self.text_model.encode(text, convert_to_numpy=True).astype(np.float32) |
| norm = np.linalg.norm(vec) |
| if norm > 0: |
| vec = vec / norm |
| return vec |
|
|
| def encode_image(self, image) -> np.ndarray: |
| """Encode image β 768-dim vector (CLIP β projected to 768).""" |
| if not HAS_PIL: |
| raise ImportError("Pillow required") |
| if isinstance(image, (str, Path)): |
| img = Image.open(image).convert("RGB") |
| elif isinstance(image, np.ndarray): |
| img = Image.fromarray(image).convert("RGB") |
| else: |
| img = image |
|
|
| clip_vec = self.clip_model.encode(img, convert_to_numpy=True).astype(np.float32) |
| |
| |
| vec = np.zeros(self.dim, dtype=np.float32) |
| vec[:len(clip_vec)] = clip_vec |
| norm = np.linalg.norm(vec) |
| if norm > 0: |
| vec = vec / norm |
| return vec |
|
|
| def encode_audio(self, audio, sr: int = 16000) -> np.ndarray: |
| """Encode audio β 768-dim (spectrogram β CLIP β projected).""" |
| |
| clip_enc = MultimodalEncoder.__new__(MultimodalEncoder) |
| clip_enc._model = self.clip_model |
| clip_enc.model_name = CLIP_MODEL_NAME |
| clip_enc.device = self.device |
| clip_enc.dim = CLIP_DIM |
|
|
| clip_vec = clip_enc.encode_audio(audio, sr=sr) |
| vec = np.zeros(self.dim, dtype=np.float32) |
| vec[:len(clip_vec)] = clip_vec |
| norm = np.linalg.norm(vec) |
| if norm > 0: |
| vec = vec / norm |
| return vec |
|
|
| def detect_polarity(self, text_a: str, text_b: str) -> dict: |
| """ |
| Detect if two texts contradict, entail, or are neutral. |
| |
| Uses NLI cross-encoder. This is the polarity detection |
| that GloVe can't do β distinguishes "help" from "harm" |
| even when they share embedding space. |
| |
| Returns: |
| { |
| "label": "contradiction" | "entailment" | "neutral", |
| "scores": {"contradiction": float, "entailment": float, "neutral": float}, |
| } |
| """ |
| scores = self.nli_model.predict([(text_a, text_b)])[0] |
| labels = ["contradiction", "entailment", "neutral"] |
| label = labels[int(np.argmax(scores))] |
| return { |
| "label": label, |
| "scores": {l: float(s) for l, s in zip(labels, scores)}, |
| } |
|
|
| def encode_batch_text(self, texts: list) -> np.ndarray: |
| """Batch encode texts β (N, 768) matrix.""" |
| vecs = self.text_model.encode(texts, convert_to_numpy=True, |
| batch_size=32).astype(np.float32) |
| norms = np.linalg.norm(vecs, axis=1, keepdims=True) |
| return vecs / np.maximum(norms, 1e-10) |
|
|