SignalMod / src /service /model_service.py
Mirae Kang
feat: implement new models and improve UI, #23
46cc63a
raw
history blame
12.7 kB
"""Centralized toxicity prediction service."""
from __future__ import annotations
import re
import sys
from pathlib import Path
from typing import Any, Optional
import joblib
from src.service.meta_stack_predictor import MetaStackPredictor
from src.service.model_catalog import load_model_catalog
AVAILABLE_MODELS: dict[str, dict[str, Any]] = load_model_catalog()
_DEFAULT_MODEL_NAME = next(
(
name
for name, cfg in AVAILABLE_MODELS.items()
if cfg.get("production_default")
),
next(iter(AVAILABLE_MODELS.keys())),
)
_HF_DEPS_MSG = "Install HF deps: uv sync --extra hf"
_LFS_POINTER_PREFIX = "version https://git-lfs"
_MIN_LOCAL_HF_WEIGHTS_BYTES = 1_000_000
def _is_lfs_pointer_file(path: Path) -> bool:
try:
if path.stat().st_size > 4096:
return False
head = path.read_text(encoding="utf-8", errors="ignore")[:80]
return head.startswith(_LFS_POINTER_PREFIX)
except OSError:
return False
def local_hf_weights_ok(model_dir: Path) -> tuple[bool, str | None]:
"""Verify a local HF folder has real weight files (not Git LFS pointers)."""
if not model_dir.is_dir():
return False, f"Model not found at {model_dir}."
for weights_name in ("model.safetensors", "pytorch_model.bin"):
weights = model_dir / weights_name
if not weights.is_file():
continue
if _is_lfs_pointer_file(weights):
return False, (
"Weights missing (Git LFS pointer only). "
"Run: uv run python scripts/materialize_finetuned_weights.py "
"(or: brew install git-lfs && git lfs pull)"
)
size = weights.stat().st_size
if size < _MIN_LOCAL_HF_WEIGHTS_BYTES:
return False, (
f"{weights_name} is too small ({size} bytes). "
"Run: uv run python scripts/materialize_finetuned_weights.py"
)
return True, None
return False, "No model.safetensors or pytorch_model.bin in model directory."
def hf_deps_available() -> bool:
try:
import transformers # noqa: F401
return True
except ImportError:
return False
def check_model_availability(name: str, project_root: Path | None = None) -> tuple[bool, str | None]:
"""Return (available, reason) for a catalog model name."""
cfg = AVAILABLE_MODELS.get(name)
if not cfg:
return False, "Unknown model"
root = project_root or Path.cwd()
model_type = cfg.get("type", "local")
if model_type == "local":
rel = cfg.get("model_path")
if rel and (root / rel).is_file():
return True, None
models_dir = root / "models"
if any((models_dir / n).exists() for n in (
"final_model.joblib",
"lr_tuned.joblib",
"lr_baseline.joblib",
"best_ensemble.joblib",
)):
return True, None
if (models_dir / "baseline" / "lr_tfidf.joblib").is_file():
return True, None
return False, f"No model in {models_dir}"
if model_type == "meta_stack":
bundle = cfg.get("model_path", "models/production_final/meta_stack_final.joblib")
if not (root / bundle).is_file():
return False, f"Meta-stack bundle not found at {bundle}"
if not hf_deps_available():
return False, _HF_DEPS_MSG
return True, None
if model_type == "hf_local":
if not hf_deps_available():
return False, _HF_DEPS_MSG
path = root / cfg["model_path"]
ok, reason = local_hf_weights_ok(path)
if ok:
return True, None
if cfg.get("hub_fallback"):
return True, reason
return False, reason
if model_type == "hf_remote":
if not hf_deps_available():
return False, _HF_DEPS_MSG
return True, None
return False, f"Unsupported model type: {model_type}"
HF_LABEL_MAP = {
"toxic": "Toxic",
"severe_toxic": "Severely offensive",
"obscene": "Obscene",
"threat": "Threat",
"insult": "Insult",
"identity_hate": "Identity hate",
"label_1": "Toxic",
}
_KEYWORD_LABELS = {
"Insult": ["idiot", "stupid", "dumb", "fool", "moron", "loser"],
"Identity hate": ["thug", "racist", "race", "criminal"],
"Threat": ["kill", "shoot", "die", "dead", "hurt", "attack"],
"Obscene": ["fuck", "shit", "ass", "bitch", "cunt", "bastard"],
"Aggression": ["hate", "despise", "disgusting", "pathetic", "worthless"],
}
def _labels_from_keywords(text: str, probability: float) -> list[str]:
t = text.lower()
found = [lbl for lbl, kws in _KEYWORD_LABELS.items() if any(k in t for k in kws)]
return found if found else (["Offensive content"] if probability >= 0.5 else [])
class _FallbackPreprocessor:
_SW = {
"the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for",
"of", "with", "is", "it", "this", "that", "are", "was", "be", "have",
"has", "he", "she", "they", "we", "you", "i", "not", "do", "did",
"will", "can", "would", "should", "could", "from", "by", "as", "if",
}
def transform(self, text: str) -> str:
t = re.sub(r"http\S+|www\.\S+|@\w+", " ", str(text).lower())
t = re.sub(r"[^\x00-\x7F]+", " ", t)
t = re.sub(r"[^a-z\s]", " ", t)
t = re.sub(r"\s+", " ", t).strip()
return " ".join(w for w in t.split() if w not in self._SW and len(w) > 2)
class ModelService:
def __init__(self, model_name: str, project_root: Optional[Path] = None):
self.model_name = model_name
resolved = AVAILABLE_MODELS.get(model_name)
if resolved is None:
resolved = AVAILABLE_MODELS.get(_DEFAULT_MODEL_NAME) or next(
iter(AVAILABLE_MODELS.values())
)
self.cfg = resolved
self.project_root = project_root or Path.cwd()
self._model = None
self._preprocessor = None
self._meta_stack: MetaStackPredictor | None = None
def _get_model(self):
if self._model is None and self.cfg["type"] != "meta_stack":
t = self.cfg["type"]
if t == "local":
self._load_local()
elif t == "hf_remote":
self._load_hf(self.cfg["model_id"])
elif t == "hf_local":
path = self.project_root / self.cfg["model_path"]
ok, _reason = local_hf_weights_ok(path)
if ok:
self._load_hf(str(path))
elif self.cfg.get("hub_fallback"):
self._load_hf(self.cfg["hub_fallback"])
else:
raise FileNotFoundError(_reason or f"Model not found at {path}.")
else:
raise ValueError(f"Unsupported model type: {t}")
if self.cfg["type"] == "meta_stack" and self._meta_stack is None:
self._load_meta_stack()
return self._model
def _load_local(self) -> None:
rel = self.cfg.get("model_path")
if rel:
p = self.project_root / rel
if p.is_file():
self._model = joblib.load(p)
if self._model is None:
for name in (
"final_model.joblib",
"lr_tuned.joblib",
"lr_baseline.joblib",
"best_ensemble.joblib",
):
p = self.project_root / "models" / name
if p.exists():
self._model = joblib.load(p)
break
if self._model is None:
baseline = self.project_root / "models" / "baseline" / "lr_tfidf.joblib"
if baseline.is_file():
self._model = joblib.load(baseline)
if self._model is None:
raise FileNotFoundError(f"No model in {self.project_root / 'models'}")
def _load_meta_stack(self) -> None:
bundle_rel = self.cfg.get("model_path", "models/production_final/meta_stack_final.joblib")
manifest_rel = self.cfg.get("manifest_path", "models/production_final/manifest.json")
bundle_path = self.project_root / bundle_rel
manifest_path = self.project_root / manifest_rel
if not bundle_path.is_file():
raise FileNotFoundError(f"Meta-stack bundle not found: {bundle_path}")
self._meta_stack = MetaStackPredictor(
bundle_path,
manifest_path=manifest_path if manifest_path.is_file() else None,
frozen_model_id=self.cfg.get("frozen_bert_id", "unitary/toxic-bert"),
)
self._model = self._meta_stack
from sklearn.pipeline import Pipeline
if isinstance(self._model, Pipeline):
self._preprocessor = None
else:
try:
sys.path.insert(0, str(self.project_root))
from src.features.text_preprocessor import TextPreprocessor
self._preprocessor = TextPreprocessor(
config_path=str(self.project_root / "configs" / "features.yaml")
)
except Exception:
self._preprocessor = _FallbackPreprocessor()
def _load_hf(self, model_id_or_path: str) -> None:
try:
from transformers import pipeline as hf_pipeline
except ImportError as exc:
raise ImportError("Install HF deps: uv sync --extra hf") from exc
self._model = hf_pipeline(
"text-classification",
model=model_id_or_path,
return_all_scores=True,
truncation=True,
max_length=512,
)
def predict(self, text: str) -> dict:
if not text or not text.strip():
return {"is_toxic": False, "probability": 0.0, "labels": [], "model_used": self.model_name}
try:
if self.cfg["type"] == "meta_stack":
self._load_meta_stack()
raw = self._meta_stack.predict(text) # type: ignore[union-attr]
return {
"is_toxic": raw["is_toxic"],
"probability": raw["probability"],
"labels": raw.get("labels", []),
"model_used": self.model_name,
"recommended_threshold": raw.get("recommended_threshold"),
}
model = self._get_model()
if self.cfg["type"] == "local":
return self._pred_local(text, model)
return self._pred_hf(text, model)
except Exception as e:
return {
"is_toxic": False,
"probability": 0.0,
"labels": [],
"model_used": self.model_name,
"error": str(e),
}
def _pred_local(self, text: str, model) -> dict:
from sklearn.pipeline import Pipeline
if isinstance(model, Pipeline):
proba = float(model.predict_proba([text])[0][1])
else:
clean = self._preprocessor.transform(text) or text
proba = float(model.predict_proba([clean])[0][1])
tox = proba >= 0.5
return {
"is_toxic": tox,
"probability": proba,
"labels": _labels_from_keywords(text, proba) if tox else [],
"model_used": self.model_name,
}
def _pred_hf(self, text: str, pipeline_fn) -> dict:
raw = pipeline_fn(text[:512])
smap = {s["label"].lower(): s["score"] for s in (raw[0] if isinstance(raw[0], list) else raw)}
proba = 0.0
for key in ("label_1", "toxic", "toxic_1"):
if key in smap:
proba = smap[key]
break
else:
neg = {"label_0", "non_toxic", "not_toxic", "not toxic"}
vals = [v for k, v in smap.items() if k not in neg]
proba = max(vals) if vals else 0.0
tox = proba >= 0.5
labels: list[str] = []
if tox:
for k, v in smap.items():
if k not in ("label_0", "non_toxic") and v >= 0.35:
friendly = HF_LABEL_MAP.get(k, k.replace("_", " ").title())
labels.append(friendly)
if not labels:
labels = ["Offensive content"]
return {"is_toxic": tox, "probability": proba, "labels": labels, "model_used": self.model_name}
@staticmethod
def get_available_models() -> dict:
return AVAILABLE_MODELS
def get_model_info(self) -> dict:
info = dict(self.cfg)
if self.cfg["type"] == "meta_stack" and self._meta_stack is not None:
info["recommended_threshold"] = self._meta_stack.default_threshold
return info