gemotions / validate_external.py
dejanseo's picture
Upload 10 files
67f0e56 verified
#!/usr/bin/env python3
"""Validate emotion vectors against external corpora.
Projects activations from external text onto emotion vectors to verify
they activate on emotionally matching content.
Run:
python -m full_replication.validate_external --model e4b
python -m full_replication.validate_external --model 31b
"""
import argparse
import json
import os
import warnings
from collections import defaultdict
import numpy as np
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
from full_replication.config import MODELS, START_TOKEN, get_results_dir
warnings.filterwarnings("ignore")
# Datasets to validate against (HuggingFace dataset IDs)
DATASETS = {
"pile_subset": {
"path": "monology/pile-uncopyrighted",
"split": "train",
"text_field": "text",
"n_samples": 5000,
},
"lmsys_chat": {
"path": "lmsys/lmsys-chat-1m",
"split": "train",
"text_field": "conversation",
"n_samples": 5000,
},
}
def load_emotion_vectors(results_dir, layer):
path = os.path.join(results_dir, f"emotion_vectors_layer{layer}.npz")
data = np.load(path)
return {name: data[name] for name in data.files}
def get_hooks_and_layers(model):
activations = {}
def make_hook(name):
def hook_fn(module, input, output):
if isinstance(output, tuple):
hidden = output[0]
else:
hidden = output
activations[name] = hidden.detach().cpu().float()
return hook_fn
if hasattr(model.model, 'language_model'):
layers = model.model.language_model.layers
elif hasattr(model.model, 'layers'):
layers = model.model.layers
else:
raise RuntimeError("Cannot find model layers")
hooks = []
for i, layer in enumerate(layers):
h = layer.register_forward_hook(make_hook(f"layer_{i}"))
hooks.append(h)
return activations, hooks
def extract_activation(model, tokenizer, text, activations_dict, target_layer):
"""Extract mean activation at target layer."""
inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
inputs = {k: v.to(model.device) for k, v in inputs.items()}
with torch.no_grad():
model(**inputs)
key = f"layer_{target_layer}"
if key not in activations_dict:
return None
hidden = activations_dict[key]
seq_len = hidden.shape[1]
if seq_len <= START_TOKEN:
vec = hidden[0].mean(dim=0).numpy()
else:
vec = hidden[0, START_TOKEN:].mean(dim=0).numpy()
activations_dict.clear()
return vec
def project_onto_emotions(activation, emotion_vectors):
"""Project activation onto each emotion vector, return cosine similarities."""
results = {}
act_norm = np.linalg.norm(activation) + 1e-8
for emotion, vec in emotion_vectors.items():
vec_norm = np.linalg.norm(vec) + 1e-8
results[emotion] = float(np.dot(activation, vec) / (act_norm * vec_norm))
return results
def validate_dataset(model, tokenizer, emotion_vectors, target_layer,
activations_dict, dataset_cfg, results_dir):
"""Run validation on one dataset."""
try:
from datasets import load_dataset
except ImportError:
print(" ERROR: 'datasets' library not installed. Run: pip install datasets")
return None
dataset_name = dataset_cfg["path"]
print(f"\n Loading dataset: {dataset_name}...")
try:
ds = load_dataset(
dataset_cfg["path"],
split=dataset_cfg["split"],
streaming=True,
trust_remote_code=True,
)
except Exception as e:
print(f" ERROR loading dataset: {e}")
return None
n_samples = dataset_cfg["n_samples"]
text_field = dataset_cfg["text_field"]
# Incremental save file for projections
incremental_file = os.path.join(results_dir, "validation",
f"_{dataset_name}_layer{target_layer}_progress.jsonl")
os.makedirs(os.path.dirname(incremental_file), exist_ok=True)
# Resume from existing progress
projections = []
emotion_activation_sums = defaultdict(float)
emotion_activation_counts = defaultdict(int)
count = 0
if os.path.exists(incremental_file):
with open(incremental_file, "r", encoding="utf-8") as f:
for line in f:
if not line.strip():
continue
record = json.loads(line)
projections.append(record)
for emotion, score in record["top_emotions"]:
emotion_activation_sums[emotion] += score
emotion_activation_counts[emotion] += 1
count += 1
print(f" Resuming from {count} cached samples...")
if count >= n_samples:
print(f" Already complete ({count} samples).")
else:
print(f" Processing {n_samples - count} remaining samples...")
skip = count
with open(incremental_file, "a", encoding="utf-8") as f:
for item in ds:
if count >= n_samples:
break
if skip > 0:
skip -= 1
continue
# Extract text
if isinstance(item.get(text_field), list):
text = " ".join(str(turn) for turn in item[text_field][:3])
else:
text = str(item.get(text_field, ""))
if len(text) < 50:
continue
activation = extract_activation(model, tokenizer, text, activations_dict, target_layer)
if activation is None:
continue
projs = project_onto_emotions(activation, emotion_vectors)
for emotion, score in projs.items():
emotion_activation_sums[emotion] += score
emotion_activation_counts[emotion] += 1
top_5 = sorted(projs.items(), key=lambda x: -x[1])[:5]
record = {"text_preview": text[:100], "top_emotions": top_5}
projections.append(record)
f.write(json.dumps(record, ensure_ascii=False) + "\n")
count += 1
if count % 500 == 0:
f.flush()
print(f" [{count}/{n_samples}]")
# Compute statistics
emotion_stats = {}
for emotion in emotion_vectors:
n = emotion_activation_counts.get(emotion, 0)
if n > 0:
mean = emotion_activation_sums[emotion] / n
emotion_stats[emotion] = {"mean_projection": float(mean), "n_samples": n}
sorted_emotions = sorted(emotion_stats.items(), key=lambda x: -x[1]["mean_projection"])
print(f" Top 10 most activated emotions across dataset:")
for emotion, stats in sorted_emotions[:10]:
print(f" {emotion}: mean projection = {stats['mean_projection']:.4f}")
return {
"dataset": dataset_name,
"n_samples": count,
"emotion_stats": emotion_stats,
"sample_projections": projections[:100],
}
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--model", required=True, choices=["e4b", "31b"])
parser.add_argument("--layer", type=int, default=None)
parser.add_argument("--dataset", choices=list(DATASETS.keys()), default=None,
help="Run on specific dataset (default: all)")
args = parser.parse_args()
model_cfg = MODELS[args.model]
results_dir = get_results_dir(args.model)
target_layer = args.layer or int(model_cfg["num_layers"] * 2 / 3)
# Check vectors exist
vec_path = os.path.join(results_dir, f"emotion_vectors_layer{target_layer}.npz")
if not os.path.exists(vec_path):
print(f"ERROR: No vectors at {vec_path}. Run extract_vectors.py first.")
return
emotion_vectors = load_emotion_vectors(results_dir, target_layer)
print(f"Loaded {len(emotion_vectors)} emotion vectors from layer {target_layer}")
# Load model
print(f"Loading model {model_cfg['model_id']}...")
tokenizer = AutoTokenizer.from_pretrained(model_cfg["model_id"])
load_kwargs = {"device_map": "auto"}
if model_cfg["quantization"] == "4bit":
load_kwargs["quantization_config"] = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype="bfloat16",
)
else:
load_kwargs["dtype"] = torch.bfloat16
model = AutoModelForCausalLM.from_pretrained(model_cfg["model_id"], **load_kwargs)
model.eval()
activations_dict, hooks = get_hooks_and_layers(model)
# Run validation
datasets_to_run = {args.dataset: DATASETS[args.dataset]} if args.dataset else DATASETS
validation_dir = os.path.join(results_dir, "validation")
os.makedirs(validation_dir, exist_ok=True)
for ds_name, ds_cfg in datasets_to_run.items():
result = validate_dataset(
model, tokenizer, emotion_vectors, target_layer,
activations_dict, ds_cfg, results_dir
)
if result:
out_file = os.path.join(validation_dir, f"{ds_name}_layer{target_layer}.json")
with open(out_file, "w", encoding="utf-8") as f:
json.dump(result, f, indent=2, ensure_ascii=False)
print(f" Saved: {out_file}")
for h in hooks:
h.remove()
print("\n=== VALIDATION COMPLETE ===")
if __name__ == "__main__":
main()