Spjimr / training.py
shahidshaikh's picture
Upload 40 files
a52bae4 verified
# ============================================================================
# training.py — supervised and unsupervised ML on semantic embeddings
# ============================================================================
#
# PURPOSE
# -------
# Semantic text classification and clustering using sentence-transformers
# embeddings. Called from app.py handlers. No Gradio, no LLMs.
#
# PIPELINE
# --------
# Every sentence is turned into a dense ~384-dim vector by a local
# sentence-transformers model (all-MiniLM-L6-v2 by default). The model is
# loaded once on first use and cached globally, so subsequent calls are fast.
#
# Supervised side: embed sentences -> logistic regression.
# Unsupervised side: embed sentences -> Hierarchical Agglomerative Clustering
# with cosine distance and average linkage.
#
# Semantic embeddings capture MEANING, not word overlap. "This product is
# broken" and "this item does not work" land close together in vector space
# because the underlying neural model understands them as equivalent. TF-IDF
# would have seen them as completely different because they share no words.
#
# CONTRACT (what app.py imports from here)
# ----------------------------------------
# train_classifier(examples=None) -> TrainedClassifier
# predict(trained, sentence) -> dict
# cluster_hierarchical(sentences, n_clusters) -> list[int]
# cluster_report(cluster_ids, sentences, true_labels) -> list[dict]
# ============================================================================
from dataclasses import dataclass
from collections import Counter
from typing import Any
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.cluster import AgglomerativeClustering
from training_data import TRAINING_EXAMPLES
from parameters import TRAIN_TEST_SPLIT, EMBEDDING_MODEL
# ----------------------------------------------------------------
# Embedding model — loaded once globally, reused forever
# ----------------------------------------------------------------
_MODEL = None
def _get_model():
"""Lazy-load the sentence-transformers model on first use.
First call downloads the model (~90MB) and takes ~30-60 seconds.
Subsequent calls are instant because the model is cached globally.
"""
global _MODEL
if _MODEL is None:
from sentence_transformers import SentenceTransformer
_MODEL = SentenceTransformer(EMBEDDING_MODEL)
return _MODEL
def _embed(sentences):
"""Turn a list of sentences into a dense numpy array of embeddings."""
model = _get_model()
return model.encode(
sentences,
convert_to_numpy=True,
show_progress_bar=False,
)
# ----------------------------------------------------------------
# Supervised: semantic embeddings + logistic regression
# ----------------------------------------------------------------
@dataclass
class TrainedClassifier:
"""Holds a fitted logistic regression plus evaluation numbers."""
model: Any
accuracy: float
labels: list
confusion: list
train_size: int
test_size: int
def train_classifier(examples=None):
"""Embed the training set, fit logistic regression, evaluate on test."""
examples = examples or TRAINING_EXAMPLES
sentences = [e["sentence"] for e in examples]
labels = [e["label"] for e in examples]
X_train, X_test, y_train, y_test = train_test_split(
sentences, labels,
train_size=TRAIN_TEST_SPLIT,
random_state=42,
stratify=labels,
)
X_train_vec = _embed(X_train)
X_test_vec = _embed(X_test)
model = LogisticRegression(max_iter=1000)
model.fit(X_train_vec, y_train)
preds = model.predict(X_test_vec)
acc = accuracy_score(y_test, preds)
unique_labels = sorted(set(labels))
cm = confusion_matrix(y_test, preds, labels=unique_labels)
return TrainedClassifier(
model=model,
accuracy=float(acc),
labels=unique_labels,
confusion=cm.tolist(),
train_size=len(y_train),
test_size=len(y_test),
)
def predict(trained, sentence):
"""Predict the label of a new sentence. Returns a plain dict."""
vec = _embed([sentence])
pred = trained.model.predict(vec)[0]
probs = trained.model.predict_proba(vec)[0]
classes = trained.model.classes_
prob_map = {str(c): float(p) for c, p in zip(classes, probs)}
return {
"sentence": sentence,
"predicted_label": str(pred),
"confidence": float(max(probs)),
"probabilities": prob_map,
}
# ----------------------------------------------------------------
# Unsupervised: Hierarchical Agglomerative Clustering on embeddings
# ----------------------------------------------------------------
def cluster_hierarchical(sentences, n_clusters=6):
"""Semantic clustering via agglomerative merging.
Each sentence starts as its own cluster. At every step the two closest
clusters are merged. Repeats until exactly n_clusters remain. Distance
between sentences is cosine distance on the semantic embedding vectors.
Linkage 'average' means the distance between two clusters is the
average pairwise distance between their members — a good all-around
choice for text.
No noise concept: every sentence ends up in exactly one cluster.
"""
matrix = _embed(sentences)
model = AgglomerativeClustering(
n_clusters=int(n_clusters),
metric="cosine",
linkage="average",
)
return model.fit_predict(matrix).tolist()
# ----------------------------------------------------------------
# Cluster reporting — compare discovered clusters to true labels
# ----------------------------------------------------------------
def cluster_report(cluster_ids, sentences, true_labels=None):
"""Summarize clusters with sizes, dominant labels, and sample sentences."""
clusters = {}
for idx, cid in enumerate(cluster_ids):
clusters.setdefault(int(cid), []).append(idx)
report = []
for cid in sorted(clusters.keys()):
members = clusters[cid]
name = f"cluster_{cid}"
label_counter = Counter()
if true_labels:
for i in members:
label_counter[true_labels[i]] += 1
dominant = label_counter.most_common(1)[0] if label_counter else (None, 0)
report.append({
"cluster_id": int(cid),
"cluster_name": name,
"size": len(members),
"dominant_label": dominant[0],
"dominant_count": dominant[1],
"label_distribution": dict(label_counter) if label_counter else {},
"sample_sentences": [sentences[i] for i in members[:3]],
})
return report
# ============================================================================
# Parameterized clustering with centroid-based representative selection
# ============================================================================
def cluster_with_params(sentences, similarity_threshold=0.60,
min_cluster_size=3, n_nearest=3):
"""Parameterized hierarchical clustering for the Researcher workflow.
Adds three researcher-facing knobs to the basic agglomerative approach:
similarity_threshold: merges stop when avg linkage similarity < this
min_cluster_size: clusters smaller than this become noise (id = -1)
n_nearest: how many sentences nearest each centroid to return as
the cluster's representative sample (for LLM labeling)
Returns a dict with cluster_ids, centroids, representatives (per cluster),
distances_to_centroid (per sentence), counts, and the embedding matrix.
"""
import numpy as np
matrix = _embed(sentences)
# 1. Agglomerative clustering with a distance threshold
distance_threshold = 1.0 - float(similarity_threshold)
model = AgglomerativeClustering(
n_clusters=None,
distance_threshold=distance_threshold,
metric="cosine",
linkage="average",
)
raw_ids = model.fit_predict(matrix).tolist()
# 2. Count members per raw cluster
counts = Counter(raw_ids)
# 3. Apply min_cluster_size filter -> noise bucket (-1)
cluster_ids = []
for cid in raw_ids:
if counts[cid] >= int(min_cluster_size):
cluster_ids.append(int(cid))
else:
cluster_ids.append(-1)
# 4. Compute normalized centroids for surviving clusters
members_by_cluster = {}
for idx, cid in enumerate(cluster_ids):
if cid == -1:
continue
members_by_cluster.setdefault(cid, []).append(idx)
centroids = {}
for cid, idxs in members_by_cluster.items():
member_vecs = matrix[idxs]
centroid = member_vecs.mean(axis=0)
norm = np.linalg.norm(centroid)
if norm > 0:
centroid = centroid / norm
centroids[cid] = centroid
# 5. Distance from each sentence to its own cluster's centroid
distances_to_centroid = []
for idx, cid in enumerate(cluster_ids):
if cid == -1:
distances_to_centroid.append(None)
continue
vec = matrix[idx]
vn = np.linalg.norm(vec)
vec_n = vec / vn if vn > 0 else vec
sim = float(np.dot(vec_n, centroids[cid]))
distances_to_centroid.append(1.0 - sim)
# 6. Pick n_nearest sentences to each centroid as the cluster's representatives
representatives = {}
for cid, idxs in members_by_cluster.items():
scored = [(i, distances_to_centroid[i]) for i in idxs]
scored.sort(key=lambda x: x[1])
representatives[cid] = scored[: int(n_nearest)]
return {
"cluster_ids": cluster_ids,
"centroids": centroids,
"representatives": representatives,
"distances_to_centroid": distances_to_centroid,
"n_clusters_found": len(members_by_cluster),
"n_noise_points": cluster_ids.count(-1),
"vectors": matrix,
}