"""Leakage-reduced exploratory embedding analysis for AX-CPT representations. This second-pass analysis removes explicit dataset labels, condition labels, and direct DCM indicator fields from the serialized text before embedding. It keeps original labels only as non-embedded metadata for grouping summaries. """ from __future__ import annotations import argparse import csv import json from pathlib import Path import numpy as np from run_embedding_analysis import ( MODEL_NAME, TOKEN_RE, condition_vector_rows, cosine_pair_summary, embed_rows, metadata_rows, pca_2d, projection_rows, read_jsonl, save_embedding_bundle, write_csv, ) MASKED_MODEL_NAME = f"{MODEL_NAME}_leakage_reduced_input_v1" REMOVED_FIELDS = { "dataset", "condition", "dcm_invocation_rate", "use_dcm", "dcm_invoked", } def write_jsonl(path: Path, rows: list[dict[str, object]]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as handle: for row in rows: handle.write(json.dumps(row, ensure_ascii=False, sort_keys=True) + "\n") def mask_pipe_fields(piece: str) -> str: kept: list[str] = [] for field in piece.split("|"): field = field.strip() if not field: continue key = field.split("=", 1)[0] if key in REMOVED_FIELDS: continue kept.append(field) return "|".join(kept) def mask_serialized_text(text: str) -> str: if " || trials: " in text: header, trial_text = text.split(" || trials: ", 1) masked_header = mask_pipe_fields(header) masked_trials = " ; ".join(mask_pipe_fields(piece) for piece in trial_text.split(" ; ")) return f"{masked_header} || trials: {masked_trials}" return " ; ".join(mask_pipe_fields(piece) for piece in text.split(" ; ")) def make_analysis_rows(rows: list[dict[str, object]]) -> list[dict[str, object]]: masked: list[dict[str, object]] = [] for row in rows: new_row = dict(row) new_row["serialized_text"] = mask_serialized_text(str(row["serialized_text"])) masked.append(new_row) return masked def make_masked_representation_rows(rows: list[dict[str, object]], prefix: str) -> list[dict[str, object]]: masked: list[dict[str, object]] = [] for idx, row in enumerate(rows): item = { "masked_representation_id": f"{prefix}::{idx:06d}", "representation_level": row["representation_level"], "n_trials": row.get("n_trials", ""), "serialized_text": mask_serialized_text(str(row["serialized_text"])), } if row["representation_level"] == "sliding_window": item.update( { "window_size": row.get("window_size", ""), "window_start_trial_idx": row.get("window_start_trial_idx", ""), "window_end_trial_idx": row.get("window_end_trial_idx", ""), } ) masked.append(item) return masked def masked_condition_vector_rows(rows: list[dict[str, object]], embeddings: np.ndarray) -> list[dict[str, object]]: out = condition_vector_rows(rows, embeddings) for row in out: row["embedding_model"] = MASKED_MODEL_NAME return out def write_masking_report(path: Path, condition_count: int, sliding_count: int) -> None: report = f"""# Leakage-Reduced Embedding Analysis This is a second-pass exploratory embedding analysis. It tests whether broad separation remains after removing obvious metadata from the text passed into the embedding step. ## Inputs - `outputs/condition_level_representations.jsonl`: {condition_count} rows. - `outputs/sliding_window_representations.jsonl`: {sliding_count} rows. ## Removed Or Masked From Embedded Text The following pipe-delimited serialized text fields are removed before embedding: - `dataset` - `condition` - `dcm_invocation_rate` - `use_dcm` - `dcm_invoked` Original dataset and condition labels are retained only outside the embedded text as grouping metadata for similarity summaries and plots. ## Still Present In Embedded Text The masked text may still contain trial type, cue, probe, parsed response, correctness, invalid-response flags, previous-trial type/correctness, distractor count, context window, and sequence/order structure. These remaining fields can still distinguish experimental families or conditions. ## Embedding Model - Model/library: `{MASKED_MODEL_NAME}` using the same local hashed token n-gram vectorizer as `scripts/run_embedding_analysis.py`. - Text processing: lowercase alphanumeric tokenization with regex `{TOKEN_RE.pattern}`. - Vectorization: deterministic signed CRC32 feature hashing into 256 dimensions. - Normalization: L2 normalization. - Projection: deterministic PCA via `numpy.linalg.svd`. This is not a neural embedding, latent model state analysis, logit analysis, probability analysis, reaction-time analysis, or cost/latency analysis. """ path.parent.mkdir(parents=True, exist_ok=True) path.write_text(report, encoding="utf-8") def write_report( path: Path, condition_count: int, sliding_count: int, condition_explained: list[float], sliding_explained: list[float], condition_group_summary: list[dict[str, object]], sliding_group_summary: list[dict[str, object]], ) -> None: def summary_lines(rows: list[dict[str, object]]) -> str: lines = [] for row in rows: lines.append( f"- {row['comparison_group']}: mean={row['mean_cosine_similarity']}, " f"min={row['min_cosine_similarity']}, max={row['max_cosine_similarity']}, n={row['n_pairs']}" ) return "\n".join(lines) report = f"""# Leakage-Reduced Exploratory Embedding Report This analysis recomputes embeddings after removing explicit dataset labels, condition labels, and direct DCM indicator fields from the serialized text. ## Rows Embedded - Condition-level rows: {condition_count} - Sliding-window rows: {sliding_count} - Trial-level rows: not embedded in this pass. ## PCA Summary - Condition-level PCA explained variance ratio: PC1={condition_explained[0]:.6f}, PC2={condition_explained[1]:.6f} - Sliding-window PCA explained variance ratio: PC1={sliding_explained[0]:.6f}, PC2={sliding_explained[1]:.6f} ## Similarity Group Summary Condition-level: {summary_lines(condition_group_summary)} Sliding-window: {summary_lines(sliding_group_summary)} ## Interpretation Scope If separation remains, it should be interpreted as separation in the remaining serialized observable fields, not as evidence about latent model internals. Remaining fields include response symbols, correctness/invalid flags, context-window values, trial order, and AX-CPT event sequences. """ path.parent.mkdir(parents=True, exist_ok=True) path.write_text(report, encoding="utf-8") def condition_category(row: dict[str, object]) -> str | None: dataset_a = str(row["dataset_a"]) dataset_b = str(row["dataset_b"]) condition_a = str(row["condition_a"]) condition_b = str(row["condition_b"]) if row["pair_type"] == "within_condition": return "within_condition" if dataset_a == "axcpt_v4b" and dataset_b == "axcpt_v4b": return "v4b_vs_v4b" if dataset_a != dataset_b: return "v4b_vs_v5" if dataset_a == "axcpt_v5_dcm" and dataset_b == "axcpt_v5_dcm": a_is_base = condition_a.endswith("_BASE") b_is_base = condition_b.endswith("_BASE") a_is_dcm = condition_a.endswith("_DCM") b_is_dcm = condition_b.endswith("_DCM") if a_is_base and b_is_base: return "v5_base_vs_base" if a_is_dcm and b_is_dcm: return "v5_dcm_vs_dcm" if (a_is_base and b_is_dcm) or (a_is_dcm and b_is_base): return "v5_base_vs_dcm" return "v5_other" return None def build_group_summary(rows: list[dict[str, object]], level: str) -> list[dict[str, object]]: grouped: dict[str, list[float]] = {} pair_counts: dict[str, int] = {} for row in rows: category = condition_category(row) if category is None or row["mean_cosine_similarity"] == "": continue grouped.setdefault(category, []).append(float(row["mean_cosine_similarity"])) pair_counts[category] = pair_counts.get(category, 0) + int(row["n_pairs"]) out: list[dict[str, object]] = [] for category in [ "within_condition", "v4b_vs_v4b", "v5_base_vs_base", "v5_dcm_vs_dcm", "v5_base_vs_dcm", "v4b_vs_v5", ]: values = grouped.get(category) if not values: continue out.append( { "representation_level": level, "comparison_group": category, "n_summary_rows": len(values), "n_pairs": pair_counts[category], "mean_cosine_similarity": round(sum(values) / len(values), 6), "min_cosine_similarity": round(min(values), 6), "max_cosine_similarity": round(max(values), 6), } ) return out def main() -> int: parser = argparse.ArgumentParser(description="Run leakage-reduced exploratory embedding analysis.") parser.add_argument("--input-dir", type=Path, default=Path("outputs")) parser.add_argument("--output-dir", type=Path, default=Path("outputs/embedding_analysis_leakage_reduced")) parser.add_argument("--dim", type=int, default=256) args = parser.parse_args() condition_rows = read_jsonl(args.input_dir / "condition_level_representations.jsonl") sliding_rows = read_jsonl(args.input_dir / "sliding_window_representations.jsonl") condition_analysis_rows = make_analysis_rows(condition_rows) sliding_analysis_rows = make_analysis_rows(sliding_rows) args.output_dir.mkdir(parents=True, exist_ok=True) write_jsonl( args.output_dir / "leakage_reduced_condition_level_representations.jsonl", make_masked_representation_rows(condition_rows, "masked_condition"), ) write_jsonl( args.output_dir / "leakage_reduced_sliding_window_representations.jsonl", make_masked_representation_rows(sliding_rows, "masked_sliding_window"), ) condition_embeddings = embed_rows(condition_analysis_rows, dim=args.dim) sliding_embeddings = embed_rows(sliding_analysis_rows, dim=args.dim) save_embedding_bundle(args.output_dir / "condition_embeddings.npz", condition_analysis_rows, condition_embeddings) save_embedding_bundle(args.output_dir / "sliding_window_embeddings.npz", sliding_analysis_rows, sliding_embeddings) metadata_fields = [ "row_idx", "representation_id", "representation_level", "dataset", "condition", "n_trials", "window_size", "window_start_trial_idx", "window_end_trial_idx", ] write_csv(args.output_dir / "condition_embedding_metadata.csv", metadata_rows(condition_analysis_rows), metadata_fields) write_csv(args.output_dir / "sliding_window_embedding_metadata.csv", metadata_rows(sliding_analysis_rows), metadata_fields) write_csv( args.output_dir / "condition_embedding_vectors.csv", masked_condition_vector_rows(condition_analysis_rows, condition_embeddings), [ "row_idx", "representation_id", "dataset", "condition", "embedding_model", "embedding_dim", "embedding_vector_json", ], ) similarity_fields = [ "representation_level", "dataset_a", "condition_a", "dataset_b", "condition_b", "pair_type", "n_pairs", "mean_cosine_similarity", "mean_cosine_distance", "min_cosine_similarity", "max_cosine_similarity", "std_cosine_similarity", ] condition_similarity = cosine_pair_summary(condition_analysis_rows, condition_embeddings, "condition_leakage_reduced") sliding_similarity = cosine_pair_summary(sliding_analysis_rows, sliding_embeddings, "sliding_window_leakage_reduced") write_csv(args.output_dir / "condition_embedding_similarity_pairs.csv", condition_similarity, similarity_fields) write_csv(args.output_dir / "sliding_window_embedding_similarity_summary.csv", sliding_similarity, similarity_fields) condition_group_summary = build_group_summary(condition_similarity, "condition_leakage_reduced") sliding_group_summary = build_group_summary(sliding_similarity, "sliding_window_leakage_reduced") group_summary_fields = [ "representation_level", "comparison_group", "n_summary_rows", "n_pairs", "mean_cosine_similarity", "min_cosine_similarity", "max_cosine_similarity", ] write_csv( args.output_dir / "leakage_reduced_similarity_group_summary.csv", condition_group_summary + sliding_group_summary, group_summary_fields, ) condition_coords, condition_explained = pca_2d(condition_embeddings) sliding_coords, sliding_explained = pca_2d(sliding_embeddings) projection_fields = [ "row_idx", "representation_id", "representation_level", "dataset", "condition", "window_size", "window_start_trial_idx", "window_end_trial_idx", "pc1", "pc2", "pc1_explained_variance_ratio", "pc2_explained_variance_ratio", ] write_csv( args.output_dir / "condition_embedding_projection_2d.csv", projection_rows(condition_analysis_rows, condition_coords, condition_explained), projection_fields, ) write_csv( args.output_dir / "sliding_window_embedding_projection_2d.csv", projection_rows(sliding_analysis_rows, sliding_coords, sliding_explained), projection_fields, ) config = { "analysis_label": "leakage_reduced_exploratory_embedding_analysis", "embedding_model": MASKED_MODEL_NAME, "embedding_dim": args.dim, "library": "numpy", "removed_from_embedded_text": sorted(REMOVED_FIELDS), "labels_retained_only_for_grouping": ["dataset", "condition"], "inputs": { "condition_level": str(args.input_dir / "condition_level_representations.jsonl"), "sliding_window": str(args.input_dir / "sliding_window_representations.jsonl"), }, "masked_outputs": { "condition_level": str(args.output_dir / "leakage_reduced_condition_level_representations.jsonl"), "sliding_window": str(args.output_dir / "leakage_reduced_sliding_window_representations.jsonl"), }, } (args.output_dir / "embedding_model_config.json").write_text( json.dumps(config, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8", ) write_masking_report( args.output_dir / "leakage_reduced_masking_report.md", condition_count=len(condition_analysis_rows), sliding_count=len(sliding_analysis_rows), ) write_report( args.output_dir / "leakage_reduced_embedding_report.md", condition_count=len(condition_analysis_rows), sliding_count=len(sliding_analysis_rows), condition_explained=condition_explained, sliding_explained=sliding_explained, condition_group_summary=condition_group_summary, sliding_group_summary=sliding_group_summary, ) print(f"Embedded {len(condition_analysis_rows)} leakage-reduced condition-level rows.") print(f"Embedded {len(sliding_analysis_rows)} leakage-reduced sliding-window rows.") print(f"Wrote leakage-reduced embedding outputs to {args.output_dir}") return 0 if __name__ == "__main__": raise SystemExit(main())