| |
| """ |
| janus4_temporal_diff.py β Janus 4-way attention reference implementation. |
| |
| 4th attention mechanism: Temporal Diff β attends to CHANGES between positions. |
| Based on Variant 4 (dedicated wtd+wvd) + Opus analysis fixes: |
| - Removed distance decay (RoPE handles this in full nanochat) |
| - Gate init biased against delta (-1.0) β model discovers if/when to use it |
| - Dedicated projections (no weight sharing with QKV or RRPRAM) |
| |
| Architecture: QKV (semantic) + RRPRAM (positional) + Echo (self-resonance) + TemporalDiff (change detection) |
| |
| What TemporalDiff captures that others don't: |
| - QKV with RoPE: encodes distance between positions, not content of change |
| - RRPRAM: positional patterns, not transitions |
| - Echo: self-similarity, not change rate |
| - TemporalDiff: "where did representation change, and do changes correlate?" |
| |
| Pure Python, zero deps. For reference/testing. Production = C or PyTorch. |
| |
| By Arianna Method, 2026-03-25. |
| """ |
|
|
| import argparse |
| import math |
| import random |
|
|
| VOCAB = 256 |
| MAX_T = 48 |
| DIM = 48 |
| HEADS = 4 |
| HD = DIM // HEADS |
|
|
|
|
| def bpe_encode(text): |
| return list(text.encode('utf-8', errors='ignore')) |
|
|
|
|
| def bpe_decode(ids): |
| return bytes([i % 256 for i in ids]).decode('utf-8', errors='ignore') |
|
|
|
|
| def rand_mat(r, c, s=0.02): |
| return [[(random.random() * 2 - 1) * s for _ in range(c)] for _ in range(r)] |
|
|
|
|
| def vec_mat(v, m): |
| out = [0.0] * len(m[0]) |
| for i, vi in enumerate(v): |
| row = m[i] |
| for j in range(len(out)): |
| out[j] += vi * row[j] |
| return out |
|
|
|
|
| def softmax(xs): |
| mx = max(xs) |
| ex = [math.exp(x - mx) for x in xs] |
| s = sum(ex) + 1e-9 |
| return [x / s for x in ex] |
|
|
|
|
| class Janus4: |
| def __init__(self): |
| self.tok = rand_mat(VOCAB, DIM) |
| self.pos = rand_mat(MAX_T, DIM) |
| |
| self.wq = rand_mat(DIM, DIM) |
| self.wk = rand_mat(DIM, DIM) |
| self.wv = rand_mat(DIM, DIM) |
| |
| self.wr = rand_mat(DIM, MAX_T) |
| self.wvr = rand_mat(DIM, DIM) |
| |
| self.wj = rand_mat(DIM, DIM) |
| |
| self.wtd = rand_mat(DIM, DIM) |
| self.wvd = rand_mat(DIM, DIM) |
| |
| self.gate = [0.0, 0.0, 0.0, -1.0] |
| |
| self.out = rand_mat(DIM, VOCAB) |
| self.bias = [0.0] * VOCAB |
|
|
| def _dot(self, a, b): |
| return sum(x * y for x, y in zip(a, b)) |
|
|
| def _head(self, v, h): |
| return v[h * HD:(h + 1) * HD] |
|
|
| def forward(self, ids): |
| T = len(ids) |
| x = [[self.tok[ids[t]][e] + self.pos[t][e] for e in range(DIM)] for t in range(T)] |
|
|
| |
| q = [vec_mat(x[t], self.wq) for t in range(T)] |
| k = [vec_mat(x[t], self.wk) for t in range(T)] |
| v = [vec_mat(x[t], self.wv) for t in range(T)] |
| rv = [vec_mat(x[t], self.wvr) for t in range(T)] |
| je = [vec_mat(x[t], self.wj) for t in range(T)] |
|
|
| |
| dx = [[0.0] * DIM for _ in range(T)] |
| for t in range(1, T): |
| for e in range(DIM): |
| dx[t][e] = x[t][e] - x[t - 1][e] |
|
|
| |
| dk = [vec_mat(dx[t], self.wtd) for t in range(T)] |
| dv = [vec_mat(dx[t], self.wvd) for t in range(T)] |
|
|
| g = softmax(self.gate) |
|
|
| cat = [[0.0] * DIM for _ in range(T)] |
| for h in range(HEADS): |
| |
| a1 = [[-1e9] * T for _ in range(T)] |
| for i in range(T): |
| qi = self._head(q[i], h) |
| for j in range(i + 1): |
| a1[i][j] = self._dot(qi, self._head(k[j], h)) / math.sqrt(HD) |
| a1[i] = softmax(a1[i]) |
| ho = [[0.0] * HD for _ in range(T)] |
| for i in range(T): |
| for j in range(T): |
| vv = self._head(v[j], h) |
| for d in range(HD): |
| ho[i][d] += a1[i][j] * vv[d] |
|
|
| |
| a2 = [[-1e9] * T for _ in range(T)] |
| for i in range(T): |
| for j in range(i + 1): |
| a2[i][j] = sum(x[i][e] * self.wr[e][j] for e in range(DIM)) / math.sqrt(HD) |
| a2[i] = softmax(a2[i]) |
| ro = [[0.0] * HD for _ in range(T)] |
| for i in range(T): |
| for j in range(T): |
| rvh = self._head(rv[j], h) |
| for d in range(HD): |
| ro[i][d] += a2[i][j] * rvh[d] |
|
|
| |
| a3 = [[-1e9] * T for _ in range(T)] |
| for i in range(T): |
| ei = self._head(je[i], h) |
| for j in range(i + 1): |
| a3[i][j] = self._dot(ei, self._head(je[j], h)) / math.sqrt(HD) |
| a3[i] = softmax(a3[i]) |
| jo = [[0.0] * HD for _ in range(T)] |
| for i in range(T): |
| for j in range(T): |
| ej = self._head(je[j], h) |
| for d in range(HD): |
| jo[i][d] += a3[i][j] * ej[d] |
|
|
| |
| |
| a4 = [[-1e9] * T for _ in range(T)] |
| for i in range(T): |
| dki = self._head(dk[i], h) |
| for j in range(i + 1): |
| a4[i][j] = self._dot(dki, self._head(dk[j], h)) / math.sqrt(HD) |
| a4[i] = softmax(a4[i]) |
| to = [[0.0] * HD for _ in range(T)] |
| for i in range(T): |
| for j in range(T): |
| dvh = self._head(dv[j], h) |
| for d in range(HD): |
| to[i][d] += a4[i][j] * dvh[d] |
|
|
| |
| for t in range(T): |
| base = h * HD |
| for d in range(HD): |
| cat[t][base + d] = (g[0] * ho[t][d] + g[1] * ro[t][d] + |
| g[2] * jo[t][d] + g[3] * to[t][d]) |
|
|
| logits = [[0.0] * VOCAB for _ in range(T)] |
| for t in range(T): |
| for vi in range(VOCAB): |
| logits[t][vi] = sum(cat[t][e] * self.out[e][vi] for e in range(DIM)) + self.bias[vi] |
| return logits, cat |
|
|
| def train_step(self, tok, tgt, lr): |
| logits, cat = self.forward(tok) |
| loss = 0.0 |
| grad = [[0.0] * VOCAB for _ in range(len(tok))] |
| for t in range(len(tok)): |
| p = softmax(logits[t]) |
| loss -= math.log(max(1e-9, p[tgt[t]])) |
| for vi in range(VOCAB): |
| grad[t][vi] = p[vi] |
| grad[t][tgt[t]] -= 1.0 |
| loss /= len(tok) |
| |
| for t in range(len(tok)): |
| for e in range(DIM): |
| ce = cat[t][e] |
| if ce == 0.0: |
| continue |
| row = self.out[e] |
| for vi in range(VOCAB): |
| row[vi] -= lr * ce * grad[t][vi] / len(tok) |
| for vi in range(VOCAB): |
| self.bias[vi] -= lr * grad[t][vi] / len(tok) |
| return loss |
|
|
|
|
| def generate(model, prompt, n=60): |
| ids = bpe_encode(prompt)[-MAX_T:] |
| for _ in range(n): |
| logits, _ = model.forward(ids) |
| p = softmax(logits[-1]) |
| ids.append(max(range(VOCAB), key=lambda i: p[i])) |
| ids = ids[-MAX_T:] |
| return bpe_decode(ids) |
|
|
|
|
| def train(model, text, steps, lr): |
| ids = bpe_encode(text) |
| losses = [] |
| for step in range(1, steps + 1): |
| off = random.randint(0, max(0, len(ids) - MAX_T - 2)) |
| tok = ids[off:off + MAX_T] |
| tgt = ids[off + 1:off + MAX_T + 1] |
| losses.append(model.train_step(tok, tgt, lr)) |
| if step % 10 == 0: |
| print(f"step {step:4d}/{steps} loss={losses[-1]:.4f}") |
| return losses |
|
|
|
|
| if __name__ == '__main__': |
| ap = argparse.ArgumentParser() |
| ap.add_argument('--train', type=str) |
| ap.add_argument('--steps', type=int, default=40) |
| ap.add_argument('--lr', type=float, default=0.05) |
| ap.add_argument('--generate', type=str) |
| args = ap.parse_args() |
|
|
| random.seed(42) |
| m = Janus4() |
| if args.train: |
| txt = open(args.train, 'r', encoding='utf-8', errors='ignore').read() |
| losses = train(m, txt, args.steps, args.lr) |
| print(f'loss_start={losses[0]:.4f} loss_end={losses[-1]:.4f}') |
| if args.generate: |
| print(generate(m, args.generate)) |
|
|