#!/usr/bin/env python3 """ janus4_temporal_diff.py — Janus 4-way attention reference implementation. 4th attention mechanism: Temporal Diff — attends to CHANGES between positions. Based on Variant 4 (dedicated wtd+wvd) + Opus analysis fixes: - Removed distance decay (RoPE handles this in full nanochat) - Gate init biased against delta (-1.0) — model discovers if/when to use it - Dedicated projections (no weight sharing with QKV or RRPRAM) Architecture: QKV (semantic) + RRPRAM (positional) + Echo (self-resonance) + TemporalDiff (change detection) What TemporalDiff captures that others don't: - QKV with RoPE: encodes distance between positions, not content of change - RRPRAM: positional patterns, not transitions - Echo: self-similarity, not change rate - TemporalDiff: "where did representation change, and do changes correlate?" Pure Python, zero deps. For reference/testing. Production = C or PyTorch. By Arianna Method, 2026-03-25. """ import argparse import math import random VOCAB = 256 MAX_T = 48 DIM = 48 HEADS = 4 HD = DIM // HEADS def bpe_encode(text): return list(text.encode('utf-8', errors='ignore')) def bpe_decode(ids): return bytes([i % 256 for i in ids]).decode('utf-8', errors='ignore') def rand_mat(r, c, s=0.02): return [[(random.random() * 2 - 1) * s for _ in range(c)] for _ in range(r)] def vec_mat(v, m): out = [0.0] * len(m[0]) for i, vi in enumerate(v): row = m[i] for j in range(len(out)): out[j] += vi * row[j] return out def softmax(xs): mx = max(xs) ex = [math.exp(x - mx) for x in xs] s = sum(ex) + 1e-9 return [x / s for x in ex] class Janus4: def __init__(self): self.tok = rand_mat(VOCAB, DIM) self.pos = rand_mat(MAX_T, DIM) # QKV — semantic attention self.wq = rand_mat(DIM, DIM) self.wk = rand_mat(DIM, DIM) self.wv = rand_mat(DIM, DIM) # RRPRAM — positional resonance self.wr = rand_mat(DIM, MAX_T) self.wvr = rand_mat(DIM, DIM) # Echo — self-resonance (W^T * W) self.wj = rand_mat(DIM, DIM) # Temporal Diff — dedicated projections (NOT shared with QKV/RRPRAM) self.wtd = rand_mat(DIM, DIM) # delta key projection self.wvd = rand_mat(DIM, DIM) # delta value projection # 4-way gate — delta starts suppressed (Opus recommendation) self.gate = [0.0, 0.0, 0.0, -1.0] # QKV, RRPRAM, Echo, TemporalDiff # Output self.out = rand_mat(DIM, VOCAB) self.bias = [0.0] * VOCAB def _dot(self, a, b): return sum(x * y for x, y in zip(a, b)) def _head(self, v, h): return v[h * HD:(h + 1) * HD] def forward(self, ids): T = len(ids) x = [[self.tok[ids[t]][e] + self.pos[t][e] for e in range(DIM)] for t in range(T)] # Precompute all projections q = [vec_mat(x[t], self.wq) for t in range(T)] k = [vec_mat(x[t], self.wk) for t in range(T)] v = [vec_mat(x[t], self.wv) for t in range(T)] rv = [vec_mat(x[t], self.wvr) for t in range(T)] je = [vec_mat(x[t], self.wj) for t in range(T)] # Temporal diff: delta of input dx = [[0.0] * DIM for _ in range(T)] for t in range(1, T): for e in range(DIM): dx[t][e] = x[t][e] - x[t - 1][e] # Dedicated projections for delta (not shared!) dk = [vec_mat(dx[t], self.wtd) for t in range(T)] # delta keys dv = [vec_mat(dx[t], self.wvd) for t in range(T)] # delta values g = softmax(self.gate) cat = [[0.0] * DIM for _ in range(T)] for h in range(HEADS): # 1) QKV attention — semantic content matching a1 = [[-1e9] * T for _ in range(T)] for i in range(T): qi = self._head(q[i], h) for j in range(i + 1): a1[i][j] = self._dot(qi, self._head(k[j], h)) / math.sqrt(HD) a1[i] = softmax(a1[i]) ho = [[0.0] * HD for _ in range(T)] for i in range(T): for j in range(T): vv = self._head(v[j], h) for d in range(HD): ho[i][d] += a1[i][j] * vv[d] # 2) RRPRAM — positional resonance a2 = [[-1e9] * T for _ in range(T)] for i in range(T): for j in range(i + 1): a2[i][j] = sum(x[i][e] * self.wr[e][j] for e in range(DIM)) / math.sqrt(HD) a2[i] = softmax(a2[i]) ro = [[0.0] * HD for _ in range(T)] for i in range(T): for j in range(T): rvh = self._head(rv[j], h) for d in range(HD): ro[i][d] += a2[i][j] * rvh[d] # 3) Echo — self-resonance (W^T * W) a3 = [[-1e9] * T for _ in range(T)] for i in range(T): ei = self._head(je[i], h) for j in range(i + 1): a3[i][j] = self._dot(ei, self._head(je[j], h)) / math.sqrt(HD) a3[i] = softmax(a3[i]) jo = [[0.0] * HD for _ in range(T)] for i in range(T): for j in range(T): ej = self._head(je[j], h) for d in range(HD): jo[i][d] += a3[i][j] * ej[d] # 4) Temporal Diff — change detection attention # No distance decay (Opus fix: RoPE handles this in full implementation) a4 = [[-1e9] * T for _ in range(T)] for i in range(T): dki = self._head(dk[i], h) for j in range(i + 1): a4[i][j] = self._dot(dki, self._head(dk[j], h)) / math.sqrt(HD) a4[i] = softmax(a4[i]) to = [[0.0] * HD for _ in range(T)] for i in range(T): for j in range(T): dvh = self._head(dv[j], h) for d in range(HD): to[i][d] += a4[i][j] * dvh[d] # Gate blend — 4-way softmax for t in range(T): base = h * HD for d in range(HD): cat[t][base + d] = (g[0] * ho[t][d] + g[1] * ro[t][d] + g[2] * jo[t][d] + g[3] * to[t][d]) logits = [[0.0] * VOCAB for _ in range(T)] for t in range(T): for vi in range(VOCAB): logits[t][vi] = sum(cat[t][e] * self.out[e][vi] for e in range(DIM)) + self.bias[vi] return logits, cat def train_step(self, tok, tgt, lr): logits, cat = self.forward(tok) loss = 0.0 grad = [[0.0] * VOCAB for _ in range(len(tok))] for t in range(len(tok)): p = softmax(logits[t]) loss -= math.log(max(1e-9, p[tgt[t]])) for vi in range(VOCAB): grad[t][vi] = p[vi] grad[t][tgt[t]] -= 1.0 loss /= len(tok) # Gradient on output layer only (reference impl) for t in range(len(tok)): for e in range(DIM): ce = cat[t][e] if ce == 0.0: continue row = self.out[e] for vi in range(VOCAB): row[vi] -= lr * ce * grad[t][vi] / len(tok) for vi in range(VOCAB): self.bias[vi] -= lr * grad[t][vi] / len(tok) return loss def generate(model, prompt, n=60): ids = bpe_encode(prompt)[-MAX_T:] for _ in range(n): logits, _ = model.forward(ids) p = softmax(logits[-1]) ids.append(max(range(VOCAB), key=lambda i: p[i])) ids = ids[-MAX_T:] return bpe_decode(ids) def train(model, text, steps, lr): ids = bpe_encode(text) losses = [] for step in range(1, steps + 1): off = random.randint(0, max(0, len(ids) - MAX_T - 2)) tok = ids[off:off + MAX_T] tgt = ids[off + 1:off + MAX_T + 1] losses.append(model.train_step(tok, tgt, lr)) if step % 10 == 0: print(f"step {step:4d}/{steps} loss={losses[-1]:.4f}") return losses if __name__ == '__main__': ap = argparse.ArgumentParser() ap.add_argument('--train', type=str) ap.add_argument('--steps', type=int, default=40) ap.add_argument('--lr', type=float, default=0.05) ap.add_argument('--generate', type=str) args = ap.parse_args() random.seed(42) m = Janus4() if args.train: txt = open(args.train, 'r', encoding='utf-8', errors='ignore').read() losses = train(m, txt, args.steps, args.lr) print(f'loss_start={losses[0]:.4f} loss_end={losses[-1]:.4f}') if args.generate: print(generate(m, args.generate))