axcpt-llm-control / scripts /run_embedding_analysis.py
neuro-bot's picture
Upload 91 files
a668ecc verified
"""Exploratory embedding analysis for serialized AX-CPT representations.
This script reads deterministic text representations produced by
scripts/rebuild_release_outputs.py and computes local hashed token n-gram
embeddings. These are actual vector embeddings of the serialized text, but they
are not neural model embeddings and they are not latent model hidden states.
"""
from __future__ import annotations
import argparse
import csv
import json
import math
import re
import zlib
from collections import defaultdict
from pathlib import Path
import numpy as np
MODEL_NAME = "local_hashing_token_ngram_v1"
EMBEDDING_DIM = 256
NGRAM_RANGE = (1, 2)
TOKEN_RE = re.compile(r"[a-z0-9_]+")
def read_jsonl(path: Path) -> list[dict[str, object]]:
rows: list[dict[str, object]] = []
with path.open("r", encoding="utf-8") as handle:
for line in handle:
line = line.strip()
if line:
rows.append(json.loads(line))
return rows
def write_csv(path: Path, rows: list[dict[str, object]], fieldnames: list[str]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
def tokenize(text: str) -> list[str]:
return TOKEN_RE.findall(text.lower())
def token_ngrams(tokens: list[str], ngram_range: tuple[int, int]) -> list[str]:
min_n, max_n = ngram_range
out: list[str] = []
for n in range(min_n, max_n + 1):
if n <= 0 or len(tokens) < n:
continue
for i in range(0, len(tokens) - n + 1):
out.append(" ".join(tokens[i : i + n]))
return out
def stable_bucket(feature: str, dim: int) -> tuple[int, float]:
data = feature.encode("utf-8")
bucket = zlib.crc32(data) % dim
sign = 1.0 if (zlib.crc32(b"sign:" + data) % 2 == 0) else -1.0
return bucket, sign
def embed_text(text: str, dim: int = EMBEDDING_DIM) -> np.ndarray:
vector = np.zeros(dim, dtype=np.float32)
features = token_ngrams(tokenize(text), NGRAM_RANGE)
for feature in features:
bucket, sign = stable_bucket(feature, dim)
vector[bucket] += sign
norm = float(np.linalg.norm(vector))
if norm > 0:
vector /= norm
return vector
def embed_rows(rows: list[dict[str, object]], dim: int = EMBEDDING_DIM) -> np.ndarray:
embeddings = np.zeros((len(rows), dim), dtype=np.float32)
for idx, row in enumerate(rows):
embeddings[idx] = embed_text(str(row["serialized_text"]), dim=dim)
return embeddings
def save_embedding_bundle(path: Path, rows: list[dict[str, object]], embeddings: np.ndarray) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
np.savez_compressed(
path,
representation_id=np.array([str(row["representation_id"]) for row in rows]),
dataset=np.array([str(row["dataset"]) for row in rows]),
condition=np.array([str(row["condition"]) for row in rows]),
embedding=embeddings.astype(np.float32),
)
def metadata_rows(rows: list[dict[str, object]]) -> list[dict[str, object]]:
out: list[dict[str, object]] = []
for idx, row in enumerate(rows):
out.append(
{
"row_idx": idx,
"representation_id": row["representation_id"],
"representation_level": row["representation_level"],
"dataset": row["dataset"],
"condition": row["condition"],
"n_trials": row.get("n_trials", ""),
"window_size": row.get("window_size", ""),
"window_start_trial_idx": row.get("window_start_trial_idx", ""),
"window_end_trial_idx": row.get("window_end_trial_idx", ""),
}
)
return out
def condition_vector_rows(rows: list[dict[str, object]], embeddings: np.ndarray) -> list[dict[str, object]]:
out: list[dict[str, object]] = []
for idx, row in enumerate(rows):
out.append(
{
"row_idx": idx,
"representation_id": row["representation_id"],
"dataset": row["dataset"],
"condition": row["condition"],
"embedding_model": MODEL_NAME,
"embedding_dim": embeddings.shape[1],
"embedding_vector_json": json.dumps([round(float(x), 8) for x in embeddings[idx]], separators=(",", ":")),
}
)
return out
def cosine_pair_summary(
rows: list[dict[str, object]],
embeddings: np.ndarray,
label: str,
) -> list[dict[str, object]]:
groups: dict[tuple[str, str], list[int]] = defaultdict(list)
for idx, row in enumerate(rows):
groups[(str(row["dataset"]), str(row["condition"]))].append(idx)
out: list[dict[str, object]] = []
keys = sorted(groups)
for i, key_a in enumerate(keys):
idx_a = groups[key_a]
emb_a = embeddings[idx_a]
for key_b in keys[i:]:
idx_b = groups[key_b]
emb_b = embeddings[idx_b]
sims = emb_a @ emb_b.T
if key_a == key_b:
if len(idx_a) < 2:
values = np.array([], dtype=np.float32)
else:
mask = np.triu(np.ones(sims.shape, dtype=bool), k=1)
values = sims[mask]
else:
values = sims.reshape(-1)
if values.size == 0:
mean_cos = min_cos = max_cos = std_cos = mean_dist = None
else:
mean_cos = float(values.mean())
min_cos = float(values.min())
max_cos = float(values.max())
std_cos = float(values.std(ddof=0))
mean_dist = float((1.0 - values).mean())
out.append(
{
"representation_level": label,
"dataset_a": key_a[0],
"condition_a": key_a[1],
"dataset_b": key_b[0],
"condition_b": key_b[1],
"pair_type": "within_condition" if key_a == key_b else "between_condition",
"n_pairs": int(values.size),
"mean_cosine_similarity": round(mean_cos, 6) if mean_cos is not None else "",
"mean_cosine_distance": round(mean_dist, 6) if mean_dist is not None else "",
"min_cosine_similarity": round(min_cos, 6) if min_cos is not None else "",
"max_cosine_similarity": round(max_cos, 6) if max_cos is not None else "",
"std_cosine_similarity": round(std_cos, 6) if std_cos is not None else "",
}
)
return out
def pca_2d(embeddings: np.ndarray) -> tuple[np.ndarray, list[float]]:
if embeddings.shape[0] == 0:
return np.zeros((0, 2), dtype=np.float32), [0.0, 0.0]
centered = embeddings.astype(np.float64) - embeddings.astype(np.float64).mean(axis=0, keepdims=True)
_, singular_values, vt = np.linalg.svd(centered, full_matrices=False)
components = vt[:2].copy()
for component_idx in range(components.shape[0]):
pivot = int(np.argmax(np.abs(components[component_idx])))
if components[component_idx, pivot] < 0:
components[component_idx] *= -1
coords = centered @ components.T
total_variance = float((singular_values**2).sum())
explained = []
for idx in range(2):
if idx < len(singular_values) and total_variance > 0:
explained.append(float((singular_values[idx] ** 2) / total_variance))
else:
explained.append(0.0)
return coords.astype(np.float32), explained
def projection_rows(
rows: list[dict[str, object]],
coords: np.ndarray,
explained: list[float],
) -> list[dict[str, object]]:
out: list[dict[str, object]] = []
for idx, row in enumerate(rows):
out.append(
{
"row_idx": idx,
"representation_id": row["representation_id"],
"representation_level": row["representation_level"],
"dataset": row["dataset"],
"condition": row["condition"],
"window_size": row.get("window_size", ""),
"window_start_trial_idx": row.get("window_start_trial_idx", ""),
"window_end_trial_idx": row.get("window_end_trial_idx", ""),
"pc1": round(float(coords[idx, 0]), 8),
"pc2": round(float(coords[idx, 1]), 8),
"pc1_explained_variance_ratio": round(explained[0], 8),
"pc2_explained_variance_ratio": round(explained[1], 8),
}
)
return out
def write_report(
path: Path,
condition_rows: list[dict[str, object]],
sliding_rows: list[dict[str, object]],
condition_explained: list[float],
sliding_explained: list[float],
) -> None:
report = f"""# Exploratory Embedding Analysis
This is a compact exploratory analysis of serialized AX-CPT representations. It should not be treated as evidence about latent model states or mechanistic representations.
## Inputs
- `outputs/condition_level_representations.jsonl`: {len(condition_rows)} rows.
- `outputs/sliding_window_representations.jsonl`: {len(sliding_rows)} rows.
Trial-level representations are not embedded in this first pass.
## Embedding Model
- Model/library: `{MODEL_NAME}` implemented locally in `scripts/run_embedding_analysis.py`.
- Dependency: `numpy` for vector math, cosine similarity, and PCA.
- Text processing: lowercase alphanumeric tokenization with regex `{TOKEN_RE.pattern}`.
- Features: token unigrams and bigrams.
- Vectorization: deterministic signed feature hashing with CRC32 into {EMBEDDING_DIM} dimensions.
- Normalization: L2 normalization per row.
These are actual text-derived embedding vectors for the serialized representations. They are not neural embeddings, latent model embeddings, hidden states, logits, probabilities, reaction times, costs, or latency measurements.
## Similarity
Cosine similarity and cosine distance are computed on L2-normalized hashed text embeddings. Summary files report within-condition and between-condition comparisons. Similarities reflect overlap in the serialized representation text and should be interpreted cautiously.
## Projection
2D projections use deterministic PCA via `numpy.linalg.svd` on centered embedding matrices. Component signs are fixed by forcing the largest absolute component loading to be positive.
- Condition-level PCA explained variance ratio: PC1={condition_explained[0]:.6f}, PC2={condition_explained[1]:.6f}
- Sliding-window PCA explained variance ratio: PC1={sliding_explained[0]:.6f}, PC2={sliding_explained[1]:.6f}
## Outputs
- `condition_embeddings.npz`
- `condition_embedding_vectors.csv`
- `condition_embedding_metadata.csv`
- `condition_embedding_similarity_pairs.csv`
- `condition_embedding_projection_2d.csv`
- `sliding_window_embeddings.npz`
- `sliding_window_embedding_metadata.csv`
- `sliding_window_embedding_similarity_summary.csv`
- `sliding_window_embedding_projection_2d.csv`
- `embedding_model_config.json`
"""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(report, encoding="utf-8")
def main() -> int:
parser = argparse.ArgumentParser(description="Run exploratory embedding analysis for AX-CPT representations.")
parser.add_argument("--input-dir", type=Path, default=Path("outputs"))
parser.add_argument("--output-dir", type=Path, default=Path("outputs/embedding_analysis"))
parser.add_argument("--dim", type=int, default=EMBEDDING_DIM)
args = parser.parse_args()
condition_rows = read_jsonl(args.input_dir / "condition_level_representations.jsonl")
sliding_rows = read_jsonl(args.input_dir / "sliding_window_representations.jsonl")
args.output_dir.mkdir(parents=True, exist_ok=True)
condition_embeddings = embed_rows(condition_rows, dim=args.dim)
sliding_embeddings = embed_rows(sliding_rows, dim=args.dim)
save_embedding_bundle(args.output_dir / "condition_embeddings.npz", condition_rows, condition_embeddings)
save_embedding_bundle(args.output_dir / "sliding_window_embeddings.npz", sliding_rows, sliding_embeddings)
metadata_fields = [
"row_idx",
"representation_id",
"representation_level",
"dataset",
"condition",
"n_trials",
"window_size",
"window_start_trial_idx",
"window_end_trial_idx",
]
write_csv(args.output_dir / "condition_embedding_metadata.csv", metadata_rows(condition_rows), metadata_fields)
write_csv(args.output_dir / "sliding_window_embedding_metadata.csv", metadata_rows(sliding_rows), metadata_fields)
write_csv(
args.output_dir / "condition_embedding_vectors.csv",
condition_vector_rows(condition_rows, condition_embeddings),
[
"row_idx",
"representation_id",
"dataset",
"condition",
"embedding_model",
"embedding_dim",
"embedding_vector_json",
],
)
similarity_fields = [
"representation_level",
"dataset_a",
"condition_a",
"dataset_b",
"condition_b",
"pair_type",
"n_pairs",
"mean_cosine_similarity",
"mean_cosine_distance",
"min_cosine_similarity",
"max_cosine_similarity",
"std_cosine_similarity",
]
write_csv(
args.output_dir / "condition_embedding_similarity_pairs.csv",
cosine_pair_summary(condition_rows, condition_embeddings, "condition"),
similarity_fields,
)
write_csv(
args.output_dir / "sliding_window_embedding_similarity_summary.csv",
cosine_pair_summary(sliding_rows, sliding_embeddings, "sliding_window"),
similarity_fields,
)
condition_coords, condition_explained = pca_2d(condition_embeddings)
sliding_coords, sliding_explained = pca_2d(sliding_embeddings)
projection_fields = [
"row_idx",
"representation_id",
"representation_level",
"dataset",
"condition",
"window_size",
"window_start_trial_idx",
"window_end_trial_idx",
"pc1",
"pc2",
"pc1_explained_variance_ratio",
"pc2_explained_variance_ratio",
]
write_csv(
args.output_dir / "condition_embedding_projection_2d.csv",
projection_rows(condition_rows, condition_coords, condition_explained),
projection_fields,
)
write_csv(
args.output_dir / "sliding_window_embedding_projection_2d.csv",
projection_rows(sliding_rows, sliding_coords, sliding_explained),
projection_fields,
)
config = {
"analysis_label": "exploratory_embedding_analysis",
"embedding_model": MODEL_NAME,
"embedding_dim": args.dim,
"library": "numpy",
"tokenizer_regex": TOKEN_RE.pattern,
"ngram_range": list(NGRAM_RANGE),
"hash_function": "zlib.crc32 signed feature hashing",
"normalization": "l2",
"projection": "PCA via numpy.linalg.svd with deterministic component sign convention",
"inputs": {
"condition_level": str(args.input_dir / "condition_level_representations.jsonl"),
"sliding_window": str(args.input_dir / "sliding_window_representations.jsonl"),
},
"not_included": [
"neural embeddings",
"latent model hidden states",
"logits",
"probabilities",
"reaction times",
"API costs",
"latency measurements",
],
}
(args.output_dir / "embedding_model_config.json").write_text(
json.dumps(config, ensure_ascii=False, indent=2, sort_keys=True) + "\n",
encoding="utf-8",
)
write_report(
args.output_dir / "exploratory_embedding_report.md",
condition_rows=condition_rows,
sliding_rows=sliding_rows,
condition_explained=condition_explained,
sliding_explained=sliding_explained,
)
print(f"Embedded {len(condition_rows)} condition-level rows.")
print(f"Embedded {len(sliding_rows)} sliding-window rows.")
print(f"Wrote exploratory embedding outputs to {args.output_dir}")
return 0
if __name__ == "__main__":
raise SystemExit(main())