Spaces:
Sleeping
Sleeping
| """ | |
| Offline pipeline: BERTopic (or sklearn DBSCAN) clusters subclaim text, then adds | |
| cosine similarity (subclaim vs mapped superclaim) using the same embedding space. | |
| Subclaim text prefers ``greenwashing_codebook.json``, then ``current_text`` in | |
| ``greenwashing_claim_history.json``. Superclaim text comes from | |
| ``greenwashing_superclaims.json`` via ``claim_superclaim_map.json``. | |
| Writes subclaim_bertopic_collapse.json (bundle fingerprint + per-subclaim rows: | |
| topic / collapse hints + hierarchy_confidence). | |
| No live classification APIs — output is consumed as static JSON by the UI. | |
| Usage (from repo root): | |
| pip install -r requirements.txt | |
| python scripts/build_subclaim_collapse_bertopic.py | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import hashlib | |
| import json | |
| import os | |
| import sys | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Any | |
| import numpy as np | |
| # Repo root (parent of scripts/) | |
| ROOT = Path(__file__).resolve().parents[1] | |
| DEFAULT_CLAIMS = ROOT / "greenwashing_claim_history.json" | |
| DEFAULT_MAP = ROOT / "claim_superclaim_map.json" | |
| DEFAULT_SUPERCLAIMS = ROOT / "greenwashing_superclaims.json" | |
| DEFAULT_CODEBOOK = ROOT / "greenwashing_codebook.json" | |
| DEFAULT_OUT = ROOT / "subclaim_bertopic_collapse.json" | |
| def _file_sha256(path: Path) -> str: | |
| h = hashlib.sha256() | |
| with open(path, "rb") as f: | |
| for chunk in iter(lambda: f.read(1024 * 1024), b""): | |
| h.update(chunk) | |
| return h.hexdigest() | |
| def _strip_dual_prefix(s: str, a: str, b: str) -> str: | |
| if s.startswith(a): | |
| return s[len(a) :] | |
| if s.startswith(b): | |
| return s[len(b) :] | |
| return s | |
| def _normalize_nc(raw: str) -> str: | |
| s = str(raw).strip() | |
| if not s: | |
| return "" | |
| if s.startswith("NC_"): | |
| return s | |
| body = _strip_dual_prefix(s, "NC_", "SC_") | |
| return f"NC_{body}" | |
| def _normalize_sc(raw: str) -> str: | |
| s = str(raw).strip() | |
| if not s: | |
| return "" | |
| if s.startswith("SC_"): | |
| return s | |
| body = _strip_dual_prefix(s, "SC_", "NC_") | |
| return f"SC_{body}" | |
| def _load_id_text_json(path: Path, kind: str) -> dict[str, str]: | |
| with open(path, encoding="utf-8") as f: | |
| data = json.load(f) | |
| if not isinstance(data, dict): | |
| raise ValueError(f"{path} must be a JSON object of {{id: text}}") | |
| out: dict[str, str] = {} | |
| for k, v in data.items(): | |
| text = str(v if v is not None else "").strip() | |
| nid = _normalize_nc(str(k)) if kind == "subclaim" else _normalize_sc(str(k)) | |
| if nid: | |
| out[nid] = text | |
| return out | |
| def _parse_claim_superclaim_map(obj: Any) -> dict[str, str]: | |
| """Normalized subclaim_id -> superclaim_id.""" | |
| pairs: list[tuple[str, str]] = [] | |
| if obj is None: | |
| return {} | |
| if isinstance(obj, dict): | |
| keys = list(obj.keys()) | |
| first_key = keys[0] if keys else None | |
| sample = obj[first_key] if first_key is not None else None | |
| is_combined = ( | |
| sample is not None | |
| and isinstance(sample, dict) | |
| and not isinstance(sample, list) | |
| and ( | |
| "superclaim_id" in sample | |
| or "superclaimId" in sample | |
| or "sc_id" in sample | |
| ) | |
| ) | |
| if is_combined: | |
| for sub_id, record in obj.items(): | |
| if not isinstance(record, dict): | |
| continue | |
| sc = ( | |
| record.get("superclaim_id") | |
| or record.get("superclaimId") | |
| or record.get("sc_id") | |
| or record.get("SC") | |
| ) | |
| if sc is None: | |
| continue | |
| pairs.append((_normalize_nc(str(sub_id)), _normalize_sc(str(sc)))) | |
| else: | |
| for nc, sc in obj.items(): | |
| pairs.append((_normalize_nc(str(nc)), _normalize_sc(str(sc)))) | |
| elif isinstance(obj, list): | |
| for item in obj: | |
| if isinstance(item, (list, tuple)) and len(item) >= 2: | |
| pairs.append((_normalize_nc(str(item[0])), _normalize_sc(str(item[1])))) | |
| elif isinstance(item, dict): | |
| nc = ( | |
| item.get("subclaim_id") | |
| or item.get("nc_id") | |
| or item.get("subclaim") | |
| or item.get("NC") | |
| ) | |
| sc = ( | |
| item.get("superclaim_id") | |
| or item.get("sc_id") | |
| or item.get("superclaim") | |
| or item.get("SC") | |
| ) | |
| if nc is None or sc is None: | |
| continue | |
| pairs.append((_normalize_nc(str(nc)), _normalize_sc(str(sc)))) | |
| out: dict[str, str] = {} | |
| for sid, scid in pairs: | |
| if sid and scid: | |
| out[sid] = scid | |
| return out | |
| def _encode_subclaims( | |
| docs: list[str], | |
| embedding_model: Any, | |
| *, | |
| verbose: bool = True, | |
| ) -> np.ndarray: | |
| return embedding_model.encode( | |
| docs, | |
| batch_size=64, | |
| show_progress_bar=verbose, | |
| convert_to_numpy=True, | |
| normalize_embeddings=True, | |
| ) | |
| def fit_tfidf_svd_embeddings( | |
| docs: list[str], | |
| *, | |
| max_features: int = 30000, | |
| n_components: int = 128, | |
| random_state: int = 42, | |
| ) -> tuple[np.ndarray, Any]: | |
| """ | |
| Lightweight embedding: TF-IDF → TruncatedSVD → L2 normalize rows. | |
| Returns (doc_embeddings, encode_fn) where encode_fn(texts) -> dense matrix. | |
| """ | |
| from sklearn.decomposition import TruncatedSVD | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.preprocessing import normalize | |
| min_df = 1 if len(docs) < 80 else 2 | |
| vec = TfidfVectorizer( | |
| max_features=max_features, | |
| min_df=min_df, | |
| max_df=0.92, | |
| ngram_range=(1, 2), | |
| sublinear_tf=True, | |
| ) | |
| X = vec.fit_transform(docs) | |
| n_comp = min(n_components, max(2, X.shape[1] - 1)) | |
| svd = TruncatedSVD(n_components=n_comp, random_state=random_state) | |
| Z = normalize(svd.fit_transform(X)).astype(np.float64) | |
| def encode(texts: list[str]) -> np.ndarray: | |
| Xt = vec.transform(texts) | |
| return normalize(svd.transform(Xt)).astype(np.float64) | |
| return Z, encode | |
| def _cluster_sklearn_dbscan( | |
| embeddings: np.ndarray, | |
| *, | |
| min_topic_size: int, | |
| eps: float, | |
| ) -> np.ndarray: | |
| """Cosine DBSCAN on L2-normalized rows; noise = -1 (same convention as BERTopic outliers).""" | |
| from sklearn.cluster import DBSCAN | |
| if min_topic_size < 2: | |
| min_topic_size = 2 | |
| labels = DBSCAN( | |
| eps=eps, | |
| min_samples=min_topic_size, | |
| metric="cosine", | |
| n_jobs=-1, | |
| ).fit_predict(embeddings) | |
| return np.asarray(labels, dtype=np.int64) | |
| def _fit_bertopic( | |
| docs: list[str], | |
| doc_embeddings: np.ndarray, | |
| embedding_model: Any, | |
| *, | |
| min_topic_size: int, | |
| verbose: bool, | |
| ) -> tuple[np.ndarray, Any]: | |
| from bertopic import BERTopic | |
| topic_model = BERTopic( | |
| embedding_model=embedding_model, | |
| min_topic_size=min_topic_size, | |
| verbose=verbose, | |
| ) | |
| topics, _ = topic_model.fit_transform(docs, embeddings=doc_embeddings) | |
| return np.asarray(topics, dtype=np.int64), topic_model | |
| def cluster_subclaims_topic( | |
| ids: list[str], | |
| docs: list[str], | |
| doc_embeddings: np.ndarray, | |
| embedding_model: Any | None, | |
| *, | |
| min_topic_size: int = 2, | |
| verbose: bool = True, | |
| backend: str = "auto", | |
| dbscan_eps: float = 0.32, | |
| ) -> tuple[np.ndarray, Any | None, str]: | |
| """ | |
| Cluster precomputed subclaim embeddings. | |
| - **bertopic**: needs ``embedding_model`` (SentenceTransformer). | |
| - **sklearn**: DBSCAN (cosine); works with TF-IDF–SVD or any dense rows. | |
| - **auto**: try BERTopic when ``embedding_model`` is set; else sklearn; on failure, sklearn. | |
| Returns ``(topic_ids_per_doc, topic_model_or_none, cluster_backend)``. | |
| """ | |
| if len(ids) != len(docs): | |
| raise ValueError("ids and docs must have the same length") | |
| if len(docs) < 2: | |
| raise ValueError("need at least 2 documents") | |
| be = (backend or "auto").strip().lower() | |
| def _sklearn() -> tuple[np.ndarray, None, str]: | |
| topics = _cluster_sklearn_dbscan( | |
| doc_embeddings, | |
| min_topic_size=min_topic_size, | |
| eps=dbscan_eps, | |
| ) | |
| return topics, None, "sklearn_dbscan" | |
| if be == "sklearn": | |
| t, m, name = _sklearn() | |
| return t, m, name | |
| if be == "bertopic": | |
| if embedding_model is None: | |
| print( | |
| "BERTopic requires sentence-transformers; using sklearn DBSCAN instead.", | |
| file=sys.stderr, | |
| ) | |
| return _sklearn() | |
| try: | |
| topics, tm = _fit_bertopic( | |
| docs, | |
| doc_embeddings, | |
| embedding_model, | |
| min_topic_size=min_topic_size, | |
| verbose=verbose, | |
| ) | |
| return topics, tm, "bertopic" | |
| except Exception as e: | |
| print( | |
| f"BERTopic failed ({e!r}); using sklearn DBSCAN " | |
| f"(eps={dbscan_eps}, min_samples={min_topic_size}).", | |
| file=sys.stderr, | |
| ) | |
| return _sklearn() | |
| # auto | |
| if embedding_model is None: | |
| return _sklearn() | |
| try: | |
| topics, tm = _fit_bertopic( | |
| docs, | |
| doc_embeddings, | |
| embedding_model, | |
| min_topic_size=min_topic_size, | |
| verbose=verbose, | |
| ) | |
| return topics, tm, "bertopic" | |
| except Exception as e: | |
| print( | |
| f"BERTopic unavailable ({e!r}); using sklearn DBSCAN fallback.", | |
| file=sys.stderr, | |
| ) | |
| return _sklearn() | |
| def build_bertopic_subclaim_clusters( | |
| ids: list[str], | |
| docs: list[str], | |
| *, | |
| embedding_model: Any, | |
| min_topic_size: int = 2, | |
| verbose: bool = True, | |
| backend: str = "auto", | |
| dbscan_eps: float = 0.32, | |
| ) -> tuple[np.ndarray, Any | None, np.ndarray, str]: | |
| """ | |
| Encode with SentenceTransformer, then :func:`cluster_subclaims_topic`. | |
| Returns ``(topics, topic_model, doc_embeddings, backend_name)``. | |
| """ | |
| doc_embeddings = _encode_subclaims(docs, embedding_model, verbose=verbose) | |
| topics, tm, name = cluster_subclaims_topic( | |
| ids, | |
| docs, | |
| doc_embeddings, | |
| embedding_model, | |
| min_topic_size=min_topic_size, | |
| verbose=verbose, | |
| backend=backend, | |
| dbscan_eps=dbscan_eps, | |
| ) | |
| return topics, tm, doc_embeddings, name | |
| def subclaim_rows_from_topics( | |
| ids: list[str], | |
| topics: np.ndarray, | |
| topic_model: Any | None, | |
| ) -> dict[str, dict[str, Any]]: | |
| """Map each subclaim id to topic_id, collapse_flag, collapse_with, and optional topic_label.""" | |
| topics_list = [int(t) for t in topics] | |
| by_topic: dict[int, list[str]] = {} | |
| for sid, t in zip(ids, topics_list): | |
| by_topic.setdefault(t, []).append(sid) | |
| subclaims_out: dict[str, dict[str, Any]] = {} | |
| for sid, t in zip(ids, topics_list): | |
| peers = [x for x in by_topic.get(t, []) if x != sid] | |
| if t < 0 or len(peers) == 0: | |
| subclaims_out[sid] = { | |
| "topic_id": t, | |
| "collapse_flag": False, | |
| "collapse_with": [], | |
| } | |
| else: | |
| subclaims_out[sid] = { | |
| "topic_id": t, | |
| "collapse_flag": True, | |
| "collapse_with": sorted(peers), | |
| } | |
| if topic_model is not None: | |
| try: | |
| topic_info = topic_model.get_topic_info() | |
| labels_by_id: dict[int, str] = {} | |
| for _, row in topic_info.iterrows(): | |
| tid = int(row["Topic"]) | |
| if tid < 0: | |
| continue | |
| labels_by_id[tid] = str(row.get("Name", "") or "") | |
| for sid in subclaims_out: | |
| tid = int(subclaims_out[sid]["topic_id"]) | |
| if tid >= 0 and tid in labels_by_id: | |
| subclaims_out[sid]["topic_label"] = labels_by_id[tid] | |
| except Exception: | |
| pass | |
| return subclaims_out | |
| def main() -> int: | |
| parser = argparse.ArgumentParser( | |
| description="Build BERTopic / DBSCAN collapse artifact for subclaims.", | |
| ) | |
| parser.add_argument("--claims-json", type=Path, default=DEFAULT_CLAIMS) | |
| parser.add_argument("--claim-superclaim-map", type=Path, default=DEFAULT_MAP) | |
| parser.add_argument("--superclaims-json", type=Path, default=DEFAULT_SUPERCLAIMS) | |
| parser.add_argument("--codebook-json", type=Path, default=DEFAULT_CODEBOOK) | |
| parser.add_argument("--out", type=Path, default=DEFAULT_OUT) | |
| parser.add_argument("--min-topic-size", type=int, default=2) | |
| parser.add_argument("--embedding-model", default="all-MiniLM-L6-v2") | |
| parser.add_argument( | |
| "--cluster-backend", | |
| choices=("auto", "bertopic", "sklearn"), | |
| default="auto", | |
| help="Topic clustering: BERTopic, sklearn DBSCAN, or auto (try BERTopic then fall back).", | |
| ) | |
| parser.add_argument( | |
| "--dbscan-eps", | |
| type=float, | |
| default=0.32, | |
| help="Cosine DBSCAN eps when using sklearn (smaller = tighter clusters).", | |
| ) | |
| parser.add_argument( | |
| "--embedding-backend", | |
| choices=("auto", "sentence_transformers", "tfidf"), | |
| default="auto", | |
| help="Embeddings: MiniLM via sentence-transformers, TF-IDF+SVD (no PyTorch), or auto.", | |
| ) | |
| args = parser.parse_args() | |
| try: | |
| import sklearn # noqa: F401 | |
| except ImportError: | |
| print("Missing scikit-learn. Install with: pip install scikit-learn numpy", file=sys.stderr) | |
| raise SystemExit(1) | |
| claims_path = args.claims_json | |
| map_path = args.claim_superclaim_map | |
| super_path = args.superclaims_json | |
| codebook_path = args.codebook_json | |
| for label, p in ( | |
| ("Claims JSON", claims_path), | |
| ("claim_superclaim_map.json", map_path), | |
| ("greenwashing_superclaims.json", super_path), | |
| ("greenwashing_codebook.json", codebook_path), | |
| ): | |
| if not p.is_file(): | |
| print(f"{label} not found: {p}", file=sys.stderr) | |
| return 1 | |
| data_bundle_paths = (claims_path, map_path, super_path, codebook_path) | |
| data_hashes = [_file_sha256(p) for p in data_bundle_paths] | |
| bundle_fingerprint = "|".join(data_hashes) | |
| claims_bundle_version = hashlib.sha256(bundle_fingerprint.encode("utf-8")).hexdigest()[:16] | |
| claims_sha = _file_sha256(claims_path) | |
| codebook = _load_id_text_json(codebook_path, "subclaim") | |
| superclaims = _load_id_text_json(super_path, "superclaim") | |
| with open(map_path, encoding="utf-8") as f: | |
| map_raw = json.load(f) | |
| sub_to_super = _parse_claim_superclaim_map(map_raw) | |
| with open(claims_path, encoding="utf-8") as f: | |
| data = json.load(f) | |
| claims = data.get("claims") or {} | |
| claims_version_from_file = data.get("claims_version") | |
| if claims_version_from_file is not None: | |
| claims_version_from_file = str(claims_version_from_file) | |
| ids: list[str] = [] | |
| docs: list[str] = [] | |
| collected: set[str] = set() | |
| def add_sid_doc(sid: str, text: str) -> None: | |
| t = text.strip() | |
| if not sid or not t or sid not in sub_to_super: | |
| return | |
| if sid in collected: | |
| return | |
| collected.add(sid) | |
| ids.append(sid) | |
| docs.append(t) | |
| for claim_id, obj in claims.items(): | |
| sid = ( | |
| str(claim_id) | |
| if str(claim_id).startswith("NC_") | |
| else f"NC_{str(claim_id).replace('NC_', '').replace('SC_', '')}" | |
| ) | |
| text = codebook.get(sid, "").strip() or str((obj or {}).get("current_text") or "").strip() | |
| add_sid_doc(sid, text) | |
| for sid in sub_to_super: | |
| if sid in collected: | |
| continue | |
| text = codebook.get(sid, "").strip() | |
| if text: | |
| add_sid_doc(sid, text) | |
| if len(docs) < 2: | |
| print( | |
| "Need at least 2 mapped subclaims with non-empty text " | |
| "(codebook and/or claim history current_text).", | |
| file=sys.stderr, | |
| ) | |
| return 1 | |
| eb = args.embedding_backend | |
| embedding_model: Any | None = None | |
| encode_fn: Any = None | |
| doc_embeddings: np.ndarray | |
| embedding_backend_used: str | |
| if eb == "tfidf": | |
| print("Embedding: TF-IDF + TruncatedSVD (no sentence-transformers).") | |
| doc_embeddings, encode_fn = fit_tfidf_svd_embeddings(docs) | |
| embedding_backend_used = "tfidf_svd" | |
| if args.cluster_backend == "bertopic": | |
| print("BERTopic needs sentence-transformers embeddings; forcing --cluster-backend sklearn.", file=sys.stderr) | |
| args.cluster_backend = "sklearn" | |
| elif eb == "sentence_transformers": | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| except ImportError: | |
| print( | |
| "sentence-transformers not installed. Use --embedding-backend tfidf " | |
| "or: pip install sentence-transformers", | |
| file=sys.stderr, | |
| ) | |
| raise SystemExit(1) | |
| print(f"Embedding: sentence-transformers ({args.embedding_model})") | |
| embedding_model = SentenceTransformer(args.embedding_model) | |
| doc_embeddings = _encode_subclaims(docs, embedding_model, verbose=True) | |
| encode_fn = lambda texts: embedding_model.encode( # type: ignore[misc] | |
| texts, | |
| batch_size=64, | |
| show_progress_bar=False, | |
| convert_to_numpy=True, | |
| normalize_embeddings=True, | |
| ) | |
| embedding_backend_used = "sentence_transformers" | |
| else: | |
| # auto: prefer sentence-transformers; fall back to TF-IDF | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| print(f"Embedding: sentence-transformers ({args.embedding_model})") | |
| embedding_model = SentenceTransformer(args.embedding_model) | |
| doc_embeddings = _encode_subclaims(docs, embedding_model, verbose=True) | |
| encode_fn = lambda texts: embedding_model.encode( | |
| texts, | |
| batch_size=64, | |
| show_progress_bar=False, | |
| convert_to_numpy=True, | |
| normalize_embeddings=True, | |
| ) | |
| embedding_backend_used = "sentence_transformers" | |
| except Exception as e: | |
| print(f"sentence-transformers unavailable ({e!r}); using TF-IDF + SVD.", file=sys.stderr) | |
| doc_embeddings, encode_fn = fit_tfidf_svd_embeddings(docs) | |
| embedding_model = None | |
| embedding_backend_used = "tfidf_svd" | |
| if args.cluster_backend == "bertopic": | |
| args.cluster_backend = "sklearn" | |
| print(f"claims_bundle_version: {claims_bundle_version}") | |
| print( | |
| f"Clustering {len(docs)} subclaims " | |
| f"(cluster={args.cluster_backend}, embedding={embedding_backend_used}, min_topic_size={args.min_topic_size})…", | |
| ) | |
| topics, topic_model, cluster_backend = cluster_subclaims_topic( | |
| ids, | |
| docs, | |
| doc_embeddings, | |
| embedding_model, | |
| min_topic_size=args.min_topic_size, | |
| verbose=True, | |
| backend=args.cluster_backend, | |
| dbscan_eps=args.dbscan_eps, | |
| ) | |
| print(f"Cluster backend used: {cluster_backend}") | |
| subclaims_out = subclaim_rows_from_topics(ids, topics, topic_model) | |
| sid_to_doc = dict(zip(ids, docs)) | |
| for sid in ids: | |
| scid = sub_to_super.get(sid) | |
| if not scid or sid not in subclaims_out: | |
| continue | |
| sub_text = sid_to_doc.get(sid, "").strip() | |
| super_text = superclaims.get(scid, "").strip() | |
| if not sub_text or not super_text: | |
| continue | |
| pair_emb = encode_fn([sub_text, super_text]) | |
| sim = float(np.dot(pair_emb[0], pair_emb[1])) | |
| sim = max(0.0, min(1.0, sim)) | |
| subclaims_out[sid]["hierarchy_confidence"] = round(sim, 4) | |
| subclaims_out[sid]["superclaim_id"] = scid | |
| out_obj: dict[str, Any] = { | |
| "generated_at": datetime.now(timezone.utc).isoformat(), | |
| "claims_bundle_version": claims_bundle_version, | |
| "claims_source_sha256": claims_sha, | |
| "data_files_sha256": data_hashes, | |
| "claims_version": claims_version_from_file, | |
| "cluster_backend": cluster_backend, | |
| "embedding_backend": embedding_backend_used, | |
| "embedding_model": args.embedding_model if embedding_backend_used == "sentence_transformers" else None, | |
| "min_topic_size": args.min_topic_size, | |
| "dbscan_eps": args.dbscan_eps if cluster_backend == "sklearn_dbscan" else None, | |
| "claims_source": os.path.basename(str(claims_path)), | |
| "subclaims": subclaims_out, | |
| } | |
| out_path = args.out | |
| out_path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(out_path, "w", encoding="utf-8") as f: | |
| json.dump(out_obj, f, indent=2, ensure_ascii=False) | |
| flagged = sum(1 for v in subclaims_out.values() if v.get("collapse_flag")) | |
| with_hier = sum(1 for v in subclaims_out.values() if "hierarchy_confidence" in v) | |
| print(f"Wrote {out_path}") | |
| print(f" collapse_flag=true: {flagged} subclaims; hierarchy_confidence: {with_hier} subclaims") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |