sms-spam-classical / src /features.py
Jakob Neugebauer
Drop MaxAbsScaler from pipeline; normalize surface features inline
f83100c
"""
Feature pipeline for the SMS spam classifier.
The pipeline is a FeatureUnion over three blocks:
1. Word-level TF-IDF (1+2 grams) — captures vocabulary patterns
("free", "prize guaranteed").
2. Character-level TF-IDF (3-5 grams, char_wb) — captures sub-word
patterns and spelling variants ("Fr3e" shares character pieces with
"free"). Same idea FastText popularised, sklearn-compatible.
3. Hand-crafted surface features — length, digit ratio, uppercase ratio,
punctuation counts, has-URL / has-phone / has-currency booleans.
No explicit stop_words list. max_df=0.95 plus IDF weighting handles
common-word suppression more principled-ly than sklearn's default
English stop list, which would remove domain-meaningful words like
"call".
"""
from __future__ import annotations
import re
import numpy as np
from scipy import sparse
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import FeatureUnion
URL_RE = re.compile(r"\bhttps?://\S+|\bwww\.\S+|\.co\.uk\b|\.com\b", re.IGNORECASE)
PHONE_RE = re.compile(r"\b\d{4,}\b")
CURRENCY_RE = re.compile(r"[£$€]|\b(?:GBP|USD|EUR)\b", re.IGNORECASE)
class SurfaceFeatures(BaseEstimator, TransformerMixin):
"""Hand-crafted features that encode 'spam looks visibly different'.
All features are normalised to roughly [0, 1] at extraction time, so
no separate scaler is needed in the pipeline. This keeps the pickled
model robust across sklearn versions (sklearn pickles are notoriously
fragile across MaxAbsScaler / StandardScaler internals between releases).
"""
feature_names = [
"length_norm", # length / 200 (most SMS are under 200 chars), clipped to 1
"n_words_norm", # n_words / 50 (most SMS are under 50 words), clipped to 1
"digit_ratio", # already 0..1
"upper_ratio", # already 0..1
"punct_ratio", # already 0..1
"has_url",
"has_phone",
"has_currency",
"n_exclamation_norm", # exclamations / 10, clipped to 1
]
def fit(self, X, y=None):
return self
def transform(self, X):
rows = []
for msg in X:
if not msg:
rows.append([0.0] * len(self.feature_names))
continue
length = len(msg)
words = msg.split()
n_words = len(words)
digits = sum(1 for c in msg if c.isdigit())
uppers = sum(1 for c in msg if c.isupper())
puncts = sum(1 for c in msg if c in ".,!?;:")
exclamation = msg.count("!")
rows.append([
min(length / 200.0, 1.0),
min(n_words / 50.0, 1.0),
digits / length if length else 0.0,
uppers / length if length else 0.0,
puncts / length if length else 0.0,
1.0 if URL_RE.search(msg) else 0.0,
1.0 if PHONE_RE.search(msg) else 0.0,
1.0 if CURRENCY_RE.search(msg) else 0.0,
min(exclamation / 10.0, 1.0),
])
return sparse.csr_matrix(np.asarray(rows, dtype=np.float64))
def build_feature_pipeline() -> FeatureUnion:
"""Word TF-IDF + character TF-IDF + surface features, all scaled."""
word_tfidf = TfidfVectorizer(
lowercase=True,
ngram_range=(1, 2),
min_df=2,
max_df=0.95,
sublinear_tf=True,
strip_accents="unicode",
)
char_tfidf = TfidfVectorizer(
analyzer="char_wb",
ngram_range=(3, 5),
min_df=2,
max_df=0.95,
sublinear_tf=True,
)
return FeatureUnion(
transformer_list=[
("word_tfidf", word_tfidf),
("char_tfidf", char_tfidf),
("surface", SurfaceFeatures()),
],
n_jobs=None,
)