|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| from __future__ import annotations
|
|
|
| import math
|
| import numpy as np
|
| import pandas as pd
|
| from collections import defaultdict
|
| from typing import Any
|
| from sentence_transformers import SentenceTransformer
|
|
|
|
|
|
|
|
|
| SPREAD_TIGHT_MAX = 0.15
|
| SPREAD_MEDIUM_MAX = 0.20
|
| SAMPLE_PERCENTAGE = 0.10
|
| STRATIFY_TOP = 0.50
|
| STRATIFY_MIDDLE = 0.30
|
| STRATIFY_EDGE = 0.20
|
| AGG_TARGET_STD = 0.15
|
|
|
| _ST_CACHE: dict = {}
|
|
|
|
|
| def _get_st_model(model_name="sentence-transformers/all-MiniLM-L6-v2"):
|
| if model_name not in _ST_CACHE:
|
| _ST_CACHE[model_name] = SentenceTransformer(model_name)
|
| return _ST_CACHE[model_name]
|
|
|
|
|
| def _embed(texts: list[str]) -> np.ndarray:
|
| model = _get_st_model()
|
| return model.encode(texts, normalize_embeddings=True, show_progress_bar=False)
|
|
|
|
|
| def _umap_reduce(embeddings: np.ndarray, n_components: int = 10) -> np.ndarray:
|
| """Reduce dimensionality for HDBSCAN stability."""
|
| try:
|
| import umap
|
| reducer = umap.UMAP(
|
| n_components=n_components,
|
| n_neighbors=min(15, len(embeddings) - 1),
|
| min_dist=0.0,
|
| metric="cosine",
|
| random_state=42,
|
| )
|
| return reducer.fit_transform(embeddings)
|
| except ImportError:
|
| from sklearn.decomposition import PCA
|
| n_comp = min(n_components, len(embeddings) - 1, embeddings.shape[1])
|
| return PCA(n_components=n_comp, random_state=42).fit_transform(embeddings)
|
|
|
|
|
| def _hdbscan_cluster(
|
| reduced: np.ndarray, min_cluster_size: int
|
| ) -> tuple[np.ndarray, np.ndarray]:
|
| """
|
| Cluster with HDBSCAN. Returns (labels, probabilities).
|
| labels: -1 = outlier
|
| probabilities: cluster membership strength (0.0 for outliers)
|
| """
|
| try:
|
| import hdbscan
|
| clusterer = hdbscan.HDBSCAN(
|
| min_cluster_size=min_cluster_size,
|
| min_samples=1,
|
| metric="euclidean",
|
| prediction_data=False,
|
| )
|
| labels = clusterer.fit_predict(reduced)
|
| probs = clusterer.probabilities_
|
| return labels, probs
|
| except ImportError:
|
|
|
| from sklearn.cluster import AgglomerativeClustering
|
| n_clusters = max(2, len(reduced) // max(min_cluster_size, 3))
|
| n_clusters = min(n_clusters, len(reduced) - 1)
|
| labels = AgglomerativeClustering(
|
| n_clusters=n_clusters,
|
| metric="euclidean",
|
| linkage="ward",
|
| ).fit_predict(reduced)
|
| probs = _fallback_probs_from_centroid(reduced, labels)
|
| return labels, probs
|
|
|
|
|
| def _fallback_probs_from_centroid(
|
| reduced: np.ndarray, labels: np.ndarray
|
| ) -> np.ndarray:
|
| """When HDBSCAN unavailable, derive pseudo-probabilities from centroid
|
| similarity within each cluster. Normalised to [0, 1]."""
|
| probs = np.zeros(len(reduced), dtype=float)
|
| for lbl in set(labels.tolist()):
|
| if lbl == -1:
|
| continue
|
| idx = np.where(labels == lbl)[0]
|
| if len(idx) == 0:
|
| continue
|
| centroid = reduced[idx].mean(axis=0)
|
| d = np.linalg.norm(reduced[idx] - centroid, axis=1)
|
| d_max = d.max() if d.max() > 0 else 1.0
|
| sim = 1.0 - (d / d_max)
|
| probs[idx] = sim
|
| return probs
|
|
|
|
|
| def _classify_spread(std_val: float) -> str:
|
| """Classify cluster into TIGHT / MEDIUM / LOOSE based on std(cluster_fit)."""
|
| if std_val < SPREAD_TIGHT_MAX:
|
| return "TIGHT"
|
| if std_val < SPREAD_MEDIUM_MAX:
|
| return "MEDIUM"
|
| return "LOOSE"
|
|
|
|
|
| def _propose_agglomerative_split(
|
| cluster_indices: list[int],
|
| embeddings: np.ndarray,
|
| cluster_fits: np.ndarray,
|
| target_std: float = AGG_TARGET_STD,
|
| ) -> dict:
|
| """
|
| For a LOOSE cluster, propose a split using AgglomerativeClustering
|
| with cosine distance. Tries K = 2..5 and picks the smallest K that
|
| yields all sub-cluster stds <= target_std; otherwise picks the K
|
| with the best improvement.
|
| """
|
| from sklearn.cluster import AgglomerativeClustering
|
|
|
| cluster_embs = embeddings[cluster_indices]
|
| N = len(cluster_indices)
|
|
|
| original_std = float(np.std(cluster_fits))
|
| best = {
|
| "n_sub": 1,
|
| "sub_labels": [0] * N,
|
| "sub_stds": [original_std],
|
| "improvement": 0.0,
|
| "target_reached": False,
|
| }
|
|
|
| if N < 4:
|
| return best
|
|
|
| for k in range(2, min(6, N)):
|
| try:
|
| sub = AgglomerativeClustering(
|
| n_clusters=k,
|
| metric="cosine",
|
| linkage="average",
|
| ).fit_predict(cluster_embs)
|
| except Exception:
|
| continue
|
|
|
| sub_stds: list[float] = []
|
| ok = True
|
| for s in range(k):
|
| mask = sub == s
|
| if mask.sum() < 2:
|
| ok = False
|
| break
|
| sub_stds.append(float(np.std(cluster_fits[mask])))
|
| if not ok or not sub_stds:
|
| continue
|
|
|
| max_sub_std = max(sub_stds)
|
| improvement = original_std - max_sub_std
|
|
|
| candidate = {
|
| "n_sub": k,
|
| "sub_labels": sub.tolist(),
|
| "sub_stds": sub_stds,
|
| "improvement": improvement,
|
| "target_reached": max_sub_std <= target_std,
|
| }
|
|
|
| if candidate["target_reached"]:
|
| return candidate
|
| if improvement > best["improvement"]:
|
| best = candidate
|
|
|
| return best
|
|
|
|
|
| def _stratified_sample_indices(
|
| indices: list[int],
|
| cluster_fits: np.ndarray,
|
| n_sample: int,
|
| ) -> list[int]:
|
| """
|
| Stratified sampling by cluster_fit rank.
|
| Top 50% / Middle 30% / Edge 20% of n_sample quota.
|
| """
|
| if n_sample >= len(indices):
|
| order = np.argsort(-cluster_fits)
|
| return [indices[i] for i in order]
|
|
|
| order = np.argsort(-cluster_fits)
|
| sorted_idx = [indices[i] for i in order]
|
| N = len(sorted_idx)
|
|
|
| n_top = max(1, round(n_sample * STRATIFY_TOP))
|
| n_mid = max(0, round(n_sample * STRATIFY_MIDDLE))
|
| n_edge = n_sample - n_top - n_mid
|
| if n_edge < 0:
|
| n_edge = 0
|
| n_mid = max(0, n_sample - n_top)
|
|
|
| top_boundary = max(1, N // 3)
|
| edge_boundary = max(top_boundary + 1, (2 * N) // 3)
|
|
|
| top_pool = sorted_idx[:top_boundary]
|
| mid_pool = sorted_idx[top_boundary:edge_boundary]
|
| edge_pool = sorted_idx[edge_boundary:]
|
|
|
| picked: list[int] = []
|
| picked.extend(top_pool[:n_top])
|
| picked.extend(mid_pool[:n_mid])
|
| picked.extend(edge_pool[:n_edge])
|
|
|
| seen = set(picked)
|
| if len(picked) < n_sample:
|
| for i in sorted_idx:
|
| if i not in seen:
|
| picked.append(i)
|
| seen.add(i)
|
| if len(picked) >= n_sample:
|
| break
|
|
|
| return picked[:n_sample]
|
|
|
|
|
| def _compute_n_sample(N: int, min_cluster_size: int) -> int:
|
| """n_sample = max(min_cluster_size, ceil(0.10 * N)), no ceiling."""
|
| return max(min_cluster_size, math.ceil(SAMPLE_PERCENTAGE * N))
|
|
|
|
|
|
|
|
|
|
|
| def run_corpus_compression(
|
| corpus: list[dict],
|
| sentences_per_cluster: int = 2,
|
| min_cluster_size: int = 3,
|
| outlier_sample_size: int = 10,
|
| min_cluster_fit: float = 0.0,
|
| auto_split_loose: bool = True,
|
| split_decisions: dict[int, str] | None = None,
|
| ) -> dict:
|
| """
|
| Run Phase 0 — Sampling (G&W at-Scale).
|
|
|
| Args:
|
| corpus: list of dicts (from Phase 0 Preparation) with
|
| at minimum a 'sentence' key. L1-L4 and
|
| sentence_id preserved where present.
|
| sentences_per_cluster: DEPRECATED. Legacy parameter retained for
|
| backward compatibility with older UI wiring.
|
| min_cluster_size: minimum sentences to form a cluster; also
|
| acts as sample-size floor.
|
| outlier_sample_size: how many outlier (-1) sentences to keep.
|
| min_cluster_fit: threshold below which sampled members are
|
| marked reason='below_cluster_fit_threshold'.
|
| auto_split_loose: if True, compute Agglomerative split
|
| proposals for LOOSE clusters (researcher
|
| reviews in UI).
|
| split_decisions: optional dict mapping cluster_id_original
|
| -> {"ACCEPTED","REJECTED","PENDING"} from
|
| a previous researcher review.
|
| """
|
| dec = dict(split_decisions or {})
|
|
|
| if not corpus:
|
| return _empty_result(["No corpus loaded. Run Phase 0 Preparation first."])
|
|
|
| sentences: list[str] = []
|
| meta_rows: list[dict] = []
|
| for r in corpus:
|
| s = (r.get("sentence") or "").strip()
|
| if not s:
|
| continue
|
| sentences.append(s)
|
| meta_rows.append({
|
| "L1": r.get("L1", ""),
|
| "L2": r.get("L2", ""),
|
| "L3": r.get("L3", ""),
|
| "L4": r.get("L4", ""),
|
| "sentence_id": r.get("sentence_id", ""),
|
| "sentence": s,
|
| "__src": r,
|
| })
|
|
|
| if len(sentences) < 10:
|
| rows = []
|
| for i, m in enumerate(meta_rows):
|
| rows.append({
|
| "idx": i,
|
| "L1": m["L1"], "L2": m["L2"], "L3": m["L3"], "L4": m["L4"],
|
| "sentence_id": m["sentence_id"],
|
| "sentence": m["sentence"],
|
| "cluster_id_original": 0,
|
| "cluster_id_refined": 0,
|
| "cluster_id": 0,
|
| "cluster_fit": 1.0,
|
| "cluster_mean_fit": 1.0,
|
| "cluster_std_fit": 0.0,
|
| "cluster_quality_tier": "TIGHT",
|
| "split_decision": "NONE",
|
| "cluster_size": len(meta_rows),
|
| "selected": True,
|
| "reason": "corpus too small — all selected",
|
| })
|
| return {
|
| "compression_rows": rows,
|
| "compressed_corpus": corpus,
|
| "split_proposals": {},
|
| "quality_summary": {
|
| "TIGHT": 1, "MEDIUM": 0, "LOOSE": 0,
|
| "n_clusters_original": 1, "n_clusters_refined": 1,
|
| "n_flagged_for_split": 0,
|
| "n_splits_accepted": 0, "n_splits_rejected": 0, "n_splits_pending": 0,
|
| },
|
| "n_original": len(sentences),
|
| "n_compressed": len(sentences),
|
| "n_clusters": 1,
|
| "n_outliers": 0,
|
| "errors": ["Corpus too small for compression (<10 sentences). All sentences kept."],
|
| }
|
|
|
| errors: list[str] = []
|
|
|
| try:
|
| embeddings = _embed(sentences)
|
| reduced = _umap_reduce(embeddings, n_components=min(10, len(sentences) - 2))
|
| labels, probs = _hdbscan_cluster(reduced, int(min_cluster_size))
|
|
|
| cluster_map: dict[int, list[int]] = defaultdict(list)
|
| outlier_indices: list[int] = []
|
| for i, lbl in enumerate(labels):
|
| if lbl == -1:
|
| outlier_indices.append(i)
|
| else:
|
| cluster_map[int(lbl)].append(i)
|
|
|
|
|
| cluster_stats: dict[int, dict] = {}
|
| split_proposals: dict[int, dict] = {}
|
| for cid, idxs in cluster_map.items():
|
| fits = probs[idxs]
|
| mean_fit = float(np.mean(fits))
|
| std_fit = float(np.std(fits))
|
| tier = _classify_spread(std_fit)
|
| cluster_stats[cid] = {
|
| "mean_fit": mean_fit, "std_fit": std_fit,
|
| "tier": tier, "size": len(idxs),
|
| }
|
| if tier == "LOOSE" and auto_split_loose:
|
| split_proposals[cid] = _propose_agglomerative_split(
|
| idxs, embeddings, fits, target_std=AGG_TARGET_STD
|
| )
|
|
|
|
|
|
|
| refined_label = np.array(labels, dtype=int)
|
| split_decisions_out: dict[int, str] = {}
|
|
|
| for cid, idxs in cluster_map.items():
|
| decision = dec.get(cid)
|
| if decision is None:
|
| decision = "PENDING" if cid in split_proposals else "NONE"
|
| split_decisions_out[cid] = decision
|
|
|
| if decision == "ACCEPTED" and cid in split_proposals:
|
| proposal = split_proposals[cid]
|
| if proposal["n_sub"] > 1:
|
| for j, sub_lbl in enumerate(proposal["sub_labels"]):
|
| refined_label[idxs[j]] = cid * 1000 + int(sub_lbl)
|
|
|
|
|
| refined_map: dict[int, list[int]] = defaultdict(list)
|
| for i, rl in enumerate(refined_label):
|
| if rl == -1:
|
| continue
|
| refined_map[int(rl)].append(i)
|
|
|
| refined_stats: dict[int, dict] = {}
|
| for rcid, idxs in refined_map.items():
|
| fits = probs[idxs]
|
| refined_stats[rcid] = {
|
| "mean_fit": float(np.mean(fits)),
|
| "std_fit": float(np.std(fits)),
|
| "tier": _classify_spread(float(np.std(fits))),
|
| "size": len(idxs),
|
| }
|
|
|
|
|
| selected_indices: set[int] = set()
|
| below_threshold_indices: set[int] = set()
|
|
|
| for rcid, idxs in refined_map.items():
|
| fits = probs[idxs]
|
| n_sample = _compute_n_sample(len(idxs), int(min_cluster_size))
|
| picked = _stratified_sample_indices(idxs, fits, n_sample)
|
|
|
| for pi in picked:
|
| if float(probs[pi]) < float(min_cluster_fit):
|
| below_threshold_indices.add(pi)
|
| else:
|
| selected_indices.add(pi)
|
|
|
|
|
| if outlier_indices:
|
| np.random.seed(42)
|
| n_keep = min(int(outlier_sample_size), len(outlier_indices))
|
| if n_keep > 0:
|
| kept = np.random.choice(outlier_indices, n_keep, replace=False)
|
| selected_indices.update(int(x) for x in kept)
|
|
|
|
|
| compression_rows: list[dict] = []
|
| for i, m in enumerate(meta_rows):
|
| orig = int(labels[i])
|
| ref = int(refined_label[i])
|
| fit = float(probs[i])
|
|
|
| if ref != -1 and ref in refined_stats:
|
| st = refined_stats[ref]
|
| mean_fit, std_fit, tier, size = (
|
| st["mean_fit"], st["std_fit"], st["tier"], st["size"]
|
| )
|
| else:
|
| mean_fit, std_fit, tier, size = 0.0, 0.0, "OUTLIER", 0
|
|
|
| selected = i in selected_indices
|
| below = i in below_threshold_indices
|
|
|
| if orig == -1 and i in selected_indices:
|
| reason = "outlier sample"
|
| elif below:
|
| reason = "below_cluster_fit_threshold"
|
| elif selected:
|
| reason = "representative (stratified sample)"
|
| elif orig == -1:
|
| reason = "outlier — not sampled"
|
| else:
|
| reason = "cluster member — not sampled"
|
|
|
| compression_rows.append({
|
| "idx": i,
|
| "L1": m["L1"], "L2": m["L2"], "L3": m["L3"], "L4": m["L4"],
|
| "sentence_id": m["sentence_id"],
|
| "sentence": m["sentence"],
|
| "cluster_id_original": orig,
|
| "cluster_id_refined": ref,
|
|
|
|
|
| "cluster_id": ref,
|
| "cluster_fit": round(fit, 4),
|
| "cluster_mean_fit": round(mean_fit, 4),
|
| "cluster_std_fit": round(std_fit, 4),
|
| "cluster_quality_tier": tier,
|
| "split_decision": split_decisions_out.get(orig, "NONE"),
|
| "cluster_size": size,
|
| "selected": bool(selected),
|
| "reason": reason,
|
| })
|
|
|
| compressed_corpus = [
|
| meta_rows[r["idx"]]["__src"]
|
| for r in compression_rows
|
| if r["selected"]
|
| ]
|
|
|
| tier_counts = defaultdict(int)
|
| for s in refined_stats.values():
|
| tier_counts[s["tier"]] += 1
|
|
|
| quality_summary = {
|
| "TIGHT": int(tier_counts["TIGHT"]),
|
| "MEDIUM": int(tier_counts["MEDIUM"]),
|
| "LOOSE": int(tier_counts["LOOSE"]),
|
| "n_clusters_original": len(cluster_map),
|
| "n_clusters_refined": len(refined_map),
|
| "n_flagged_for_split": len(split_proposals),
|
| "n_splits_accepted": sum(
|
| 1 for v in split_decisions_out.values() if v == "ACCEPTED"
|
| ),
|
| "n_splits_rejected": sum(
|
| 1 for v in split_decisions_out.values() if v == "REJECTED"
|
| ),
|
| "n_splits_pending": sum(
|
| 1 for v in split_decisions_out.values() if v == "PENDING"
|
| ),
|
| }
|
|
|
| n_clusters = len(refined_map)
|
| n_outliers = len(outlier_indices)
|
|
|
| except Exception as e:
|
| errors.append(f"Compression error: {type(e).__name__}: {e}")
|
| return _empty_result(errors)
|
|
|
| return {
|
| "compression_rows": compression_rows,
|
| "compressed_corpus": compressed_corpus,
|
| "split_proposals": {int(k): v for k, v in split_proposals.items()},
|
| "quality_summary": quality_summary,
|
| "n_original": len(sentences),
|
| "n_compressed": len(selected_indices),
|
| "n_clusters": n_clusters,
|
| "n_outliers": n_outliers,
|
| "errors": errors,
|
| }
|
|
|
|
|
| def _empty_result(errors: list[str]) -> dict:
|
| return {
|
| "compression_rows": [],
|
| "compressed_corpus": [],
|
| "split_proposals": {},
|
| "quality_summary": {
|
| "TIGHT": 0, "MEDIUM": 0, "LOOSE": 0,
|
| "n_clusters_original": 0, "n_clusters_refined": 0,
|
| "n_flagged_for_split": 0,
|
| "n_splits_accepted": 0, "n_splits_rejected": 0, "n_splits_pending": 0,
|
| },
|
| "n_original": 0,
|
| "n_compressed": 0,
|
| "n_clusters": 0,
|
| "n_outliers": 0,
|
| "errors": errors,
|
| }
|
|
|