Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| 语言解码层 — Meta Model的"嘴巴" | |
| 将motor区300维输出向量解码为自然语言token序列。 | |
| 仿生: motor激活 → 布洛卡区(组织语言) → 运动皮层(逐字输出) | |
| 自回归流程: | |
| motor_300维 → 初始隐状态 → predict_next() → token概率 → 采样 → | |
| token_embedding + 隐状态 → predict_next() → ... → <EOS> | |
| """ | |
| import numpy as np | |
| from typing import List, Tuple, Optional | |
| class LanguageDecoder: | |
| """语言解码层 — motor向量 → 自然语言""" | |
| def __init__(self, motor_dim: int = 300, vocab_size: int = 4143, | |
| hidden_dim: int = 300, max_len: int = 64): | |
| self.motor_dim = motor_dim | |
| self.vocab_size = vocab_size | |
| self.hidden_dim = hidden_dim | |
| self.max_len = max_len | |
| # token嵌入表: (vocab_size, hidden_dim) — 每个token的语义向量 | |
| self.token_embed = np.random.randn(vocab_size, hidden_dim).astype(np.float32) * 0.05 | |
| # 输出投影: hidden_dim → vocab_size (预测下一个token) | |
| self.W_out = np.random.randn(hidden_dim, vocab_size).astype(np.float32) * 0.05 | |
| self.b_out = np.zeros(vocab_size, dtype=np.float32) | |
| # 循环权重: hidden_t = ReLU(W_h @ [hidden_{t-1}, embed_t] + b_h) | |
| self.W_h = np.random.randn(hidden_dim, hidden_dim * 2).astype(np.float32) * 0.05 | |
| self.b_h = np.zeros(hidden_dim, dtype=np.float32) | |
| # motor→hidden投影: 初始化隐状态 | |
| self.W_init = np.random.randn(motor_dim, hidden_dim).astype(np.float32) * 0.05 | |
| self.b_init = np.zeros(hidden_dim, dtype=np.float32) | |
| # 词表: idx ↔ 字符 | |
| self.idx2char: dict = {} | |
| self.char2idx: dict = {} | |
| # 特殊token | |
| self.PAD = 0 | |
| self.EOS = 1 | |
| self.UNK = 2 | |
| def set_vocab(self, chars: list): | |
| """设置词表: chars[0]=PAD, chars[1]=EOS, chars[2]=UNK, 之后是正常字""" | |
| self.idx2char = {i: c for i, c in enumerate(chars)} | |
| self.char2idx = {c: i for i, c in enumerate(chars)} | |
| # 调整嵌入表大小 | |
| if len(chars) != self.vocab_size: | |
| old = self.token_embed | |
| self.token_embed = np.random.randn(len(chars), self.hidden_dim).astype(np.float32) * 0.05 | |
| copy_n = min(len(old), len(chars)) | |
| self.token_embed[:copy_n] = old[:copy_n] | |
| self.vocab_size = len(chars) | |
| self.W_out = np.random.randn(self.hidden_dim, self.vocab_size).astype(np.float32) * 0.05 | |
| self.b_out = np.zeros(self.vocab_size, dtype=np.float32) | |
| def _init_hidden(self, motor_vec: np.ndarray) -> np.ndarray: | |
| """motor向量 → 初始隐状态""" | |
| v = motor_vec[:self.motor_dim] | |
| if len(v) < self.motor_dim: | |
| v = np.pad(v, (0, self.motor_dim - len(v))) | |
| return np.maximum(0, v @ self.W_init + self.b_init) | |
| def _step(self, hidden: np.ndarray, token_idx: int) -> Tuple[np.ndarray, np.ndarray]: | |
| """单步推理: hidden + embed → 新hidden + logits""" | |
| embed = self.token_embed[token_idx] # (hidden_dim,) | |
| # 拼接 [hidden, embed] | |
| combined = np.concatenate([hidden, embed]) # (hidden_dim*2,) | |
| new_hidden = np.maximum(0, self.W_h @ combined + self.b_h) | |
| logits = new_hidden @ self.W_out + self.b_out # (vocab_size,) | |
| return new_hidden, logits | |
| def decode(self, motor_vec: np.ndarray, temperature: float = 0.8, | |
| top_k: int = 10) -> str: | |
| """自回归生成: motor向量 → 文字序列""" | |
| hidden = self._init_hidden(motor_vec) | |
| # 第一步: 用隐状态直接预测(无输入token) | |
| logits = hidden @ self.W_out + self.b_out | |
| tokens = [self._sample(logits, temperature, top_k)] | |
| for _ in range(self.max_len - 1): | |
| hidden, logits = self._step(hidden, tokens[-1]) | |
| idx = self._sample(logits, temperature, top_k) | |
| if idx == self.EOS: | |
| break | |
| tokens.append(idx) | |
| return ''.join(self.idx2char.get(t, '?') for t in tokens if t > self.UNK) | |
| def _sample(self, logits: np.ndarray, temperature: float, | |
| top_k: int) -> int: | |
| """top-k采样""" | |
| logits = logits / max(temperature, 0.01) | |
| # top-k | |
| top_indices = np.argsort(logits)[-top_k:] | |
| top_logits = logits[top_indices] | |
| # softmax | |
| exp_l = np.exp(top_logits - np.max(top_logits)) | |
| probs = exp_l / (exp_l.sum() + 1e-8) | |
| return int(np.random.choice(top_indices, p=probs)) | |
| def train_step(self, motor_vec: np.ndarray, target_chars: str, | |
| lr: float = 0.01) -> dict: | |
| """单样本训练: motor向量 + 目标文字 → 更新权重""" | |
| hidden = self._init_hidden(motor_vec) | |
| total_loss = 0.0 | |
| n_tokens = 0 | |
| # teacher forcing: 逐字预测下一个字 | |
| chars = list(target_chars) + ['<EOS>'] | |
| prev_idx = self.PAD # 首字输入PAD | |
| for t, char in enumerate(chars): | |
| target_idx = self.char2idx.get(char, self.UNK) | |
| hidden, logits = self._step(hidden, prev_idx) | |
| # softmax cross-entropy loss | |
| exp_l = np.exp(logits - np.max(logits)) | |
| probs = exp_l / (exp_l.sum() + 1e-8) | |
| loss = -np.log(probs[target_idx] + 1e-8) | |
| total_loss += loss | |
| n_tokens += 1 | |
| # 简化梯度: 只更新W_out(最直接的映射) | |
| grad_logits = probs.copy() | |
| grad_logits[target_idx] -= 1.0 # softmax梯度 | |
| # W_out -= lr * hidden^T @ grad_logits | |
| self.W_out -= lr * np.outer(hidden, grad_logits) | |
| self.b_out -= lr * grad_logits | |
| prev_idx = target_idx # teacher forcing | |
| return {'loss': total_loss / max(n_tokens, 1), 'n_tokens': n_tokens} | |
| def save(self, path: str): | |
| """保存权重""" | |
| np.savez(path, | |
| token_embed=self.token_embed, | |
| W_out=self.W_out, b_out=self.b_out, | |
| W_h=self.W_h, b_h=self.b_h, | |
| W_init=self.W_init, b_init=self.b_init, | |
| idx2char=self.idx2char) | |
| def load(self, path: str): | |
| """加载权重""" | |
| data = np.load(path, allow_pickle=True) | |
| self.token_embed = data['token_embed'] | |
| self.W_out = data['W_out'] | |
| self.b_out = data['b_out'] | |
| self.W_h = data['W_h'] | |
| self.b_h = data['b_h'] | |
| self.W_init = data['W_init'] | |
| self.b_init = data['b_init'] | |
| self.idx2char = data['idx2char'].item() | |
| self.char2idx = {c: i for i, c in self.idx2char.items()} | |
| self.vocab_size = len(self.idx2char) | |
| print(f"[Decoder] 已加载: vocab={self.vocab_size}, hidden={self.hidden_dim}") | |