""" Offline pipeline: BERTopic (or sklearn DBSCAN) clusters subclaim text, then adds cosine similarity (subclaim vs mapped superclaim) using the same embedding space. Subclaim text prefers ``greenwashing_codebook.json``, then ``current_text`` in ``greenwashing_claim_history.json``. Superclaim text comes from ``greenwashing_superclaims.json`` via ``claim_superclaim_map.json``. Writes subclaim_bertopic_collapse.json (bundle fingerprint + per-subclaim rows: topic / collapse hints + hierarchy_confidence). No live classification APIs — output is consumed as static JSON by the UI. Usage (from repo root): pip install -r requirements.txt python scripts/build_subclaim_collapse_bertopic.py """ from __future__ import annotations import argparse import hashlib import json import os import sys from datetime import datetime, timezone from pathlib import Path from typing import Any import numpy as np # Repo root (parent of scripts/) ROOT = Path(__file__).resolve().parents[1] DEFAULT_CLAIMS = ROOT / "greenwashing_claim_history.json" DEFAULT_MAP = ROOT / "claim_superclaim_map.json" DEFAULT_SUPERCLAIMS = ROOT / "greenwashing_superclaims.json" DEFAULT_CODEBOOK = ROOT / "greenwashing_codebook.json" DEFAULT_OUT = ROOT / "subclaim_bertopic_collapse.json" def _file_sha256(path: Path) -> str: h = hashlib.sha256() with open(path, "rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): h.update(chunk) return h.hexdigest() def _strip_dual_prefix(s: str, a: str, b: str) -> str: if s.startswith(a): return s[len(a) :] if s.startswith(b): return s[len(b) :] return s def _normalize_nc(raw: str) -> str: s = str(raw).strip() if not s: return "" if s.startswith("NC_"): return s body = _strip_dual_prefix(s, "NC_", "SC_") return f"NC_{body}" def _normalize_sc(raw: str) -> str: s = str(raw).strip() if not s: return "" if s.startswith("SC_"): return s body = _strip_dual_prefix(s, "SC_", "NC_") return f"SC_{body}" def _load_id_text_json(path: Path, kind: str) -> dict[str, str]: with open(path, encoding="utf-8") as f: data = json.load(f) if not isinstance(data, dict): raise ValueError(f"{path} must be a JSON object of {{id: text}}") out: dict[str, str] = {} for k, v in data.items(): text = str(v if v is not None else "").strip() nid = _normalize_nc(str(k)) if kind == "subclaim" else _normalize_sc(str(k)) if nid: out[nid] = text return out def _parse_claim_superclaim_map(obj: Any) -> dict[str, str]: """Normalized subclaim_id -> superclaim_id.""" pairs: list[tuple[str, str]] = [] if obj is None: return {} if isinstance(obj, dict): keys = list(obj.keys()) first_key = keys[0] if keys else None sample = obj[first_key] if first_key is not None else None is_combined = ( sample is not None and isinstance(sample, dict) and not isinstance(sample, list) and ( "superclaim_id" in sample or "superclaimId" in sample or "sc_id" in sample ) ) if is_combined: for sub_id, record in obj.items(): if not isinstance(record, dict): continue sc = ( record.get("superclaim_id") or record.get("superclaimId") or record.get("sc_id") or record.get("SC") ) if sc is None: continue pairs.append((_normalize_nc(str(sub_id)), _normalize_sc(str(sc)))) else: for nc, sc in obj.items(): pairs.append((_normalize_nc(str(nc)), _normalize_sc(str(sc)))) elif isinstance(obj, list): for item in obj: if isinstance(item, (list, tuple)) and len(item) >= 2: pairs.append((_normalize_nc(str(item[0])), _normalize_sc(str(item[1])))) elif isinstance(item, dict): nc = ( item.get("subclaim_id") or item.get("nc_id") or item.get("subclaim") or item.get("NC") ) sc = ( item.get("superclaim_id") or item.get("sc_id") or item.get("superclaim") or item.get("SC") ) if nc is None or sc is None: continue pairs.append((_normalize_nc(str(nc)), _normalize_sc(str(sc)))) out: dict[str, str] = {} for sid, scid in pairs: if sid and scid: out[sid] = scid return out def _encode_subclaims( docs: list[str], embedding_model: Any, *, verbose: bool = True, ) -> np.ndarray: return embedding_model.encode( docs, batch_size=64, show_progress_bar=verbose, convert_to_numpy=True, normalize_embeddings=True, ) def fit_tfidf_svd_embeddings( docs: list[str], *, max_features: int = 30000, n_components: int = 128, random_state: int = 42, ) -> tuple[np.ndarray, Any]: """ Lightweight embedding: TF-IDF → TruncatedSVD → L2 normalize rows. Returns (doc_embeddings, encode_fn) where encode_fn(texts) -> dense matrix. """ from sklearn.decomposition import TruncatedSVD from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.preprocessing import normalize min_df = 1 if len(docs) < 80 else 2 vec = TfidfVectorizer( max_features=max_features, min_df=min_df, max_df=0.92, ngram_range=(1, 2), sublinear_tf=True, ) X = vec.fit_transform(docs) n_comp = min(n_components, max(2, X.shape[1] - 1)) svd = TruncatedSVD(n_components=n_comp, random_state=random_state) Z = normalize(svd.fit_transform(X)).astype(np.float64) def encode(texts: list[str]) -> np.ndarray: Xt = vec.transform(texts) return normalize(svd.transform(Xt)).astype(np.float64) return Z, encode def _cluster_sklearn_dbscan( embeddings: np.ndarray, *, min_topic_size: int, eps: float, ) -> np.ndarray: """Cosine DBSCAN on L2-normalized rows; noise = -1 (same convention as BERTopic outliers).""" from sklearn.cluster import DBSCAN if min_topic_size < 2: min_topic_size = 2 labels = DBSCAN( eps=eps, min_samples=min_topic_size, metric="cosine", n_jobs=-1, ).fit_predict(embeddings) return np.asarray(labels, dtype=np.int64) def _fit_bertopic( docs: list[str], doc_embeddings: np.ndarray, embedding_model: Any, *, min_topic_size: int, verbose: bool, ) -> tuple[np.ndarray, Any]: from bertopic import BERTopic topic_model = BERTopic( embedding_model=embedding_model, min_topic_size=min_topic_size, verbose=verbose, ) topics, _ = topic_model.fit_transform(docs, embeddings=doc_embeddings) return np.asarray(topics, dtype=np.int64), topic_model def cluster_subclaims_topic( ids: list[str], docs: list[str], doc_embeddings: np.ndarray, embedding_model: Any | None, *, min_topic_size: int = 2, verbose: bool = True, backend: str = "auto", dbscan_eps: float = 0.32, ) -> tuple[np.ndarray, Any | None, str]: """ Cluster precomputed subclaim embeddings. - **bertopic**: needs ``embedding_model`` (SentenceTransformer). - **sklearn**: DBSCAN (cosine); works with TF-IDF–SVD or any dense rows. - **auto**: try BERTopic when ``embedding_model`` is set; else sklearn; on failure, sklearn. Returns ``(topic_ids_per_doc, topic_model_or_none, cluster_backend)``. """ if len(ids) != len(docs): raise ValueError("ids and docs must have the same length") if len(docs) < 2: raise ValueError("need at least 2 documents") be = (backend or "auto").strip().lower() def _sklearn() -> tuple[np.ndarray, None, str]: topics = _cluster_sklearn_dbscan( doc_embeddings, min_topic_size=min_topic_size, eps=dbscan_eps, ) return topics, None, "sklearn_dbscan" if be == "sklearn": t, m, name = _sklearn() return t, m, name if be == "bertopic": if embedding_model is None: print( "BERTopic requires sentence-transformers; using sklearn DBSCAN instead.", file=sys.stderr, ) return _sklearn() try: topics, tm = _fit_bertopic( docs, doc_embeddings, embedding_model, min_topic_size=min_topic_size, verbose=verbose, ) return topics, tm, "bertopic" except Exception as e: print( f"BERTopic failed ({e!r}); using sklearn DBSCAN " f"(eps={dbscan_eps}, min_samples={min_topic_size}).", file=sys.stderr, ) return _sklearn() # auto if embedding_model is None: return _sklearn() try: topics, tm = _fit_bertopic( docs, doc_embeddings, embedding_model, min_topic_size=min_topic_size, verbose=verbose, ) return topics, tm, "bertopic" except Exception as e: print( f"BERTopic unavailable ({e!r}); using sklearn DBSCAN fallback.", file=sys.stderr, ) return _sklearn() def build_bertopic_subclaim_clusters( ids: list[str], docs: list[str], *, embedding_model: Any, min_topic_size: int = 2, verbose: bool = True, backend: str = "auto", dbscan_eps: float = 0.32, ) -> tuple[np.ndarray, Any | None, np.ndarray, str]: """ Encode with SentenceTransformer, then :func:`cluster_subclaims_topic`. Returns ``(topics, topic_model, doc_embeddings, backend_name)``. """ doc_embeddings = _encode_subclaims(docs, embedding_model, verbose=verbose) topics, tm, name = cluster_subclaims_topic( ids, docs, doc_embeddings, embedding_model, min_topic_size=min_topic_size, verbose=verbose, backend=backend, dbscan_eps=dbscan_eps, ) return topics, tm, doc_embeddings, name def subclaim_rows_from_topics( ids: list[str], topics: np.ndarray, topic_model: Any | None, ) -> dict[str, dict[str, Any]]: """Map each subclaim id to topic_id, collapse_flag, collapse_with, and optional topic_label.""" topics_list = [int(t) for t in topics] by_topic: dict[int, list[str]] = {} for sid, t in zip(ids, topics_list): by_topic.setdefault(t, []).append(sid) subclaims_out: dict[str, dict[str, Any]] = {} for sid, t in zip(ids, topics_list): peers = [x for x in by_topic.get(t, []) if x != sid] if t < 0 or len(peers) == 0: subclaims_out[sid] = { "topic_id": t, "collapse_flag": False, "collapse_with": [], } else: subclaims_out[sid] = { "topic_id": t, "collapse_flag": True, "collapse_with": sorted(peers), } if topic_model is not None: try: topic_info = topic_model.get_topic_info() labels_by_id: dict[int, str] = {} for _, row in topic_info.iterrows(): tid = int(row["Topic"]) if tid < 0: continue labels_by_id[tid] = str(row.get("Name", "") or "") for sid in subclaims_out: tid = int(subclaims_out[sid]["topic_id"]) if tid >= 0 and tid in labels_by_id: subclaims_out[sid]["topic_label"] = labels_by_id[tid] except Exception: pass return subclaims_out def main() -> int: parser = argparse.ArgumentParser( description="Build BERTopic / DBSCAN collapse artifact for subclaims.", ) parser.add_argument("--claims-json", type=Path, default=DEFAULT_CLAIMS) parser.add_argument("--claim-superclaim-map", type=Path, default=DEFAULT_MAP) parser.add_argument("--superclaims-json", type=Path, default=DEFAULT_SUPERCLAIMS) parser.add_argument("--codebook-json", type=Path, default=DEFAULT_CODEBOOK) parser.add_argument("--out", type=Path, default=DEFAULT_OUT) parser.add_argument("--min-topic-size", type=int, default=2) parser.add_argument("--embedding-model", default="all-MiniLM-L6-v2") parser.add_argument( "--cluster-backend", choices=("auto", "bertopic", "sklearn"), default="auto", help="Topic clustering: BERTopic, sklearn DBSCAN, or auto (try BERTopic then fall back).", ) parser.add_argument( "--dbscan-eps", type=float, default=0.32, help="Cosine DBSCAN eps when using sklearn (smaller = tighter clusters).", ) parser.add_argument( "--embedding-backend", choices=("auto", "sentence_transformers", "tfidf"), default="auto", help="Embeddings: MiniLM via sentence-transformers, TF-IDF+SVD (no PyTorch), or auto.", ) args = parser.parse_args() try: import sklearn # noqa: F401 except ImportError: print("Missing scikit-learn. Install with: pip install scikit-learn numpy", file=sys.stderr) raise SystemExit(1) claims_path = args.claims_json map_path = args.claim_superclaim_map super_path = args.superclaims_json codebook_path = args.codebook_json for label, p in ( ("Claims JSON", claims_path), ("claim_superclaim_map.json", map_path), ("greenwashing_superclaims.json", super_path), ("greenwashing_codebook.json", codebook_path), ): if not p.is_file(): print(f"{label} not found: {p}", file=sys.stderr) return 1 data_bundle_paths = (claims_path, map_path, super_path, codebook_path) data_hashes = [_file_sha256(p) for p in data_bundle_paths] bundle_fingerprint = "|".join(data_hashes) claims_bundle_version = hashlib.sha256(bundle_fingerprint.encode("utf-8")).hexdigest()[:16] claims_sha = _file_sha256(claims_path) codebook = _load_id_text_json(codebook_path, "subclaim") superclaims = _load_id_text_json(super_path, "superclaim") with open(map_path, encoding="utf-8") as f: map_raw = json.load(f) sub_to_super = _parse_claim_superclaim_map(map_raw) with open(claims_path, encoding="utf-8") as f: data = json.load(f) claims = data.get("claims") or {} claims_version_from_file = data.get("claims_version") if claims_version_from_file is not None: claims_version_from_file = str(claims_version_from_file) ids: list[str] = [] docs: list[str] = [] collected: set[str] = set() def add_sid_doc(sid: str, text: str) -> None: t = text.strip() if not sid or not t or sid not in sub_to_super: return if sid in collected: return collected.add(sid) ids.append(sid) docs.append(t) for claim_id, obj in claims.items(): sid = ( str(claim_id) if str(claim_id).startswith("NC_") else f"NC_{str(claim_id).replace('NC_', '').replace('SC_', '')}" ) text = codebook.get(sid, "").strip() or str((obj or {}).get("current_text") or "").strip() add_sid_doc(sid, text) for sid in sub_to_super: if sid in collected: continue text = codebook.get(sid, "").strip() if text: add_sid_doc(sid, text) if len(docs) < 2: print( "Need at least 2 mapped subclaims with non-empty text " "(codebook and/or claim history current_text).", file=sys.stderr, ) return 1 eb = args.embedding_backend embedding_model: Any | None = None encode_fn: Any = None doc_embeddings: np.ndarray embedding_backend_used: str if eb == "tfidf": print("Embedding: TF-IDF + TruncatedSVD (no sentence-transformers).") doc_embeddings, encode_fn = fit_tfidf_svd_embeddings(docs) embedding_backend_used = "tfidf_svd" if args.cluster_backend == "bertopic": print("BERTopic needs sentence-transformers embeddings; forcing --cluster-backend sklearn.", file=sys.stderr) args.cluster_backend = "sklearn" elif eb == "sentence_transformers": try: from sentence_transformers import SentenceTransformer except ImportError: print( "sentence-transformers not installed. Use --embedding-backend tfidf " "or: pip install sentence-transformers", file=sys.stderr, ) raise SystemExit(1) print(f"Embedding: sentence-transformers ({args.embedding_model})") embedding_model = SentenceTransformer(args.embedding_model) doc_embeddings = _encode_subclaims(docs, embedding_model, verbose=True) encode_fn = lambda texts: embedding_model.encode( # type: ignore[misc] texts, batch_size=64, show_progress_bar=False, convert_to_numpy=True, normalize_embeddings=True, ) embedding_backend_used = "sentence_transformers" else: # auto: prefer sentence-transformers; fall back to TF-IDF try: from sentence_transformers import SentenceTransformer print(f"Embedding: sentence-transformers ({args.embedding_model})") embedding_model = SentenceTransformer(args.embedding_model) doc_embeddings = _encode_subclaims(docs, embedding_model, verbose=True) encode_fn = lambda texts: embedding_model.encode( texts, batch_size=64, show_progress_bar=False, convert_to_numpy=True, normalize_embeddings=True, ) embedding_backend_used = "sentence_transformers" except Exception as e: print(f"sentence-transformers unavailable ({e!r}); using TF-IDF + SVD.", file=sys.stderr) doc_embeddings, encode_fn = fit_tfidf_svd_embeddings(docs) embedding_model = None embedding_backend_used = "tfidf_svd" if args.cluster_backend == "bertopic": args.cluster_backend = "sklearn" print(f"claims_bundle_version: {claims_bundle_version}") print( f"Clustering {len(docs)} subclaims " f"(cluster={args.cluster_backend}, embedding={embedding_backend_used}, min_topic_size={args.min_topic_size})…", ) topics, topic_model, cluster_backend = cluster_subclaims_topic( ids, docs, doc_embeddings, embedding_model, min_topic_size=args.min_topic_size, verbose=True, backend=args.cluster_backend, dbscan_eps=args.dbscan_eps, ) print(f"Cluster backend used: {cluster_backend}") subclaims_out = subclaim_rows_from_topics(ids, topics, topic_model) sid_to_doc = dict(zip(ids, docs)) for sid in ids: scid = sub_to_super.get(sid) if not scid or sid not in subclaims_out: continue sub_text = sid_to_doc.get(sid, "").strip() super_text = superclaims.get(scid, "").strip() if not sub_text or not super_text: continue pair_emb = encode_fn([sub_text, super_text]) sim = float(np.dot(pair_emb[0], pair_emb[1])) sim = max(0.0, min(1.0, sim)) subclaims_out[sid]["hierarchy_confidence"] = round(sim, 4) subclaims_out[sid]["superclaim_id"] = scid out_obj: dict[str, Any] = { "generated_at": datetime.now(timezone.utc).isoformat(), "claims_bundle_version": claims_bundle_version, "claims_source_sha256": claims_sha, "data_files_sha256": data_hashes, "claims_version": claims_version_from_file, "cluster_backend": cluster_backend, "embedding_backend": embedding_backend_used, "embedding_model": args.embedding_model if embedding_backend_used == "sentence_transformers" else None, "min_topic_size": args.min_topic_size, "dbscan_eps": args.dbscan_eps if cluster_backend == "sklearn_dbscan" else None, "claims_source": os.path.basename(str(claims_path)), "subclaims": subclaims_out, } out_path = args.out out_path.parent.mkdir(parents=True, exist_ok=True) with open(out_path, "w", encoding="utf-8") as f: json.dump(out_obj, f, indent=2, ensure_ascii=False) flagged = sum(1 for v in subclaims_out.values() if v.get("collapse_flag")) with_hier = sum(1 for v in subclaims_out.values() if "hierarchy_confidence" in v) print(f"Wrote {out_path}") print(f" collapse_flag=true: {flagged} subclaims; hierarchy_confidence: {with_hier} subclaims") return 0 if __name__ == "__main__": raise SystemExit(main())