""" sentiment_deploy_ensemble.py ============================ Self-contained, picklable deployment wrapper for the Route C ENSEMBLE sentiment classifier (log-averaged BERTweet + twitter-roberta with per-class threshold offsets), compatible with the case-manual API template. The API loads a single ``*.model`` pickle and expects a dict:: {"vectorizer": , "classifier": } A two-model HuggingFace ensemble does not fit that interface, so this module provides two adapters that replicate the notebook's scoring path EXACTLY: * ``EnsembleVectorizer`` -- pass-through "vectorizer". Applies the same light cleaning used at training time (HTML strip + whitespace collapse) and returns the cleaned strings. ``fit``/``fit_transform`` are no-ops, so the wrapper is safe even if the API template calls ``fit_transform`` at inference time. * ``EnsembleClassifier`` -- holds the fine-tuned weights, config and tokenizer files of BOTH members *inside the pickle* (no external paths), plus the per-class additive offsets fit on validation. At inference it reproduces the notebook's final decision rule: pred = argmax( mean_m logsoftmax(logits_m) + offsets ) and maps internal class indices {0,1,2} -> API labels {-1, 0, 1}. Each member is fed text in its OWN native form (BERTweet: raw cleaned, its tokenizer normalises @USER/HTTPURL internally; twitter-roberta: mentions->@user, links->http) so serve-time inputs match training-time inputs per model. IMPORTANT (pickle/__main__ caveat): the API loads the pickle in a SEPARATE process, so the classes referenced by the pickle must be importable there. Defining them in THIS module (not in a notebook's __main__) is what makes the round-trip work. Ship ``sentiment_deploy_ensemble.py`` alongside ``app.py``. """ from __future__ import annotations import os import tempfile from typing import List, Sequence # Internal index -> API label. Training uses 0=Negative, 1=Neutral, 2=Positive. # The API/case-manual label space is -1=Negative, 0=Neutral, 1=Positive. INDEX_TO_API_LABEL = {0: -1, 1: 0, 2: 1} # --------------------------------------------------------------------------- # # Text preprocessing (must match the training notebook bit-for-bit) # --------------------------------------------------------------------------- # def normalize_text(x) -> str: """Light, rule-based cleaning applied identically at train and serve time. Strips HTML (reviews contain markup) and collapses whitespace. Mention/URL normalisation is delegated to each model's own preprocessing so train and serve stay consistent. """ if x is None: return "" x = str(x) if "<" in x and ">" in x: # only pay BeautifulSoup cost when markup is likely try: from bs4 import BeautifulSoup x = BeautifulSoup(x, "html.parser").get_text(separator=" ") except Exception: pass x = " ".join(x.split()) # collapse all whitespace runs return x def cardiff_preprocess(text) -> str: """twitter-roberta was trained with mentions -> '@user' and links -> 'http'. Applied ONLY to the roberta member (BERTweet's tokenizer does its own @USER/HTTPURL normalisation), matching the notebook's ``_prep_texts``. """ out = [] for tok in str(text).split(" "): if tok.startswith("@") and len(tok) > 1: tok = "@user" elif tok.startswith("http"): tok = "http" out.append(tok) return " ".join(out) def _prep_for_member(texts: Sequence[str], is_bertweet: bool) -> List[str]: if is_bertweet: return list(texts) # tokenizer normalises internally return [cardiff_preprocess(t) for t in texts] # --------------------------------------------------------------------------- # # Vectorizer (pass-through, for API compatibility) # --------------------------------------------------------------------------- # class EnsembleVectorizer: """Pass-through 'vectorizer'. Tokenisation happens inside the classifier.""" def fit(self, X=None, y=None): return self def transform(self, X: Sequence[str]) -> List[str]: if isinstance(X, str): X = [X] return [normalize_text(t) for t in X] def fit_transform(self, X: Sequence[str], y=None) -> List[str]: return self.transform(X) # --------------------------------------------------------------------------- # # Classifier (holds BOTH members + offsets; rebuilds lazily; never pickles # live torch objects) # --------------------------------------------------------------------------- # class EnsembleClassifier: """Self-contained, picklable log-averaged ensemble classifier. Parameters ---------- members : list of dicts, each:: {"name": str, "is_bertweet": bool, "tokenizer_kwargs": dict, # e.g. {"normalization": True, "use_fast": False} "config": transformers config, "state_dict": dict[str, cpu tensor], "tokenizer_files": dict[str, bytes]} offsets : sequence of 3 floats Per-class additive offsets (fit on validation in the notebook), applied to the averaged log-probabilities before argmax. max_length : int batch_size : int """ def __init__(self, members=None, offsets=(0.0, 0.0, 0.0), max_length: int = 128, batch_size: int = 64): self.max_length = int(max_length) self.batch_size = int(batch_size) self.index_to_api = dict(INDEX_TO_API_LABEL) self.offsets = [float(o) for o in offsets] # Picklable member payloads (already CPU tensors / raw bytes). self._members = members if members is not None else [] # Live objects rebuilt lazily; never pickled. self._built = None # ---- (de)serialisation ------------------------------------------------ # def __getstate__(self): return { "max_length": self.max_length, "batch_size": self.batch_size, "index_to_api": self.index_to_api, "offsets": self.offsets, "_members": self._members, } def __setstate__(self, state): self.__dict__.update(state) self._built = None # ---- lazy rebuild ----------------------------------------------------- # def _ensure(self): if self._built is not None: return import torch from transformers import (AutoModelForSequenceClassification, AutoTokenizer) self._device = "cuda" if torch.cuda.is_available() else "cpu" built = [] for m in self._members: # tokenizer: dump bytes to a temp dir, then load with member kwargs tokdir = tempfile.mkdtemp(prefix="ens_tok_") for name, data in m["tokenizer_files"].items(): with open(os.path.join(tokdir, name), "wb") as fh: fh.write(data) tok = AutoTokenizer.from_pretrained(tokdir, **m.get("tokenizer_kwargs", {})) # model: rebuild from config + state_dict (no hub download) model = AutoModelForSequenceClassification.from_config(m["config"]) model.load_state_dict(m["state_dict"]) model.to(self._device) model.eval() built.append({"tok": tok, "model": model, "is_bertweet": bool(m.get("is_bertweet", False))}) self._built = built # ---- inference -------------------------------------------------------- # @staticmethod def _log_softmax(z): import numpy as np z = np.asarray(z, dtype=np.float64) z = z - z.max(axis=1, keepdims=True) return z - np.log(np.exp(z).sum(axis=1, keepdims=True)) def _member_logits(self, member, texts): """Forward pass for one member -> raw logits array (n, 3).""" import numpy as np import torch prepped = _prep_for_member(texts, member["is_bertweet"]) tok, model = member["tok"], member["model"] chunks = [] for i in range(0, len(prepped), self.batch_size): batch = prepped[i:i + self.batch_size] enc = tok(batch, max_length=self.max_length, truncation=True, padding=True, return_tensors="pt") enc = {k: v.to(self._device) for k, v in enc.items()} with torch.no_grad(): logits = model(**enc).logits chunks.append(logits.detach().cpu().numpy()) return np.vstack(chunks).astype(np.float64) def predict(self, X: Sequence[str]): """Return a list of API labels in {-1, 0, 1} for the input texts.""" import numpy as np if isinstance(X, str): X = [X] texts = [normalize_text(t) for t in X] if len(texts) == 0: return [] self._ensure() # mean of per-member log-softmax (= log geometric mean of probabilities) lp_sum = None for member in self._built: lp = self._log_softmax(self._member_logits(member, texts)) lp_sum = lp if lp_sum is None else lp_sum + lp ens = lp_sum / len(self._built) ens = ens + np.asarray(self.offsets, dtype=np.float64) # threshold tuning idx = ens.argmax(axis=1) return [int(self.index_to_api[int(j)]) for j in idx] # convenience: averaged ensemble probabilities (post-offset, softmaxed) def predict_proba(self, X: Sequence[str]): import numpy as np if isinstance(X, str): X = [X] texts = [normalize_text(t) for t in X] self._ensure() lp_sum = None for member in self._built: lp = self._log_softmax(self._member_logits(member, texts)) lp_sum = lp if lp_sum is None else lp_sum + lp ens = lp_sum / len(self._built) + np.asarray(self.offsets, dtype=np.float64) ens = ens - ens.max(axis=1, keepdims=True) p = np.exp(ens) return p / p.sum(axis=1, keepdims=True)