| """ |
| sentiment_deploy_ensemble.py |
| ============================ |
| |
| Self-contained, picklable deployment wrapper for the Route C ENSEMBLE |
| sentiment classifier (log-averaged BERTweet + twitter-roberta with per-class |
| threshold offsets), compatible with the case-manual API template. |
| |
| The API loads a single ``*.model`` pickle and expects a dict:: |
| |
| {"vectorizer": <obj with .transform(list[str])>, |
| "classifier": <obj with .predict(X)>} |
| |
| A two-model HuggingFace ensemble does not fit that interface, so this module |
| provides two adapters that replicate the notebook's scoring path EXACTLY: |
| |
| * ``EnsembleVectorizer`` -- pass-through "vectorizer". Applies the same light |
| cleaning used at training time (HTML strip + whitespace collapse) and returns |
| the cleaned strings. ``fit``/``fit_transform`` are no-ops, so the wrapper is |
| safe even if the API template calls ``fit_transform`` at inference time. |
| |
| * ``EnsembleClassifier`` -- holds the fine-tuned weights, config and tokenizer |
| files of BOTH members *inside the pickle* (no external paths), plus the |
| per-class additive offsets fit on validation. At inference it reproduces the |
| notebook's final decision rule: |
| |
| pred = argmax( mean_m logsoftmax(logits_m) + offsets ) |
| |
| and maps internal class indices {0,1,2} -> API labels {-1, 0, 1}. |
| |
| Each member is fed text in its OWN native form (BERTweet: raw cleaned, its |
| tokenizer normalises @USER/HTTPURL internally; twitter-roberta: mentions->@user, |
| links->http) so serve-time inputs match training-time inputs per model. |
| |
| IMPORTANT (pickle/__main__ caveat): the API loads the pickle in a SEPARATE |
| process, so the classes referenced by the pickle must be importable there. |
| Defining them in THIS module (not in a notebook's __main__) is what makes the |
| round-trip work. Ship ``sentiment_deploy_ensemble.py`` alongside ``app.py``. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import os |
| import tempfile |
| from typing import List, Sequence |
|
|
| |
| |
| INDEX_TO_API_LABEL = {0: -1, 1: 0, 2: 1} |
|
|
|
|
| |
| |
| |
| def normalize_text(x) -> str: |
| """Light, rule-based cleaning applied identically at train and serve time. |
| |
| Strips HTML (reviews contain markup) and collapses whitespace. Mention/URL |
| normalisation is delegated to each model's own preprocessing so train and |
| serve stay consistent. |
| """ |
| if x is None: |
| return "" |
| x = str(x) |
| if "<" in x and ">" in x: |
| try: |
| from bs4 import BeautifulSoup |
| x = BeautifulSoup(x, "html.parser").get_text(separator=" ") |
| except Exception: |
| pass |
| x = " ".join(x.split()) |
| return x |
|
|
|
|
| def cardiff_preprocess(text) -> str: |
| """twitter-roberta was trained with mentions -> '@user' and links -> 'http'. |
| |
| Applied ONLY to the roberta member (BERTweet's tokenizer does its own |
| @USER/HTTPURL normalisation), matching the notebook's ``_prep_texts``. |
| """ |
| out = [] |
| for tok in str(text).split(" "): |
| if tok.startswith("@") and len(tok) > 1: |
| tok = "@user" |
| elif tok.startswith("http"): |
| tok = "http" |
| out.append(tok) |
| return " ".join(out) |
|
|
|
|
| def _prep_for_member(texts: Sequence[str], is_bertweet: bool) -> List[str]: |
| if is_bertweet: |
| return list(texts) |
| return [cardiff_preprocess(t) for t in texts] |
|
|
|
|
| |
| |
| |
| class EnsembleVectorizer: |
| """Pass-through 'vectorizer'. Tokenisation happens inside the classifier.""" |
|
|
| def fit(self, X=None, y=None): |
| return self |
|
|
| def transform(self, X: Sequence[str]) -> List[str]: |
| if isinstance(X, str): |
| X = [X] |
| return [normalize_text(t) for t in X] |
|
|
| def fit_transform(self, X: Sequence[str], y=None) -> List[str]: |
| return self.transform(X) |
|
|
|
|
| |
| |
| |
| |
| class EnsembleClassifier: |
| """Self-contained, picklable log-averaged ensemble classifier. |
| |
| Parameters |
| ---------- |
| members : list of dicts, each:: |
| {"name": str, |
| "is_bertweet": bool, |
| "tokenizer_kwargs": dict, # e.g. {"normalization": True, "use_fast": False} |
| "config": transformers config, |
| "state_dict": dict[str, cpu tensor], |
| "tokenizer_files": dict[str, bytes]} |
| offsets : sequence of 3 floats |
| Per-class additive offsets (fit on validation in the notebook), |
| applied to the averaged log-probabilities before argmax. |
| max_length : int |
| batch_size : int |
| """ |
|
|
| def __init__(self, members=None, offsets=(0.0, 0.0, 0.0), |
| max_length: int = 128, batch_size: int = 64): |
| self.max_length = int(max_length) |
| self.batch_size = int(batch_size) |
| self.index_to_api = dict(INDEX_TO_API_LABEL) |
| self.offsets = [float(o) for o in offsets] |
| |
| self._members = members if members is not None else [] |
| |
| self._built = None |
|
|
| |
| def __getstate__(self): |
| return { |
| "max_length": self.max_length, |
| "batch_size": self.batch_size, |
| "index_to_api": self.index_to_api, |
| "offsets": self.offsets, |
| "_members": self._members, |
| } |
|
|
| def __setstate__(self, state): |
| self.__dict__.update(state) |
| self._built = None |
|
|
| |
| def _ensure(self): |
| if self._built is not None: |
| return |
| import torch |
| from transformers import (AutoModelForSequenceClassification, |
| AutoTokenizer) |
|
|
| self._device = "cuda" if torch.cuda.is_available() else "cpu" |
| built = [] |
| for m in self._members: |
| |
| tokdir = tempfile.mkdtemp(prefix="ens_tok_") |
| for name, data in m["tokenizer_files"].items(): |
| with open(os.path.join(tokdir, name), "wb") as fh: |
| fh.write(data) |
| tok = AutoTokenizer.from_pretrained(tokdir, **m.get("tokenizer_kwargs", {})) |
|
|
| |
| model = AutoModelForSequenceClassification.from_config(m["config"]) |
| model.load_state_dict(m["state_dict"]) |
| model.to(self._device) |
| model.eval() |
|
|
| built.append({"tok": tok, "model": model, |
| "is_bertweet": bool(m.get("is_bertweet", False))}) |
| self._built = built |
|
|
| |
| @staticmethod |
| def _log_softmax(z): |
| import numpy as np |
| z = np.asarray(z, dtype=np.float64) |
| z = z - z.max(axis=1, keepdims=True) |
| return z - np.log(np.exp(z).sum(axis=1, keepdims=True)) |
|
|
| def _member_logits(self, member, texts): |
| """Forward pass for one member -> raw logits array (n, 3).""" |
| import numpy as np |
| import torch |
| prepped = _prep_for_member(texts, member["is_bertweet"]) |
| tok, model = member["tok"], member["model"] |
| chunks = [] |
| for i in range(0, len(prepped), self.batch_size): |
| batch = prepped[i:i + self.batch_size] |
| enc = tok(batch, max_length=self.max_length, truncation=True, |
| padding=True, return_tensors="pt") |
| enc = {k: v.to(self._device) for k, v in enc.items()} |
| with torch.no_grad(): |
| logits = model(**enc).logits |
| chunks.append(logits.detach().cpu().numpy()) |
| return np.vstack(chunks).astype(np.float64) |
|
|
| def predict(self, X: Sequence[str]): |
| """Return a list of API labels in {-1, 0, 1} for the input texts.""" |
| import numpy as np |
| if isinstance(X, str): |
| X = [X] |
| texts = [normalize_text(t) for t in X] |
| if len(texts) == 0: |
| return [] |
| self._ensure() |
|
|
| |
| lp_sum = None |
| for member in self._built: |
| lp = self._log_softmax(self._member_logits(member, texts)) |
| lp_sum = lp if lp_sum is None else lp_sum + lp |
| ens = lp_sum / len(self._built) |
|
|
| ens = ens + np.asarray(self.offsets, dtype=np.float64) |
| idx = ens.argmax(axis=1) |
| return [int(self.index_to_api[int(j)]) for j in idx] |
|
|
| |
| def predict_proba(self, X: Sequence[str]): |
| import numpy as np |
| if isinstance(X, str): |
| X = [X] |
| texts = [normalize_text(t) for t in X] |
| self._ensure() |
| lp_sum = None |
| for member in self._built: |
| lp = self._log_softmax(self._member_logits(member, texts)) |
| lp_sum = lp if lp_sum is None else lp_sum + lp |
| ens = lp_sum / len(self._built) + np.asarray(self.offsets, dtype=np.float64) |
| ens = ens - ens.max(axis=1, keepdims=True) |
| p = np.exp(ens) |
| return p / p.sum(axis=1, keepdims=True) |
|
|