| | """ |
| | Test the trained filename anomaly detector. |
| | Usage: python test_model.py |
| | """ |
| | import json, math, os |
| |
|
| | |
| | MODEL_PATH = 'model.json' |
| | assert os.path.exists(MODEL_PATH), f"No model found at {MODEL_PATH}!" |
| | with open(MODEL_PATH) as f: |
| | payload = json.load(f) |
| |
|
| | hp = payload['hyperparams'] |
| | n_embd = hp['n_embd'] |
| | n_head = hp['n_head'] |
| | n_layer = hp['n_layer'] |
| | block_size = hp['block_size'] |
| | head_dim = hp['head_dim'] |
| | uchars = payload['vocab'] |
| | vocab_size = payload['vocab_size'] |
| | weights = payload['weights'] |
| | BOS = vocab_size - 1 |
| | stoi = {ch: i for i, ch in enumerate(uchars)} |
| |
|
| | print(f"Model loaded: {n_embd}d, {n_head}h, {n_layer}L, vocab={vocab_size}") |
| |
|
| | |
| | def linear(x, w): |
| | return [sum(wi * xi for wi, xi in zip(wo, x)) for wo in w] |
| |
|
| | def rmsnorm(x): |
| | ms = sum(xi * xi for xi in x) / len(x) |
| | scale = (ms + 1e-5) ** -0.5 |
| | return [xi * scale for xi in x] |
| |
|
| | def softmax_floats(logits): |
| | m = max(logits) |
| | exps = [math.exp(v - m) for v in logits] |
| | s = sum(exps) |
| | return [e / s for e in exps] |
| |
|
| | def gpt_forward(token_id, pos_id, keys, values): |
| | tok_emb = weights['wte'][token_id] |
| | pos_emb = weights['wpe'][pos_id] |
| | x = [t + p for t, p in zip(tok_emb, pos_emb)] |
| | x = rmsnorm(x) |
| | for li in range(n_layer): |
| | x_res = x |
| | x = rmsnorm(x) |
| | q = linear(x, weights[f'layer{li}.attn_wq']) |
| | k = linear(x, weights[f'layer{li}.attn_wk']) |
| | v = linear(x, weights[f'layer{li}.attn_wv']) |
| | keys[li].append(k) |
| | values[li].append(v) |
| | x_attn = [] |
| | for h in range(n_head): |
| | hs = h * head_dim |
| | q_h = q[hs:hs+head_dim] |
| | k_h = [ki[hs:hs+head_dim] for ki in keys[li]] |
| | v_h = [vi[hs:hs+head_dim] for vi in values[li]] |
| | attn = [sum(q_h[j]*k_h[t][j] for j in range(head_dim)) / head_dim**0.5 for t in range(len(k_h))] |
| | aw = softmax_floats(attn) |
| | head_out = [sum(aw[t]*v_h[t][j] for t in range(len(v_h))) for j in range(head_dim)] |
| | x_attn.extend(head_out) |
| | x = linear(x_attn, weights[f'layer{li}.attn_wo']) |
| | x = [a + b for a, b in zip(x, x_res)] |
| | x_res = x |
| | x = rmsnorm(x) |
| | x = linear(x, weights[f'layer{li}.mlp_fc1']) |
| | x = [max(0, xi) for xi in x] |
| | x = linear(x, weights[f'layer{li}.mlp_fc2']) |
| | x = [a + b for a, b in zip(x, x_res)] |
| | return linear(x, weights['lm_head']) |
| |
|
| | def score_filename(name): |
| | """Return negative log-likelihood (lower = more normal).""" |
| | toks = [BOS] + [stoi[c] for c in name if c in stoi] + [BOS] |
| | if len(toks) > block_size + 1: |
| | toks = toks[:block_size + 1] |
| | keys = [[] for _ in range(n_layer)] |
| | vals = [[] for _ in range(n_layer)] |
| | total_nll = 0.0 |
| | for pos in range(len(toks) - 1): |
| | logits = gpt_forward(toks[pos], pos, keys, vals) |
| | probs = softmax_floats(logits) |
| | p = probs[toks[pos + 1]] |
| | total_nll += -math.log(p) if p > 0 else 1e6 |
| | return total_nll |
| |
|
| | |
| | normal_filenames = [ |
| | "acr_banner_spring25_enUS_v01.png", |
| | "acr_email_bf24_enGB_v02.jpg", |
| | "acr_video_demo_enUS_v01.mp4", |
| | "acr_logo_primary_enUS_v03.svg", |
| | "acr_report_fy24q4_enUS_v01.pdf", |
| | ] |
| |
|
| | anomalous_filenames = [ |
| | "DELETE_THIS_NOW.exe", |
| | "..hidden_config.bat", |
| | "photo_2024_vacation_IMG_3847.HEIC", |
| | "meeting notes final FINAL v2 (1).docx", |
| | "acr banner spring enUS v01.png", |
| | ] |
| |
|
| | print("\n--- Normal filenames (should have LOW NLL) ---") |
| | for fn in normal_filenames: |
| | nll = score_filename(fn) |
| | print(f" NLL {nll:7.2f} | {fn}") |
| |
|
| | print("\n--- Anomalous filenames (should have HIGH NLL) ---") |
| | for fn in anomalous_filenames: |
| | nll = score_filename(fn) |
| | print(f" NLL {nll:7.2f} | {fn}") |
| |
|