Ensemble_threshold_MG / sentiment_deploy_ensemble.py
Vo Nhu Tu Anh
Upload 6 files
d76c0bc verified
Raw
History Blame Contribute Delete
10.1 kB
"""
sentiment_deploy_ensemble.py
============================
Self-contained, picklable deployment wrapper for the Route C ENSEMBLE
sentiment classifier (log-averaged BERTweet + twitter-roberta with per-class
threshold offsets), compatible with the case-manual API template.
The API loads a single ``*.model`` pickle and expects a dict::
{"vectorizer": <obj with .transform(list[str])>,
"classifier": <obj with .predict(X)>}
A two-model HuggingFace ensemble does not fit that interface, so this module
provides two adapters that replicate the notebook's scoring path EXACTLY:
* ``EnsembleVectorizer`` -- pass-through "vectorizer". Applies the same light
cleaning used at training time (HTML strip + whitespace collapse) and returns
the cleaned strings. ``fit``/``fit_transform`` are no-ops, so the wrapper is
safe even if the API template calls ``fit_transform`` at inference time.
* ``EnsembleClassifier`` -- holds the fine-tuned weights, config and tokenizer
files of BOTH members *inside the pickle* (no external paths), plus the
per-class additive offsets fit on validation. At inference it reproduces the
notebook's final decision rule:
pred = argmax( mean_m logsoftmax(logits_m) + offsets )
and maps internal class indices {0,1,2} -> API labels {-1, 0, 1}.
Each member is fed text in its OWN native form (BERTweet: raw cleaned, its
tokenizer normalises @USER/HTTPURL internally; twitter-roberta: mentions->@user,
links->http) so serve-time inputs match training-time inputs per model.
IMPORTANT (pickle/__main__ caveat): the API loads the pickle in a SEPARATE
process, so the classes referenced by the pickle must be importable there.
Defining them in THIS module (not in a notebook's __main__) is what makes the
round-trip work. Ship ``sentiment_deploy_ensemble.py`` alongside ``app.py``.
"""
from __future__ import annotations
import os
import tempfile
from typing import List, Sequence
# Internal index -> API label. Training uses 0=Negative, 1=Neutral, 2=Positive.
# The API/case-manual label space is -1=Negative, 0=Neutral, 1=Positive.
INDEX_TO_API_LABEL = {0: -1, 1: 0, 2: 1}
# --------------------------------------------------------------------------- #
# Text preprocessing (must match the training notebook bit-for-bit)
# --------------------------------------------------------------------------- #
def normalize_text(x) -> str:
"""Light, rule-based cleaning applied identically at train and serve time.
Strips HTML (reviews contain markup) and collapses whitespace. Mention/URL
normalisation is delegated to each model's own preprocessing so train and
serve stay consistent.
"""
if x is None:
return ""
x = str(x)
if "<" in x and ">" in x: # only pay BeautifulSoup cost when markup is likely
try:
from bs4 import BeautifulSoup
x = BeautifulSoup(x, "html.parser").get_text(separator=" ")
except Exception:
pass
x = " ".join(x.split()) # collapse all whitespace runs
return x
def cardiff_preprocess(text) -> str:
"""twitter-roberta was trained with mentions -> '@user' and links -> 'http'.
Applied ONLY to the roberta member (BERTweet's tokenizer does its own
@USER/HTTPURL normalisation), matching the notebook's ``_prep_texts``.
"""
out = []
for tok in str(text).split(" "):
if tok.startswith("@") and len(tok) > 1:
tok = "@user"
elif tok.startswith("http"):
tok = "http"
out.append(tok)
return " ".join(out)
def _prep_for_member(texts: Sequence[str], is_bertweet: bool) -> List[str]:
if is_bertweet:
return list(texts) # tokenizer normalises internally
return [cardiff_preprocess(t) for t in texts]
# --------------------------------------------------------------------------- #
# Vectorizer (pass-through, for API compatibility)
# --------------------------------------------------------------------------- #
class EnsembleVectorizer:
"""Pass-through 'vectorizer'. Tokenisation happens inside the classifier."""
def fit(self, X=None, y=None):
return self
def transform(self, X: Sequence[str]) -> List[str]:
if isinstance(X, str):
X = [X]
return [normalize_text(t) for t in X]
def fit_transform(self, X: Sequence[str], y=None) -> List[str]:
return self.transform(X)
# --------------------------------------------------------------------------- #
# Classifier (holds BOTH members + offsets; rebuilds lazily; never pickles
# live torch objects)
# --------------------------------------------------------------------------- #
class EnsembleClassifier:
"""Self-contained, picklable log-averaged ensemble classifier.
Parameters
----------
members : list of dicts, each::
{"name": str,
"is_bertweet": bool,
"tokenizer_kwargs": dict, # e.g. {"normalization": True, "use_fast": False}
"config": transformers config,
"state_dict": dict[str, cpu tensor],
"tokenizer_files": dict[str, bytes]}
offsets : sequence of 3 floats
Per-class additive offsets (fit on validation in the notebook),
applied to the averaged log-probabilities before argmax.
max_length : int
batch_size : int
"""
def __init__(self, members=None, offsets=(0.0, 0.0, 0.0),
max_length: int = 128, batch_size: int = 64):
self.max_length = int(max_length)
self.batch_size = int(batch_size)
self.index_to_api = dict(INDEX_TO_API_LABEL)
self.offsets = [float(o) for o in offsets]
# Picklable member payloads (already CPU tensors / raw bytes).
self._members = members if members is not None else []
# Live objects rebuilt lazily; never pickled.
self._built = None
# ---- (de)serialisation ------------------------------------------------ #
def __getstate__(self):
return {
"max_length": self.max_length,
"batch_size": self.batch_size,
"index_to_api": self.index_to_api,
"offsets": self.offsets,
"_members": self._members,
}
def __setstate__(self, state):
self.__dict__.update(state)
self._built = None
# ---- lazy rebuild ----------------------------------------------------- #
def _ensure(self):
if self._built is not None:
return
import torch
from transformers import (AutoModelForSequenceClassification,
AutoTokenizer)
self._device = "cuda" if torch.cuda.is_available() else "cpu"
built = []
for m in self._members:
# tokenizer: dump bytes to a temp dir, then load with member kwargs
tokdir = tempfile.mkdtemp(prefix="ens_tok_")
for name, data in m["tokenizer_files"].items():
with open(os.path.join(tokdir, name), "wb") as fh:
fh.write(data)
tok = AutoTokenizer.from_pretrained(tokdir, **m.get("tokenizer_kwargs", {}))
# model: rebuild from config + state_dict (no hub download)
model = AutoModelForSequenceClassification.from_config(m["config"])
model.load_state_dict(m["state_dict"])
model.to(self._device)
model.eval()
built.append({"tok": tok, "model": model,
"is_bertweet": bool(m.get("is_bertweet", False))})
self._built = built
# ---- inference -------------------------------------------------------- #
@staticmethod
def _log_softmax(z):
import numpy as np
z = np.asarray(z, dtype=np.float64)
z = z - z.max(axis=1, keepdims=True)
return z - np.log(np.exp(z).sum(axis=1, keepdims=True))
def _member_logits(self, member, texts):
"""Forward pass for one member -> raw logits array (n, 3)."""
import numpy as np
import torch
prepped = _prep_for_member(texts, member["is_bertweet"])
tok, model = member["tok"], member["model"]
chunks = []
for i in range(0, len(prepped), self.batch_size):
batch = prepped[i:i + self.batch_size]
enc = tok(batch, max_length=self.max_length, truncation=True,
padding=True, return_tensors="pt")
enc = {k: v.to(self._device) for k, v in enc.items()}
with torch.no_grad():
logits = model(**enc).logits
chunks.append(logits.detach().cpu().numpy())
return np.vstack(chunks).astype(np.float64)
def predict(self, X: Sequence[str]):
"""Return a list of API labels in {-1, 0, 1} for the input texts."""
import numpy as np
if isinstance(X, str):
X = [X]
texts = [normalize_text(t) for t in X]
if len(texts) == 0:
return []
self._ensure()
# mean of per-member log-softmax (= log geometric mean of probabilities)
lp_sum = None
for member in self._built:
lp = self._log_softmax(self._member_logits(member, texts))
lp_sum = lp if lp_sum is None else lp_sum + lp
ens = lp_sum / len(self._built)
ens = ens + np.asarray(self.offsets, dtype=np.float64) # threshold tuning
idx = ens.argmax(axis=1)
return [int(self.index_to_api[int(j)]) for j in idx]
# convenience: averaged ensemble probabilities (post-offset, softmaxed)
def predict_proba(self, X: Sequence[str]):
import numpy as np
if isinstance(X, str):
X = [X]
texts = [normalize_text(t) for t in X]
self._ensure()
lp_sum = None
for member in self._built:
lp = self._log_softmax(self._member_logits(member, texts))
lp_sum = lp if lp_sum is None else lp_sum + lp
ens = lp_sum / len(self._built) + np.asarray(self.offsets, dtype=np.float64)
ens = ens - ens.max(axis=1, keepdims=True)
p = np.exp(ens)
return p / p.sum(axis=1, keepdims=True)