swarm-chat / src /core /language_decoder.py
lk080424
虫巢-200M训练部署: npz+json替代pkl, 三区循环训练, 4454QA数据
358ab64
Raw
History Blame Contribute Delete
6.95 kB
#!/usr/bin/env python3
"""
语言解码层 — Meta Model的"嘴巴"
将motor区300维输出向量解码为自然语言token序列。
仿生: motor激活 → 布洛卡区(组织语言) → 运动皮层(逐字输出)
自回归流程:
motor_300维 → 初始隐状态 → predict_next() → token概率 → 采样 →
token_embedding + 隐状态 → predict_next() → ... → <EOS>
"""
import numpy as np
from typing import List, Tuple, Optional
class LanguageDecoder:
"""语言解码层 — motor向量 → 自然语言"""
def __init__(self, motor_dim: int = 300, vocab_size: int = 4143,
hidden_dim: int = 300, max_len: int = 64):
self.motor_dim = motor_dim
self.vocab_size = vocab_size
self.hidden_dim = hidden_dim
self.max_len = max_len
# token嵌入表: (vocab_size, hidden_dim) — 每个token的语义向量
self.token_embed = np.random.randn(vocab_size, hidden_dim).astype(np.float32) * 0.05
# 输出投影: hidden_dim → vocab_size (预测下一个token)
self.W_out = np.random.randn(hidden_dim, vocab_size).astype(np.float32) * 0.05
self.b_out = np.zeros(vocab_size, dtype=np.float32)
# 循环权重: hidden_t = ReLU(W_h @ [hidden_{t-1}, embed_t] + b_h)
self.W_h = np.random.randn(hidden_dim, hidden_dim * 2).astype(np.float32) * 0.05
self.b_h = np.zeros(hidden_dim, dtype=np.float32)
# motor→hidden投影: 初始化隐状态
self.W_init = np.random.randn(motor_dim, hidden_dim).astype(np.float32) * 0.05
self.b_init = np.zeros(hidden_dim, dtype=np.float32)
# 词表: idx ↔ 字符
self.idx2char: dict = {}
self.char2idx: dict = {}
# 特殊token
self.PAD = 0
self.EOS = 1
self.UNK = 2
def set_vocab(self, chars: list):
"""设置词表: chars[0]=PAD, chars[1]=EOS, chars[2]=UNK, 之后是正常字"""
self.idx2char = {i: c for i, c in enumerate(chars)}
self.char2idx = {c: i for i, c in enumerate(chars)}
# 调整嵌入表大小
if len(chars) != self.vocab_size:
old = self.token_embed
self.token_embed = np.random.randn(len(chars), self.hidden_dim).astype(np.float32) * 0.05
copy_n = min(len(old), len(chars))
self.token_embed[:copy_n] = old[:copy_n]
self.vocab_size = len(chars)
self.W_out = np.random.randn(self.hidden_dim, self.vocab_size).astype(np.float32) * 0.05
self.b_out = np.zeros(self.vocab_size, dtype=np.float32)
def _init_hidden(self, motor_vec: np.ndarray) -> np.ndarray:
"""motor向量 → 初始隐状态"""
v = motor_vec[:self.motor_dim]
if len(v) < self.motor_dim:
v = np.pad(v, (0, self.motor_dim - len(v)))
return np.maximum(0, v @ self.W_init + self.b_init)
def _step(self, hidden: np.ndarray, token_idx: int) -> Tuple[np.ndarray, np.ndarray]:
"""单步推理: hidden + embed → 新hidden + logits"""
embed = self.token_embed[token_idx] # (hidden_dim,)
# 拼接 [hidden, embed]
combined = np.concatenate([hidden, embed]) # (hidden_dim*2,)
new_hidden = np.maximum(0, self.W_h @ combined + self.b_h)
logits = new_hidden @ self.W_out + self.b_out # (vocab_size,)
return new_hidden, logits
def decode(self, motor_vec: np.ndarray, temperature: float = 0.8,
top_k: int = 10) -> str:
"""自回归生成: motor向量 → 文字序列"""
hidden = self._init_hidden(motor_vec)
# 第一步: 用隐状态直接预测(无输入token)
logits = hidden @ self.W_out + self.b_out
tokens = [self._sample(logits, temperature, top_k)]
for _ in range(self.max_len - 1):
hidden, logits = self._step(hidden, tokens[-1])
idx = self._sample(logits, temperature, top_k)
if idx == self.EOS:
break
tokens.append(idx)
return ''.join(self.idx2char.get(t, '?') for t in tokens if t > self.UNK)
def _sample(self, logits: np.ndarray, temperature: float,
top_k: int) -> int:
"""top-k采样"""
logits = logits / max(temperature, 0.01)
# top-k
top_indices = np.argsort(logits)[-top_k:]
top_logits = logits[top_indices]
# softmax
exp_l = np.exp(top_logits - np.max(top_logits))
probs = exp_l / (exp_l.sum() + 1e-8)
return int(np.random.choice(top_indices, p=probs))
def train_step(self, motor_vec: np.ndarray, target_chars: str,
lr: float = 0.01) -> dict:
"""单样本训练: motor向量 + 目标文字 → 更新权重"""
hidden = self._init_hidden(motor_vec)
total_loss = 0.0
n_tokens = 0
# teacher forcing: 逐字预测下一个字
chars = list(target_chars) + ['<EOS>']
prev_idx = self.PAD # 首字输入PAD
for t, char in enumerate(chars):
target_idx = self.char2idx.get(char, self.UNK)
hidden, logits = self._step(hidden, prev_idx)
# softmax cross-entropy loss
exp_l = np.exp(logits - np.max(logits))
probs = exp_l / (exp_l.sum() + 1e-8)
loss = -np.log(probs[target_idx] + 1e-8)
total_loss += loss
n_tokens += 1
# 简化梯度: 只更新W_out(最直接的映射)
grad_logits = probs.copy()
grad_logits[target_idx] -= 1.0 # softmax梯度
# W_out -= lr * hidden^T @ grad_logits
self.W_out -= lr * np.outer(hidden, grad_logits)
self.b_out -= lr * grad_logits
prev_idx = target_idx # teacher forcing
return {'loss': total_loss / max(n_tokens, 1), 'n_tokens': n_tokens}
def save(self, path: str):
"""保存权重"""
np.savez(path,
token_embed=self.token_embed,
W_out=self.W_out, b_out=self.b_out,
W_h=self.W_h, b_h=self.b_h,
W_init=self.W_init, b_init=self.b_init,
idx2char=self.idx2char)
def load(self, path: str):
"""加载权重"""
data = np.load(path, allow_pickle=True)
self.token_embed = data['token_embed']
self.W_out = data['W_out']
self.b_out = data['b_out']
self.W_h = data['W_h']
self.b_h = data['b_h']
self.W_init = data['W_init']
self.b_init = data['b_init']
self.idx2char = data['idx2char'].item()
self.char2idx = {c: i for i, c in self.idx2char.items()}
self.vocab_size = len(self.idx2char)
print(f"[Decoder] 已加载: vocab={self.vocab_size}, hidden={self.hidden_dim}")