Spjimr / corpus_compression.py
shahidshaikh's picture
Upload 40 files
a52bae4 verified
# ============================================================================
# corpus_compression.py — Phase 0 Sampling (G&W at-Scale Workbench)
# ============================================================================
#
# PURPOSE
# -------
# Phase 0 Sampling enables Computational Thematic Analysis at Scale
# (Gauthier & Wallace 2022). Inserts between Phase 0 Preparation and the
# Cluster Labeling stage. Produces a sampled, representative subset of
# the corpus for downstream B&C thematic analysis.
#
# METHODOLOGY (FT50 submission design)
# -------------------------------------
# Two-stage clustering with researcher-in-the-loop refinement:
#
# Stage 1 — Initial clustering (HDBSCAN)
# Campello, Moulavi, Zimek, Sander (2015) ACM TKDD 10(1):1-51.
# Density-based, no pre-specified K, handles outliers natively.
# Produces initial cluster_id + cluster_fit per sentence.
#
# Stage 2 — Spread diagnostic
# For each cluster, compute std(cluster_fit). Classify into:
# TIGHT (std < 0.15) -> accept as-is
# MEDIUM (0.15 <= std < 0.20) -> accept as-is
# LOOSE (std >= 0.20) -> flag for Agglomerative split review
# Rationale: loose clusters indicate mixed-density regions where
# HDBSCAN merged related-but-distinct semantic patterns.
#
# Stage 3 — Agglomerative refinement (proposed, researcher-approved)
# Ward (1963) JASA 58(301):236-244. On LOOSE clusters only, run
# AgglomerativeClustering with cosine distance to produce sub-clusters
# with std <= 0.15. Researcher reviews proposed split:
# ACCEPT / REJECT / KEEP AS-IS.
#
# Stage 4 — Stratified sampling
# Sample n = max(min_cluster_size, ceil(0.10 * N)) sentences per cluster.
# No ceiling — methodology is not capped by LLM context windows.
# Stratification: top 50% / middle 30% / edge 20% by cluster_fit.
# Contrasts with BERTopic's fixed top-4 (Grootendorst 2022) and
# TnT-LLM's fixed 200 (Wan et al. 2024 KDD) which ignore cluster
# size and heterogeneity.
#
# OUTPUT (frozen artifact, one-way pipeline)
# ------------------------------------------
# Each row of the compression table carries:
# idx, L1, L2, L3, L4, sentence_id, sentence,
# cluster_id_original (HDBSCAN output)
# cluster_id_refined (after Agglomerative split if approved; else same)
# cluster_fit (HDBSCAN membership probability, 0-1)
# cluster_mean_fit (mean of cluster_fit for the refined cluster)
# cluster_std_fit (std of cluster_fit for the refined cluster)
# cluster_quality_tier (TIGHT / MEDIUM / LOOSE / OUTLIER)
# split_decision (NONE / ACCEPTED / REJECTED / PENDING)
# cluster_size, selected, reason
#
# Downstream stages read this artifact. Phase 0 never mutates after commit.
# ============================================================================
from __future__ import annotations
import math
import numpy as np
import pandas as pd
from collections import defaultdict
from typing import Any
from sentence_transformers import SentenceTransformer
# ----------------------------------------------------------------
# Constants — FT50 design (see module docstring for justification)
# ----------------------------------------------------------------
SPREAD_TIGHT_MAX = 0.15
SPREAD_MEDIUM_MAX = 0.20
SAMPLE_PERCENTAGE = 0.10
STRATIFY_TOP = 0.50
STRATIFY_MIDDLE = 0.30
STRATIFY_EDGE = 0.20
AGG_TARGET_STD = 0.15
_ST_CACHE: dict = {}
def _get_st_model(model_name="sentence-transformers/all-MiniLM-L6-v2"):
if model_name not in _ST_CACHE:
_ST_CACHE[model_name] = SentenceTransformer(model_name)
return _ST_CACHE[model_name]
def _embed(texts: list[str]) -> np.ndarray:
model = _get_st_model()
return model.encode(texts, normalize_embeddings=True, show_progress_bar=False)
def _umap_reduce(embeddings: np.ndarray, n_components: int = 10) -> np.ndarray:
"""Reduce dimensionality for HDBSCAN stability."""
try:
import umap
reducer = umap.UMAP(
n_components=n_components,
n_neighbors=min(15, len(embeddings) - 1),
min_dist=0.0,
metric="cosine",
random_state=42,
)
return reducer.fit_transform(embeddings)
except ImportError:
from sklearn.decomposition import PCA
n_comp = min(n_components, len(embeddings) - 1, embeddings.shape[1])
return PCA(n_components=n_comp, random_state=42).fit_transform(embeddings)
def _hdbscan_cluster(
reduced: np.ndarray, min_cluster_size: int
) -> tuple[np.ndarray, np.ndarray]:
"""
Cluster with HDBSCAN. Returns (labels, probabilities).
labels: -1 = outlier
probabilities: cluster membership strength (0.0 for outliers)
"""
try:
import hdbscan
clusterer = hdbscan.HDBSCAN(
min_cluster_size=min_cluster_size,
min_samples=1,
metric="euclidean",
prediction_data=False,
)
labels = clusterer.fit_predict(reduced)
probs = clusterer.probabilities_
return labels, probs
except ImportError:
# HDBSCAN not available — fallback to AgglomerativeClustering
from sklearn.cluster import AgglomerativeClustering
n_clusters = max(2, len(reduced) // max(min_cluster_size, 3))
n_clusters = min(n_clusters, len(reduced) - 1)
labels = AgglomerativeClustering(
n_clusters=n_clusters,
metric="euclidean",
linkage="ward",
).fit_predict(reduced)
probs = _fallback_probs_from_centroid(reduced, labels)
return labels, probs
def _fallback_probs_from_centroid(
reduced: np.ndarray, labels: np.ndarray
) -> np.ndarray:
"""When HDBSCAN unavailable, derive pseudo-probabilities from centroid
similarity within each cluster. Normalised to [0, 1]."""
probs = np.zeros(len(reduced), dtype=float)
for lbl in set(labels.tolist()):
if lbl == -1:
continue
idx = np.where(labels == lbl)[0]
if len(idx) == 0:
continue
centroid = reduced[idx].mean(axis=0)
d = np.linalg.norm(reduced[idx] - centroid, axis=1)
d_max = d.max() if d.max() > 0 else 1.0
sim = 1.0 - (d / d_max)
probs[idx] = sim
return probs
def _classify_spread(std_val: float) -> str:
"""Classify cluster into TIGHT / MEDIUM / LOOSE based on std(cluster_fit)."""
if std_val < SPREAD_TIGHT_MAX:
return "TIGHT"
if std_val < SPREAD_MEDIUM_MAX:
return "MEDIUM"
return "LOOSE"
def _propose_agglomerative_split(
cluster_indices: list[int],
embeddings: np.ndarray,
cluster_fits: np.ndarray,
target_std: float = AGG_TARGET_STD,
) -> dict:
"""
For a LOOSE cluster, propose a split using AgglomerativeClustering
with cosine distance. Tries K = 2..5 and picks the smallest K that
yields all sub-cluster stds <= target_std; otherwise picks the K
with the best improvement.
"""
from sklearn.cluster import AgglomerativeClustering
cluster_embs = embeddings[cluster_indices]
N = len(cluster_indices)
original_std = float(np.std(cluster_fits))
best = {
"n_sub": 1,
"sub_labels": [0] * N,
"sub_stds": [original_std],
"improvement": 0.0,
"target_reached": False,
}
if N < 4:
return best
for k in range(2, min(6, N)):
try:
sub = AgglomerativeClustering(
n_clusters=k,
metric="cosine",
linkage="average",
).fit_predict(cluster_embs)
except Exception:
continue
sub_stds: list[float] = []
ok = True
for s in range(k):
mask = sub == s
if mask.sum() < 2:
ok = False
break
sub_stds.append(float(np.std(cluster_fits[mask])))
if not ok or not sub_stds:
continue
max_sub_std = max(sub_stds)
improvement = original_std - max_sub_std
candidate = {
"n_sub": k,
"sub_labels": sub.tolist(),
"sub_stds": sub_stds,
"improvement": improvement,
"target_reached": max_sub_std <= target_std,
}
if candidate["target_reached"]:
return candidate
if improvement > best["improvement"]:
best = candidate
return best
def _stratified_sample_indices(
indices: list[int],
cluster_fits: np.ndarray,
n_sample: int,
) -> list[int]:
"""
Stratified sampling by cluster_fit rank.
Top 50% / Middle 30% / Edge 20% of n_sample quota.
"""
if n_sample >= len(indices):
order = np.argsort(-cluster_fits)
return [indices[i] for i in order]
order = np.argsort(-cluster_fits)
sorted_idx = [indices[i] for i in order]
N = len(sorted_idx)
n_top = max(1, round(n_sample * STRATIFY_TOP))
n_mid = max(0, round(n_sample * STRATIFY_MIDDLE))
n_edge = n_sample - n_top - n_mid
if n_edge < 0:
n_edge = 0
n_mid = max(0, n_sample - n_top)
top_boundary = max(1, N // 3)
edge_boundary = max(top_boundary + 1, (2 * N) // 3)
top_pool = sorted_idx[:top_boundary]
mid_pool = sorted_idx[top_boundary:edge_boundary]
edge_pool = sorted_idx[edge_boundary:]
picked: list[int] = []
picked.extend(top_pool[:n_top])
picked.extend(mid_pool[:n_mid])
picked.extend(edge_pool[:n_edge])
seen = set(picked)
if len(picked) < n_sample:
for i in sorted_idx:
if i not in seen:
picked.append(i)
seen.add(i)
if len(picked) >= n_sample:
break
return picked[:n_sample]
def _compute_n_sample(N: int, min_cluster_size: int) -> int:
"""n_sample = max(min_cluster_size, ceil(0.10 * N)), no ceiling."""
return max(min_cluster_size, math.ceil(SAMPLE_PERCENTAGE * N))
# ----------------------------------------------------------------
# Main entry point
# ----------------------------------------------------------------
def run_corpus_compression(
corpus: list[dict],
sentences_per_cluster: int = 2,
min_cluster_size: int = 3,
outlier_sample_size: int = 10,
min_cluster_fit: float = 0.0,
auto_split_loose: bool = True,
split_decisions: dict[int, str] | None = None,
) -> dict:
"""
Run Phase 0 — Sampling (G&W at-Scale).
Args:
corpus: list of dicts (from Phase 0 Preparation) with
at minimum a 'sentence' key. L1-L4 and
sentence_id preserved where present.
sentences_per_cluster: DEPRECATED. Legacy parameter retained for
backward compatibility with older UI wiring.
min_cluster_size: minimum sentences to form a cluster; also
acts as sample-size floor.
outlier_sample_size: how many outlier (-1) sentences to keep.
min_cluster_fit: threshold below which sampled members are
marked reason='below_cluster_fit_threshold'.
auto_split_loose: if True, compute Agglomerative split
proposals for LOOSE clusters (researcher
reviews in UI).
split_decisions: optional dict mapping cluster_id_original
-> {"ACCEPTED","REJECTED","PENDING"} from
a previous researcher review.
"""
dec = dict(split_decisions or {})
if not corpus:
return _empty_result(["No corpus loaded. Run Phase 0 Preparation first."])
sentences: list[str] = []
meta_rows: list[dict] = []
for r in corpus:
s = (r.get("sentence") or "").strip()
if not s:
continue
sentences.append(s)
meta_rows.append({
"L1": r.get("L1", ""),
"L2": r.get("L2", ""),
"L3": r.get("L3", ""),
"L4": r.get("L4", ""),
"sentence_id": r.get("sentence_id", ""),
"sentence": s,
"__src": r,
})
if len(sentences) < 10:
rows = []
for i, m in enumerate(meta_rows):
rows.append({
"idx": i,
"L1": m["L1"], "L2": m["L2"], "L3": m["L3"], "L4": m["L4"],
"sentence_id": m["sentence_id"],
"sentence": m["sentence"],
"cluster_id_original": 0,
"cluster_id_refined": 0,
"cluster_id": 0,
"cluster_fit": 1.0,
"cluster_mean_fit": 1.0,
"cluster_std_fit": 0.0,
"cluster_quality_tier": "TIGHT",
"split_decision": "NONE",
"cluster_size": len(meta_rows),
"selected": True,
"reason": "corpus too small — all selected",
})
return {
"compression_rows": rows,
"compressed_corpus": corpus,
"split_proposals": {},
"quality_summary": {
"TIGHT": 1, "MEDIUM": 0, "LOOSE": 0,
"n_clusters_original": 1, "n_clusters_refined": 1,
"n_flagged_for_split": 0,
"n_splits_accepted": 0, "n_splits_rejected": 0, "n_splits_pending": 0,
},
"n_original": len(sentences),
"n_compressed": len(sentences),
"n_clusters": 1,
"n_outliers": 0,
"errors": ["Corpus too small for compression (<10 sentences). All sentences kept."],
}
errors: list[str] = []
try:
embeddings = _embed(sentences)
reduced = _umap_reduce(embeddings, n_components=min(10, len(sentences) - 2))
labels, probs = _hdbscan_cluster(reduced, int(min_cluster_size))
cluster_map: dict[int, list[int]] = defaultdict(list)
outlier_indices: list[int] = []
for i, lbl in enumerate(labels):
if lbl == -1:
outlier_indices.append(i)
else:
cluster_map[int(lbl)].append(i)
# Spread diagnostic + split proposals
cluster_stats: dict[int, dict] = {}
split_proposals: dict[int, dict] = {}
for cid, idxs in cluster_map.items():
fits = probs[idxs]
mean_fit = float(np.mean(fits))
std_fit = float(np.std(fits))
tier = _classify_spread(std_fit)
cluster_stats[cid] = {
"mean_fit": mean_fit, "std_fit": std_fit,
"tier": tier, "size": len(idxs),
}
if tier == "LOOSE" and auto_split_loose:
split_proposals[cid] = _propose_agglomerative_split(
idxs, embeddings, fits, target_std=AGG_TARGET_STD
)
# Apply researcher split decisions
# refined cluster id: if ACCEPTED, new id = original*1000 + sub_id
refined_label = np.array(labels, dtype=int)
split_decisions_out: dict[int, str] = {}
for cid, idxs in cluster_map.items():
decision = dec.get(cid)
if decision is None:
decision = "PENDING" if cid in split_proposals else "NONE"
split_decisions_out[cid] = decision
if decision == "ACCEPTED" and cid in split_proposals:
proposal = split_proposals[cid]
if proposal["n_sub"] > 1:
for j, sub_lbl in enumerate(proposal["sub_labels"]):
refined_label[idxs[j]] = cid * 1000 + int(sub_lbl)
# Refined cluster stats
refined_map: dict[int, list[int]] = defaultdict(list)
for i, rl in enumerate(refined_label):
if rl == -1:
continue
refined_map[int(rl)].append(i)
refined_stats: dict[int, dict] = {}
for rcid, idxs in refined_map.items():
fits = probs[idxs]
refined_stats[rcid] = {
"mean_fit": float(np.mean(fits)),
"std_fit": float(np.std(fits)),
"tier": _classify_spread(float(np.std(fits))),
"size": len(idxs),
}
# Stratified sampling per refined cluster
selected_indices: set[int] = set()
below_threshold_indices: set[int] = set()
for rcid, idxs in refined_map.items():
fits = probs[idxs]
n_sample = _compute_n_sample(len(idxs), int(min_cluster_size))
picked = _stratified_sample_indices(idxs, fits, n_sample)
for pi in picked:
if float(probs[pi]) < float(min_cluster_fit):
below_threshold_indices.add(pi)
else:
selected_indices.add(pi)
# Outlier sampling
if outlier_indices:
np.random.seed(42)
n_keep = min(int(outlier_sample_size), len(outlier_indices))
if n_keep > 0:
kept = np.random.choice(outlier_indices, n_keep, replace=False)
selected_indices.update(int(x) for x in kept)
# Build rows
compression_rows: list[dict] = []
for i, m in enumerate(meta_rows):
orig = int(labels[i])
ref = int(refined_label[i])
fit = float(probs[i])
if ref != -1 and ref in refined_stats:
st = refined_stats[ref]
mean_fit, std_fit, tier, size = (
st["mean_fit"], st["std_fit"], st["tier"], st["size"]
)
else:
mean_fit, std_fit, tier, size = 0.0, 0.0, "OUTLIER", 0
selected = i in selected_indices
below = i in below_threshold_indices
if orig == -1 and i in selected_indices:
reason = "outlier sample"
elif below:
reason = "below_cluster_fit_threshold"
elif selected:
reason = "representative (stratified sample)"
elif orig == -1:
reason = "outlier — not sampled"
else:
reason = "cluster member — not sampled"
compression_rows.append({
"idx": i,
"L1": m["L1"], "L2": m["L2"], "L3": m["L3"], "L4": m["L4"],
"sentence_id": m["sentence_id"],
"sentence": m["sentence"],
"cluster_id_original": orig,
"cluster_id_refined": ref,
# Backward-compat alias: downstream (cluster_labeling, Phase 1+)
# reads `cluster_id` and should see the refined cluster id.
"cluster_id": ref,
"cluster_fit": round(fit, 4),
"cluster_mean_fit": round(mean_fit, 4),
"cluster_std_fit": round(std_fit, 4),
"cluster_quality_tier": tier,
"split_decision": split_decisions_out.get(orig, "NONE"),
"cluster_size": size,
"selected": bool(selected),
"reason": reason,
})
compressed_corpus = [
meta_rows[r["idx"]]["__src"]
for r in compression_rows
if r["selected"]
]
tier_counts = defaultdict(int)
for s in refined_stats.values():
tier_counts[s["tier"]] += 1
quality_summary = {
"TIGHT": int(tier_counts["TIGHT"]),
"MEDIUM": int(tier_counts["MEDIUM"]),
"LOOSE": int(tier_counts["LOOSE"]),
"n_clusters_original": len(cluster_map),
"n_clusters_refined": len(refined_map),
"n_flagged_for_split": len(split_proposals),
"n_splits_accepted": sum(
1 for v in split_decisions_out.values() if v == "ACCEPTED"
),
"n_splits_rejected": sum(
1 for v in split_decisions_out.values() if v == "REJECTED"
),
"n_splits_pending": sum(
1 for v in split_decisions_out.values() if v == "PENDING"
),
}
n_clusters = len(refined_map)
n_outliers = len(outlier_indices)
except Exception as e:
errors.append(f"Compression error: {type(e).__name__}: {e}")
return _empty_result(errors)
return {
"compression_rows": compression_rows,
"compressed_corpus": compressed_corpus,
"split_proposals": {int(k): v for k, v in split_proposals.items()},
"quality_summary": quality_summary,
"n_original": len(sentences),
"n_compressed": len(selected_indices),
"n_clusters": n_clusters,
"n_outliers": n_outliers,
"errors": errors,
}
def _empty_result(errors: list[str]) -> dict:
return {
"compression_rows": [],
"compressed_corpus": [],
"split_proposals": {},
"quality_summary": {
"TIGHT": 0, "MEDIUM": 0, "LOOSE": 0,
"n_clusters_original": 0, "n_clusters_refined": 0,
"n_flagged_for_split": 0,
"n_splits_accepted": 0, "n_splits_rejected": 0, "n_splits_pending": 0,
},
"n_original": 0,
"n_compressed": 0,
"n_clusters": 0,
"n_outliers": 0,
"errors": errors,
}