File size: 5,915 Bytes
178b774 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 | """
Embedding generation using MTEB-leading models (<1B params).
Primary: Qwen/Qwen3-Embedding-0.6B (MTEB Eng v2: 70.70, Classification: 85.76)
Fallback: dunzhang/stella_en_400M_v5
"""
import logging
from pathlib import Path
from typing import Optional
import numpy as np
import pandas as pd
import torch
from tqdm import tqdm
from .config import (
EMBEDDING_BATCH_SIZE,
EMBEDDING_DIM,
EMBEDDING_FALLBACK,
EMBEDDING_MODEL,
OUTPUT_DIR,
)
log = logging.getLogger(__name__)
class TweetEmbedder:
"""
Generate dense embeddings for tweets using Qwen3-Embedding-0.6B.
Embeddings are useful for:
- SetFit few-shot classification
- Clustering / topic modeling
- Nearest-neighbor retrieval of similar tweets
- Dimensionality reduction visualization (UMAP/t-SNE)
"""
def __init__(
self,
model_name: Optional[str] = None,
device: Optional[str] = None,
embedding_dim: int = EMBEDDING_DIM,
):
self.model_name = model_name or EMBEDDING_MODEL
self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")
self.embedding_dim = embedding_dim
self._model = None
self._tokenizer = None
def load(self):
"""Load the embedding model."""
try:
log.info("Loading embedding model: %s", self.model_name)
self._load_model(self.model_name)
except Exception as e:
log.warning("Failed to load %s: %s. Trying fallback...", self.model_name, e)
self.model_name = EMBEDDING_FALLBACK
self._load_model(self.model_name)
log.info("Embedding model loaded on device=%s", self.device)
def _load_model(self, model_name: str):
"""Load model using sentence-transformers or transformers."""
try:
from sentence_transformers import SentenceTransformer
self._model = SentenceTransformer(model_name, device=self.device)
self._use_st = True
log.info("Using SentenceTransformer backend")
except Exception:
from transformers import AutoModel, AutoTokenizer
self._tokenizer = AutoTokenizer.from_pretrained(model_name)
self._model = AutoModel.from_pretrained(model_name).to(self.device).eval()
self._use_st = False
log.info("Using raw transformers backend")
def embed_texts(
self,
texts: list[str],
batch_size: int = EMBEDDING_BATCH_SIZE,
show_progress: bool = True,
instruction: str = "",
) -> np.ndarray:
"""
Generate embeddings for a list of texts.
Args:
texts: List of tweet texts
batch_size: Batch size for inference
show_progress: Show tqdm progress bar
instruction: Optional instruction prefix (for instruction-aware models like Qwen3)
Returns:
numpy array of shape (n_texts, embedding_dim)
"""
if self._model is None:
self.load()
if self._use_st:
# SentenceTransformer handles batching internally
if instruction and hasattr(self._model, "encode"):
# Qwen3 supports instruction-aware embeddings
embeddings = self._model.encode(
texts,
batch_size=batch_size,
show_progress_bar=show_progress,
prompt=instruction,
normalize_embeddings=True,
)
else:
embeddings = self._model.encode(
texts,
batch_size=batch_size,
show_progress_bar=show_progress,
normalize_embeddings=True,
)
return np.array(embeddings)
# Manual batching with transformers
all_embeddings = []
iterator = range(0, len(texts), batch_size)
if show_progress:
iterator = tqdm(iterator, desc="Embedding", leave=False)
for i in iterator:
batch = texts[i : i + batch_size]
if instruction:
batch = [f"{instruction}{t}" for t in batch]
encoded = self._tokenizer(
batch,
padding=True,
truncation=True,
max_length=512,
return_tensors="pt",
).to(self.device)
with torch.no_grad():
output = self._model(**encoded)
# Mean pooling over last hidden state
attention_mask = encoded["attention_mask"]
hidden = output.last_hidden_state
mask_expanded = attention_mask.unsqueeze(-1).expand(hidden.size()).float()
sum_embeddings = torch.sum(hidden * mask_expanded, dim=1)
sum_mask = torch.clamp(mask_expanded.sum(dim=1), min=1e-9)
embeddings = (sum_embeddings / sum_mask).cpu().numpy()
all_embeddings.append(embeddings)
return np.vstack(all_embeddings)
def embed_dataframe(
self,
df: pd.DataFrame,
text_col: str = "text",
batch_size: int = EMBEDDING_BATCH_SIZE,
save_path: Optional[str] = None,
) -> np.ndarray:
"""
Generate embeddings for all tweets in a DataFrame.
Optionally save to disk as .npy file.
"""
texts = df[text_col].tolist()
instruction = "Classify the sentiment and tone of this tweet: "
embeddings = self.embed_texts(
texts,
batch_size=batch_size,
instruction=instruction,
)
if save_path:
p = Path(save_path)
p.parent.mkdir(parents=True, exist_ok=True)
np.save(str(p), embeddings)
log.info("Saved embeddings (%s) to %s", embeddings.shape, p)
return embeddings
|