| | |
| | """ |
| | Numpy ๋ง์ผ๋ก BERT ๊ตฌํํ๊ธฐ (ํ์ฅํ โ ์ํคํ
์ฒ ๊ฐํ) |
| | ------------------------------------------------------------ |
| | ๋ณ๊ฒฝ/๊ฐํ๋ ๋ถ๋ถ ์์ฝ: |
| | - ๊ธฐ๋ณธ BERT ์ํคํ
์ฒ๋ฅผ ์ค์ ์ ์ ์ฌํ๊ฒ ๊ฐํ: Encoder L = 12, H = 768, A = 12, intermediate = 3072, max_pos = 512 (๊ธฐ๋ณธ๊ฐ) |
| | - EncoderLayer๋ฅผ Pre-LayerNorm ์คํ์ผ๋ก ๋ณ๊ฒฝ(ํ์ต ์์ ์ฑ ํฅ์). |
| | - PositionwiseFFN์ "๋ ๊ฐ์ FFN ๋ธ๋ก"์ผ๋ก ํ์ฅํ์ฌ ์ธ์ฝ๋๋น ๋ ํ๋ถํ ๋น์ ํ์ฑ ์ ๊ณต. |
| | - MLM head์์ "์ ์" weight-tying์ ์ ์ฉ: Tensor ์ฐ์ฐ์ผ๋ก ์ฐ๊ฒฐํ์ฌ ์๋๋ฏธ๋ถ์ด ์ ์ ๋์ํ๋๋ก ํจ. |
| | - model_summary() ์ถ๊ฐ: ๋ชจ๋ธ ๊ตฌ์กฐ/ํ๋ผ๋ฏธํฐ ์ ์์ฝ ์ถ๋ ฅ. |
| | - save_model() ์ถ๊ฐ: ํ์ต์ด ๋๋ ๋ชจ๋ธ ํ๋ผ๋ฏธํฐ๋ฅผ ./bert_numpy_model.npz ๊ทธ๋ฆฌ๊ณ ./bert_numpy_model.npy ๋ก ์ ์ฅ. |
| | - ์ด์ ์ gradient accumulation / LR scheduler / Dropout ๋ฑ์ ์ ์ง. |
| | |
| | ์ฃผ์: |
| | - ๊ธฐ๋ณธ๊ฐ์ผ๋ก ๋ํ BERT ์ค์ (12-layer, H=768)์ CPU์์ ๋งค์ฐ ๋ฌด๊ฒ๊ณ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ๋ง์ด ์ฌ์ฉํฉ๋๋ค. ํ์ต์ ๋ฐ๋ก ๋๋ฆฌ๊ธฐ๋ณด๋ค ๋จผ์ ์์ ์ค์ ์ผ๋ก ํ
์คํธํ์๊ธธ ๊ถ์ฅํฉ๋๋ค. |
| | |
| | ์คํ: |
| | $ pip install numpy datasets huggingface_hub |
| | $ python numpy_only_bert_from_scratch.py |
| | |
| | """ |
| | from __future__ import annotations |
| | import math |
| | import random |
| | import unicodedata |
| | import re |
| | from dataclasses import dataclass |
| | from typing import List, Tuple, Dict, Optional |
| |
|
| | import numpy as np |
| |
|
| | |
| | try: |
| | from datasets import load_dataset |
| | from huggingface_hub import hf_hub_download |
| | HAS_HF = True |
| | except Exception: |
| | HAS_HF = False |
| |
|
| | |
| | |
| | |
| |
|
| | def set_seed(seed: int = 42): |
| | random.seed(seed) |
| | np.random.seed(seed) |
| |
|
| |
|
| | def gelu(x: np.ndarray) -> np.ndarray: |
| | return 0.5 * x * (1.0 + np.tanh(np.sqrt(2.0/np.pi) * (x + 0.044715 * (x**3)))) |
| |
|
| |
|
| | def softmax(x: np.ndarray, axis: int = -1) -> np.ndarray: |
| | x = x - np.max(x, axis=axis, keepdims=True) |
| | e = np.exp(x) |
| | return e / np.sum(e, axis=axis, keepdims=True) |
| |
|
| |
|
| | def xavier_init(shape: Tuple[int, ...]) -> np.ndarray: |
| | if len(shape) == 1: |
| | fan_in = shape[0] |
| | fan_out = shape[0] |
| | else: |
| | fan_in = shape[-2] if len(shape) >= 2 else shape[0] |
| | fan_out = shape[-1] |
| | limit = np.sqrt(6.0 / (fan_in + fan_out)) |
| | return np.random.uniform(-limit, limit, size=shape).astype(np.float32) |
| |
|
| | |
| | |
| | |
| | def reduce_grad(grad: np.ndarray, shape: Tuple[int, ...]) -> np.ndarray: |
| | """๋ธ๋ก๋์บ์คํธ๋ grad๋ฅผ ์๋ shape๋ก ์ค์ฌ์ค""" |
| | |
| | while grad.ndim > len(shape): |
| | grad = grad.sum(axis=0) |
| | |
| | for i, dim in enumerate(shape): |
| | if dim == 1 and grad.shape[i] != 1: |
| | grad = grad.sum(axis=i, keepdims=True) |
| | return grad |
| |
|
| |
|
| | class Tensor: |
| | def __init__(self, data: np.ndarray, requires_grad: bool = False, name: str = ""): |
| | if not isinstance(data, np.ndarray): |
| | data = np.array(data, dtype=np.float32) |
| | self.data = data.astype(np.float32) |
| | self.grad = np.zeros_like(self.data) if requires_grad else None |
| | self.requires_grad = requires_grad |
| | self._backward = lambda: None |
| | self._prev: List[Tensor] = [] |
| | self.name = name |
| |
|
| |
|
| | def zero_grad(self): |
| | if self.requires_grad: |
| | self.grad[...] = 0.0 |
| |
|
| | def backward(self, grad: Optional[np.ndarray] = None): |
| | if grad is None: |
| | assert self.data.size == 1, "backward() requires grad for non-scalar" |
| | grad = np.ones_like(self.data) |
| | self.grad = self.grad + grad if self.grad is not None else grad |
| |
|
| | topo = [] |
| | visited = set() |
| | def build_topo(v: Tensor): |
| | if id(v) not in visited: |
| | visited.add(id(v)) |
| | for child in v._prev: |
| | build_topo(child) |
| | topo.append(v) |
| | build_topo(self) |
| | for v in reversed(topo): |
| | v._backward() |
| |
|
| | |
| | def __add__(self, other: Tensor | float): |
| | other = other if isinstance(other, Tensor) else Tensor(np.array(other, dtype=np.float32)) |
| | out = Tensor(self.data + other.data, requires_grad=(self.requires_grad or other.requires_grad)) |
| |
|
| | def _backward(): |
| | if self.requires_grad: |
| | self.grad += reduce_grad(out.grad, self.data.shape) |
| | if other.requires_grad: |
| | other.grad += reduce_grad(out.grad, other.data.shape) |
| |
|
| | out._backward = _backward |
| | out._prev = [self, other] |
| | return out |
| |
|
| |
|
| |
|
| |
|
| | def __sub__(self, other): |
| | other = other if isinstance(other, Tensor) else Tensor(np.array(other, dtype=np.float32)) |
| | out = Tensor(self.data - other.data, requires_grad=(self.requires_grad or other.requires_grad)) |
| |
|
| | def _backward(): |
| | if self.requires_grad: |
| | self.grad += reduce_grad(out.grad, self.data.shape) |
| | if other.requires_grad: |
| | other.grad -= reduce_grad(out.grad, other.data.shape) |
| |
|
| | out._backward = _backward |
| | out._prev = [self, other] |
| | return out |
| |
|
| |
|
| | def __mul__(self, other: Tensor | float): |
| | other = other if isinstance(other, Tensor) else Tensor(np.array(other, dtype=np.float32)) |
| | out = Tensor(self.data * other.data, requires_grad=(self.requires_grad or other.requires_grad)) |
| |
|
| | def _backward(): |
| | if self.requires_grad: |
| | self.grad += reduce_grad(out.grad * other.data, self.data.shape) |
| | if other.requires_grad: |
| | other.grad += reduce_grad(out.grad * self.data, other.data.shape) |
| |
|
| | out._backward = _backward |
| | out._prev = [self, other] |
| | return out |
| |
|
| |
|
| | def __truediv__(self, other: Tensor | float): |
| | other = other if isinstance(other, Tensor) else Tensor(np.array(other, dtype=np.float32)) |
| | out = Tensor(self.data / other.data, requires_grad=(self.requires_grad or other.requires_grad)) |
| |
|
| | def _backward(): |
| | if self.requires_grad: |
| | self.grad += reduce_grad(out.grad * (1.0 / other.data), self.data.shape) |
| | if other.requires_grad: |
| | other.grad += reduce_grad(out.grad * (-self.data / (other.data ** 2)), other.data.shape) |
| |
|
| | out._backward = _backward |
| | out._prev = [self, other] |
| | return out |
| |
|
| |
|
| | def matmul(self, other: Tensor): |
| | out = Tensor(self.data @ other.data, requires_grad=(self.requires_grad or other.requires_grad)) |
| |
|
| | def _backward(): |
| | if self.requires_grad: |
| | grad_self = out.grad @ np.swapaxes(other.data, -1, -2) |
| | self.grad += reduce_grad(grad_self, self.data.shape) |
| | if other.requires_grad: |
| | grad_other = np.swapaxes(self.data, -1, -2) @ out.grad |
| | other.grad += reduce_grad(grad_other, other.data.shape) |
| |
|
| | out._backward = _backward |
| | out._prev = [self, other] |
| | return out |
| |
|
| |
|
| | def T(self): |
| | out = Tensor(self.data.T, requires_grad=self.requires_grad) |
| | def _backward(): |
| | if self.requires_grad: |
| | self.grad += out.grad.T |
| | out._backward = _backward |
| | out._prev = [self] |
| | return out |
| |
|
| | def sum(self, axis=None, keepdims=False): |
| | out = Tensor(self.data.sum(axis=axis, keepdims=keepdims), requires_grad=self.requires_grad) |
| | def _backward(): |
| | if not self.requires_grad: |
| | return |
| | grad = out.grad |
| | if axis is not None and not keepdims: |
| | shape = list(self.data.shape) |
| | if isinstance(axis, int): |
| | axis_ = [axis] |
| | else: |
| | axis_ = list(axis) |
| | for ax in axis_: |
| | shape[ax] = 1 |
| | grad = grad.reshape(shape) |
| | grad = np.broadcast_to(grad, self.data.shape) |
| | self.grad += grad |
| | out._backward = _backward |
| | out._prev = [self] |
| | return out |
| |
|
| | def mean(self, axis=None, keepdims=False): |
| | denom = self.data.size if axis is None else (self.data.shape[axis] if isinstance(axis, int) else np.prod([self.data.shape[a] for a in axis])) |
| | return self.sum(axis=axis, keepdims=keepdims) * (1.0/denom) |
| |
|
| | def relu(self): |
| | out_data = np.maximum(self.data, 0) |
| | out = Tensor(out_data, requires_grad=self.requires_grad) |
| | def _backward(): |
| | if self.requires_grad: |
| | self.grad += (self.data > 0).astype(np.float32) * out.grad |
| | out._backward = _backward |
| | out._prev = [self] |
| | return out |
| |
|
| | def gelu(self): |
| | out_data = gelu(self.data) |
| | out = Tensor(out_data, requires_grad=self.requires_grad) |
| | def _backward(): |
| | if self.requires_grad: |
| | c = np.sqrt(2.0/np.pi) |
| | t = c * (self.data + 0.044715 * (self.data**3)) |
| | th = np.tanh(t) |
| | dt_dx = c * (1 + 3*0.044715*(self.data**2)) * (1 - th**2) |
| | dgelu = 0.5 * (1 + th) + 0.5 * self.data * dt_dx |
| | self.grad += dgelu * out.grad |
| | out._backward = _backward |
| | out._prev = [self] |
| | return out |
| |
|
| | def softmax(self, axis=-1): |
| | out_data = softmax(self.data, axis=axis) |
| | out = Tensor(out_data, requires_grad=self.requires_grad) |
| | def _backward(): |
| | if not self.requires_grad: |
| | return |
| | y = out.data |
| | g = out.grad |
| | s = np.sum(g * y, axis=axis, keepdims=True) |
| | self.grad += y * (g - s) |
| | out._backward = _backward |
| | out._prev = [self] |
| | return out |
| |
|
| | def layernorm(self, eps=1e-12): |
| | mean = self.data.mean(axis=-1, keepdims=True) |
| | var = ((self.data - mean)**2).mean(axis=-1, keepdims=True) |
| | inv_std = 1.0 / np.sqrt(var + eps) |
| | normed = (self.data - mean) * inv_std |
| | out = Tensor(normed, requires_grad=self.requires_grad) |
| | def _backward(): |
| | if not self.requires_grad: |
| | return |
| | N = self.data.shape[-1] |
| | g = out.grad |
| | xmu = self.data - mean |
| | dx = (1.0/np.sqrt(var + eps)) * (g - g.mean(axis=-1, keepdims=True) - xmu * (g * xmu).mean(axis=-1, keepdims=True) / (var + eps)) |
| | self.grad += dx |
| | out._backward = _backward |
| | out._prev = [self] |
| | return out |
| |
|
| | def tanh(self): |
| | y = np.tanh(self.data) |
| | out = Tensor(y, requires_grad=self.requires_grad) |
| | def _backward(): |
| | if self.requires_grad: |
| | self.grad += (1 - y**2) * out.grad |
| | out._backward = _backward |
| | out._prev = [self] |
| | return out |
| |
|
| | def detach(self): |
| | return Tensor(self.data.copy(), requires_grad=False) |
| |
|
| | @staticmethod |
| | def from_np(x: np.ndarray, requires_grad=False, name: str = ""): |
| | return Tensor(x, requires_grad=requires_grad, name=name) |
| |
|
| | setattr(Tensor, 'transpose_last2', lambda self: Tensor(self.data.swapaxes(-1,-2), requires_grad=self.requires_grad)) |
| |
|
| | |
| | |
| | |
| | class Module: |
| | def parameters(self) -> List[Tensor]: |
| | raise NotImplementedError |
| | def zero_grad(self): |
| | for p in self.parameters(): |
| | p.zero_grad() |
| |
|
| | class Dense(Module): |
| | def __init__(self, in_features: int, out_features: int, bias: bool = True, name: str = "dense"): |
| | self.W = Tensor.from_np(xavier_init((in_features, out_features)), requires_grad=True, name=f"{name}.W") |
| | self.b = Tensor.from_np(np.zeros((out_features,), dtype=np.float32), requires_grad=True, name=f"{name}.b") if bias else None |
| | def __call__(self, x: Tensor) -> Tensor: |
| | out = x.matmul(self.W) |
| | if self.b is not None: |
| | out = out + self.b |
| | return out |
| | def parameters(self): |
| | return [p for p in [self.W, self.b] if p is not None] |
| |
|
| | class LayerNorm(Module): |
| | def __init__(self, hidden_size: int, eps: float = 1e-12, name: str = "ln"): |
| | self.gamma = Tensor.from_np(np.ones((hidden_size,), dtype=np.float32), requires_grad=True, name=f"{name}.gamma") |
| | self.beta = Tensor.from_np(np.zeros((hidden_size,), dtype=np.float32), requires_grad=True, name=f"{name}.beta") |
| | self.eps = eps |
| | def __call__(self, x: Tensor) -> Tensor: |
| | normed = x.layernorm(self.eps) |
| | return normed * self.gamma + self.beta |
| | def parameters(self): |
| | return [self.gamma, self.beta] |
| |
|
| | class Dropout(Module): |
| | def __init__(self, p: float = 0.1): |
| | self.p = p |
| | self.training = True |
| | self.mask: Optional[np.ndarray] = None |
| | def __call__(self, x: Tensor) -> Tensor: |
| | if not self.training or self.p == 0.0: |
| | return x |
| | self.mask = (np.random.rand(*x.data.shape) >= self.p).astype(np.float32) / (1.0 - self.p) |
| | out = Tensor(x.data * self.mask, requires_grad=x.requires_grad) |
| | def _backward(): |
| | if x.requires_grad: |
| | x.grad += out.grad * self.mask |
| | out._backward = _backward |
| | out._prev = [x] |
| | return out |
| | def parameters(self): |
| | return [] |
| |
|
| | def dropout_is_training(module: Module, training: bool): |
| | for attr in dir(module): |
| | try: |
| | obj = getattr(module, attr) |
| | except Exception: |
| | continue |
| | if isinstance(obj, Dropout): |
| | obj.training = training |
| | if isinstance(obj, Module): |
| | dropout_is_training(obj, training) |
| |
|
| | class MultiHeadSelfAttention(Module): |
| | def __init__(self, hidden_size: int, num_heads: int, attn_dropout: float = 0.1, proj_dropout: float = 0.1, name: str = "mha"): |
| | assert hidden_size % num_heads == 0 |
| | self.hidden = hidden_size |
| | self.num_heads = num_heads |
| | self.head_dim = hidden_size // num_heads |
| | self.Wq = Dense(hidden_size, hidden_size, name=f"{name}.Wq") |
| | self.Wk = Dense(hidden_size, hidden_size, name=f"{name}.Wk") |
| | self.Wv = Dense(hidden_size, hidden_size, name=f"{name}.Wv") |
| | self.Wo = Dense(hidden_size, hidden_size, name=f"{name}.Wo") |
| | self.attn_drop = Dropout(attn_dropout) |
| | self.proj_drop = Dropout(proj_dropout) |
| | def __call__(self, x: Tensor, attention_mask: Optional[np.ndarray]) -> Tensor: |
| | B, T, H = x.data.shape |
| | q = self.Wq(x); k = self.Wk(x); v = self.Wv(x) |
| | def split_heads(t: Tensor) -> Tensor: |
| | t2 = t.data.reshape(B, T, self.num_heads, self.head_dim).transpose(0,2,1,3) |
| | out = Tensor(t2, requires_grad=t.requires_grad) |
| | def _backward(): |
| | if t.requires_grad: |
| | grad = out.grad.transpose(0,2,1,3).reshape(B, T, self.hidden) |
| | t.grad += grad |
| | out._backward = _backward |
| | out._prev = [t] |
| | return out |
| | qh, kh, vh = split_heads(q), split_heads(k), split_heads(v) |
| | scale = 1.0 / np.sqrt(self.head_dim) |
| | def bmm(a: Tensor, b: Tensor) -> Tensor: |
| | |
| | Bn, Nh, Tq, D = a.data.shape |
| | _, _, D2, Tk = b.data.shape |
| | assert D == D2 |
| |
|
| | out_data = np.matmul(a.data, b.data) |
| | out = Tensor(out_data, requires_grad=(a.requires_grad or b.requires_grad)) |
| |
|
| | def _backward(): |
| | if a.requires_grad: |
| | grad_a = np.matmul(out.grad, np.swapaxes(b.data, -1, -2)) |
| | a.grad += grad_a |
| | if b.requires_grad: |
| | grad_b = np.matmul(np.swapaxes(a.data, -1, -2), out.grad) |
| | b.grad += grad_b |
| |
|
| | out._backward = _backward |
| | out._prev = [a, b] |
| | return out |
| | kh_T = Tensor(kh.data.transpose(0,1,3,2), requires_grad=kh.requires_grad) |
| | def _backward_kh_T(): |
| | if kh.requires_grad and kh_T.grad is not None: |
| | kh.grad += kh_T.grad.transpose(0,1,3,2) |
| | kh_T._backward = _backward_kh_T |
| | kh_T._prev = [kh] |
| | scores = bmm(qh, kh_T) * Tensor(np.array(scale, dtype=np.float32)) |
| | if attention_mask is not None: |
| | scores = Tensor(scores.data + attention_mask, requires_grad=scores.requires_grad) |
| | attn = scores.softmax(axis=-1) |
| | attn = self.attn_drop(attn) |
| | context = bmm(attn, vh) |
| | def combine_heads(t: Tensor) -> Tensor: |
| | Bn, Nh, Tq, D = t.data.shape |
| | t2 = t.data.transpose(0,2,1,3).reshape(Bn, Tq, Nh*D) |
| | out = Tensor(t2, requires_grad=t.requires_grad) |
| | def _backward(): |
| | if t.requires_grad: |
| | grad = out.grad.reshape(Bn, Tq, Nh, D).transpose(0,2,1,3) |
| | t.grad += grad |
| | out._backward = _backward |
| | out._prev = [t] |
| | return out |
| | context_merged = combine_heads(context) |
| | out = self.Wo(context_merged) |
| | out = self.proj_drop(out) |
| | return out |
| |
|
| | class PositionwiseFFN(Module): |
| | """์ฑ๋ฅ ํฅ์์ ์ํ "๋ ๊ฐ์ FFN ๋ธ๋ก" ๊ตฌ์กฐ. |
| | (hidden -> intermediate -> hidden) ์ด 2๋ฒ ์ฐ์์ผ๋ก ์์ฌ ์๋ค. |
| | ๊ฐ ๋ธ๋ก์ Dropout์ ํฌํจํ๊ณ , ๋ธ๋ก ํ residual ์ฐ๊ฒฐ์ EncoderLayer์์ ์ํ๋๋ค. |
| | """ |
| | def __init__(self, hidden_size: int, intermediate_size: int, dropout: float = 0.1, name: str = "ffn"): |
| | |
| | self.dense1 = Dense(hidden_size, intermediate_size, name=f"{name}.dense1") |
| | self.dense2 = Dense(intermediate_size, hidden_size, name=f"{name}.dense2") |
| | |
| | self.dense3 = Dense(hidden_size, intermediate_size, name=f"{name}.dense3") |
| | self.dense4 = Dense(intermediate_size, hidden_size, name=f"{name}.dense4") |
| | self.drop = Dropout(dropout) |
| | def __call__(self, x: Tensor) -> Tensor: |
| | |
| | h = self.dense1(x).gelu() |
| | h = self.drop(h) |
| | h = self.dense2(h) |
| | |
| | h2 = self.dense3(h).gelu() |
| | h2 = self.drop(h2) |
| | h2 = self.dense4(h2) |
| | return h2 |
| | def parameters(self): |
| | return self.dense1.parameters() + self.dense2.parameters() + self.dense3.parameters() + self.dense4.parameters() |
| |
|
| | class EncoderLayer(Module): |
| | """Pre-LayerNorm Transformer Encoder Layer |
| | ๊ตฌ์กฐ: |
| | x -> LN -> MHA -> dropout -> x + out |
| | x -> LN -> FFN (์ฌ๊ธฐ์ ๋ ๋ธ๋ก) -> dropout -> x + out |
| | Pre-LN์ ํ์ต ์์ ์ฑ์ด ์ข์ ํธ์ด๋ค. |
| | """ |
| | def __init__(self, hidden_size: int, num_heads: int, intermediate_size: int, attn_dropout=0.1, dropout=0.1, name: str = "enc"): |
| | self.mha = MultiHeadSelfAttention(hidden_size, num_heads, attn_dropout=attn_dropout, proj_dropout=dropout, name=f"{name}.mha") |
| | self.ln1 = LayerNorm(hidden_size, name=f"{name}.ln1") |
| | self.ffn = PositionwiseFFN(hidden_size, intermediate_size, dropout=dropout, name=f"{name}.ffn") |
| | self.ln2 = LayerNorm(hidden_size, name=f"{name}.ln2") |
| | self.drop = Dropout(dropout) |
| | def __call__(self, x: Tensor, attention_mask: Optional[np.ndarray]) -> Tensor: |
| | |
| | x_ln = self.ln1(x) |
| | attn_out = self.mha(x_ln, attention_mask) |
| | x = x + self.drop(attn_out) |
| | |
| | x_ln2 = self.ln2(x) |
| | ffn_out = self.ffn(x_ln2) |
| | x = x + self.drop(ffn_out) |
| | return x |
| | def parameters(self): |
| | ps = [] |
| | ps += self.mha.Wq.parameters() |
| | ps += self.mha.Wk.parameters() |
| | ps += self.mha.Wv.parameters() |
| | ps += self.mha.Wo.parameters() |
| | ps += self.ln1.parameters() |
| | ps += self.ffn.parameters() |
| | ps += self.ln2.parameters() |
| | return ps |
| |
|
| | class BertEmbeddings(Module): |
| | def __init__(self, vocab_size: int, hidden_size: int, max_position: int = 512, type_vocab_size: int = 2, dropout=0.1, name: str = "emb"): |
| | self.word_embeddings = Tensor.from_np(xavier_init((vocab_size, hidden_size)), requires_grad=True, name=f"{name}.word") |
| | self.position_embeddings = Tensor.from_np(xavier_init((max_position, hidden_size)), requires_grad=True, name=f"{name}.pos") |
| | self.token_type_embeddings = Tensor.from_np(xavier_init((type_vocab_size, hidden_size)), requires_grad=True, name=f"{name}.type") |
| | self.ln = LayerNorm(hidden_size, name=f"{name}.ln") |
| | self.drop = Dropout(dropout) |
| | self.max_position = max_position |
| | def __call__(self, input_ids: np.ndarray, token_type_ids: np.ndarray) -> Tensor: |
| | B, T = input_ids.shape |
| | assert T <= self.max_position |
| | word = self.word_embeddings.data[input_ids] |
| | type_ = self.token_type_embeddings.data[token_type_ids] |
| | pos_ids = np.arange(T, dtype=np.int32)[None, :] |
| | pos = self.position_embeddings.data[pos_ids] |
| | out_data = word + type_ + pos |
| | x = Tensor(out_data, requires_grad=True) |
| | def _backward(): |
| | if x.grad is None: |
| | return |
| |
|
| | grad_flat = x.grad.reshape(-1, x.grad.shape[-1]) |
| |
|
| | |
| | if self.word_embeddings.requires_grad: |
| | ids = input_ids.reshape(-1).astype(np.int64) |
| | np.add.at(self.word_embeddings.grad, ids, grad_flat) |
| |
|
| | |
| | if self.token_type_embeddings.requires_grad: |
| | ids = token_type_ids.reshape(-1).astype(np.int64) |
| | np.add.at(self.token_type_embeddings.grad, ids, grad_flat) |
| |
|
| | |
| | if self.position_embeddings.requires_grad: |
| | ids = np.arange(T, dtype=np.int64) |
| | ids = np.tile(ids, B) |
| | np.add.at(self.position_embeddings.grad, ids, grad_flat) |
| |
|
| | x._backward = _backward |
| | x._prev = [] |
| | x = self.ln(x) |
| | x = self.drop(x) |
| | return x |
| | def parameters(self): |
| | return [self.word_embeddings, self.position_embeddings, self.token_type_embeddings] + self.ln.parameters() |
| |
|
| | class BertEncoder(Module): |
| | def __init__(self, num_layers: int, hidden_size: int, num_heads: int, intermediate_size: int, dropout=0.1): |
| | self.layers = [EncoderLayer(hidden_size, num_heads, intermediate_size, dropout=dropout, name=f"layer{i}") for i in range(num_layers)] |
| | def __call__(self, x: Tensor, attention_mask: Optional[np.ndarray]) -> Tensor: |
| | for layer in self.layers: |
| | x = layer(x, attention_mask) |
| | return x |
| | def parameters(self): |
| | ps = [] |
| | for l in self.layers: |
| | ps += l.parameters() |
| | return ps |
| |
|
| | class BertPooler(Module): |
| | def __init__(self, hidden_size: int): |
| | self.dense = Dense(hidden_size, hidden_size, name="pooler.dense") |
| | def __call__(self, x: Tensor) -> Tensor: |
| | cls = Tensor(x.data[:,0,:], requires_grad=x.requires_grad) |
| | def _backward(): |
| | if x.requires_grad and cls.grad is not None: |
| | x.grad[:,0,:] += cls.grad |
| | cls._backward = _backward |
| | cls._prev = [x] |
| | pooled = self.dense(cls).tanh() |
| | return pooled |
| | def parameters(self): |
| | return self.dense.parameters() |
| |
|
| | class BertForPreTraining(Module): |
| | def __init__(self, vocab_size: int, hidden_size: int = 768, num_layers: int = 12, num_heads: int = 12, intermediate_size: int = 3072, max_position: int = 512, dropout=0.1): |
| | self.emb = BertEmbeddings(vocab_size, hidden_size, max_position=max_position, dropout=dropout) |
| | self.encoder = BertEncoder(num_layers, hidden_size, num_heads, intermediate_size, dropout=dropout) |
| | self.pooler = BertPooler(hidden_size) |
| | self.pred_ln = LayerNorm(hidden_size, name="pred.ln") |
| | self.pred_dense = Dense(hidden_size, hidden_size, name="pred.proj") |
| | self.mlm_bias = Tensor.from_np(np.zeros((vocab_size,), dtype=np.float32), requires_grad=True, name="pred.bias") |
| | self.nsp = Dense(hidden_size, 2, name="nsp") |
| | def __call__(self, input_ids: np.ndarray, token_type_ids: np.ndarray, attention_mask: np.ndarray) -> Tuple[Tensor, Tensor, Tensor]: |
| | mask = (1.0 - attention_mask).astype(np.float32) * -1e4 |
| | mask = mask[:, None, None, :] |
| | x = self.emb(input_ids, token_type_ids) |
| | x = self.encoder(x, mask) |
| | pooled = self.pooler(x) |
| | pred = self.pred_ln(x) |
| | pred = self.pred_dense(pred).gelu() |
| | |
| | logits = pred.matmul(self.emb.word_embeddings.T()) + self.mlm_bias |
| | nsp_logits = self.nsp(pooled) |
| | return logits, nsp_logits, x |
| | def parameters(self): |
| | ps = [] |
| | ps += self.emb.parameters() |
| | ps += self.encoder.parameters() |
| | ps += self.pooler.parameters() |
| | ps += self.pred_ln.parameters() |
| | ps += self.pred_dense.parameters() |
| | ps += [self.mlm_bias] |
| | ps += self.nsp.parameters() |
| | return ps |
| |
|
| | |
| | |
| | |
| | def cross_entropy(logits: Tensor, target: np.ndarray, ignore_index: int = -100) -> Tensor: |
| | C = logits.data.shape[-1] |
| | x = logits.data |
| | x = x - np.max(x, axis=-1, keepdims=True) |
| | logsumexp = np.log(np.sum(np.exp(x), axis=-1, keepdims=True)) |
| | log_probs_data = x - logsumexp |
| | mask = (target != ignore_index).astype(np.float32) |
| | flat_idx = np.arange(target.size) |
| | target_flat = target.reshape(-1) |
| | log_probs_flat = log_probs_data.reshape(-1, C) |
| | nll_flat = -log_probs_flat[flat_idx, target_flat] |
| | nll_flat = nll_flat * mask.reshape(-1) |
| | loss_data = nll_flat.sum() / (mask.sum() + 1e-12) |
| | loss = Tensor(np.array(loss_data, dtype=np.float32), requires_grad=True) |
| | def _backward(): |
| | probs = np.exp(log_probs_data) |
| | grad = probs |
| | onehot = np.zeros_like(probs) |
| | onehot.reshape(-1, C)[flat_idx, target_flat] = 1.0 |
| | grad = (grad - onehot) * mask[..., None] |
| | grad = grad / (mask.sum() + 1e-12) |
| | if logits.grad is None: |
| | logits.grad = np.zeros_like(logits.data) |
| | logits.grad += grad.astype(np.float32) |
| | loss._backward = _backward |
| | loss._prev = [logits] |
| | return loss |
| |
|
| | class AdamW: |
| | def __init__(self, params: List[Tensor], lr=1e-4, betas=(0.9, 0.999), eps=1e-8, weight_decay=0.01): |
| | self.params = params |
| | self.lr = lr |
| | self.b1, self.b2 = betas |
| | self.eps = eps |
| | self.wd = weight_decay |
| | self.t = 0 |
| | self.m: Dict[int, np.ndarray] = {} |
| | self.v: Dict[int, np.ndarray] = {} |
| | def step(self): |
| | self.t += 1 |
| | for p in self.params: |
| | if p.grad is None: |
| | continue |
| | pid = id(p) |
| | if pid not in self.m: |
| | self.m[pid] = np.zeros_like(p.data) |
| | self.v[pid] = np.zeros_like(p.data) |
| | g = p.grad |
| | if self.wd > 0 and p.data.ndim > 1: |
| | p.data -= self.lr * self.wd * p.data |
| | self.m[pid] = self.b1 * self.m[pid] + (1 - self.b1) * g |
| | self.v[pid] = self.b2 * self.v[pid] + (1 - self.b2) * (g * g) |
| | mhat = self.m[pid] / (1 - self.b1 ** self.t) |
| | vhat = self.v[pid] / (1 - self.b2 ** self.t) |
| | p.data -= self.lr * mhat / (np.sqrt(vhat) + self.eps) |
| | def zero_grad(self): |
| | for p in self.params: |
| | p.zero_grad() |
| |
|
| | class LRScheduler: |
| | def __init__(self, optimizer: AdamW, base_lr: float, warmup_steps: int, total_steps: int): |
| | self.opt = optimizer |
| | self.base_lr = base_lr |
| | self.warmup = warmup_steps |
| | self.total = total_steps |
| | self.step_num = 0 |
| | def step(self): |
| | self.step_num += 1 |
| | if self.step_num <= self.warmup: |
| | scale = self.step_num / max(1, self.warmup) |
| | else: |
| | progress = (self.step_num - self.warmup) / max(1, (self.total - self.warmup)) |
| | scale = max(0.0, 1.0 - progress) |
| | lr = self.base_lr * scale |
| | self.opt.lr = lr |
| | return lr |
| |
|
| | |
| | |
| | |
| | class BasicTokenizer: |
| | def __init__(self, do_lower_case=True): |
| | self.do_lower_case = do_lower_case |
| | def _is_whitespace(self, ch): |
| | return ch.isspace() |
| | def _is_punctuation(self, ch): |
| | cp = ord(ch) |
| | if ((cp >= 33 and cp <= 47) or (cp >= 58 and cp <= 64) or (cp >= 91 and cp <= 96) or (cp >= 123 and cp <= 126)): |
| | return True |
| | cat = unicodedata.category(ch) |
| | return cat.startswith("P") |
| | def _clean_text(self, text): |
| | text = text.replace("nul", " ") |
| | return text |
| | def _tokenize_chinese_chars(self, text): |
| | output = [] |
| | for ch in text: |
| | cp = ord(ch) |
| | if (cp >= 0x4E00 and cp <= 0x9FFF): |
| | output.append(" "+ch+" ") |
| | else: |
| | output.append(ch) |
| | return "".join(output) |
| | def tokenize(self, text: str) -> List[str]: |
| | text = self._clean_text(text) |
| | text = self._tokenize_chinese_chars(text) |
| | if self.do_lower_case: |
| | text = text.lower() |
| | text = unicodedata.normalize("NFD", text) |
| | text = "".join([ch for ch in text if unicodedata.category(ch) != 'Mn']) |
| | spaced = [] |
| | for ch in text: |
| | if self._is_punctuation(ch) or self._is_whitespace(ch): |
| | spaced.append(" ") |
| | else: |
| | spaced.append(ch) |
| | text = "".join(spaced) |
| | return text.strip().split() |
| |
|
| | class WordPieceTokenizer: |
| | def __init__(self, vocab: Dict[str,int], unk_token="[UNK]", max_input_chars_per_word=100): |
| | self.vocab = vocab |
| | self.unk = unk_token |
| | self.max_chars = max_input_chars_per_word |
| | def tokenize(self, token: str) -> List[str]: |
| | if len(token) > self.max_chars: |
| | return [self.unk] |
| | sub_tokens = [] |
| | start = 0 |
| | while start < len(token): |
| | end = len(token) |
| | cur = None |
| | while start < end: |
| | substr = token[start:end] |
| | if start > 0: |
| | substr = "##" + substr |
| | if substr in self.vocab: |
| | cur = substr |
| | break |
| | end -= 1 |
| | if cur is None: |
| | return [self.unk] |
| | sub_tokens.append(cur) |
| | start = end |
| | return sub_tokens |
| |
|
| | class BertTokenizer: |
| | def __init__(self, vocab: Dict[str,int]): |
| | self.vocab = vocab |
| | self.inv_vocab = {i:s for s,i in vocab.items()} |
| | self.basic = BasicTokenizer(do_lower_case=True) |
| | self.wordpiece = WordPieceTokenizer(vocab) |
| | self.cls_token = "[CLS]"; self.sep_token = "[SEP]"; self.mask_token="[MASK]"; self.pad_token="[PAD]" |
| | self.cls_id = vocab[self.cls_token]; self.sep_id=vocab[self.sep_token]; self.mask_id=vocab[self.mask_token]; self.pad_id=vocab[self.pad_token] |
| | def encode(self, text_a: str, text_b: Optional[str]=None, max_len: int = 128) -> Tuple[List[int], List[int], List[int]]: |
| | a_tokens = [] |
| | for tok in self.basic.tokenize(text_a): |
| | a_tokens.extend(self.wordpiece.tokenize(tok)) |
| | b_tokens = [] |
| | if text_b: |
| | for tok in self.basic.tokenize(text_b): |
| | b_tokens.extend(self.wordpiece.tokenize(tok)) |
| | max_a = max_len - 3 if not b_tokens else (max_len - 3) // 2 |
| | max_b = max_len - 3 - max_a |
| | a_tokens = a_tokens[:max_a] |
| | b_tokens = b_tokens[:max_b] |
| | tokens = [self.cls_token] + a_tokens + [self.sep_token] |
| | type_ids = [0]*(len(tokens)) |
| | if b_tokens: |
| | tokens += b_tokens + [self.sep_token] |
| | type_ids += [1]*(len(b_tokens)+1) |
| | input_ids = [self.vocab.get(t, self.vocab.get("[UNK]", 100)) for t in tokens] |
| | attention_mask = [1]*len(input_ids) |
| | while len(input_ids) < max_len: |
| | input_ids.append(self.pad_id); attention_mask.append(0); type_ids.append(0) |
| | return input_ids[:max_len], attention_mask[:max_len], type_ids[:max_len] |
| |
|
| | |
| | |
| | |
| | @dataclass |
| | class PretrainBatch: |
| | input_ids: np.ndarray |
| | token_type_ids: np.ndarray |
| | attention_mask: np.ndarray |
| | mlm_labels: np.ndarray |
| | nsp_labels: np.ndarray |
| |
|
| |
|
| | def load_vocab_from_hub(repo_id: str = "bert-base-uncased", filename: str = "vocab.txt") -> Dict[str,int]: |
| | if not HAS_HF: |
| | raise RuntimeError("huggingface_hub / datasets๊ฐ ์ค์น๋์ด ์์ด์ผ ํจ") |
| | path = hf_hub_download(repo_id=repo_id, filename=filename) |
| | vocab = {} |
| | with open(path, "r", encoding="utf-8") as f: |
| | for i, line in enumerate(f): |
| | tok = line.strip() |
| | vocab[tok] = i |
| | return vocab |
| |
|
| |
|
| | def create_mlm_nsp_examples(texts: List[str], tokenizer: BertTokenizer, max_len: int = 128, dupe_factor: int = 1, masked_lm_prob=0.15) -> List[PretrainBatch]: |
| | sents = [s for s in texts if len(s.strip()) > 0] |
| | examples = [] |
| | for _ in range(dupe_factor): |
| | for i in range(len(sents)-1): |
| | a = sents[i] |
| | if random.random() < 0.5: |
| | b = sents[i+1] |
| | is_next = 1 |
| | else: |
| | b = random.choice(sents) |
| | is_next = 0 |
| | input_ids, attn, type_ids = tokenizer.encode(a, b, max_len) |
| | input_ids = np.array(input_ids, dtype=np.int32) |
| | attn = np.array(attn, dtype=np.int32) |
| | type_ids = np.array(type_ids, dtype=np.int32) |
| | mlm_labels = np.full_like(input_ids, fill_value=-100) |
| | cand_indexes = [j for j, tid in enumerate(input_ids) if tid not in (tokenizer.cls_id, tokenizer.sep_id, tokenizer.pad_id)] |
| | num_to_mask = max(1, int(round(len(cand_indexes) * masked_lm_prob))) |
| | random.shuffle(cand_indexes) |
| | masked = cand_indexes[:num_to_mask] |
| | for pos in masked: |
| | original = input_ids[pos] |
| | r = random.random() |
| | if r < 0.8: |
| | input_ids[pos] = tokenizer.mask_id |
| | elif r < 0.9: |
| | input_ids[pos] = random.randint(0, len(tokenizer.vocab)-1) |
| | else: |
| | pass |
| | mlm_labels[pos] = original |
| | examples.append(PretrainBatch( |
| | input_ids=input_ids, |
| | token_type_ids=type_ids, |
| | attention_mask=attn, |
| | mlm_labels=mlm_labels, |
| | nsp_labels=np.array([is_next], dtype=np.int32), |
| | )) |
| | return examples |
| |
|
| |
|
| | def collate_batches(batches: List[PretrainBatch], batch_size: int) -> List[PretrainBatch]: |
| | out = [] |
| | for i in range(0, len(batches), batch_size): |
| | chunk = batches[i:i+batch_size] |
| | if not chunk: |
| | continue |
| | B = len(chunk) |
| | T = len(chunk[0].input_ids) |
| | def stack(arrs): |
| | return np.stack(arrs, axis=0) |
| | out.append(PretrainBatch( |
| | input_ids=stack([b.input_ids for b in chunk]), |
| | token_type_ids=stack([b.token_type_ids for b in chunk]), |
| | attention_mask=stack([b.attention_mask for b in chunk]), |
| | mlm_labels=stack([b.mlm_labels for b in chunk]), |
| | nsp_labels=stack([b.nsp_labels for b in chunk]).reshape(B), |
| | )) |
| | return out |
| |
|
| | |
| | |
| | |
| |
|
| | def model_summary(model: BertForPreTraining): |
| | """๊ฐ๋จํ ๋ชจ๋ธ ์์ฝ: ๋ ์ด์ด ์, ํ๋ , ํค๋ ์, ํ๋ผ๋ฏธํฐ ๊ฐ์(๊ทผ์ฌ) |
| | """ |
| | print("===== MODEL SUMMARY =====") |
| | |
| | try: |
| | hidden = model.emb.word_embeddings.data.shape[1] |
| | vocab = model.emb.word_embeddings.data.shape[0] |
| | num_layers = len(model.encoder.layers) |
| | except Exception: |
| | hidden = None; vocab = None; num_layers = None |
| | print(f"Vocab size: {vocab}") |
| | print(f"Hidden size: {hidden}") |
| | print(f"Num layers: {num_layers}") |
| | |
| | total = 0 |
| | names = set() |
| | for p in model.parameters(): |
| | total += p.data.size |
| | names.add(p.name) |
| | print(f"Total parameters (approx): {total:,}") |
| | print("=========================") |
| |
|
| |
|
| | def save_model(model: BertForPreTraining, path_base: str = "./bert_numpy_model"): |
| | """๋ชจ๋ธ์ ๋ชจ๋ ํ๋ผ๋ฏธํฐ๋ฅผ ์์งํ์ฌ .npz์ .npy๋ก ์ ์ฅํ๋ค. |
| | - .npz: ๊ฐ ํ๋ผ๋ฏธํฐ๋ฅผ ๊ฐ๋ณ ๋ฐฐ์ด๋ก ์ ์ฅ |
| | - .npy: ํ์ด์ฌ dict ๊ฐ์ฒด๋ก ์ ์ฅ (๋ก๋ ์ np.load(..., allow_pickle=True) ํ์) |
| | """ |
| | sd = {} |
| | used = set() |
| | i = 0 |
| | for p in model.parameters(): |
| | name = p.name if getattr(p, 'name', '') else f'param_{i}' |
| | |
| | if name in used: |
| | name = f"{name}_{i}" |
| | sd[name] = p.data |
| | used.add(name) |
| | i += 1 |
| | np.savez(path_base + ".npz", **sd) |
| | |
| | np.save(path_base + ".npy", sd) |
| | print(f"Model saved to {path_base}.npz and {path_base}.npy") |
| |
|
| | |
| | |
| | |
| |
|
| | def train_demo(use_large_model: bool = True): |
| | """ํ์ต ๋ฐ๋ชจ ํจ์ |
| | - use_large_model: True์ด๋ฉด ๊ธฐ๋ณธ์ ์ผ๋ก 12-layer, H=768 ์ค์ ์ ์ฌ์ฉ (๋ฌด๊ฑฐ์). ํ
์คํธ์ฉ์ผ๋ก False๋ก ์ค์ ํ๋ฉด ๋ ์์ ๋ชจ๋ธ์ ์. |
| | """ |
| | set_seed(1234) |
| | if not HAS_HF: |
| | raise RuntimeError("datasets/huggingface_hub ์ค์น ํ์. pip install datasets huggingface_hub") |
| |
|
| | print("[info] Loading vocab and dataset from hub...") |
| | vocab = load_vocab_from_hub("bert-base-uncased", "vocab.txt") |
| | tokenizer = BertTokenizer(vocab) |
| |
|
| | |
| | ds = load_dataset("wikitext", "wikitext-2-raw-v1") |
| | raw_lines = ds['train']['text'][:2000] |
| |
|
| | print("[info] Creating examples (MLM+NSP)...") |
| | examples = create_mlm_nsp_examples(raw_lines, tokenizer, max_len=128, dupe_factor=1) |
| | random.shuffle(examples) |
| |
|
| | |
| | if use_large_model: |
| | model = BertForPreTraining(vocab_size=len(vocab), hidden_size=768, num_layers=12, num_heads=12, intermediate_size=3072, max_position=512, dropout=0.1) |
| | else: |
| | |
| | model = BertForPreTraining(vocab_size=len(vocab), hidden_size=256, num_layers=4, num_heads=4, intermediate_size=1024, max_position=128, dropout=0.1) |
| |
|
| | model_summary(model) |
| |
|
| | |
| | per_step_batch = 4 |
| | accum_steps = 4 |
| | batches = collate_batches(examples, batch_size=per_step_batch) |
| |
|
| | params = model.parameters() |
| | optim = AdamW(params, lr=2e-4, weight_decay=0.01) |
| | total_steps = 500 |
| | warmup_steps = 50 |
| | scheduler = LRScheduler(optim, base_lr=2e-4, warmup_steps=warmup_steps, total_steps=total_steps) |
| |
|
| | print("[info] Start training (gradient accumulation enabled)...") |
| | global_step = 0 |
| | for step, batch in enumerate(batches): |
| | if global_step >= total_steps: |
| | break |
| | dropout_is_training(model, True) |
| |
|
| | mlm_logits, nsp_logits, _ = model(batch.input_ids, batch.token_type_ids, batch.attention_mask) |
| | mlm_loss = cross_entropy(mlm_logits, batch.mlm_labels, ignore_index=-100) |
| | nsp_loss = cross_entropy(nsp_logits, batch.nsp_labels) |
| | loss = mlm_loss + nsp_loss |
| |
|
| | |
| | loss.backward() |
| |
|
| | if (step + 1) % accum_steps == 0: |
| | lr = scheduler.step() |
| | optim.step() |
| | optim.zero_grad() |
| | global_step += 1 |
| | if global_step % 10 == 0: |
| | print(f"global_step={global_step:4d} | lr={lr:.6f} | loss={loss.data.item():.4f} | mlm={mlm_loss.data.item():.4f} | nsp={nsp_loss.data.item():.4f}") |
| |
|
| | print("[info] Training finished. Saving model...") |
| | save_model(model, "./bert_numpy_model") |
| | print("[info] Done.") |
| |
|
| | |
| | |
| | |
| | if __name__ == "__main__": |
| | |
| | |
| | train_demo(use_large_model=False) |
| |
|