Spaces:
Running
Running
File size: 61,919 Bytes
1a93ca4 0e396c0 1f5efac 0e396c0 1a93ca4 ac1a7df c19d5a8 ac1a7df c19d5a8 ac1a7df c19d5a8 ac1a7df d60cfe2 1dd1822 d60cfe2 1a93ca4 7fd64d2 1a93ca4 d60cfe2 f33c95a 2f4e4f3 d60cfe2 1a93ca4 2f4e4f3 1a93ca4 f33c95a 1a93ca4 f33c95a 1a93ca4 7ad3e7b 7fd64d2 1a93ca4 7ad3e7b 1a93ca4 1dd1822 1a93ca4 1dd1822 1a93ca4 7ad3e7b 1a93ca4 7ad3e7b 1a93ca4 f33c95a 1a93ca4 1dd1822 ac1a7df 1dd1822 ac1a7df 1dd1822 1a93ca4 d60cfe2 7fd64d2 7ad3e7b 1a93ca4 7ad3e7b f33c95a 1a93ca4 f33c95a 1a93ca4 ac1a7df d60cfe2 7ad3e7b 1a93ca4 7fd64d2 1a93ca4 ac1a7df 7ad3e7b ac1a7df 7ad3e7b ac1a7df 455f0c2 ac1a7df 455f0c2 ac1a7df 455f0c2 ac1a7df 455f0c2 ac1a7df 7ad3e7b ac1a7df ae08976 a938eeb ae08976 7ad3e7b ae08976 7ad3e7b ae08976 72f82a4 ae08976 7ad3e7b ae08976 a938eeb ae08976 7ad3e7b ae08976 7ad3e7b ae08976 a938eeb 7ad3e7b ae08976 2ad1c2e aabd66e a4e6de8 1a93ca4 1221d69 ac1a7df c6cb681 f33c95a 1dd1822 1a93ca4 f33c95a 1dd1822 f33c95a 1221d69 1a93ca4 ac1a7df 1a93ca4 1dd1822 aabd66e 1a93ca4 1dd1822 f33c95a 1dd1822 1a93ca4 1dd1822 1a93ca4 1dd1822 1a93ca4 ac1a7df 1a93ca4 ac1a7df 1dd1822 ac1a7df 1dd1822 f33c95a 1dd1822 f33c95a 1dd1822 f33c95a 1dd1822 f33c95a 1dd1822 1a93ca4 ac1a7df aabd66e ac1a7df 1221d69 f33c95a 1221d69 f33c95a 1dd1822 f33c95a a4e6de8 1dd1822 ac1a7df 9d22c3c ac1a7df 9d22c3c ac1a7df 9d22c3c a4e6de8 9d22c3c ac1a7df 9d22c3c ac1a7df 9d22c3c ac1a7df 9d22c3c ac1a7df 5c05f37 a4e6de8 ac1a7df 5c05f37 2965a7d ac1a7df 5c05f37 4cd795f 5c05f37 d6f21db ac1a7df c19d5a8 ac1a7df 5c05f37 ac1a7df 5c05f37 d6f21db 5c05f37 ac1a7df d6f21db ac1a7df 5c05f37 1221d69 ac1a7df 1221d69 5c05f37 1221d69 ac1a7df 5c05f37 1221d69 ac1a7df 5c05f37 1221d69 ac1a7df 5c05f37 ac1a7df 5c05f37 24bd049 1a93ca4 ef749ae d5dc3e0 ef749ae 1a93ca4 ef749ae 1a93ca4 ef749ae 1a93ca4 d5dc3e0 1a93ca4 d5dc3e0 1a93ca4 d5dc3e0 1a93ca4 d5dc3e0 1a93ca4 d5dc3e0 1a93ca4 d5dc3e0 1a93ca4 d5dc3e0 ef749ae 1a93ca4 ef749ae 96e2bc7 ef749ae 96e2bc7 67f5be3 96e2bc7 1a93ca4 d5dc3e0 8e9f963 7fa8fb4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 | """Model pattern extraction utilities for transformer models."""
import re
import torch
import torch.nn.functional as F
from typing import Dict, List, Tuple, Any, Optional
from transformers import AutoModelForCausalLM, AutoTokenizer
def extract_patterns(model, use_modules=True) -> Dict[str, List[str]]:
"""Extract patterns from model modules or parameters."""
items = model.named_modules() if use_modules else model.named_parameters()
patterns = {}
for name, _ in items:
if not name:
continue
# Replace numeric sequences with {N} placeholder
pattern = re.sub(r'(\.|_)(\d+)(\.|_|$)', r'\1{N}\3', name)
pattern = re.sub(r'([a-zA-Z])(\d+)(\.|_|$)', r'\1{N}\3', pattern)
if pattern not in patterns:
patterns[pattern] = []
patterns[pattern].append(name)
return patterns
def load_model_and_get_patterns(model_name: str) -> Tuple[Dict[str, List[str]], Dict[str, List[str]]]:
"""
Load model from HuggingFace Hub and extract module/parameter patterns.
Returns:
(module_patterns, parameter_patterns): Pattern dictionaries mapping patterns to name lists
"""
print(f"Loading model: {model_name}")
# Load model and tokenizer
model = AutoModelForCausalLM.from_pretrained(model_name, attn_implementation='eager')
tokenizer = AutoTokenizer.from_pretrained(model_name)
model.eval()
# Extract patterns
module_patterns = extract_patterns(model, use_modules=True)
param_patterns = extract_patterns(model, use_modules=False)
print(f"Found {len(module_patterns)} module patterns, {len(param_patterns)} parameter patterns")
return module_patterns, param_patterns
def safe_to_serializable(obj: Any) -> Any:
"""Convert tensors to lists recursively for JSON serialization."""
if torch.is_tensor(obj):
# Check if tensor is a meta tensor (no data) and skip it
try:
if obj.device.type == 'meta':
return None
return obj.detach().cpu().tolist()
except RuntimeError:
# Handle meta tensors that raise errors when accessing device
return None
if isinstance(obj, (list, tuple)):
return [safe_to_serializable(x) for x in obj]
if isinstance(obj, dict):
return {k: safe_to_serializable(v) for k, v in obj.items()}
return obj
def merge_token_probabilities(token_probs: List[Tuple[str, float]]) -> List[Tuple[str, float]]:
"""
Merge tokens with and without leading space, summing their probabilities.
Example: [(" cat", 0.15), ("cat", 0.05), (" dog", 0.10)] -> [("cat", 0.20), ("dog", 0.10)]
Args:
token_probs: List of (token_string, probability) tuples
Returns:
List of (token_string, merged_probability) tuples, sorted by probability (descending)
"""
merged = {} # Map from stripped token -> total probability
for token, prob in token_probs:
# Strip leading space to get canonical form
canonical = token.lstrip()
merged[canonical] = merged.get(canonical, 0.0) + prob
# Convert back to list and sort by probability (descending)
result = sorted(merged.items(), key=lambda x: x[1], reverse=True)
return result
def compute_global_top5_tokens(model_output, tokenizer, top_k: int = 5) -> List[Dict[str, Any]]:
"""
Compute the global top-5 tokens from model's final output with merged probabilities.
Args:
model_output: Output from model(**inputs) containing logits
tokenizer: Tokenizer for decoding
top_k: Number of top tokens to return (default: 5)
Returns:
List of dicts {'token': str, 'probability': float} for top K tokens
"""
with torch.no_grad():
# Get probabilities for next token (last position)
logits = model_output.logits[0, -1, :] # [vocab_size]
probs = F.softmax(logits, dim=-1)
# Get more candidates to account for merging (get 2x top_k)
top_probs, top_indices = torch.topk(probs, k=min(top_k * 2, len(probs)))
# Decode tokens
candidates = [
(tokenizer.decode([idx.item()], skip_special_tokens=False), prob.item())
for idx, prob in zip(top_indices, top_probs)
]
# Merge tokens with/without leading space
merged = merge_token_probabilities(candidates)
# Return top K after merging, formatted as dicts
return [{'token': t, 'probability': p} for t, p in merged[:top_k]]
def compute_per_position_top5(model_output, tokenizer, prompt_token_count: int, top_k: int = 5) -> List[Dict[str, Any]]:
"""
Compute top-K next-token probabilities at each generated-token position.
Uses logits already produced by the forward pass on the full sequence
(prompt + generated tokens). Position i in the returned list corresponds
to the prediction of generated token g_i given the prefix up to g_{i-1}.
Args:
model_output: Output from model(**inputs) containing logits [1, seq_len, vocab].
tokenizer: Tokenizer for decoding token IDs.
prompt_token_count: Number of tokens in the original prompt (P).
top_k: Number of top tokens per position (default 5).
Returns:
List of dicts, one per generated token position::
[
{
"position": 0,
"top5": [{"token": str, "probability": float}, ...],
"actual_token": str, # token actually generated at this position
"actual_prob": float # its probability at this position
},
...
]
"""
seq_len = model_output.logits.shape[1]
num_generated = seq_len - prompt_token_count
if num_generated <= 0:
return []
results = []
with torch.no_grad():
# Precompute input_ids from the logits tensor shape for actual-token lookup.
# The actual token at generated position i lives at input index prompt_token_count + i.
# We recover it from argmax only when we don't have the real ids; however
# the caller should pass the full-sequence ids. Here we derive the actual
# token from the logits tensor's *next* position in the sequence.
all_logits = model_output.logits[0] # [seq_len, vocab]
for i in range(num_generated):
logit_idx = prompt_token_count - 1 + i # index into logits
next_token_idx = prompt_token_count + i # index of the actual next token
probs = F.softmax(all_logits[logit_idx], dim=-1)
# --- top-K with merge ---
top_probs, top_indices = torch.topk(probs, k=min(top_k * 2, len(probs)))
candidates = [
(tokenizer.decode([idx.item()], skip_special_tokens=False), prob.item())
for idx, prob in zip(top_indices, top_probs)
]
merged = merge_token_probabilities(candidates)
top5 = [{'token': t, 'probability': p} for t, p in merged[:top_k]]
# --- actual token at this position ---
# The actual next token is whichever token the model *was given* at
# next_token_idx. We can infer it from the argmax of the embedding
# lookup, but the simplest reliable way is to use the input_ids that
# produced these logits. Since we don't have direct access to
# input_ids here, we look at the logits at the *next* position:
# the token fed at position next_token_idx determined that position's
# context. We recover it by checking which token index has the
# highest *un-softmaxed* logit at position (logit_idx - 1) ... but
# that is circular. Instead, the caller stores the actual token ids
# alongside model_output. We fall back to a secondary attribute.
actual_token_id = None
if hasattr(model_output, 'input_ids') and model_output.input_ids is not None:
actual_token_id = model_output.input_ids[0, next_token_idx].item()
elif hasattr(model_output, '_input_ids'):
actual_token_id = model_output._input_ids[0, next_token_idx].item()
if actual_token_id is not None:
actual_token = tokenizer.decode([actual_token_id], skip_special_tokens=False)
actual_prob = probs[actual_token_id].item()
else:
# Fallback: use the argmax as "actual" (only correct for greedy)
top_prob, top_idx = probs.max(dim=-1)
actual_token = tokenizer.decode([top_idx.item()], skip_special_tokens=False)
actual_prob = top_prob.item()
results.append({
'position': i,
'top5': top5,
'actual_token': actual_token,
'actual_prob': float(actual_prob),
})
return results
def get_actual_model_output(model_output, tokenizer) -> Tuple[str, float]:
"""
Extract the predicted token from model's output.
Args:
model_output: Output from model(**inputs) containing logits
tokenizer: Tokenizer for decoding
Returns:
(token_string, probability) for the predicted next token
"""
with torch.no_grad():
# Get probabilities for next token (last position)
logits = model_output.logits[0, -1, :] # [vocab_size]
probs = F.softmax(logits, dim=-1)
# Get top predicted token
top_prob, top_idx = probs.max(dim=-1)
token_str = tokenizer.decode([top_idx.item()], skip_special_tokens=False)
return token_str, top_prob.item()
def execute_forward_pass(model, tokenizer, prompt: str, config: Dict[str, Any],
ablation_config: Optional[Dict[int, List[int]]] = None,
original_prompt: Optional[str] = None) -> Dict[str, Any]:
"""
Execute forward pass with PyVene IntervenableModel to capture activations from specified modules.
Args:
model: Loaded transformer model
tokenizer: Loaded tokenizer
prompt: Input text prompt (may be full sequence: original prompt + generated tokens)
config: Dict with module lists like {"attention_modules": [...], "block_modules": [...], ...}
ablation_config: Optional dict mapping layer numbers to list of head indices to ablate.
original_prompt: When provided, enables per-position top-5 computation for
the output scrubber. If prompt contains generated tokens beyond
original_prompt, each generated-token position gets its own top-5 data.
Returns:
JSON-serializable dict with captured activations and metadata
"""
if ablation_config:
return execute_forward_pass_with_multi_layer_head_ablation(model, tokenizer, prompt, config, ablation_config)
print(f"Executing forward pass with prompt: '{prompt}'")
# Extract module lists from config
attention_modules = config.get("attention_modules", [])
block_modules = config.get("block_modules", [])
norm_parameters = config.get("norm_parameters", [])
logit_lens_parameter = config.get("logit_lens_parameter")
all_modules = attention_modules + block_modules
if not all_modules:
print("No modules specified for capture")
return {"error": "No modules specified"}
# Register hooks directly on the original model to capture activations.
# (Avoids PyVene IntervenableModel which can remap module names and break
# hook registration, especially after model switching.)
inputs = tokenizer(prompt, return_tensors="pt")
captured = {}
name_to_module = dict(model.named_modules())
# Debug: warn if any requested modules are missing
missing_modules = [m for m in all_modules if m not in name_to_module]
if missing_modules:
print(f"Warning: {len(missing_modules)} modules not found in model: {missing_modules[:3]}...")
def make_hook(mod_name: str):
return lambda module, inputs, output: captured.update({mod_name: {"output": safe_to_serializable(output)}})
hooks = [
name_to_module[mod_name].register_forward_hook(make_hook(mod_name))
for mod_name in all_modules if mod_name in name_to_module
]
# Execute forward pass and capture actual output
with torch.no_grad():
model_output = model(**inputs, use_cache=False, output_attentions=True)
# Remove hooks
for hook in hooks:
hook.remove()
# Separate outputs by type based on module name pattern
attention_outputs = {}
block_outputs = {}
for mod_name, output in captured.items():
if 'attn' in mod_name or 'attention' in mod_name:
attention_outputs[mod_name] = output
else:
# Block/layer outputs (residual stream - full layer output)
block_outputs[mod_name] = output
# Capture normalization parameters (deprecated - kept for backward compatibility)
all_params = dict(model.named_parameters())
norm_data = [safe_to_serializable(all_params[p]) for p in norm_parameters if p in all_params]
# Extract predicted token from model output
actual_output = None
global_top5_tokens = []
try:
output_token, output_prob = get_actual_model_output(model_output, tokenizer)
actual_output = {"token": output_token, "probability": output_prob}
# Compute global top 5 tokens with merged probabilities
global_top5_tokens = compute_global_top5_tokens(model_output, tokenizer, top_k=5)
except Exception as e:
print(f"Warning: Could not extract model output: {e}")
# --- Per-position top-5 for the output scrubber ---
per_position_top5 = []
prompt_token_count = None
generated_tokens = []
if original_prompt is not None:
prompt_ids = tokenizer(original_prompt, return_tensors="pt")["input_ids"]
prompt_token_count = prompt_ids.shape[1]
seq_len = inputs["input_ids"].shape[1]
num_generated = seq_len - prompt_token_count
if num_generated > 0:
# Attach input_ids to model_output so compute_per_position_top5
# can look up the actual token at each position.
model_output.input_ids = inputs["input_ids"]
per_position_top5 = compute_per_position_top5(
model_output, tokenizer, prompt_token_count, top_k=5
)
# Decode each generated token individually for slider marks
full_ids = inputs["input_ids"][0].tolist()
generated_tokens = [
tokenizer.decode([full_ids[prompt_token_count + i]], skip_special_tokens=False)
for i in range(num_generated)
]
# Build output dictionary
# Pre-decode tokens so downstream code doesn't need the tokenizer
decoded_tokens = [tokenizer.decode([tid]) for tid in inputs["input_ids"][0].tolist()]
result = {
"model": getattr(model.config, "name_or_path", "unknown"),
"prompt": prompt,
"input_ids": safe_to_serializable(inputs["input_ids"]),
"tokens": decoded_tokens,
"attention_modules": list(attention_outputs.keys()),
"attention_outputs": attention_outputs,
"block_modules": list(block_outputs.keys()),
"block_outputs": block_outputs,
"norm_parameters": norm_parameters,
"norm_data": norm_data,
"actual_output": actual_output,
"global_top5_tokens": global_top5_tokens,
"per_position_top5": per_position_top5,
"prompt_token_count": prompt_token_count,
"generated_tokens": generated_tokens,
"original_prompt": original_prompt,
# Model config so pipeline doesn't need to reload the model
"model_config": {
"hidden_size": model.config.hidden_size,
"num_attention_heads": model.config.num_attention_heads,
"num_hidden_layers": model.config.num_hidden_layers,
"intermediate_size": getattr(model.config, 'intermediate_size', model.config.hidden_size * 4),
},
}
print(f"Captured {len(captured)} module outputs using PyVene")
return result
def execute_forward_pass_with_head_ablation(model, tokenizer, prompt: str, config: Dict[str, Any],
ablate_layer_num: int, ablate_head_indices: List[int]) -> Dict[str, Any]:
"""
Execute forward pass with specific attention heads zeroed out.
Args:
model: Loaded transformer model
tokenizer: Loaded tokenizer
prompt: Input text prompt
config: Dict with module lists like {"attention_modules": [...], "block_modules": [...], ...}
ablate_layer_num: Layer number containing heads to ablate
ablate_head_indices: List of head indices to zero out (e.g., [0, 2, 5])
Returns:
JSON-serializable dict with captured activations (with ablated heads)
"""
print(f"Executing forward pass with head ablation: Layer {ablate_layer_num}, Heads {ablate_head_indices}")
# Extract module lists from config
attention_modules = config.get("attention_modules", [])
block_modules = config.get("block_modules", [])
norm_parameters = config.get("norm_parameters", [])
logit_lens_parameter = config.get("logit_lens_parameter")
all_modules = attention_modules + block_modules
if not all_modules:
return {"error": "No modules specified"}
# Find the target attention module for the layer to ablate
target_attention_module = None
for mod_name in attention_modules:
layer_match = re.search(r'\.(\d+)(?:\.|$)', mod_name)
if layer_match and int(layer_match.group(1)) == ablate_layer_num:
target_attention_module = mod_name
break
if not target_attention_module:
return {"error": f"Could not find attention module for layer {ablate_layer_num}"}
# Prepare inputs
inputs = tokenizer(prompt, return_tensors="pt")
# Register hooks directly on the original model (avoids PyVene module renaming issues)
captured = {}
name_to_module = dict(model.named_modules())
def make_hook(mod_name: str):
return lambda module, inputs, output: captured.update({mod_name: {"output": safe_to_serializable(output)}})
# Create head ablation hook that both ablates and captures
def head_ablation_hook(module, input, output):
"""Zero out specific attention heads in the output AND capture it."""
ablated_output = output # Default to original output
if isinstance(output, tuple):
# Attention modules typically return (hidden_states, attention_weights, ...)
hidden_states = output[0] # [batch, seq_len, hidden_dim]
# Convert to tensor if needed
if not isinstance(hidden_states, torch.Tensor):
hidden_states = torch.tensor(hidden_states)
batch_size, seq_len, hidden_dim = hidden_states.shape
# Determine head dimension
# Assuming hidden_dim = num_heads * head_dim
# We need to get num_heads from the model config
num_heads = model.config.num_attention_heads
head_dim = hidden_dim // num_heads
# Reshape to [batch, seq_len, num_heads, head_dim]
hidden_states_reshaped = hidden_states.view(batch_size, seq_len, num_heads, head_dim)
# Zero out specified heads
for head_idx in ablate_head_indices:
if 0 <= head_idx < num_heads:
hidden_states_reshaped[:, :, head_idx, :] = 0.0
# Reshape back to [batch, seq_len, hidden_dim]
ablated_hidden = hidden_states_reshaped.view(batch_size, seq_len, hidden_dim)
# Reconstruct output tuple
if len(output) > 1:
ablated_output = (ablated_hidden,) + output[1:]
else:
ablated_output = (ablated_hidden,)
# Capture the ablated output (CRITICAL: this was missing!)
captured.update({target_attention_module: {"output": safe_to_serializable(ablated_output)}})
return ablated_output
# Register hooks
hooks = []
for mod_name in all_modules:
if mod_name in name_to_module:
if mod_name == target_attention_module:
# Apply head ablation hook
hooks.append(name_to_module[mod_name].register_forward_hook(head_ablation_hook))
else:
# Regular capture hook
hooks.append(name_to_module[mod_name].register_forward_hook(make_hook(mod_name)))
# Execute forward pass
with torch.no_grad():
model_output = model(**inputs, use_cache=False)
# Remove hooks
for hook in hooks:
hook.remove()
# Separate outputs by type
attention_outputs = {}
block_outputs = {}
for mod_name, output in captured.items():
if 'attn' in mod_name or 'attention' in mod_name:
attention_outputs[mod_name] = output
else:
block_outputs[mod_name] = output
# Capture normalization parameters
all_params = dict(model.named_parameters())
norm_data = [safe_to_serializable(all_params[p]) for p in norm_parameters if p in all_params]
# Extract predicted token from model output
actual_output = None
global_top5_tokens = []
try:
output_token, output_prob = get_actual_model_output(model_output, tokenizer)
actual_output = {"token": output_token, "probability": output_prob}
global_top5_tokens = compute_global_top5_tokens(model_output, tokenizer, top_k=5)
except Exception as e:
print(f"Warning: Could not extract model output: {e}")
# Build output dictionary
result = {
"model": getattr(model.config, "name_or_path", "unknown"),
"prompt": prompt,
"input_ids": safe_to_serializable(inputs["input_ids"]),
"attention_modules": list(attention_outputs.keys()),
"attention_outputs": attention_outputs,
"block_modules": list(block_outputs.keys()),
"block_outputs": block_outputs,
"norm_parameters": norm_parameters,
"norm_data": norm_data,
"actual_output": actual_output,
"global_top5_tokens": global_top5_tokens,
"ablated_layer": ablate_layer_num,
"ablated_heads": ablate_head_indices
}
return result
def execute_forward_pass_with_multi_layer_head_ablation(model, tokenizer, prompt: str, config: Dict[str, Any],
heads_by_layer: Dict[int, List[int]], original_prompt: Optional[str] = None) -> Dict[str, Any]:
"""
Execute forward pass with specific attention heads zeroed out across multiple layers simultaneously.
Args:
model: Loaded transformer model
tokenizer: Loaded tokenizer
prompt: Input text prompt
config: Dict with module lists like {"attention_modules": [...], "block_modules": [...], ...}
heads_by_layer: Dict mapping layer numbers to lists of head indices to ablate
e.g., {0: [1, 3], 2: [0, 5]} ablates heads 1,3 in layer 0 and heads 0,5 in layer 2
Returns:
JSON-serializable dict with captured activations (with all specified heads ablated)
"""
# Format ablation info for logging
ablation_info = ", ".join([f"L{layer}: H{heads}" for layer, heads in sorted(heads_by_layer.items())])
print(f"Executing forward pass with multi-layer head ablation: {ablation_info}")
# Handle empty heads_by_layer - just run normal forward pass
if not heads_by_layer:
from utils.model_patterns import execute_forward_pass
return execute_forward_pass(model, tokenizer, prompt, config)
# Extract module lists from config
attention_modules = config.get("attention_modules", [])
block_modules = config.get("block_modules", [])
norm_parameters = config.get("norm_parameters", [])
logit_lens_parameter = config.get("logit_lens_parameter")
all_modules = attention_modules + block_modules
if not all_modules:
return {"error": "No modules specified"}
# Build mapping from layer number to attention module name
layer_to_attention_module = {}
for mod_name in attention_modules:
layer_match = re.search(r'\.(\d+)(?:\.|$)', mod_name)
if layer_match:
layer_num = int(layer_match.group(1))
layer_to_attention_module[layer_num] = mod_name
# Find target attention modules for all layers to ablate
target_modules_to_heads = {} # module_name -> list of head indices
for layer_num, head_indices in heads_by_layer.items():
if layer_num in layer_to_attention_module:
mod_name = layer_to_attention_module[layer_num]
target_modules_to_heads[mod_name] = head_indices
else:
return {"error": f"Could not find attention module for layer {layer_num}"}
# Prepare inputs
inputs = tokenizer(prompt, return_tensors="pt")
# Register hooks directly on the original model (avoids PyVene module renaming issues)
captured = {}
name_to_module = dict(model.named_modules())
def make_hook(mod_name: str):
return lambda module, inputs, output: captured.update({mod_name: {"output": safe_to_serializable(output)}})
# Create parameterized head ablation hook factory
def make_head_ablation_hook(target_mod_name: str, ablate_head_indices: List[int]):
"""Create a hook that zeros out specific attention heads and captures the output."""
def head_ablation_hook(module, input, output):
ablated_output = output # Default to original output
if isinstance(output, tuple):
# Attention modules typically return (hidden_states, attention_weights, ...)
hidden_states = output[0] # [batch, seq_len, hidden_dim]
# Convert to tensor if needed
if not isinstance(hidden_states, torch.Tensor):
hidden_states = torch.tensor(hidden_states)
batch_size, seq_len, hidden_dim = hidden_states.shape
# Determine head dimension
num_heads = model.config.num_attention_heads
head_dim = hidden_dim // num_heads
# Reshape to [batch, seq_len, num_heads, head_dim]
hidden_states_reshaped = hidden_states.view(batch_size, seq_len, num_heads, head_dim)
# Zero out specified heads
for head_idx in ablate_head_indices:
if 0 <= head_idx < num_heads:
hidden_states_reshaped[:, :, head_idx, :] = 0.0
# Reshape back to [batch, seq_len, hidden_dim]
ablated_hidden = hidden_states_reshaped.view(batch_size, seq_len, hidden_dim)
# Reconstruct output tuple
if len(output) > 1:
# Check for attention weights (usually index 2 if output_attentions=True)
if len(output) > 2:
attn_weights = output[2] # [batch, heads, seq, seq]
if isinstance(attn_weights, torch.Tensor):
# Zero out specified heads in attention weights too
# Clone to avoid in-place modification errors if any
attn_weights_mod = attn_weights.clone()
for head_idx in ablate_head_indices:
if 0 <= head_idx < num_heads:
attn_weights_mod[:, head_idx, :, :] = 0.0
# Reconstruct tuple with modified weights
ablated_output = (ablated_hidden, output[1], attn_weights_mod) + output[3:]
else:
ablated_output = (ablated_hidden,) + output[1:]
else:
ablated_output = (ablated_hidden,) + output[1:]
else:
ablated_output = (ablated_hidden,)
# Capture the ablated output
captured.update({target_mod_name: {"output": safe_to_serializable(ablated_output)}})
return ablated_output
return head_ablation_hook
# Register hooks
hooks = []
for mod_name in all_modules:
if mod_name in name_to_module:
if mod_name in target_modules_to_heads:
# Apply head ablation hook for this module
head_indices = target_modules_to_heads[mod_name]
hooks.append(name_to_module[mod_name].register_forward_hook(
make_head_ablation_hook(mod_name, head_indices)
))
else:
# Regular capture hook
hooks.append(name_to_module[mod_name].register_forward_hook(make_hook(mod_name)))
# Execute forward pass
with torch.no_grad():
model_output = model(**inputs, use_cache=False, output_attentions=True)
# Remove hooks
for hook in hooks:
hook.remove()
# Separate outputs by type
attention_outputs = {}
block_outputs = {}
for mod_name, output in captured.items():
if 'attn' in mod_name or 'attention' in mod_name:
attention_outputs[mod_name] = output
else:
block_outputs[mod_name] = output
# Capture normalization parameters
all_params = dict(model.named_parameters())
norm_data = [safe_to_serializable(all_params[p]) for p in norm_parameters if p in all_params]
# Extract predicted token from model output
actual_output = None
global_top5_tokens = []
try:
output_token, output_prob = get_actual_model_output(model_output, tokenizer)
actual_output = {"token": output_token, "probability": output_prob}
global_top5_tokens = compute_global_top5_tokens(model_output, tokenizer, top_k=5)
except Exception as e:
print(f"Warning: Could not extract model output: {e}")
# Compute per-position top 5 if an original_prompt is provided
per_position_top5 = []
generated_tokens = []
prompt_token_count = 0
if original_prompt:
prompt_ids = tokenizer(original_prompt, return_tensors="pt")["input_ids"]
prompt_token_count = prompt_ids.shape[1]
seq_len = inputs["input_ids"].shape[1]
num_generated = seq_len - prompt_token_count
if num_generated > 0:
model_output.input_ids = inputs["input_ids"]
per_position_top5 = compute_per_position_top5(
model_output, tokenizer, prompt_token_count, top_k=5
)
full_ids = inputs["input_ids"][0].tolist()
generated_tokens = [
tokenizer.decode([full_ids[prompt_token_count + i]], skip_special_tokens=False)
for i in range(num_generated)
]
# Build output dictionary
# Pre-decode tokens so downstream code doesn't need the tokenizer
decoded_tokens = [tokenizer.decode([tid]) for tid in inputs["input_ids"][0].tolist()]
result = {
"model": getattr(model.config, "name_or_path", "unknown"),
"prompt": prompt,
"input_ids": safe_to_serializable(inputs["input_ids"]),
"tokens": decoded_tokens,
"attention_modules": list(attention_outputs.keys()),
"attention_outputs": attention_outputs,
"block_modules": list(block_outputs.keys()),
"block_outputs": block_outputs,
"norm_parameters": norm_parameters,
"norm_data": norm_data,
"actual_output": actual_output,
"global_top5_tokens": global_top5_tokens,
"ablated_heads_by_layer": heads_by_layer, # Include ablation info in result
"per_position_top5": per_position_top5,
"prompt_token_count": prompt_token_count,
"generated_tokens": generated_tokens,
"original_prompt": original_prompt,
# Model config so pipeline doesn't need to reload the model
"model_config": {
"hidden_size": model.config.hidden_size,
"num_attention_heads": model.config.num_attention_heads,
"num_hidden_layers": model.config.num_hidden_layers,
"intermediate_size": getattr(model.config, 'intermediate_size', model.config.hidden_size * 4),
},
}
return result
def evaluate_sequence_ablation(model, tokenizer, sequence_text: str, config: Dict[str, Any],
ablation_type: str, ablation_target: Any) -> Dict[str, Any]:
"""
Evaluate the impact of ablation on a full sequence.
This runs TWO forward passes on the FULL sequence:
1. Reference pass (original model) -> Capture logits/probs
2. Ablated pass (modified model) -> Capture logits/probs
Then computes metrics: KL Divergence, Target Prob Changes.
Args:
model: Loaded transformer model
tokenizer: Tokenizer
sequence_text: The full text sequence to evaluate
config: Module configuration (needed for ablation setup)
ablation_type: 'head' or 'layer'
ablation_target: tuple (layer, head_indices) or int (layer_num)
Returns:
Dict with evaluation metrics.
"""
from .ablation_metrics import compute_kl_divergence, get_token_probability_deltas
print(f"Evaluating sequence ablation: Type={ablation_type}, Target={ablation_target}")
inputs = tokenizer(sequence_text, return_tensors="pt")
input_ids = inputs["input_ids"].to(model.device)
# --- 1. Reference Pass ---
with torch.no_grad():
outputs_ref = model(input_ids)
logits_ref = outputs_ref.logits # [1, seq_len, vocab_size]
# --- 2. Ablated Pass ---
# Setup ablation based on type
# We need to wrap the model using PyVene logic or custom hooks just for this pass
# Since we already have logic in execute_forward_pass_with_..._ablation, we can reuse the Hook logic
# But we want the full logits, not just captured activations.
# Let's manually register hooks here for simplicity and control
hooks = []
def head_ablation_hook_factory(layer_idx, head_indices):
def hook(module, input, output):
# output is (hidden_states, ...) or hidden_states
if isinstance(output, tuple):
hidden_states = output[0]
else:
hidden_states = output
# Assume hidden_states is [batch, seq, hidden]
# Reshape, zero out heads, Reshape back
if not isinstance(hidden_states, torch.Tensor):
if isinstance(hidden_states, list): hidden_states = torch.tensor(hidden_states)
# Move to device if needed? They should be on device.
num_heads = model.config.num_attention_heads
head_dim = hidden_states.shape[-1] // num_heads
# view: [batch, seq, heads, dim]
new_shape = hidden_states.shape[:-1] + (num_heads, head_dim)
reshaped = hidden_states.view(new_shape)
# Create mask or just zero out
# We can't modify in place securely with autograd usually, but here no_grad is on.
# Clone to be safe
reshaped = reshaped.clone()
for h_idx in head_indices:
reshaped[..., h_idx, :] = 0
ablated_hidden = reshaped.view(hidden_states.shape)
if isinstance(output, tuple):
return (ablated_hidden,) + output[1:]
return ablated_hidden
return hook
# Hook for Layer Ablation (Identity/Skip or Zero)
# We'll use Identity (Skip Layer) as a simpler approximation of "removing logic"
# OR Mean Ablation if we had the mean.
# For now, let's just do nothing for layer ablation or return error,
# as the user primarily asks for "ablation experiment updates" which often means Heads.
# But to be safe, let's implement the same Mean Ablation if possible, or Identity.
# Identity (Skip) is easier:
def identity_hook(module, input, output):
# input is tuple (hidden_states, ...)
return input if isinstance(input, tuple) else (input,)
try:
if ablation_type == 'head':
layer_num, head_indices = ablation_target
# Find module
# Standard transformers: model.layers[i].self_attn
# We need the exact module name map standard to HuggingFace
# Or use the config's mapping if available.
# Let's rely on standard naming or search
# Simple heuristic: find 'layers.X.self_attn' or 'h.X.attn'
target_module = None
for name, mod in model.named_modules():
# Check for standard patterns
# layer_num is int
if f"layers.{layer_num}.self_attn" in name or f"h.{layer_num}.attn" in name or f"blocks.{layer_num}.attn" in name:
if "k_proj" not in name and "v_proj" not in name and "q_proj" not in name: # avoid submodules
target_module = mod
break
if target_module:
hooks.append(target_module.register_forward_hook(head_ablation_hook_factory(layer_num, head_indices)))
else:
print(f"Warning: Could not find attention module for layer {layer_num}")
elif ablation_type == 'layer':
layer_num = ablation_target
target_module = None
for name, mod in model.named_modules():
# Layers are usually 'model.layers.X' or 'transformer.h.X'
# We want the module that corresponds to the layer block
# Be careful not to pick 'layers.X.mlp'
if (f"layers.{layer_num}" in name or f"h.{layer_num}" in name) and name.count('.') <= 2: # heuristic for top-level layer
target_module = mod
break
if target_module:
# Skip layer (Identity)
hooks.append(target_module.register_forward_hook(lambda m, i, o: i[0] if isinstance(i, tuple) else i))
# Run Ablated Pass
with torch.no_grad():
outputs_abl = model(input_ids)
logits_abl = outputs_abl.logits
finally:
for hook in hooks:
hook.remove()
# --- 3. Compute Metrics ---
# KL Divergence [seq_len]
kl_div = compute_kl_divergence(logits_ref, logits_abl)
# Prob Deltas for actual tokens [seq_len-1] (shifted)
prob_deltas = get_token_probability_deltas(logits_ref, logits_abl, input_ids)
return {
"kl_divergence": kl_div,
"probability_deltas": prob_deltas,
"tokens": [tokenizer.decode([tid]) for tid in input_ids[0].tolist()]
}
def _prepare_hidden_state(layer_output: Any) -> torch.Tensor:
"""Helper to convert layer output to tensor, handling tuple outputs."""
# Handle PyVene captured tuple outputs where 2nd element is None (e.g. use_cache=False)
if isinstance(layer_output, (list, tuple)) and len(layer_output) > 1 and layer_output[1] is None:
layer_output = layer_output[0]
hidden = torch.tensor(layer_output) if not isinstance(layer_output, torch.Tensor) else layer_output
if hidden.dim() == 4:
hidden = hidden.squeeze(0)
return hidden
def logit_lens_transformation(layer_output: Any, norm_data: List[Any], model, tokenizer, norm_parameter: Optional[str] = None, top_k: int = 5) -> List[Tuple[str, float]]:
"""
Transform layer output to top K token probabilities using logit lens.
Returns merged probabilities (tokens with/without leading space are combined).
For standard logit lens, use block/layer outputs (residual stream), not component outputs.
The residual stream contains the full hidden state with all accumulated information.
Applies final layer normalization before projection (critical for correctness).
Uses model's built-in functions to minimize computational errors.
Args:
layer_output: Hidden state from any layer (preferably block output / residual stream)
norm_data: Not used (deprecated - using model's norm layer directly)
model: HuggingFace model
tokenizer: Tokenizer for decoding
norm_parameter: Parameter path for final norm layer (e.g., "model.norm.weight")
top_k: Number of top tokens to return (default: 5)
Returns:
List of (token_string, probability) tuples for top K tokens with merged probabilities
"""
with torch.no_grad():
# Convert to tensor and ensure proper shape [batch, seq_len, hidden_dim]
hidden = _prepare_hidden_state(layer_output)
# Step 1: Apply final layer normalization (critical for intermediate layers)
final_norm = get_norm_layer_from_parameter(model, norm_parameter)
if final_norm is not None:
hidden = final_norm(hidden)
# Step 2: Project to vocab space using model's lm_head
lm_head = model.get_output_embeddings()
logits = lm_head(hidden)
# Step 3: Get probabilities via softmax
probs = F.softmax(logits[0, -1, :], dim=-1)
# Step 4: Extract top candidates (get 2x top_k to account for merging)
top_probs, top_indices = torch.topk(probs, k=min(top_k * 2, len(probs)))
candidates = [
(tokenizer.decode([idx.item()], skip_special_tokens=False), prob.item())
for idx, prob in zip(top_indices, top_probs)
]
# Step 5: Merge tokens with/without leading space
merged = merge_token_probabilities(candidates)
return merged[:top_k]
def get_norm_layer_from_parameter(model, norm_parameter: Optional[str]) -> Optional[Any]:
"""
Get the final layer normalization module from the model using the norm parameter path.
Args:
model: The transformer model
norm_parameter: Parameter path (e.g., "model.norm.weight") or None
Returns:
The normalization layer module, or None if not found
"""
if norm_parameter:
# Convert parameter path to module path (remove .weight/.bias suffix)
module_path = norm_parameter.replace('.weight', '').replace('.bias', '')
try:
parts = module_path.split('.')
obj = model
for part in parts:
obj = getattr(obj, part)
return obj
except AttributeError:
print(f"Warning: Could not find norm layer at {module_path}")
# Fallback: Try common final norm layer names if no parameter specified
for attr_path in ['model.norm', 'transformer.ln_f', 'model.decoder.final_layer_norm',
'gpt_neox.final_layer_norm', 'transformer.norm_f']:
try:
parts = attr_path.split('.')
obj = model
for part in parts:
obj = getattr(obj, part)
return obj
except AttributeError:
continue
return None
def _get_token_probabilities_for_layer(activation_data: Dict[str, Any], module_name: str,
model, tokenizer, target_tokens: List[str]) -> Dict[str, float]:
"""
Get probabilities for specific tokens at a given layer.
Args:
activation_data: Activation data from forward pass
module_name: Layer module name
model: Transformer model
tokenizer: Tokenizer
target_tokens: List of token strings to get probabilities for
Returns:
Dict mapping token -> probability (merged for variants with/without space)
"""
try:
if module_name not in activation_data.get('block_outputs', {}):
return {}
layer_output = activation_data['block_outputs'][module_name]['output']
norm_params = activation_data.get('norm_parameters', [])
norm_parameter = norm_params[0] if norm_params else None
final_norm = get_norm_layer_from_parameter(model, norm_parameter)
lm_head = model.get_output_embeddings()
with torch.no_grad():
hidden = _prepare_hidden_state(layer_output)
if final_norm is not None:
hidden = final_norm(hidden)
logits = lm_head(hidden)
probs = F.softmax(logits[0, -1, :], dim=-1)
# For each target token, get probabilities for both variants (with/without space)
token_probs = {}
for token in target_tokens:
# Try both variants and sum probabilities
variants = [token, ' ' + token]
total_prob = 0.0
for variant in variants:
token_ids = tokenizer.encode(variant, add_special_tokens=False)
if token_ids:
tid = token_ids[-1] # Use last sub-token
total_prob += probs[tid].item()
token_probs[token] = total_prob
return token_probs
except Exception as e:
print(f"Warning: Could not compute token probabilities for {module_name}: {e}")
return {}
def _get_top_tokens(activation_data: Dict[str, Any], module_name: str, model, tokenizer, top_k: int = 5) -> Optional[List[Tuple[str, float]]]:
"""
Helper: Get top K tokens for a layer's block output.
Uses block outputs (residual stream) which represent the full hidden state
after all layer computations (attention + feedforward + residuals).
"""
try:
# Get block output (residual stream)
if module_name not in activation_data.get('block_outputs', {}):
return None
layer_output = activation_data['block_outputs'][module_name]['output']
# Get norm parameter from activation data (should be a single parameter or list with one item)
norm_params = activation_data.get('norm_parameters', [])
norm_parameter = norm_params[0] if norm_params else None
return logit_lens_transformation(layer_output, [], model, tokenizer, norm_parameter, top_k=top_k)
except Exception as e:
print(f"Warning: Could not compute logit lens for {module_name}: {e}")
return None
def detect_significant_probability_increases(layer_wise_probs: Dict[int, Dict[str, float]],
layer_wise_deltas: Dict[int, Dict[str, float]],
actual_output_token: str,
threshold: float = 1.0) -> List[int]:
"""
Detect layers where the actual output token has significant probability increase.
A layer is significant if the actual output token has ≥100% relative increase from previous layer.
Example: 0.20 → 0.40 is (0.40-0.20)/0.20 = 100% increase.
This threshold highlights layers where the model's confidence in the actual output
doubles, representing a pedagogically significant shift in the prediction.
Args:
layer_wise_probs: Dict mapping layer_num → {token: prob}
layer_wise_deltas: Dict mapping layer_num → {token: delta}
actual_output_token: The token that the model actually outputs (predicted token)
threshold: Relative increase threshold (default: 1.0 = 100%)
Returns:
List of layer numbers with significant increases in the actual output token
"""
significant_layers = []
for layer_num in sorted(layer_wise_probs.keys()):
probs = layer_wise_probs[layer_num]
deltas = layer_wise_deltas.get(layer_num, {})
# Only check the actual output token
if actual_output_token in probs:
prob = probs[actual_output_token]
delta = deltas.get(actual_output_token, 0.0)
prev_prob = prob - delta
# Check for significant relative increase (avoid division by zero)
if prev_prob > 1e-6 and delta > 0:
relative_increase = delta / prev_prob
if relative_increase >= threshold:
significant_layers.append(layer_num)
return significant_layers
def extract_layer_data(activation_data: Dict[str, Any], model, tokenizer) -> List[Dict[str, Any]]:
"""
Extract layer-by-layer data for accordion display with top-5, deltas, and attention.
Also tracks global top 5 tokens across all layers.
Returns:
List of dicts with: layer_num, top_token, top_prob, top_5_tokens, deltas,
global_top5_probs, global_top5_deltas
"""
layer_modules = activation_data.get('block_modules', [])
if not layer_modules:
return []
# Debug: Check if attention outputs are present
attention_outputs = activation_data.get('attention_outputs', {})
print(f"DEBUG extract_layer_data: Found {len(attention_outputs)} attention modules")
# Extract and sort layers by layer number
layer_info = sorted(
[(int(re.findall(r'\d+', name)[0]), name)
for name in layer_modules if re.findall(r'\d+', name)]
)
# Check if we can compute token predictions (requires block_outputs and norm_parameters)
# Note: Previously, this checked for logit_lens_parameter, but that parameter is not actually
# needed for computing predictions. The _get_top_tokens function only needs block_outputs
# and norm_parameters to work correctly.
has_block_outputs = bool(activation_data.get('block_outputs', {}))
has_norm_params = bool(activation_data.get('norm_parameters', []))
can_compute_predictions = has_block_outputs and has_norm_params
# Get global top 5 tokens from final output
global_top5_tokens = activation_data.get('global_top5_tokens', [])
# Handle both dicts (new format) and tuples (legacy)
if global_top5_tokens and isinstance(global_top5_tokens[0], dict):
global_top5_token_names = [t.get('token') for t in global_top5_tokens]
else:
global_top5_token_names = [token for token, _ in global_top5_tokens]
layer_data = []
prev_token_probs = {} # Track previous layer's token probabilities (layer's own top 5)
prev_global_probs = {} # Track previous layer's global top 5 probabilities
for layer_num, module_name in layer_info:
top_tokens = _get_top_tokens(activation_data, module_name, model, tokenizer, top_k=5) if can_compute_predictions else None
# Get probabilities for global top 5 tokens at this layer
global_top5_probs = {}
global_top5_deltas = {}
if can_compute_predictions and global_top5_token_names:
global_top5_probs = _get_token_probabilities_for_layer(
activation_data, module_name, model, tokenizer, global_top5_token_names
)
# Compute deltas for global top 5
for token in global_top5_token_names:
current_prob = global_top5_probs.get(token, 0.0)
prev_prob = prev_global_probs.get(token, 0.0)
global_top5_deltas[token] = current_prob - prev_prob
if top_tokens:
top_token, top_prob = top_tokens[0]
# Compute deltas vs previous layer (for layer's own top 5)
deltas = {}
for token, prob in top_tokens:
prev_prob = prev_token_probs.get(token, 0.0)
deltas[token] = prob - prev_prob
layer_data.append({
'layer_num': layer_num,
'module_name': module_name,
'top_token': top_token,
'top_prob': top_prob,
'top_3_tokens': top_tokens[:3], # Keep for backward compatibility
'top_5_tokens': top_tokens[:5], # New: top-5 for bar chart
'deltas': deltas,
'global_top5_probs': global_top5_probs, # New: global top 5 probs at this layer
'global_top5_deltas': global_top5_deltas # New: global top 5 deltas
})
# Update previous layer probabilities
prev_token_probs = {token: prob for token, prob in top_tokens}
prev_global_probs = global_top5_probs.copy()
else:
layer_data.append({
'layer_num': layer_num,
'module_name': module_name,
'top_token': None,
'top_prob': None,
'top_3_tokens': [],
'top_5_tokens': [],
'deltas': {},
'global_top5_probs': {},
'global_top5_deltas': {}
})
prev_global_probs = {}
return layer_data
def generate_bertviz_model_view_html(activation_data: Dict[str, Any]) -> str:
"""
Generate BertViz model view HTML.
Shows a comprehensive view of attention across all layers and heads.
Args:
activation_data: Output from execute_forward_pass
Returns:
HTML string for the visualization
"""
try:
from bertviz import model_view
from transformers import AutoTokenizer
# Extract attention modules and sort by layer
attention_outputs = activation_data.get('attention_outputs', {})
if not attention_outputs:
return f"<p>No attention data available</p>"
# Sort attention modules by layer number
layer_attention_pairs = []
for module_name in attention_outputs.keys():
numbers = re.findall(r'\d+', module_name)
if numbers:
layer_num = int(numbers[0])
attention_output = attention_outputs[module_name]['output']
if isinstance(attention_output, list) and len(attention_output) >= 2:
# Get attention weights (element 1 of the output tuple)
attention_weights = torch.tensor(attention_output[1]) # [batch, heads, seq, seq]
layer_attention_pairs.append((layer_num, attention_weights))
if not layer_attention_pairs:
return f"<p>No valid attention data found</p>"
# Sort by layer number and extract attention tensors
layer_attention_pairs.sort(key=lambda x: x[0])
attentions = tuple(attn for _, attn in layer_attention_pairs)
# Get tokens
input_ids = torch.tensor(activation_data['input_ids'])
model_name = activation_data.get('model', 'unknown')
# Load tokenizer and convert to tokens
tokenizer = AutoTokenizer.from_pretrained(model_name)
raw_tokens = tokenizer.convert_ids_to_tokens(input_ids[0])
# Clean up tokens (remove special tokenizer artifacts like Ġ for GPT-2)
tokens = [token.replace('Ġ', ' ') if token.startswith('Ġ') else token for token in raw_tokens]
# Generate model_view
html_result = model_view(attentions, tokens, html_action='return')
return html_result.data if hasattr(html_result, 'data') else str(html_result)
except Exception as e:
import traceback
traceback.print_exc()
return f"<p>Error generating visualization: {str(e)}</p>"
def generate_bertviz_html(activation_data: Dict[str, Any], layer_index: int, view_type: str = 'full') -> str:
"""
Generate BertViz attention visualization HTML using head_view.
Uses head_view for a less overwhelming display that lets users scroll through
individual attention heads. Shows all heads with layer/head selectors.
Args:
activation_data: Output from execute_forward_pass
layer_index: Index of layer to visualize (used for initial layer selection)
view_type: 'full' for complete visualization or 'mini' for preview
Returns:
HTML string for the visualization
"""
try:
from bertviz import head_view
from transformers import AutoTokenizer
# Extract attention modules and sort by layer
attention_outputs = activation_data.get('attention_outputs', {})
if not attention_outputs:
return f"<p>No attention data available</p>"
# Sort attention modules by layer number
layer_attention_pairs = []
for module_name in attention_outputs.keys():
numbers = re.findall(r'\d+', module_name)
if numbers:
layer_num = int(numbers[0])
attention_output = attention_outputs[module_name]['output']
if isinstance(attention_output, list) and len(attention_output) >= 2:
# Get attention weights (element 1 of the output tuple)
attention_weights = torch.tensor(attention_output[1]) # [batch, heads, seq, seq]
layer_attention_pairs.append((layer_num, attention_weights))
if not layer_attention_pairs:
return f"<p>No valid attention data found</p>"
# Sort by layer number and extract attention tensors
layer_attention_pairs.sort(key=lambda x: x[0])
attentions = tuple(attn for _, attn in layer_attention_pairs)
# Get tokens
input_ids = torch.tensor(activation_data['input_ids'])
model_name = activation_data.get('model', 'unknown')
# Load tokenizer and convert to tokens
tokenizer = AutoTokenizer.from_pretrained(model_name)
raw_tokens = tokenizer.convert_ids_to_tokens(input_ids[0])
# Clean up tokens (remove special tokenizer artifacts like Ġ for GPT-2)
tokens = [token.replace('Ġ', ' ') if token.startswith('Ġ') else token for token in raw_tokens]
# Generate visualization based on view_type
if view_type == 'mini':
# Mini version: simplified HTML preview
return f"""
<div style="padding:10px; border:1px solid #ccc; border-radius:5px;">
<h4>Layer {layer_index} Attention Preview</h4>
<p><strong>Tokens:</strong> {' '.join(tokens[:8])}{'...' if len(tokens) > 8 else ''}</p>
<p><strong>Total Layers:</strong> {len(attentions)}</p>
<p><strong>Heads per Layer:</strong> {attentions[0].shape[1] if attentions else 'N/A'}</p>
<p><em>Click for full head_view visualization</em></p>
</div>
"""
else:
# Full version: BertViz head_view (less overwhelming, scrollable heads)
from utils.colors import BERTVIZ_HEAD_COLORS
html_result = head_view(attentions, tokens, html_action='return')
html_str = html_result.data if hasattr(html_result, 'data') else str(html_result)
# Patch BertViz color scheme to match our swatch palette (no collisions for ≤16 heads)
_colors_js = repr(BERTVIZ_HEAD_COLORS).replace("'", '"') # JSON-safe array literal
_patch = f"headColors = d3.scaleOrdinal({_colors_js});"
html_str = html_str.replace(
'headColors = d3.scaleOrdinal(d3.schemeCategory10);',
_patch
)
html_str = html_str.replace(
'headColors = d3.scale.category10();',
_patch
)
# Inject head-index labels inside the checkbox swatches.
# Target the first `updateCheckboxes();` call inside drawCheckboxes
# and prepend D3 code that appends <text> elements over each rect.
_label_js = (
'checkboxContainer.selectAll("text")\n'
' .data(config.headVis)\n'
' .enter()\n'
' .append("text")\n'
' .text((d, i) => i)\n'
' .attr("x", (d, i) => i * CHECKBOX_SIZE + CHECKBOX_SIZE / 2)\n'
' .attr("y", top + CHECKBOX_SIZE / 2)\n'
' .attr("text-anchor", "middle")\n'
' .attr("dominant-baseline", "central")\n'
' .attr("font-size", "10px")\n'
' .attr("font-weight", "bold")\n'
' .attr("fill", "white")\n'
' .attr("pointer-events", "none");\n'
' updateCheckboxes();'
)
html_str = html_str.replace(
'updateCheckboxes();\n\n checkbox.on',
_label_js + '\n\n checkbox.on',
1, # replace only the first occurrence
)
return html_str
except Exception as e:
import traceback
traceback.print_exc()
return f"<p>Error generating visualization: {str(e)}</p>"
|