# ============================================================================ # corpus_compression.py — Phase 0 Sampling (G&W at-Scale Workbench) # ============================================================================ # # PURPOSE # ------- # Phase 0 Sampling enables Computational Thematic Analysis at Scale # (Gauthier & Wallace 2022). Inserts between Phase 0 Preparation and the # Cluster Labeling stage. Produces a sampled, representative subset of # the corpus for downstream B&C thematic analysis. # # METHODOLOGY (FT50 submission design) # ------------------------------------- # Two-stage clustering with researcher-in-the-loop refinement: # # Stage 1 — Initial clustering (HDBSCAN) # Campello, Moulavi, Zimek, Sander (2015) ACM TKDD 10(1):1-51. # Density-based, no pre-specified K, handles outliers natively. # Produces initial cluster_id + cluster_fit per sentence. # # Stage 2 — Spread diagnostic # For each cluster, compute std(cluster_fit). Classify into: # TIGHT (std < 0.15) -> accept as-is # MEDIUM (0.15 <= std < 0.20) -> accept as-is # LOOSE (std >= 0.20) -> flag for Agglomerative split review # Rationale: loose clusters indicate mixed-density regions where # HDBSCAN merged related-but-distinct semantic patterns. # # Stage 3 — Agglomerative refinement (proposed, researcher-approved) # Ward (1963) JASA 58(301):236-244. On LOOSE clusters only, run # AgglomerativeClustering with cosine distance to produce sub-clusters # with std <= 0.15. Researcher reviews proposed split: # ACCEPT / REJECT / KEEP AS-IS. # # Stage 4 — Stratified sampling # Sample n = max(min_cluster_size, ceil(0.10 * N)) sentences per cluster. # No ceiling — methodology is not capped by LLM context windows. # Stratification: top 50% / middle 30% / edge 20% by cluster_fit. # Contrasts with BERTopic's fixed top-4 (Grootendorst 2022) and # TnT-LLM's fixed 200 (Wan et al. 2024 KDD) which ignore cluster # size and heterogeneity. # # OUTPUT (frozen artifact, one-way pipeline) # ------------------------------------------ # Each row of the compression table carries: # idx, L1, L2, L3, L4, sentence_id, sentence, # cluster_id_original (HDBSCAN output) # cluster_id_refined (after Agglomerative split if approved; else same) # cluster_fit (HDBSCAN membership probability, 0-1) # cluster_mean_fit (mean of cluster_fit for the refined cluster) # cluster_std_fit (std of cluster_fit for the refined cluster) # cluster_quality_tier (TIGHT / MEDIUM / LOOSE / OUTLIER) # split_decision (NONE / ACCEPTED / REJECTED / PENDING) # cluster_size, selected, reason # # Downstream stages read this artifact. Phase 0 never mutates after commit. # ============================================================================ from __future__ import annotations import math import numpy as np import pandas as pd from collections import defaultdict from typing import Any from sentence_transformers import SentenceTransformer # ---------------------------------------------------------------- # Constants — FT50 design (see module docstring for justification) # ---------------------------------------------------------------- SPREAD_TIGHT_MAX = 0.15 SPREAD_MEDIUM_MAX = 0.20 SAMPLE_PERCENTAGE = 0.10 STRATIFY_TOP = 0.50 STRATIFY_MIDDLE = 0.30 STRATIFY_EDGE = 0.20 AGG_TARGET_STD = 0.15 _ST_CACHE: dict = {} def _get_st_model(model_name="sentence-transformers/all-MiniLM-L6-v2"): if model_name not in _ST_CACHE: _ST_CACHE[model_name] = SentenceTransformer(model_name) return _ST_CACHE[model_name] def _embed(texts: list[str]) -> np.ndarray: model = _get_st_model() return model.encode(texts, normalize_embeddings=True, show_progress_bar=False) def _umap_reduce(embeddings: np.ndarray, n_components: int = 10) -> np.ndarray: """Reduce dimensionality for HDBSCAN stability.""" try: import umap reducer = umap.UMAP( n_components=n_components, n_neighbors=min(15, len(embeddings) - 1), min_dist=0.0, metric="cosine", random_state=42, ) return reducer.fit_transform(embeddings) except ImportError: from sklearn.decomposition import PCA n_comp = min(n_components, len(embeddings) - 1, embeddings.shape[1]) return PCA(n_components=n_comp, random_state=42).fit_transform(embeddings) def _hdbscan_cluster( reduced: np.ndarray, min_cluster_size: int ) -> tuple[np.ndarray, np.ndarray]: """ Cluster with HDBSCAN. Returns (labels, probabilities). labels: -1 = outlier probabilities: cluster membership strength (0.0 for outliers) """ try: import hdbscan clusterer = hdbscan.HDBSCAN( min_cluster_size=min_cluster_size, min_samples=1, metric="euclidean", prediction_data=False, ) labels = clusterer.fit_predict(reduced) probs = clusterer.probabilities_ return labels, probs except ImportError: # HDBSCAN not available — fallback to AgglomerativeClustering from sklearn.cluster import AgglomerativeClustering n_clusters = max(2, len(reduced) // max(min_cluster_size, 3)) n_clusters = min(n_clusters, len(reduced) - 1) labels = AgglomerativeClustering( n_clusters=n_clusters, metric="euclidean", linkage="ward", ).fit_predict(reduced) probs = _fallback_probs_from_centroid(reduced, labels) return labels, probs def _fallback_probs_from_centroid( reduced: np.ndarray, labels: np.ndarray ) -> np.ndarray: """When HDBSCAN unavailable, derive pseudo-probabilities from centroid similarity within each cluster. Normalised to [0, 1].""" probs = np.zeros(len(reduced), dtype=float) for lbl in set(labels.tolist()): if lbl == -1: continue idx = np.where(labels == lbl)[0] if len(idx) == 0: continue centroid = reduced[idx].mean(axis=0) d = np.linalg.norm(reduced[idx] - centroid, axis=1) d_max = d.max() if d.max() > 0 else 1.0 sim = 1.0 - (d / d_max) probs[idx] = sim return probs def _classify_spread(std_val: float) -> str: """Classify cluster into TIGHT / MEDIUM / LOOSE based on std(cluster_fit).""" if std_val < SPREAD_TIGHT_MAX: return "TIGHT" if std_val < SPREAD_MEDIUM_MAX: return "MEDIUM" return "LOOSE" def _propose_agglomerative_split( cluster_indices: list[int], embeddings: np.ndarray, cluster_fits: np.ndarray, target_std: float = AGG_TARGET_STD, ) -> dict: """ For a LOOSE cluster, propose a split using AgglomerativeClustering with cosine distance. Tries K = 2..5 and picks the smallest K that yields all sub-cluster stds <= target_std; otherwise picks the K with the best improvement. """ from sklearn.cluster import AgglomerativeClustering cluster_embs = embeddings[cluster_indices] N = len(cluster_indices) original_std = float(np.std(cluster_fits)) best = { "n_sub": 1, "sub_labels": [0] * N, "sub_stds": [original_std], "improvement": 0.0, "target_reached": False, } if N < 4: return best for k in range(2, min(6, N)): try: sub = AgglomerativeClustering( n_clusters=k, metric="cosine", linkage="average", ).fit_predict(cluster_embs) except Exception: continue sub_stds: list[float] = [] ok = True for s in range(k): mask = sub == s if mask.sum() < 2: ok = False break sub_stds.append(float(np.std(cluster_fits[mask]))) if not ok or not sub_stds: continue max_sub_std = max(sub_stds) improvement = original_std - max_sub_std candidate = { "n_sub": k, "sub_labels": sub.tolist(), "sub_stds": sub_stds, "improvement": improvement, "target_reached": max_sub_std <= target_std, } if candidate["target_reached"]: return candidate if improvement > best["improvement"]: best = candidate return best def _stratified_sample_indices( indices: list[int], cluster_fits: np.ndarray, n_sample: int, ) -> list[int]: """ Stratified sampling by cluster_fit rank. Top 50% / Middle 30% / Edge 20% of n_sample quota. """ if n_sample >= len(indices): order = np.argsort(-cluster_fits) return [indices[i] for i in order] order = np.argsort(-cluster_fits) sorted_idx = [indices[i] for i in order] N = len(sorted_idx) n_top = max(1, round(n_sample * STRATIFY_TOP)) n_mid = max(0, round(n_sample * STRATIFY_MIDDLE)) n_edge = n_sample - n_top - n_mid if n_edge < 0: n_edge = 0 n_mid = max(0, n_sample - n_top) top_boundary = max(1, N // 3) edge_boundary = max(top_boundary + 1, (2 * N) // 3) top_pool = sorted_idx[:top_boundary] mid_pool = sorted_idx[top_boundary:edge_boundary] edge_pool = sorted_idx[edge_boundary:] picked: list[int] = [] picked.extend(top_pool[:n_top]) picked.extend(mid_pool[:n_mid]) picked.extend(edge_pool[:n_edge]) seen = set(picked) if len(picked) < n_sample: for i in sorted_idx: if i not in seen: picked.append(i) seen.add(i) if len(picked) >= n_sample: break return picked[:n_sample] def _compute_n_sample(N: int, min_cluster_size: int) -> int: """n_sample = max(min_cluster_size, ceil(0.10 * N)), no ceiling.""" return max(min_cluster_size, math.ceil(SAMPLE_PERCENTAGE * N)) # ---------------------------------------------------------------- # Main entry point # ---------------------------------------------------------------- def run_corpus_compression( corpus: list[dict], sentences_per_cluster: int = 2, min_cluster_size: int = 3, outlier_sample_size: int = 10, min_cluster_fit: float = 0.0, auto_split_loose: bool = True, split_decisions: dict[int, str] | None = None, ) -> dict: """ Run Phase 0 — Sampling (G&W at-Scale). Args: corpus: list of dicts (from Phase 0 Preparation) with at minimum a 'sentence' key. L1-L4 and sentence_id preserved where present. sentences_per_cluster: DEPRECATED. Legacy parameter retained for backward compatibility with older UI wiring. min_cluster_size: minimum sentences to form a cluster; also acts as sample-size floor. outlier_sample_size: how many outlier (-1) sentences to keep. min_cluster_fit: threshold below which sampled members are marked reason='below_cluster_fit_threshold'. auto_split_loose: if True, compute Agglomerative split proposals for LOOSE clusters (researcher reviews in UI). split_decisions: optional dict mapping cluster_id_original -> {"ACCEPTED","REJECTED","PENDING"} from a previous researcher review. """ dec = dict(split_decisions or {}) if not corpus: return _empty_result(["No corpus loaded. Run Phase 0 Preparation first."]) sentences: list[str] = [] meta_rows: list[dict] = [] for r in corpus: s = (r.get("sentence") or "").strip() if not s: continue sentences.append(s) meta_rows.append({ "L1": r.get("L1", ""), "L2": r.get("L2", ""), "L3": r.get("L3", ""), "L4": r.get("L4", ""), "sentence_id": r.get("sentence_id", ""), "sentence": s, "__src": r, }) if len(sentences) < 10: rows = [] for i, m in enumerate(meta_rows): rows.append({ "idx": i, "L1": m["L1"], "L2": m["L2"], "L3": m["L3"], "L4": m["L4"], "sentence_id": m["sentence_id"], "sentence": m["sentence"], "cluster_id_original": 0, "cluster_id_refined": 0, "cluster_id": 0, "cluster_fit": 1.0, "cluster_mean_fit": 1.0, "cluster_std_fit": 0.0, "cluster_quality_tier": "TIGHT", "split_decision": "NONE", "cluster_size": len(meta_rows), "selected": True, "reason": "corpus too small — all selected", }) return { "compression_rows": rows, "compressed_corpus": corpus, "split_proposals": {}, "quality_summary": { "TIGHT": 1, "MEDIUM": 0, "LOOSE": 0, "n_clusters_original": 1, "n_clusters_refined": 1, "n_flagged_for_split": 0, "n_splits_accepted": 0, "n_splits_rejected": 0, "n_splits_pending": 0, }, "n_original": len(sentences), "n_compressed": len(sentences), "n_clusters": 1, "n_outliers": 0, "errors": ["Corpus too small for compression (<10 sentences). All sentences kept."], } errors: list[str] = [] try: embeddings = _embed(sentences) reduced = _umap_reduce(embeddings, n_components=min(10, len(sentences) - 2)) labels, probs = _hdbscan_cluster(reduced, int(min_cluster_size)) cluster_map: dict[int, list[int]] = defaultdict(list) outlier_indices: list[int] = [] for i, lbl in enumerate(labels): if lbl == -1: outlier_indices.append(i) else: cluster_map[int(lbl)].append(i) # Spread diagnostic + split proposals cluster_stats: dict[int, dict] = {} split_proposals: dict[int, dict] = {} for cid, idxs in cluster_map.items(): fits = probs[idxs] mean_fit = float(np.mean(fits)) std_fit = float(np.std(fits)) tier = _classify_spread(std_fit) cluster_stats[cid] = { "mean_fit": mean_fit, "std_fit": std_fit, "tier": tier, "size": len(idxs), } if tier == "LOOSE" and auto_split_loose: split_proposals[cid] = _propose_agglomerative_split( idxs, embeddings, fits, target_std=AGG_TARGET_STD ) # Apply researcher split decisions # refined cluster id: if ACCEPTED, new id = original*1000 + sub_id refined_label = np.array(labels, dtype=int) split_decisions_out: dict[int, str] = {} for cid, idxs in cluster_map.items(): decision = dec.get(cid) if decision is None: decision = "PENDING" if cid in split_proposals else "NONE" split_decisions_out[cid] = decision if decision == "ACCEPTED" and cid in split_proposals: proposal = split_proposals[cid] if proposal["n_sub"] > 1: for j, sub_lbl in enumerate(proposal["sub_labels"]): refined_label[idxs[j]] = cid * 1000 + int(sub_lbl) # Refined cluster stats refined_map: dict[int, list[int]] = defaultdict(list) for i, rl in enumerate(refined_label): if rl == -1: continue refined_map[int(rl)].append(i) refined_stats: dict[int, dict] = {} for rcid, idxs in refined_map.items(): fits = probs[idxs] refined_stats[rcid] = { "mean_fit": float(np.mean(fits)), "std_fit": float(np.std(fits)), "tier": _classify_spread(float(np.std(fits))), "size": len(idxs), } # Stratified sampling per refined cluster selected_indices: set[int] = set() below_threshold_indices: set[int] = set() for rcid, idxs in refined_map.items(): fits = probs[idxs] n_sample = _compute_n_sample(len(idxs), int(min_cluster_size)) picked = _stratified_sample_indices(idxs, fits, n_sample) for pi in picked: if float(probs[pi]) < float(min_cluster_fit): below_threshold_indices.add(pi) else: selected_indices.add(pi) # Outlier sampling if outlier_indices: np.random.seed(42) n_keep = min(int(outlier_sample_size), len(outlier_indices)) if n_keep > 0: kept = np.random.choice(outlier_indices, n_keep, replace=False) selected_indices.update(int(x) for x in kept) # Build rows compression_rows: list[dict] = [] for i, m in enumerate(meta_rows): orig = int(labels[i]) ref = int(refined_label[i]) fit = float(probs[i]) if ref != -1 and ref in refined_stats: st = refined_stats[ref] mean_fit, std_fit, tier, size = ( st["mean_fit"], st["std_fit"], st["tier"], st["size"] ) else: mean_fit, std_fit, tier, size = 0.0, 0.0, "OUTLIER", 0 selected = i in selected_indices below = i in below_threshold_indices if orig == -1 and i in selected_indices: reason = "outlier sample" elif below: reason = "below_cluster_fit_threshold" elif selected: reason = "representative (stratified sample)" elif orig == -1: reason = "outlier — not sampled" else: reason = "cluster member — not sampled" compression_rows.append({ "idx": i, "L1": m["L1"], "L2": m["L2"], "L3": m["L3"], "L4": m["L4"], "sentence_id": m["sentence_id"], "sentence": m["sentence"], "cluster_id_original": orig, "cluster_id_refined": ref, # Backward-compat alias: downstream (cluster_labeling, Phase 1+) # reads `cluster_id` and should see the refined cluster id. "cluster_id": ref, "cluster_fit": round(fit, 4), "cluster_mean_fit": round(mean_fit, 4), "cluster_std_fit": round(std_fit, 4), "cluster_quality_tier": tier, "split_decision": split_decisions_out.get(orig, "NONE"), "cluster_size": size, "selected": bool(selected), "reason": reason, }) compressed_corpus = [ meta_rows[r["idx"]]["__src"] for r in compression_rows if r["selected"] ] tier_counts = defaultdict(int) for s in refined_stats.values(): tier_counts[s["tier"]] += 1 quality_summary = { "TIGHT": int(tier_counts["TIGHT"]), "MEDIUM": int(tier_counts["MEDIUM"]), "LOOSE": int(tier_counts["LOOSE"]), "n_clusters_original": len(cluster_map), "n_clusters_refined": len(refined_map), "n_flagged_for_split": len(split_proposals), "n_splits_accepted": sum( 1 for v in split_decisions_out.values() if v == "ACCEPTED" ), "n_splits_rejected": sum( 1 for v in split_decisions_out.values() if v == "REJECTED" ), "n_splits_pending": sum( 1 for v in split_decisions_out.values() if v == "PENDING" ), } n_clusters = len(refined_map) n_outliers = len(outlier_indices) except Exception as e: errors.append(f"Compression error: {type(e).__name__}: {e}") return _empty_result(errors) return { "compression_rows": compression_rows, "compressed_corpus": compressed_corpus, "split_proposals": {int(k): v for k, v in split_proposals.items()}, "quality_summary": quality_summary, "n_original": len(sentences), "n_compressed": len(selected_indices), "n_clusters": n_clusters, "n_outliers": n_outliers, "errors": errors, } def _empty_result(errors: list[str]) -> dict: return { "compression_rows": [], "compressed_corpus": [], "split_proposals": {}, "quality_summary": { "TIGHT": 0, "MEDIUM": 0, "LOOSE": 0, "n_clusters_original": 0, "n_clusters_refined": 0, "n_flagged_for_split": 0, "n_splits_accepted": 0, "n_splits_rejected": 0, "n_splits_pending": 0, }, "n_original": 0, "n_compressed": 0, "n_clusters": 0, "n_outliers": 0, "errors": errors, }