janus4 / janus /janus4_temporal_diff.py
ataeff's picture
Upload janus/janus4_temporal_diff.py with huggingface_hub
dd5b02b verified
#!/usr/bin/env python3
"""
janus4_temporal_diff.py β€” Janus 4-way attention reference implementation.
4th attention mechanism: Temporal Diff β€” attends to CHANGES between positions.
Based on Variant 4 (dedicated wtd+wvd) + Opus analysis fixes:
- Removed distance decay (RoPE handles this in full nanochat)
- Gate init biased against delta (-1.0) β€” model discovers if/when to use it
- Dedicated projections (no weight sharing with QKV or RRPRAM)
Architecture: QKV (semantic) + RRPRAM (positional) + Echo (self-resonance) + TemporalDiff (change detection)
What TemporalDiff captures that others don't:
- QKV with RoPE: encodes distance between positions, not content of change
- RRPRAM: positional patterns, not transitions
- Echo: self-similarity, not change rate
- TemporalDiff: "where did representation change, and do changes correlate?"
Pure Python, zero deps. For reference/testing. Production = C or PyTorch.
By Arianna Method, 2026-03-25.
"""
import argparse
import math
import random
VOCAB = 256
MAX_T = 48
DIM = 48
HEADS = 4
HD = DIM // HEADS
def bpe_encode(text):
return list(text.encode('utf-8', errors='ignore'))
def bpe_decode(ids):
return bytes([i % 256 for i in ids]).decode('utf-8', errors='ignore')
def rand_mat(r, c, s=0.02):
return [[(random.random() * 2 - 1) * s for _ in range(c)] for _ in range(r)]
def vec_mat(v, m):
out = [0.0] * len(m[0])
for i, vi in enumerate(v):
row = m[i]
for j in range(len(out)):
out[j] += vi * row[j]
return out
def softmax(xs):
mx = max(xs)
ex = [math.exp(x - mx) for x in xs]
s = sum(ex) + 1e-9
return [x / s for x in ex]
class Janus4:
def __init__(self):
self.tok = rand_mat(VOCAB, DIM)
self.pos = rand_mat(MAX_T, DIM)
# QKV β€” semantic attention
self.wq = rand_mat(DIM, DIM)
self.wk = rand_mat(DIM, DIM)
self.wv = rand_mat(DIM, DIM)
# RRPRAM β€” positional resonance
self.wr = rand_mat(DIM, MAX_T)
self.wvr = rand_mat(DIM, DIM)
# Echo β€” self-resonance (W^T * W)
self.wj = rand_mat(DIM, DIM)
# Temporal Diff β€” dedicated projections (NOT shared with QKV/RRPRAM)
self.wtd = rand_mat(DIM, DIM) # delta key projection
self.wvd = rand_mat(DIM, DIM) # delta value projection
# 4-way gate β€” delta starts suppressed (Opus recommendation)
self.gate = [0.0, 0.0, 0.0, -1.0] # QKV, RRPRAM, Echo, TemporalDiff
# Output
self.out = rand_mat(DIM, VOCAB)
self.bias = [0.0] * VOCAB
def _dot(self, a, b):
return sum(x * y for x, y in zip(a, b))
def _head(self, v, h):
return v[h * HD:(h + 1) * HD]
def forward(self, ids):
T = len(ids)
x = [[self.tok[ids[t]][e] + self.pos[t][e] for e in range(DIM)] for t in range(T)]
# Precompute all projections
q = [vec_mat(x[t], self.wq) for t in range(T)]
k = [vec_mat(x[t], self.wk) for t in range(T)]
v = [vec_mat(x[t], self.wv) for t in range(T)]
rv = [vec_mat(x[t], self.wvr) for t in range(T)]
je = [vec_mat(x[t], self.wj) for t in range(T)]
# Temporal diff: delta of input
dx = [[0.0] * DIM for _ in range(T)]
for t in range(1, T):
for e in range(DIM):
dx[t][e] = x[t][e] - x[t - 1][e]
# Dedicated projections for delta (not shared!)
dk = [vec_mat(dx[t], self.wtd) for t in range(T)] # delta keys
dv = [vec_mat(dx[t], self.wvd) for t in range(T)] # delta values
g = softmax(self.gate)
cat = [[0.0] * DIM for _ in range(T)]
for h in range(HEADS):
# 1) QKV attention β€” semantic content matching
a1 = [[-1e9] * T for _ in range(T)]
for i in range(T):
qi = self._head(q[i], h)
for j in range(i + 1):
a1[i][j] = self._dot(qi, self._head(k[j], h)) / math.sqrt(HD)
a1[i] = softmax(a1[i])
ho = [[0.0] * HD for _ in range(T)]
for i in range(T):
for j in range(T):
vv = self._head(v[j], h)
for d in range(HD):
ho[i][d] += a1[i][j] * vv[d]
# 2) RRPRAM β€” positional resonance
a2 = [[-1e9] * T for _ in range(T)]
for i in range(T):
for j in range(i + 1):
a2[i][j] = sum(x[i][e] * self.wr[e][j] for e in range(DIM)) / math.sqrt(HD)
a2[i] = softmax(a2[i])
ro = [[0.0] * HD for _ in range(T)]
for i in range(T):
for j in range(T):
rvh = self._head(rv[j], h)
for d in range(HD):
ro[i][d] += a2[i][j] * rvh[d]
# 3) Echo β€” self-resonance (W^T * W)
a3 = [[-1e9] * T for _ in range(T)]
for i in range(T):
ei = self._head(je[i], h)
for j in range(i + 1):
a3[i][j] = self._dot(ei, self._head(je[j], h)) / math.sqrt(HD)
a3[i] = softmax(a3[i])
jo = [[0.0] * HD for _ in range(T)]
for i in range(T):
for j in range(T):
ej = self._head(je[j], h)
for d in range(HD):
jo[i][d] += a3[i][j] * ej[d]
# 4) Temporal Diff β€” change detection attention
# No distance decay (Opus fix: RoPE handles this in full implementation)
a4 = [[-1e9] * T for _ in range(T)]
for i in range(T):
dki = self._head(dk[i], h)
for j in range(i + 1):
a4[i][j] = self._dot(dki, self._head(dk[j], h)) / math.sqrt(HD)
a4[i] = softmax(a4[i])
to = [[0.0] * HD for _ in range(T)]
for i in range(T):
for j in range(T):
dvh = self._head(dv[j], h)
for d in range(HD):
to[i][d] += a4[i][j] * dvh[d]
# Gate blend β€” 4-way softmax
for t in range(T):
base = h * HD
for d in range(HD):
cat[t][base + d] = (g[0] * ho[t][d] + g[1] * ro[t][d] +
g[2] * jo[t][d] + g[3] * to[t][d])
logits = [[0.0] * VOCAB for _ in range(T)]
for t in range(T):
for vi in range(VOCAB):
logits[t][vi] = sum(cat[t][e] * self.out[e][vi] for e in range(DIM)) + self.bias[vi]
return logits, cat
def train_step(self, tok, tgt, lr):
logits, cat = self.forward(tok)
loss = 0.0
grad = [[0.0] * VOCAB for _ in range(len(tok))]
for t in range(len(tok)):
p = softmax(logits[t])
loss -= math.log(max(1e-9, p[tgt[t]]))
for vi in range(VOCAB):
grad[t][vi] = p[vi]
grad[t][tgt[t]] -= 1.0
loss /= len(tok)
# Gradient on output layer only (reference impl)
for t in range(len(tok)):
for e in range(DIM):
ce = cat[t][e]
if ce == 0.0:
continue
row = self.out[e]
for vi in range(VOCAB):
row[vi] -= lr * ce * grad[t][vi] / len(tok)
for vi in range(VOCAB):
self.bias[vi] -= lr * grad[t][vi] / len(tok)
return loss
def generate(model, prompt, n=60):
ids = bpe_encode(prompt)[-MAX_T:]
for _ in range(n):
logits, _ = model.forward(ids)
p = softmax(logits[-1])
ids.append(max(range(VOCAB), key=lambda i: p[i]))
ids = ids[-MAX_T:]
return bpe_decode(ids)
def train(model, text, steps, lr):
ids = bpe_encode(text)
losses = []
for step in range(1, steps + 1):
off = random.randint(0, max(0, len(ids) - MAX_T - 2))
tok = ids[off:off + MAX_T]
tgt = ids[off + 1:off + MAX_T + 1]
losses.append(model.train_step(tok, tgt, lr))
if step % 10 == 0:
print(f"step {step:4d}/{steps} loss={losses[-1]:.4f}")
return losses
if __name__ == '__main__':
ap = argparse.ArgumentParser()
ap.add_argument('--train', type=str)
ap.add_argument('--steps', type=int, default=40)
ap.add_argument('--lr', type=float, default=0.05)
ap.add_argument('--generate', type=str)
args = ap.parse_args()
random.seed(42)
m = Janus4()
if args.train:
txt = open(args.train, 'r', encoding='utf-8', errors='ignore').read()
losses = train(m, txt, args.steps, args.lr)
print(f'loss_start={losses[0]:.4f} loss_end={losses[-1]:.4f}')
if args.generate:
print(generate(m, args.generate))