# ============================================================================ # training.py — supervised and unsupervised ML on semantic embeddings # ============================================================================ # # PURPOSE # ------- # Semantic text classification and clustering using sentence-transformers # embeddings. Called from app.py handlers. No Gradio, no LLMs. # # PIPELINE # -------- # Every sentence is turned into a dense ~384-dim vector by a local # sentence-transformers model (all-MiniLM-L6-v2 by default). The model is # loaded once on first use and cached globally, so subsequent calls are fast. # # Supervised side: embed sentences -> logistic regression. # Unsupervised side: embed sentences -> Hierarchical Agglomerative Clustering # with cosine distance and average linkage. # # Semantic embeddings capture MEANING, not word overlap. "This product is # broken" and "this item does not work" land close together in vector space # because the underlying neural model understands them as equivalent. TF-IDF # would have seen them as completely different because they share no words. # # CONTRACT (what app.py imports from here) # ---------------------------------------- # train_classifier(examples=None) -> TrainedClassifier # predict(trained, sentence) -> dict # cluster_hierarchical(sentences, n_clusters) -> list[int] # cluster_report(cluster_ids, sentences, true_labels) -> list[dict] # ============================================================================ from dataclasses import dataclass from collections import Counter from typing import Any import numpy as np from sklearn.linear_model import LogisticRegression from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score, confusion_matrix from sklearn.cluster import AgglomerativeClustering from training_data import TRAINING_EXAMPLES from parameters import TRAIN_TEST_SPLIT, EMBEDDING_MODEL # ---------------------------------------------------------------- # Embedding model — loaded once globally, reused forever # ---------------------------------------------------------------- _MODEL = None def _get_model(): """Lazy-load the sentence-transformers model on first use. First call downloads the model (~90MB) and takes ~30-60 seconds. Subsequent calls are instant because the model is cached globally. """ global _MODEL if _MODEL is None: from sentence_transformers import SentenceTransformer _MODEL = SentenceTransformer(EMBEDDING_MODEL) return _MODEL def _embed(sentences): """Turn a list of sentences into a dense numpy array of embeddings.""" model = _get_model() return model.encode( sentences, convert_to_numpy=True, show_progress_bar=False, ) # ---------------------------------------------------------------- # Supervised: semantic embeddings + logistic regression # ---------------------------------------------------------------- @dataclass class TrainedClassifier: """Holds a fitted logistic regression plus evaluation numbers.""" model: Any accuracy: float labels: list confusion: list train_size: int test_size: int def train_classifier(examples=None): """Embed the training set, fit logistic regression, evaluate on test.""" examples = examples or TRAINING_EXAMPLES sentences = [e["sentence"] for e in examples] labels = [e["label"] for e in examples] X_train, X_test, y_train, y_test = train_test_split( sentences, labels, train_size=TRAIN_TEST_SPLIT, random_state=42, stratify=labels, ) X_train_vec = _embed(X_train) X_test_vec = _embed(X_test) model = LogisticRegression(max_iter=1000) model.fit(X_train_vec, y_train) preds = model.predict(X_test_vec) acc = accuracy_score(y_test, preds) unique_labels = sorted(set(labels)) cm = confusion_matrix(y_test, preds, labels=unique_labels) return TrainedClassifier( model=model, accuracy=float(acc), labels=unique_labels, confusion=cm.tolist(), train_size=len(y_train), test_size=len(y_test), ) def predict(trained, sentence): """Predict the label of a new sentence. Returns a plain dict.""" vec = _embed([sentence]) pred = trained.model.predict(vec)[0] probs = trained.model.predict_proba(vec)[0] classes = trained.model.classes_ prob_map = {str(c): float(p) for c, p in zip(classes, probs)} return { "sentence": sentence, "predicted_label": str(pred), "confidence": float(max(probs)), "probabilities": prob_map, } # ---------------------------------------------------------------- # Unsupervised: Hierarchical Agglomerative Clustering on embeddings # ---------------------------------------------------------------- def cluster_hierarchical(sentences, n_clusters=6): """Semantic clustering via agglomerative merging. Each sentence starts as its own cluster. At every step the two closest clusters are merged. Repeats until exactly n_clusters remain. Distance between sentences is cosine distance on the semantic embedding vectors. Linkage 'average' means the distance between two clusters is the average pairwise distance between their members — a good all-around choice for text. No noise concept: every sentence ends up in exactly one cluster. """ matrix = _embed(sentences) model = AgglomerativeClustering( n_clusters=int(n_clusters), metric="cosine", linkage="average", ) return model.fit_predict(matrix).tolist() # ---------------------------------------------------------------- # Cluster reporting — compare discovered clusters to true labels # ---------------------------------------------------------------- def cluster_report(cluster_ids, sentences, true_labels=None): """Summarize clusters with sizes, dominant labels, and sample sentences.""" clusters = {} for idx, cid in enumerate(cluster_ids): clusters.setdefault(int(cid), []).append(idx) report = [] for cid in sorted(clusters.keys()): members = clusters[cid] name = f"cluster_{cid}" label_counter = Counter() if true_labels: for i in members: label_counter[true_labels[i]] += 1 dominant = label_counter.most_common(1)[0] if label_counter else (None, 0) report.append({ "cluster_id": int(cid), "cluster_name": name, "size": len(members), "dominant_label": dominant[0], "dominant_count": dominant[1], "label_distribution": dict(label_counter) if label_counter else {}, "sample_sentences": [sentences[i] for i in members[:3]], }) return report # ============================================================================ # Parameterized clustering with centroid-based representative selection # ============================================================================ def cluster_with_params(sentences, similarity_threshold=0.60, min_cluster_size=3, n_nearest=3): """Parameterized hierarchical clustering for the Researcher workflow. Adds three researcher-facing knobs to the basic agglomerative approach: similarity_threshold: merges stop when avg linkage similarity < this min_cluster_size: clusters smaller than this become noise (id = -1) n_nearest: how many sentences nearest each centroid to return as the cluster's representative sample (for LLM labeling) Returns a dict with cluster_ids, centroids, representatives (per cluster), distances_to_centroid (per sentence), counts, and the embedding matrix. """ import numpy as np matrix = _embed(sentences) # 1. Agglomerative clustering with a distance threshold distance_threshold = 1.0 - float(similarity_threshold) model = AgglomerativeClustering( n_clusters=None, distance_threshold=distance_threshold, metric="cosine", linkage="average", ) raw_ids = model.fit_predict(matrix).tolist() # 2. Count members per raw cluster counts = Counter(raw_ids) # 3. Apply min_cluster_size filter -> noise bucket (-1) cluster_ids = [] for cid in raw_ids: if counts[cid] >= int(min_cluster_size): cluster_ids.append(int(cid)) else: cluster_ids.append(-1) # 4. Compute normalized centroids for surviving clusters members_by_cluster = {} for idx, cid in enumerate(cluster_ids): if cid == -1: continue members_by_cluster.setdefault(cid, []).append(idx) centroids = {} for cid, idxs in members_by_cluster.items(): member_vecs = matrix[idxs] centroid = member_vecs.mean(axis=0) norm = np.linalg.norm(centroid) if norm > 0: centroid = centroid / norm centroids[cid] = centroid # 5. Distance from each sentence to its own cluster's centroid distances_to_centroid = [] for idx, cid in enumerate(cluster_ids): if cid == -1: distances_to_centroid.append(None) continue vec = matrix[idx] vn = np.linalg.norm(vec) vec_n = vec / vn if vn > 0 else vec sim = float(np.dot(vec_n, centroids[cid])) distances_to_centroid.append(1.0 - sim) # 6. Pick n_nearest sentences to each centroid as the cluster's representatives representatives = {} for cid, idxs in members_by_cluster.items(): scored = [(i, distances_to_centroid[i]) for i in idxs] scored.sort(key=lambda x: x[1]) representatives[cid] = scored[: int(n_nearest)] return { "cluster_ids": cluster_ids, "centroids": centroids, "representatives": representatives, "distances_to_centroid": distances_to_centroid, "n_clusters_found": len(members_by_cluster), "n_noise_points": cluster_ids.count(-1), "vectors": matrix, }