"""Exploratory embedding analysis for serialized AX-CPT representations. This script reads deterministic text representations produced by scripts/rebuild_release_outputs.py and computes local hashed token n-gram embeddings. These are actual vector embeddings of the serialized text, but they are not neural model embeddings and they are not latent model hidden states. """ from __future__ import annotations import argparse import csv import json import math import re import zlib from collections import defaultdict from pathlib import Path import numpy as np MODEL_NAME = "local_hashing_token_ngram_v1" EMBEDDING_DIM = 256 NGRAM_RANGE = (1, 2) TOKEN_RE = re.compile(r"[a-z0-9_]+") def read_jsonl(path: Path) -> list[dict[str, object]]: rows: list[dict[str, object]] = [] with path.open("r", encoding="utf-8") as handle: for line in handle: line = line.strip() if line: rows.append(json.loads(line)) return rows def write_csv(path: Path, rows: list[dict[str, object]], fieldnames: list[str]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", newline="", encoding="utf-8") as handle: writer = csv.DictWriter(handle, fieldnames=fieldnames) writer.writeheader() writer.writerows(rows) def tokenize(text: str) -> list[str]: return TOKEN_RE.findall(text.lower()) def token_ngrams(tokens: list[str], ngram_range: tuple[int, int]) -> list[str]: min_n, max_n = ngram_range out: list[str] = [] for n in range(min_n, max_n + 1): if n <= 0 or len(tokens) < n: continue for i in range(0, len(tokens) - n + 1): out.append(" ".join(tokens[i : i + n])) return out def stable_bucket(feature: str, dim: int) -> tuple[int, float]: data = feature.encode("utf-8") bucket = zlib.crc32(data) % dim sign = 1.0 if (zlib.crc32(b"sign:" + data) % 2 == 0) else -1.0 return bucket, sign def embed_text(text: str, dim: int = EMBEDDING_DIM) -> np.ndarray: vector = np.zeros(dim, dtype=np.float32) features = token_ngrams(tokenize(text), NGRAM_RANGE) for feature in features: bucket, sign = stable_bucket(feature, dim) vector[bucket] += sign norm = float(np.linalg.norm(vector)) if norm > 0: vector /= norm return vector def embed_rows(rows: list[dict[str, object]], dim: int = EMBEDDING_DIM) -> np.ndarray: embeddings = np.zeros((len(rows), dim), dtype=np.float32) for idx, row in enumerate(rows): embeddings[idx] = embed_text(str(row["serialized_text"]), dim=dim) return embeddings def save_embedding_bundle(path: Path, rows: list[dict[str, object]], embeddings: np.ndarray) -> None: path.parent.mkdir(parents=True, exist_ok=True) np.savez_compressed( path, representation_id=np.array([str(row["representation_id"]) for row in rows]), dataset=np.array([str(row["dataset"]) for row in rows]), condition=np.array([str(row["condition"]) for row in rows]), embedding=embeddings.astype(np.float32), ) def metadata_rows(rows: list[dict[str, object]]) -> list[dict[str, object]]: out: list[dict[str, object]] = [] for idx, row in enumerate(rows): out.append( { "row_idx": idx, "representation_id": row["representation_id"], "representation_level": row["representation_level"], "dataset": row["dataset"], "condition": row["condition"], "n_trials": row.get("n_trials", ""), "window_size": row.get("window_size", ""), "window_start_trial_idx": row.get("window_start_trial_idx", ""), "window_end_trial_idx": row.get("window_end_trial_idx", ""), } ) return out def condition_vector_rows(rows: list[dict[str, object]], embeddings: np.ndarray) -> list[dict[str, object]]: out: list[dict[str, object]] = [] for idx, row in enumerate(rows): out.append( { "row_idx": idx, "representation_id": row["representation_id"], "dataset": row["dataset"], "condition": row["condition"], "embedding_model": MODEL_NAME, "embedding_dim": embeddings.shape[1], "embedding_vector_json": json.dumps([round(float(x), 8) for x in embeddings[idx]], separators=(",", ":")), } ) return out def cosine_pair_summary( rows: list[dict[str, object]], embeddings: np.ndarray, label: str, ) -> list[dict[str, object]]: groups: dict[tuple[str, str], list[int]] = defaultdict(list) for idx, row in enumerate(rows): groups[(str(row["dataset"]), str(row["condition"]))].append(idx) out: list[dict[str, object]] = [] keys = sorted(groups) for i, key_a in enumerate(keys): idx_a = groups[key_a] emb_a = embeddings[idx_a] for key_b in keys[i:]: idx_b = groups[key_b] emb_b = embeddings[idx_b] sims = emb_a @ emb_b.T if key_a == key_b: if len(idx_a) < 2: values = np.array([], dtype=np.float32) else: mask = np.triu(np.ones(sims.shape, dtype=bool), k=1) values = sims[mask] else: values = sims.reshape(-1) if values.size == 0: mean_cos = min_cos = max_cos = std_cos = mean_dist = None else: mean_cos = float(values.mean()) min_cos = float(values.min()) max_cos = float(values.max()) std_cos = float(values.std(ddof=0)) mean_dist = float((1.0 - values).mean()) out.append( { "representation_level": label, "dataset_a": key_a[0], "condition_a": key_a[1], "dataset_b": key_b[0], "condition_b": key_b[1], "pair_type": "within_condition" if key_a == key_b else "between_condition", "n_pairs": int(values.size), "mean_cosine_similarity": round(mean_cos, 6) if mean_cos is not None else "", "mean_cosine_distance": round(mean_dist, 6) if mean_dist is not None else "", "min_cosine_similarity": round(min_cos, 6) if min_cos is not None else "", "max_cosine_similarity": round(max_cos, 6) if max_cos is not None else "", "std_cosine_similarity": round(std_cos, 6) if std_cos is not None else "", } ) return out def pca_2d(embeddings: np.ndarray) -> tuple[np.ndarray, list[float]]: if embeddings.shape[0] == 0: return np.zeros((0, 2), dtype=np.float32), [0.0, 0.0] centered = embeddings.astype(np.float64) - embeddings.astype(np.float64).mean(axis=0, keepdims=True) _, singular_values, vt = np.linalg.svd(centered, full_matrices=False) components = vt[:2].copy() for component_idx in range(components.shape[0]): pivot = int(np.argmax(np.abs(components[component_idx]))) if components[component_idx, pivot] < 0: components[component_idx] *= -1 coords = centered @ components.T total_variance = float((singular_values**2).sum()) explained = [] for idx in range(2): if idx < len(singular_values) and total_variance > 0: explained.append(float((singular_values[idx] ** 2) / total_variance)) else: explained.append(0.0) return coords.astype(np.float32), explained def projection_rows( rows: list[dict[str, object]], coords: np.ndarray, explained: list[float], ) -> list[dict[str, object]]: out: list[dict[str, object]] = [] for idx, row in enumerate(rows): out.append( { "row_idx": idx, "representation_id": row["representation_id"], "representation_level": row["representation_level"], "dataset": row["dataset"], "condition": row["condition"], "window_size": row.get("window_size", ""), "window_start_trial_idx": row.get("window_start_trial_idx", ""), "window_end_trial_idx": row.get("window_end_trial_idx", ""), "pc1": round(float(coords[idx, 0]), 8), "pc2": round(float(coords[idx, 1]), 8), "pc1_explained_variance_ratio": round(explained[0], 8), "pc2_explained_variance_ratio": round(explained[1], 8), } ) return out def write_report( path: Path, condition_rows: list[dict[str, object]], sliding_rows: list[dict[str, object]], condition_explained: list[float], sliding_explained: list[float], ) -> None: report = f"""# Exploratory Embedding Analysis This is a compact exploratory analysis of serialized AX-CPT representations. It should not be treated as evidence about latent model states or mechanistic representations. ## Inputs - `outputs/condition_level_representations.jsonl`: {len(condition_rows)} rows. - `outputs/sliding_window_representations.jsonl`: {len(sliding_rows)} rows. Trial-level representations are not embedded in this first pass. ## Embedding Model - Model/library: `{MODEL_NAME}` implemented locally in `scripts/run_embedding_analysis.py`. - Dependency: `numpy` for vector math, cosine similarity, and PCA. - Text processing: lowercase alphanumeric tokenization with regex `{TOKEN_RE.pattern}`. - Features: token unigrams and bigrams. - Vectorization: deterministic signed feature hashing with CRC32 into {EMBEDDING_DIM} dimensions. - Normalization: L2 normalization per row. These are actual text-derived embedding vectors for the serialized representations. They are not neural embeddings, latent model embeddings, hidden states, logits, probabilities, reaction times, costs, or latency measurements. ## Similarity Cosine similarity and cosine distance are computed on L2-normalized hashed text embeddings. Summary files report within-condition and between-condition comparisons. Similarities reflect overlap in the serialized representation text and should be interpreted cautiously. ## Projection 2D projections use deterministic PCA via `numpy.linalg.svd` on centered embedding matrices. Component signs are fixed by forcing the largest absolute component loading to be positive. - Condition-level PCA explained variance ratio: PC1={condition_explained[0]:.6f}, PC2={condition_explained[1]:.6f} - Sliding-window PCA explained variance ratio: PC1={sliding_explained[0]:.6f}, PC2={sliding_explained[1]:.6f} ## Outputs - `condition_embeddings.npz` - `condition_embedding_vectors.csv` - `condition_embedding_metadata.csv` - `condition_embedding_similarity_pairs.csv` - `condition_embedding_projection_2d.csv` - `sliding_window_embeddings.npz` - `sliding_window_embedding_metadata.csv` - `sliding_window_embedding_similarity_summary.csv` - `sliding_window_embedding_projection_2d.csv` - `embedding_model_config.json` """ path.parent.mkdir(parents=True, exist_ok=True) path.write_text(report, encoding="utf-8") def main() -> int: parser = argparse.ArgumentParser(description="Run exploratory embedding analysis for AX-CPT representations.") parser.add_argument("--input-dir", type=Path, default=Path("outputs")) parser.add_argument("--output-dir", type=Path, default=Path("outputs/embedding_analysis")) parser.add_argument("--dim", type=int, default=EMBEDDING_DIM) args = parser.parse_args() condition_rows = read_jsonl(args.input_dir / "condition_level_representations.jsonl") sliding_rows = read_jsonl(args.input_dir / "sliding_window_representations.jsonl") args.output_dir.mkdir(parents=True, exist_ok=True) condition_embeddings = embed_rows(condition_rows, dim=args.dim) sliding_embeddings = embed_rows(sliding_rows, dim=args.dim) save_embedding_bundle(args.output_dir / "condition_embeddings.npz", condition_rows, condition_embeddings) save_embedding_bundle(args.output_dir / "sliding_window_embeddings.npz", sliding_rows, sliding_embeddings) metadata_fields = [ "row_idx", "representation_id", "representation_level", "dataset", "condition", "n_trials", "window_size", "window_start_trial_idx", "window_end_trial_idx", ] write_csv(args.output_dir / "condition_embedding_metadata.csv", metadata_rows(condition_rows), metadata_fields) write_csv(args.output_dir / "sliding_window_embedding_metadata.csv", metadata_rows(sliding_rows), metadata_fields) write_csv( args.output_dir / "condition_embedding_vectors.csv", condition_vector_rows(condition_rows, condition_embeddings), [ "row_idx", "representation_id", "dataset", "condition", "embedding_model", "embedding_dim", "embedding_vector_json", ], ) similarity_fields = [ "representation_level", "dataset_a", "condition_a", "dataset_b", "condition_b", "pair_type", "n_pairs", "mean_cosine_similarity", "mean_cosine_distance", "min_cosine_similarity", "max_cosine_similarity", "std_cosine_similarity", ] write_csv( args.output_dir / "condition_embedding_similarity_pairs.csv", cosine_pair_summary(condition_rows, condition_embeddings, "condition"), similarity_fields, ) write_csv( args.output_dir / "sliding_window_embedding_similarity_summary.csv", cosine_pair_summary(sliding_rows, sliding_embeddings, "sliding_window"), similarity_fields, ) condition_coords, condition_explained = pca_2d(condition_embeddings) sliding_coords, sliding_explained = pca_2d(sliding_embeddings) projection_fields = [ "row_idx", "representation_id", "representation_level", "dataset", "condition", "window_size", "window_start_trial_idx", "window_end_trial_idx", "pc1", "pc2", "pc1_explained_variance_ratio", "pc2_explained_variance_ratio", ] write_csv( args.output_dir / "condition_embedding_projection_2d.csv", projection_rows(condition_rows, condition_coords, condition_explained), projection_fields, ) write_csv( args.output_dir / "sliding_window_embedding_projection_2d.csv", projection_rows(sliding_rows, sliding_coords, sliding_explained), projection_fields, ) config = { "analysis_label": "exploratory_embedding_analysis", "embedding_model": MODEL_NAME, "embedding_dim": args.dim, "library": "numpy", "tokenizer_regex": TOKEN_RE.pattern, "ngram_range": list(NGRAM_RANGE), "hash_function": "zlib.crc32 signed feature hashing", "normalization": "l2", "projection": "PCA via numpy.linalg.svd with deterministic component sign convention", "inputs": { "condition_level": str(args.input_dir / "condition_level_representations.jsonl"), "sliding_window": str(args.input_dir / "sliding_window_representations.jsonl"), }, "not_included": [ "neural embeddings", "latent model hidden states", "logits", "probabilities", "reaction times", "API costs", "latency measurements", ], } (args.output_dir / "embedding_model_config.json").write_text( json.dumps(config, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8", ) write_report( args.output_dir / "exploratory_embedding_report.md", condition_rows=condition_rows, sliding_rows=sliding_rows, condition_explained=condition_explained, sliding_explained=sliding_explained, ) print(f"Embedded {len(condition_rows)} condition-level rows.") print(f"Embedded {len(sliding_rows)} sliding-window rows.") print(f"Wrote exploratory embedding outputs to {args.output_dir}") return 0 if __name__ == "__main__": raise SystemExit(main())