|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| from dataclasses import dataclass
|
| from collections import Counter
|
| from typing import Any
|
|
|
| import numpy as np
|
| from sklearn.linear_model import LogisticRegression
|
| from sklearn.model_selection import train_test_split
|
| from sklearn.metrics import accuracy_score, confusion_matrix
|
| from sklearn.cluster import AgglomerativeClustering
|
|
|
| from training_data import TRAINING_EXAMPLES
|
| from parameters import TRAIN_TEST_SPLIT, EMBEDDING_MODEL
|
|
|
|
|
|
|
|
|
|
|
| _MODEL = None
|
|
|
|
|
| def _get_model():
|
| """Lazy-load the sentence-transformers model on first use.
|
|
|
| First call downloads the model (~90MB) and takes ~30-60 seconds.
|
| Subsequent calls are instant because the model is cached globally.
|
| """
|
| global _MODEL
|
| if _MODEL is None:
|
| from sentence_transformers import SentenceTransformer
|
| _MODEL = SentenceTransformer(EMBEDDING_MODEL)
|
| return _MODEL
|
|
|
|
|
| def _embed(sentences):
|
| """Turn a list of sentences into a dense numpy array of embeddings."""
|
| model = _get_model()
|
| return model.encode(
|
| sentences,
|
| convert_to_numpy=True,
|
| show_progress_bar=False,
|
| )
|
|
|
|
|
|
|
|
|
|
|
| @dataclass
|
| class TrainedClassifier:
|
| """Holds a fitted logistic regression plus evaluation numbers."""
|
| model: Any
|
| accuracy: float
|
| labels: list
|
| confusion: list
|
| train_size: int
|
| test_size: int
|
|
|
|
|
| def train_classifier(examples=None):
|
| """Embed the training set, fit logistic regression, evaluate on test."""
|
| examples = examples or TRAINING_EXAMPLES
|
| sentences = [e["sentence"] for e in examples]
|
| labels = [e["label"] for e in examples]
|
|
|
| X_train, X_test, y_train, y_test = train_test_split(
|
| sentences, labels,
|
| train_size=TRAIN_TEST_SPLIT,
|
| random_state=42,
|
| stratify=labels,
|
| )
|
|
|
| X_train_vec = _embed(X_train)
|
| X_test_vec = _embed(X_test)
|
|
|
| model = LogisticRegression(max_iter=1000)
|
| model.fit(X_train_vec, y_train)
|
|
|
| preds = model.predict(X_test_vec)
|
| acc = accuracy_score(y_test, preds)
|
| unique_labels = sorted(set(labels))
|
| cm = confusion_matrix(y_test, preds, labels=unique_labels)
|
|
|
| return TrainedClassifier(
|
| model=model,
|
| accuracy=float(acc),
|
| labels=unique_labels,
|
| confusion=cm.tolist(),
|
| train_size=len(y_train),
|
| test_size=len(y_test),
|
| )
|
|
|
|
|
| def predict(trained, sentence):
|
| """Predict the label of a new sentence. Returns a plain dict."""
|
| vec = _embed([sentence])
|
| pred = trained.model.predict(vec)[0]
|
| probs = trained.model.predict_proba(vec)[0]
|
| classes = trained.model.classes_
|
|
|
| prob_map = {str(c): float(p) for c, p in zip(classes, probs)}
|
| return {
|
| "sentence": sentence,
|
| "predicted_label": str(pred),
|
| "confidence": float(max(probs)),
|
| "probabilities": prob_map,
|
| }
|
|
|
|
|
|
|
|
|
|
|
| def cluster_hierarchical(sentences, n_clusters=6):
|
| """Semantic clustering via agglomerative merging.
|
|
|
| Each sentence starts as its own cluster. At every step the two closest
|
| clusters are merged. Repeats until exactly n_clusters remain. Distance
|
| between sentences is cosine distance on the semantic embedding vectors.
|
| Linkage 'average' means the distance between two clusters is the
|
| average pairwise distance between their members — a good all-around
|
| choice for text.
|
|
|
| No noise concept: every sentence ends up in exactly one cluster.
|
| """
|
| matrix = _embed(sentences)
|
| model = AgglomerativeClustering(
|
| n_clusters=int(n_clusters),
|
| metric="cosine",
|
| linkage="average",
|
| )
|
| return model.fit_predict(matrix).tolist()
|
|
|
|
|
|
|
|
|
|
|
| def cluster_report(cluster_ids, sentences, true_labels=None):
|
| """Summarize clusters with sizes, dominant labels, and sample sentences."""
|
| clusters = {}
|
| for idx, cid in enumerate(cluster_ids):
|
| clusters.setdefault(int(cid), []).append(idx)
|
|
|
| report = []
|
| for cid in sorted(clusters.keys()):
|
| members = clusters[cid]
|
| name = f"cluster_{cid}"
|
|
|
| label_counter = Counter()
|
| if true_labels:
|
| for i in members:
|
| label_counter[true_labels[i]] += 1
|
| dominant = label_counter.most_common(1)[0] if label_counter else (None, 0)
|
|
|
| report.append({
|
| "cluster_id": int(cid),
|
| "cluster_name": name,
|
| "size": len(members),
|
| "dominant_label": dominant[0],
|
| "dominant_count": dominant[1],
|
| "label_distribution": dict(label_counter) if label_counter else {},
|
| "sample_sentences": [sentences[i] for i in members[:3]],
|
| })
|
| return report
|
|
|
|
|
|
|
|
|
|
|
| def cluster_with_params(sentences, similarity_threshold=0.60,
|
| min_cluster_size=3, n_nearest=3):
|
| """Parameterized hierarchical clustering for the Researcher workflow.
|
|
|
| Adds three researcher-facing knobs to the basic agglomerative approach:
|
| similarity_threshold: merges stop when avg linkage similarity < this
|
| min_cluster_size: clusters smaller than this become noise (id = -1)
|
| n_nearest: how many sentences nearest each centroid to return as
|
| the cluster's representative sample (for LLM labeling)
|
|
|
| Returns a dict with cluster_ids, centroids, representatives (per cluster),
|
| distances_to_centroid (per sentence), counts, and the embedding matrix.
|
| """
|
| import numpy as np
|
|
|
| matrix = _embed(sentences)
|
|
|
|
|
| distance_threshold = 1.0 - float(similarity_threshold)
|
| model = AgglomerativeClustering(
|
| n_clusters=None,
|
| distance_threshold=distance_threshold,
|
| metric="cosine",
|
| linkage="average",
|
| )
|
| raw_ids = model.fit_predict(matrix).tolist()
|
|
|
|
|
| counts = Counter(raw_ids)
|
|
|
|
|
| cluster_ids = []
|
| for cid in raw_ids:
|
| if counts[cid] >= int(min_cluster_size):
|
| cluster_ids.append(int(cid))
|
| else:
|
| cluster_ids.append(-1)
|
|
|
|
|
| members_by_cluster = {}
|
| for idx, cid in enumerate(cluster_ids):
|
| if cid == -1:
|
| continue
|
| members_by_cluster.setdefault(cid, []).append(idx)
|
|
|
| centroids = {}
|
| for cid, idxs in members_by_cluster.items():
|
| member_vecs = matrix[idxs]
|
| centroid = member_vecs.mean(axis=0)
|
| norm = np.linalg.norm(centroid)
|
| if norm > 0:
|
| centroid = centroid / norm
|
| centroids[cid] = centroid
|
|
|
|
|
| distances_to_centroid = []
|
| for idx, cid in enumerate(cluster_ids):
|
| if cid == -1:
|
| distances_to_centroid.append(None)
|
| continue
|
| vec = matrix[idx]
|
| vn = np.linalg.norm(vec)
|
| vec_n = vec / vn if vn > 0 else vec
|
| sim = float(np.dot(vec_n, centroids[cid]))
|
| distances_to_centroid.append(1.0 - sim)
|
|
|
|
|
| representatives = {}
|
| for cid, idxs in members_by_cluster.items():
|
| scored = [(i, distances_to_centroid[i]) for i in idxs]
|
| scored.sort(key=lambda x: x[1])
|
| representatives[cid] = scored[: int(n_nearest)]
|
|
|
| return {
|
| "cluster_ids": cluster_ids,
|
| "centroids": centroids,
|
| "representatives": representatives,
|
| "distances_to_centroid": distances_to_centroid,
|
| "n_clusters_found": len(members_by_cluster),
|
| "n_noise_points": cluster_ids.count(-1),
|
| "vectors": matrix,
|
| }
|
|
|