| """Exploratory embedding analysis for serialized AX-CPT representations. |
| |
| This script reads deterministic text representations produced by |
| scripts/rebuild_release_outputs.py and computes local hashed token n-gram |
| embeddings. These are actual vector embeddings of the serialized text, but they |
| are not neural model embeddings and they are not latent model hidden states. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import csv |
| import json |
| import math |
| import re |
| import zlib |
| from collections import defaultdict |
| from pathlib import Path |
|
|
| import numpy as np |
|
|
|
|
| MODEL_NAME = "local_hashing_token_ngram_v1" |
| EMBEDDING_DIM = 256 |
| NGRAM_RANGE = (1, 2) |
| TOKEN_RE = re.compile(r"[a-z0-9_]+") |
|
|
|
|
| def read_jsonl(path: Path) -> list[dict[str, object]]: |
| rows: list[dict[str, object]] = [] |
| with path.open("r", encoding="utf-8") as handle: |
| for line in handle: |
| line = line.strip() |
| if line: |
| rows.append(json.loads(line)) |
| return rows |
|
|
|
|
| def write_csv(path: Path, rows: list[dict[str, object]], fieldnames: list[str]) -> None: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| with path.open("w", newline="", encoding="utf-8") as handle: |
| writer = csv.DictWriter(handle, fieldnames=fieldnames) |
| writer.writeheader() |
| writer.writerows(rows) |
|
|
|
|
| def tokenize(text: str) -> list[str]: |
| return TOKEN_RE.findall(text.lower()) |
|
|
|
|
| def token_ngrams(tokens: list[str], ngram_range: tuple[int, int]) -> list[str]: |
| min_n, max_n = ngram_range |
| out: list[str] = [] |
| for n in range(min_n, max_n + 1): |
| if n <= 0 or len(tokens) < n: |
| continue |
| for i in range(0, len(tokens) - n + 1): |
| out.append(" ".join(tokens[i : i + n])) |
| return out |
|
|
|
|
| def stable_bucket(feature: str, dim: int) -> tuple[int, float]: |
| data = feature.encode("utf-8") |
| bucket = zlib.crc32(data) % dim |
| sign = 1.0 if (zlib.crc32(b"sign:" + data) % 2 == 0) else -1.0 |
| return bucket, sign |
|
|
|
|
| def embed_text(text: str, dim: int = EMBEDDING_DIM) -> np.ndarray: |
| vector = np.zeros(dim, dtype=np.float32) |
| features = token_ngrams(tokenize(text), NGRAM_RANGE) |
| for feature in features: |
| bucket, sign = stable_bucket(feature, dim) |
| vector[bucket] += sign |
|
|
| norm = float(np.linalg.norm(vector)) |
| if norm > 0: |
| vector /= norm |
| return vector |
|
|
|
|
| def embed_rows(rows: list[dict[str, object]], dim: int = EMBEDDING_DIM) -> np.ndarray: |
| embeddings = np.zeros((len(rows), dim), dtype=np.float32) |
| for idx, row in enumerate(rows): |
| embeddings[idx] = embed_text(str(row["serialized_text"]), dim=dim) |
| return embeddings |
|
|
|
|
| def save_embedding_bundle(path: Path, rows: list[dict[str, object]], embeddings: np.ndarray) -> None: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| np.savez_compressed( |
| path, |
| representation_id=np.array([str(row["representation_id"]) for row in rows]), |
| dataset=np.array([str(row["dataset"]) for row in rows]), |
| condition=np.array([str(row["condition"]) for row in rows]), |
| embedding=embeddings.astype(np.float32), |
| ) |
|
|
|
|
| def metadata_rows(rows: list[dict[str, object]]) -> list[dict[str, object]]: |
| out: list[dict[str, object]] = [] |
| for idx, row in enumerate(rows): |
| out.append( |
| { |
| "row_idx": idx, |
| "representation_id": row["representation_id"], |
| "representation_level": row["representation_level"], |
| "dataset": row["dataset"], |
| "condition": row["condition"], |
| "n_trials": row.get("n_trials", ""), |
| "window_size": row.get("window_size", ""), |
| "window_start_trial_idx": row.get("window_start_trial_idx", ""), |
| "window_end_trial_idx": row.get("window_end_trial_idx", ""), |
| } |
| ) |
| return out |
|
|
|
|
| def condition_vector_rows(rows: list[dict[str, object]], embeddings: np.ndarray) -> list[dict[str, object]]: |
| out: list[dict[str, object]] = [] |
| for idx, row in enumerate(rows): |
| out.append( |
| { |
| "row_idx": idx, |
| "representation_id": row["representation_id"], |
| "dataset": row["dataset"], |
| "condition": row["condition"], |
| "embedding_model": MODEL_NAME, |
| "embedding_dim": embeddings.shape[1], |
| "embedding_vector_json": json.dumps([round(float(x), 8) for x in embeddings[idx]], separators=(",", ":")), |
| } |
| ) |
| return out |
|
|
|
|
| def cosine_pair_summary( |
| rows: list[dict[str, object]], |
| embeddings: np.ndarray, |
| label: str, |
| ) -> list[dict[str, object]]: |
| groups: dict[tuple[str, str], list[int]] = defaultdict(list) |
| for idx, row in enumerate(rows): |
| groups[(str(row["dataset"]), str(row["condition"]))].append(idx) |
|
|
| out: list[dict[str, object]] = [] |
| keys = sorted(groups) |
| for i, key_a in enumerate(keys): |
| idx_a = groups[key_a] |
| emb_a = embeddings[idx_a] |
| for key_b in keys[i:]: |
| idx_b = groups[key_b] |
| emb_b = embeddings[idx_b] |
| sims = emb_a @ emb_b.T |
|
|
| if key_a == key_b: |
| if len(idx_a) < 2: |
| values = np.array([], dtype=np.float32) |
| else: |
| mask = np.triu(np.ones(sims.shape, dtype=bool), k=1) |
| values = sims[mask] |
| else: |
| values = sims.reshape(-1) |
|
|
| if values.size == 0: |
| mean_cos = min_cos = max_cos = std_cos = mean_dist = None |
| else: |
| mean_cos = float(values.mean()) |
| min_cos = float(values.min()) |
| max_cos = float(values.max()) |
| std_cos = float(values.std(ddof=0)) |
| mean_dist = float((1.0 - values).mean()) |
|
|
| out.append( |
| { |
| "representation_level": label, |
| "dataset_a": key_a[0], |
| "condition_a": key_a[1], |
| "dataset_b": key_b[0], |
| "condition_b": key_b[1], |
| "pair_type": "within_condition" if key_a == key_b else "between_condition", |
| "n_pairs": int(values.size), |
| "mean_cosine_similarity": round(mean_cos, 6) if mean_cos is not None else "", |
| "mean_cosine_distance": round(mean_dist, 6) if mean_dist is not None else "", |
| "min_cosine_similarity": round(min_cos, 6) if min_cos is not None else "", |
| "max_cosine_similarity": round(max_cos, 6) if max_cos is not None else "", |
| "std_cosine_similarity": round(std_cos, 6) if std_cos is not None else "", |
| } |
| ) |
| return out |
|
|
|
|
| def pca_2d(embeddings: np.ndarray) -> tuple[np.ndarray, list[float]]: |
| if embeddings.shape[0] == 0: |
| return np.zeros((0, 2), dtype=np.float32), [0.0, 0.0] |
|
|
| centered = embeddings.astype(np.float64) - embeddings.astype(np.float64).mean(axis=0, keepdims=True) |
| _, singular_values, vt = np.linalg.svd(centered, full_matrices=False) |
| components = vt[:2].copy() |
|
|
| for component_idx in range(components.shape[0]): |
| pivot = int(np.argmax(np.abs(components[component_idx]))) |
| if components[component_idx, pivot] < 0: |
| components[component_idx] *= -1 |
|
|
| coords = centered @ components.T |
| total_variance = float((singular_values**2).sum()) |
| explained = [] |
| for idx in range(2): |
| if idx < len(singular_values) and total_variance > 0: |
| explained.append(float((singular_values[idx] ** 2) / total_variance)) |
| else: |
| explained.append(0.0) |
| return coords.astype(np.float32), explained |
|
|
|
|
| def projection_rows( |
| rows: list[dict[str, object]], |
| coords: np.ndarray, |
| explained: list[float], |
| ) -> list[dict[str, object]]: |
| out: list[dict[str, object]] = [] |
| for idx, row in enumerate(rows): |
| out.append( |
| { |
| "row_idx": idx, |
| "representation_id": row["representation_id"], |
| "representation_level": row["representation_level"], |
| "dataset": row["dataset"], |
| "condition": row["condition"], |
| "window_size": row.get("window_size", ""), |
| "window_start_trial_idx": row.get("window_start_trial_idx", ""), |
| "window_end_trial_idx": row.get("window_end_trial_idx", ""), |
| "pc1": round(float(coords[idx, 0]), 8), |
| "pc2": round(float(coords[idx, 1]), 8), |
| "pc1_explained_variance_ratio": round(explained[0], 8), |
| "pc2_explained_variance_ratio": round(explained[1], 8), |
| } |
| ) |
| return out |
|
|
|
|
| def write_report( |
| path: Path, |
| condition_rows: list[dict[str, object]], |
| sliding_rows: list[dict[str, object]], |
| condition_explained: list[float], |
| sliding_explained: list[float], |
| ) -> None: |
| report = f"""# Exploratory Embedding Analysis |
| |
| This is a compact exploratory analysis of serialized AX-CPT representations. It should not be treated as evidence about latent model states or mechanistic representations. |
| |
| ## Inputs |
| |
| - `outputs/condition_level_representations.jsonl`: {len(condition_rows)} rows. |
| - `outputs/sliding_window_representations.jsonl`: {len(sliding_rows)} rows. |
| |
| Trial-level representations are not embedded in this first pass. |
| |
| ## Embedding Model |
| |
| - Model/library: `{MODEL_NAME}` implemented locally in `scripts/run_embedding_analysis.py`. |
| - Dependency: `numpy` for vector math, cosine similarity, and PCA. |
| - Text processing: lowercase alphanumeric tokenization with regex `{TOKEN_RE.pattern}`. |
| - Features: token unigrams and bigrams. |
| - Vectorization: deterministic signed feature hashing with CRC32 into {EMBEDDING_DIM} dimensions. |
| - Normalization: L2 normalization per row. |
| |
| These are actual text-derived embedding vectors for the serialized representations. They are not neural embeddings, latent model embeddings, hidden states, logits, probabilities, reaction times, costs, or latency measurements. |
| |
| ## Similarity |
| |
| Cosine similarity and cosine distance are computed on L2-normalized hashed text embeddings. Summary files report within-condition and between-condition comparisons. Similarities reflect overlap in the serialized representation text and should be interpreted cautiously. |
| |
| ## Projection |
| |
| 2D projections use deterministic PCA via `numpy.linalg.svd` on centered embedding matrices. Component signs are fixed by forcing the largest absolute component loading to be positive. |
| |
| - Condition-level PCA explained variance ratio: PC1={condition_explained[0]:.6f}, PC2={condition_explained[1]:.6f} |
| - Sliding-window PCA explained variance ratio: PC1={sliding_explained[0]:.6f}, PC2={sliding_explained[1]:.6f} |
| |
| ## Outputs |
| |
| - `condition_embeddings.npz` |
| - `condition_embedding_vectors.csv` |
| - `condition_embedding_metadata.csv` |
| - `condition_embedding_similarity_pairs.csv` |
| - `condition_embedding_projection_2d.csv` |
| - `sliding_window_embeddings.npz` |
| - `sliding_window_embedding_metadata.csv` |
| - `sliding_window_embedding_similarity_summary.csv` |
| - `sliding_window_embedding_projection_2d.csv` |
| - `embedding_model_config.json` |
| """ |
| path.parent.mkdir(parents=True, exist_ok=True) |
| path.write_text(report, encoding="utf-8") |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser(description="Run exploratory embedding analysis for AX-CPT representations.") |
| parser.add_argument("--input-dir", type=Path, default=Path("outputs")) |
| parser.add_argument("--output-dir", type=Path, default=Path("outputs/embedding_analysis")) |
| parser.add_argument("--dim", type=int, default=EMBEDDING_DIM) |
| args = parser.parse_args() |
|
|
| condition_rows = read_jsonl(args.input_dir / "condition_level_representations.jsonl") |
| sliding_rows = read_jsonl(args.input_dir / "sliding_window_representations.jsonl") |
| args.output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| condition_embeddings = embed_rows(condition_rows, dim=args.dim) |
| sliding_embeddings = embed_rows(sliding_rows, dim=args.dim) |
|
|
| save_embedding_bundle(args.output_dir / "condition_embeddings.npz", condition_rows, condition_embeddings) |
| save_embedding_bundle(args.output_dir / "sliding_window_embeddings.npz", sliding_rows, sliding_embeddings) |
|
|
| metadata_fields = [ |
| "row_idx", |
| "representation_id", |
| "representation_level", |
| "dataset", |
| "condition", |
| "n_trials", |
| "window_size", |
| "window_start_trial_idx", |
| "window_end_trial_idx", |
| ] |
| write_csv(args.output_dir / "condition_embedding_metadata.csv", metadata_rows(condition_rows), metadata_fields) |
| write_csv(args.output_dir / "sliding_window_embedding_metadata.csv", metadata_rows(sliding_rows), metadata_fields) |
|
|
| write_csv( |
| args.output_dir / "condition_embedding_vectors.csv", |
| condition_vector_rows(condition_rows, condition_embeddings), |
| [ |
| "row_idx", |
| "representation_id", |
| "dataset", |
| "condition", |
| "embedding_model", |
| "embedding_dim", |
| "embedding_vector_json", |
| ], |
| ) |
|
|
| similarity_fields = [ |
| "representation_level", |
| "dataset_a", |
| "condition_a", |
| "dataset_b", |
| "condition_b", |
| "pair_type", |
| "n_pairs", |
| "mean_cosine_similarity", |
| "mean_cosine_distance", |
| "min_cosine_similarity", |
| "max_cosine_similarity", |
| "std_cosine_similarity", |
| ] |
| write_csv( |
| args.output_dir / "condition_embedding_similarity_pairs.csv", |
| cosine_pair_summary(condition_rows, condition_embeddings, "condition"), |
| similarity_fields, |
| ) |
| write_csv( |
| args.output_dir / "sliding_window_embedding_similarity_summary.csv", |
| cosine_pair_summary(sliding_rows, sliding_embeddings, "sliding_window"), |
| similarity_fields, |
| ) |
|
|
| condition_coords, condition_explained = pca_2d(condition_embeddings) |
| sliding_coords, sliding_explained = pca_2d(sliding_embeddings) |
| projection_fields = [ |
| "row_idx", |
| "representation_id", |
| "representation_level", |
| "dataset", |
| "condition", |
| "window_size", |
| "window_start_trial_idx", |
| "window_end_trial_idx", |
| "pc1", |
| "pc2", |
| "pc1_explained_variance_ratio", |
| "pc2_explained_variance_ratio", |
| ] |
| write_csv( |
| args.output_dir / "condition_embedding_projection_2d.csv", |
| projection_rows(condition_rows, condition_coords, condition_explained), |
| projection_fields, |
| ) |
| write_csv( |
| args.output_dir / "sliding_window_embedding_projection_2d.csv", |
| projection_rows(sliding_rows, sliding_coords, sliding_explained), |
| projection_fields, |
| ) |
|
|
| config = { |
| "analysis_label": "exploratory_embedding_analysis", |
| "embedding_model": MODEL_NAME, |
| "embedding_dim": args.dim, |
| "library": "numpy", |
| "tokenizer_regex": TOKEN_RE.pattern, |
| "ngram_range": list(NGRAM_RANGE), |
| "hash_function": "zlib.crc32 signed feature hashing", |
| "normalization": "l2", |
| "projection": "PCA via numpy.linalg.svd with deterministic component sign convention", |
| "inputs": { |
| "condition_level": str(args.input_dir / "condition_level_representations.jsonl"), |
| "sliding_window": str(args.input_dir / "sliding_window_representations.jsonl"), |
| }, |
| "not_included": [ |
| "neural embeddings", |
| "latent model hidden states", |
| "logits", |
| "probabilities", |
| "reaction times", |
| "API costs", |
| "latency measurements", |
| ], |
| } |
| (args.output_dir / "embedding_model_config.json").write_text( |
| json.dumps(config, ensure_ascii=False, indent=2, sort_keys=True) + "\n", |
| encoding="utf-8", |
| ) |
| write_report( |
| args.output_dir / "exploratory_embedding_report.md", |
| condition_rows=condition_rows, |
| sliding_rows=sliding_rows, |
| condition_explained=condition_explained, |
| sliding_explained=sliding_explained, |
| ) |
|
|
| print(f"Embedded {len(condition_rows)} condition-level rows.") |
| print(f"Embedded {len(sliding_rows)} sliding-window rows.") |
| print(f"Wrote exploratory embedding outputs to {args.output_dir}") |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|
|
|