#!/usr/bin/env python3 """ 语言解码层 — Meta Model的"嘴巴" 将motor区300维输出向量解码为自然语言token序列。 仿生: motor激活 → 布洛卡区(组织语言) → 运动皮层(逐字输出) 自回归流程: motor_300维 → 初始隐状态 → predict_next() → token概率 → 采样 → token_embedding + 隐状态 → predict_next() → ... → """ import numpy as np from typing import List, Tuple, Optional class LanguageDecoder: """语言解码层 — motor向量 → 自然语言""" def __init__(self, motor_dim: int = 300, vocab_size: int = 4143, hidden_dim: int = 300, max_len: int = 64): self.motor_dim = motor_dim self.vocab_size = vocab_size self.hidden_dim = hidden_dim self.max_len = max_len # token嵌入表: (vocab_size, hidden_dim) — 每个token的语义向量 self.token_embed = np.random.randn(vocab_size, hidden_dim).astype(np.float32) * 0.05 # 输出投影: hidden_dim → vocab_size (预测下一个token) self.W_out = np.random.randn(hidden_dim, vocab_size).astype(np.float32) * 0.05 self.b_out = np.zeros(vocab_size, dtype=np.float32) # 循环权重: hidden_t = ReLU(W_h @ [hidden_{t-1}, embed_t] + b_h) self.W_h = np.random.randn(hidden_dim, hidden_dim * 2).astype(np.float32) * 0.05 self.b_h = np.zeros(hidden_dim, dtype=np.float32) # motor→hidden投影: 初始化隐状态 self.W_init = np.random.randn(motor_dim, hidden_dim).astype(np.float32) * 0.05 self.b_init = np.zeros(hidden_dim, dtype=np.float32) # 词表: idx ↔ 字符 self.idx2char: dict = {} self.char2idx: dict = {} # 特殊token self.PAD = 0 self.EOS = 1 self.UNK = 2 def set_vocab(self, chars: list): """设置词表: chars[0]=PAD, chars[1]=EOS, chars[2]=UNK, 之后是正常字""" self.idx2char = {i: c for i, c in enumerate(chars)} self.char2idx = {c: i for i, c in enumerate(chars)} # 调整嵌入表大小 if len(chars) != self.vocab_size: old = self.token_embed self.token_embed = np.random.randn(len(chars), self.hidden_dim).astype(np.float32) * 0.05 copy_n = min(len(old), len(chars)) self.token_embed[:copy_n] = old[:copy_n] self.vocab_size = len(chars) self.W_out = np.random.randn(self.hidden_dim, self.vocab_size).astype(np.float32) * 0.05 self.b_out = np.zeros(self.vocab_size, dtype=np.float32) def _init_hidden(self, motor_vec: np.ndarray) -> np.ndarray: """motor向量 → 初始隐状态""" v = motor_vec[:self.motor_dim] if len(v) < self.motor_dim: v = np.pad(v, (0, self.motor_dim - len(v))) return np.maximum(0, v @ self.W_init + self.b_init) def _step(self, hidden: np.ndarray, token_idx: int) -> Tuple[np.ndarray, np.ndarray]: """单步推理: hidden + embed → 新hidden + logits""" embed = self.token_embed[token_idx] # (hidden_dim,) # 拼接 [hidden, embed] combined = np.concatenate([hidden, embed]) # (hidden_dim*2,) new_hidden = np.maximum(0, self.W_h @ combined + self.b_h) logits = new_hidden @ self.W_out + self.b_out # (vocab_size,) return new_hidden, logits def decode(self, motor_vec: np.ndarray, temperature: float = 0.8, top_k: int = 10) -> str: """自回归生成: motor向量 → 文字序列""" hidden = self._init_hidden(motor_vec) # 第一步: 用隐状态直接预测(无输入token) logits = hidden @ self.W_out + self.b_out tokens = [self._sample(logits, temperature, top_k)] for _ in range(self.max_len - 1): hidden, logits = self._step(hidden, tokens[-1]) idx = self._sample(logits, temperature, top_k) if idx == self.EOS: break tokens.append(idx) return ''.join(self.idx2char.get(t, '?') for t in tokens if t > self.UNK) def _sample(self, logits: np.ndarray, temperature: float, top_k: int) -> int: """top-k采样""" logits = logits / max(temperature, 0.01) # top-k top_indices = np.argsort(logits)[-top_k:] top_logits = logits[top_indices] # softmax exp_l = np.exp(top_logits - np.max(top_logits)) probs = exp_l / (exp_l.sum() + 1e-8) return int(np.random.choice(top_indices, p=probs)) def train_step(self, motor_vec: np.ndarray, target_chars: str, lr: float = 0.01) -> dict: """单样本训练: motor向量 + 目标文字 → 更新权重""" hidden = self._init_hidden(motor_vec) total_loss = 0.0 n_tokens = 0 # teacher forcing: 逐字预测下一个字 chars = list(target_chars) + [''] prev_idx = self.PAD # 首字输入PAD for t, char in enumerate(chars): target_idx = self.char2idx.get(char, self.UNK) hidden, logits = self._step(hidden, prev_idx) # softmax cross-entropy loss exp_l = np.exp(logits - np.max(logits)) probs = exp_l / (exp_l.sum() + 1e-8) loss = -np.log(probs[target_idx] + 1e-8) total_loss += loss n_tokens += 1 # 简化梯度: 只更新W_out(最直接的映射) grad_logits = probs.copy() grad_logits[target_idx] -= 1.0 # softmax梯度 # W_out -= lr * hidden^T @ grad_logits self.W_out -= lr * np.outer(hidden, grad_logits) self.b_out -= lr * grad_logits prev_idx = target_idx # teacher forcing return {'loss': total_loss / max(n_tokens, 1), 'n_tokens': n_tokens} def save(self, path: str): """保存权重""" np.savez(path, token_embed=self.token_embed, W_out=self.W_out, b_out=self.b_out, W_h=self.W_h, b_h=self.b_h, W_init=self.W_init, b_init=self.b_init, idx2char=self.idx2char) def load(self, path: str): """加载权重""" data = np.load(path, allow_pickle=True) self.token_embed = data['token_embed'] self.W_out = data['W_out'] self.b_out = data['b_out'] self.W_h = data['W_h'] self.b_h = data['b_h'] self.W_init = data['W_init'] self.b_init = data['b_init'] self.idx2char = data['idx2char'].item() self.char2idx = {c: i for i, c in self.idx2char.items()} self.vocab_size = len(self.idx2char) print(f"[Decoder] 已加载: vocab={self.vocab_size}, hidden={self.hidden_dim}")