Spaces:
Running
Running
| """ | |
| FinBERT Embedding Pipeline. | |
| Extracts 768-dimensional CLS token vectors from ProsusAI/finbert and reduces | |
| them to compact representations via IncrementalPCA for downstream consumption | |
| by the Temporal Fusion Transformer. | |
| Usage (standalone backfill): | |
| python -m deep_learning.data.embeddings --backfill --days 180 | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import logging | |
| import struct | |
| from datetime import datetime, timedelta, timezone | |
| from functools import lru_cache | |
| from pathlib import Path | |
| from typing import Optional, Sequence | |
| import numpy as np | |
| logger = logging.getLogger(__name__) | |
| _EMBEDDING_DIM = 768 | |
| # --------------------------------------------------------------------------- | |
| # Lazy model loading | |
| # --------------------------------------------------------------------------- | |
| def _load_finbert_encoder(): | |
| """Load ProsusAI/finbert as a bare encoder (no classification head).""" | |
| import os | |
| os.environ.setdefault("TOKENIZERS_PARALLELISM", "false") | |
| from transformers import AutoModel, AutoTokenizer | |
| model_name = "ProsusAI/finbert" | |
| logger.info("Loading FinBERT encoder: %s", model_name) | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| model = AutoModel.from_pretrained(model_name) | |
| model.eval() | |
| logger.info("FinBERT encoder loaded (%d parameters)", sum(p.numel() for p in model.parameters())) | |
| return tokenizer, model | |
| # --------------------------------------------------------------------------- | |
| # Single / batch embedding extraction | |
| # --------------------------------------------------------------------------- | |
| def extract_embedding(text: str, *, max_length: int = 512) -> np.ndarray: | |
| """Return the [CLS] hidden-state vector (768-d, float32) for *text*.""" | |
| import torch | |
| tokenizer, model = _load_finbert_encoder() | |
| inputs = tokenizer( | |
| text, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=max_length, | |
| padding=True, | |
| ) | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| cls_vec: np.ndarray = outputs.last_hidden_state[:, 0, :].squeeze(0).cpu().numpy() | |
| return cls_vec.astype(np.float32) | |
| def extract_embeddings_batch( | |
| texts: Sequence[str], | |
| *, | |
| max_length: int = 512, | |
| batch_size: int = 32, | |
| ) -> np.ndarray: | |
| """ | |
| Vectorised extraction for a list of texts. | |
| Returns: | |
| ndarray of shape (N, 768) with float32 CLS vectors. | |
| """ | |
| import torch | |
| tokenizer, model = _load_finbert_encoder() | |
| all_vecs: list[np.ndarray] = [] | |
| for start in range(0, len(texts), batch_size): | |
| chunk = list(texts[start: start + batch_size]) | |
| inputs = tokenizer( | |
| chunk, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=max_length, | |
| padding=True, | |
| ) | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| cls_vecs = outputs.last_hidden_state[:, 0, :].cpu().numpy().astype(np.float32) | |
| all_vecs.append(cls_vecs) | |
| return np.concatenate(all_vecs, axis=0) | |
| # --------------------------------------------------------------------------- | |
| # PCA dimensionality reduction | |
| # --------------------------------------------------------------------------- | |
| def fit_pca( | |
| embeddings: np.ndarray, | |
| n_components: int = 32, | |
| save_path: Optional[str] = None, | |
| ) -> "IncrementalPCA": | |
| """Fit IncrementalPCA on a matrix of (N, 768) embeddings.""" | |
| from sklearn.decomposition import IncrementalPCA | |
| import joblib | |
| logger.info("Fitting PCA: %s -> %d components on %d samples", embeddings.shape, n_components, len(embeddings)) | |
| pca = IncrementalPCA(n_components=n_components) | |
| pca.fit(embeddings) | |
| explained = pca.explained_variance_ratio_.sum() | |
| logger.info("PCA explained variance: %.4f", explained) | |
| if save_path: | |
| path = Path(save_path) | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| joblib.dump(pca, path) | |
| logger.info("PCA model saved to %s", path) | |
| return pca | |
| def load_pca(path: str) -> "IncrementalPCA": | |
| import joblib | |
| return joblib.load(path) | |
| def reduce_embeddings( | |
| embeddings: np.ndarray, | |
| pca: "IncrementalPCA", | |
| ) -> np.ndarray: | |
| """Project (N, 768) embeddings down to (N, pca.n_components) via PCA.""" | |
| return pca.transform(embeddings).astype(np.float32) | |
| # --------------------------------------------------------------------------- | |
| # Serialisation helpers (DB storage as bytes) | |
| # --------------------------------------------------------------------------- | |
| def embedding_to_bytes(vec: np.ndarray) -> bytes: | |
| """Pack a 1-D float32 array into a compact binary blob.""" | |
| return vec.astype(np.float32).tobytes() | |
| def bytes_to_embedding(data: bytes, dim: int = _EMBEDDING_DIM) -> np.ndarray: | |
| """Unpack binary blob back to float32 array.""" | |
| return np.frombuffer(data, dtype=np.float32).copy() | |
| # --------------------------------------------------------------------------- | |
| # DB backfill logic | |
| # --------------------------------------------------------------------------- | |
| def backfill_embeddings( | |
| *, | |
| days: int = 180, | |
| pca_dim: int = 32, | |
| batch_size: int = 64, | |
| ) -> dict[str, int]: | |
| """ | |
| Compute and store FinBERT embeddings for all unprocessed news articles. | |
| Steps: | |
| 1. Query news_processed rows that lack a corresponding news_embeddings row. | |
| 2. Extract CLS embeddings in batches. | |
| 3. Fit or load PCA model. | |
| 4. Store reduced embeddings in news_embeddings table. | |
| """ | |
| from app.db import SessionLocal | |
| from app.models import NewsProcessed, NewsRaw | |
| from deep_learning.config import get_tft_config | |
| cfg = get_tft_config() | |
| pca_path = Path(cfg.embedding.pca_model_path) | |
| stats = {"embedded": 0, "skipped": 0, "pca_fitted": False} | |
| with SessionLocal() as session: | |
| from sqlalchemy import text as sa_text | |
| cutoff = datetime.now(timezone.utc) - timedelta(days=days) | |
| rows = ( | |
| session.query( | |
| NewsProcessed.id, | |
| NewsProcessed.cleaned_text, | |
| NewsRaw.title, | |
| NewsRaw.description, | |
| ) | |
| .join(NewsRaw, NewsProcessed.raw_id == NewsRaw.id) | |
| .filter(NewsRaw.published_at >= cutoff) | |
| .order_by(NewsProcessed.id.asc()) | |
| .all() | |
| ) | |
| if not rows: | |
| logger.info("No articles to embed") | |
| return stats | |
| logger.info("Found %d articles for embedding extraction", len(rows)) | |
| ids: list[int] = [] | |
| texts: list[str] = [] | |
| for row in rows: | |
| text = str(row.cleaned_text or f"{row.title} {row.description or ''}")[:1200] | |
| if len(text.strip()) < 10: | |
| stats["skipped"] += 1 | |
| continue | |
| ids.append(int(row.id)) | |
| texts.append(text) | |
| if not texts: | |
| return stats | |
| full_embeddings = extract_embeddings_batch(texts, batch_size=batch_size) | |
| logger.info("Extracted %d embeddings, shape=%s", len(full_embeddings), full_embeddings.shape) | |
| if pca_path.exists(): | |
| pca = load_pca(str(pca_path)) | |
| logger.info("Loaded existing PCA model from %s", pca_path) | |
| else: | |
| pca = fit_pca(full_embeddings, n_components=pca_dim, save_path=str(pca_path)) | |
| stats["pca_fitted"] = True | |
| reduced = reduce_embeddings(full_embeddings, pca) | |
| try: | |
| from app.models import NewsEmbedding | |
| except ImportError: | |
| logger.error("NewsEmbedding model not found - run DB migration first") | |
| return stats | |
| for idx, article_id in enumerate(ids): | |
| existing = session.query(NewsEmbedding).filter( | |
| NewsEmbedding.news_processed_id == article_id | |
| ).first() | |
| if existing: | |
| stats["skipped"] += 1 | |
| continue | |
| emb = NewsEmbedding( | |
| news_processed_id=article_id, | |
| embedding_full=embedding_to_bytes(full_embeddings[idx]), | |
| embedding_pca=embedding_to_bytes(reduced[idx]), | |
| pca_version=f"pca{pca_dim}_v1", | |
| ) | |
| session.add(emb) | |
| stats["embedded"] += 1 | |
| if stats["embedded"] % 200 == 0: | |
| session.commit() | |
| logger.info("Committed %d embeddings so far", stats["embedded"]) | |
| session.commit() | |
| logger.info("Embedding backfill complete: %s", stats) | |
| return stats | |
| # --------------------------------------------------------------------------- | |
| # Daily aggregation helper | |
| # --------------------------------------------------------------------------- | |
| def aggregate_daily_embeddings( | |
| article_embeddings: np.ndarray, | |
| weights: Optional[np.ndarray] = None, | |
| ) -> np.ndarray: | |
| """ | |
| Aggregate multiple article embeddings into a single daily vector. | |
| Uses confidence-weighted mean when weights are provided, | |
| otherwise a simple mean. | |
| """ | |
| if article_embeddings.ndim == 1: | |
| return article_embeddings | |
| if weights is not None: | |
| w = np.asarray(weights, dtype=np.float32).reshape(-1, 1) | |
| w = w / (w.sum() + 1e-9) | |
| return (article_embeddings * w).sum(axis=0).astype(np.float32) | |
| return article_embeddings.mean(axis=0).astype(np.float32) | |
| # --------------------------------------------------------------------------- | |
| # CLI | |
| # --------------------------------------------------------------------------- | |
| if __name__ == "__main__": | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") | |
| parser = argparse.ArgumentParser(description="FinBERT embedding backfill") | |
| parser.add_argument("--backfill", action="store_true") | |
| parser.add_argument("--days", type=int, default=180) | |
| parser.add_argument("--pca-dim", type=int, default=32) | |
| parser.add_argument("--batch-size", type=int, default=64) | |
| args = parser.parse_args() | |
| if args.backfill: | |
| from app.db import init_db | |
| init_db() | |
| backfill_embeddings(days=args.days, pca_dim=args.pca_dim, batch_size=args.batch_size) | |