# -*- coding: utf-8 -*- """ Numpy 만으로 BERT 구현하기 (확장판 — 아키텍처 강화) ------------------------------------------------------------ 변경/강화된 부분 요약: - 기본 BERT 아키텍처를 실제와 유사하게 강화: Encoder L = 12, H = 768, A = 12, intermediate = 3072, max_pos = 512 (기본값) - EncoderLayer를 Pre-LayerNorm 스타일로 변경(학습 안정성 향상). - PositionwiseFFN을 "두 개의 FFN 블록"으로 확장하여 인코더당 더 풍부한 비선형성 제공. - MLM head에서 "정식" weight-tying을 적용: Tensor 연산으로 연결하여 자동미분이 정상 동작하도록 함. - model_summary() 추가: 모델 구조/파라미터 수 요약 출력. - save_model() 추가: 학습이 끝난 모델 파라미터를 ./bert_numpy_model.npz 그리고 ./bert_numpy_model.npy 로 저장. - 이전의 gradient accumulation / LR scheduler / Dropout 등은 유지. 주의: - 기본값으로 대형 BERT 설정(12-layer, H=768)은 CPU에서 매우 무겁고 메모리를 많이 사용합니다. 학습을 바로 돌리기보다 먼저 작은 설정으로 테스트하시길 권장합니다. 실행: $ pip install numpy datasets huggingface_hub $ python numpy_only_bert_from_scratch.py """ from __future__ import annotations import math import random import unicodedata import re from dataclasses import dataclass from typing import List, Tuple, Dict, Optional import numpy as np # 외부 데이터 로딩용(선택적) try: from datasets import load_dataset from huggingface_hub import hf_hub_download HAS_HF = True except Exception: HAS_HF = False ############################################################ # 유틸리티 ############################################################ def set_seed(seed: int = 42): random.seed(seed) np.random.seed(seed) def gelu(x: np.ndarray) -> np.ndarray: return 0.5 * x * (1.0 + np.tanh(np.sqrt(2.0/np.pi) * (x + 0.044715 * (x**3)))) def softmax(x: np.ndarray, axis: int = -1) -> np.ndarray: x = x - np.max(x, axis=axis, keepdims=True) e = np.exp(x) return e / np.sum(e, axis=axis, keepdims=True) def xavier_init(shape: Tuple[int, ...]) -> np.ndarray: if len(shape) == 1: fan_in = shape[0] fan_out = shape[0] else: fan_in = shape[-2] if len(shape) >= 2 else shape[0] fan_out = shape[-1] limit = np.sqrt(6.0 / (fan_in + fan_out)) return np.random.uniform(-limit, limit, size=shape).astype(np.float32) ############################################################ # 자동미분 엔진 (간단한 테이프 기반) ############################################################ def reduce_grad(grad: np.ndarray, shape: Tuple[int, ...]) -> np.ndarray: """브로드캐스트된 grad를 원래 shape로 줄여줌""" # 차원 맞추기: grad.ndim > shape.ndim 인 경우 앞쪽 차원 합치기 while grad.ndim > len(shape): grad = grad.sum(axis=0) # 각 축마다 원래 shape이 1인 경우 sum 축소 for i, dim in enumerate(shape): if dim == 1 and grad.shape[i] != 1: grad = grad.sum(axis=i, keepdims=True) return grad class Tensor: def __init__(self, data: np.ndarray, requires_grad: bool = False, name: str = ""): if not isinstance(data, np.ndarray): data = np.array(data, dtype=np.float32) self.data = data.astype(np.float32) self.grad = np.zeros_like(self.data) if requires_grad else None self.requires_grad = requires_grad self._backward = lambda: None self._prev: List[Tensor] = [] self.name = name def zero_grad(self): if self.requires_grad: self.grad[...] = 0.0 def backward(self, grad: Optional[np.ndarray] = None): if grad is None: assert self.data.size == 1, "backward() requires grad for non-scalar" grad = np.ones_like(self.data) self.grad = self.grad + grad if self.grad is not None else grad topo = [] visited = set() def build_topo(v: Tensor): if id(v) not in visited: visited.add(id(v)) for child in v._prev: build_topo(child) topo.append(v) build_topo(self) for v in reversed(topo): v._backward() # 산술 연산 def __add__(self, other: Tensor | float): other = other if isinstance(other, Tensor) else Tensor(np.array(other, dtype=np.float32)) out = Tensor(self.data + other.data, requires_grad=(self.requires_grad or other.requires_grad)) def _backward(): if self.requires_grad: self.grad += reduce_grad(out.grad, self.data.shape) if other.requires_grad: other.grad += reduce_grad(out.grad, other.data.shape) out._backward = _backward out._prev = [self, other] return out def __sub__(self, other): other = other if isinstance(other, Tensor) else Tensor(np.array(other, dtype=np.float32)) out = Tensor(self.data - other.data, requires_grad=(self.requires_grad or other.requires_grad)) def _backward(): if self.requires_grad: self.grad += reduce_grad(out.grad, self.data.shape) if other.requires_grad: other.grad -= reduce_grad(out.grad, other.data.shape) out._backward = _backward out._prev = [self, other] return out def __mul__(self, other: Tensor | float): other = other if isinstance(other, Tensor) else Tensor(np.array(other, dtype=np.float32)) out = Tensor(self.data * other.data, requires_grad=(self.requires_grad or other.requires_grad)) def _backward(): if self.requires_grad: self.grad += reduce_grad(out.grad * other.data, self.data.shape) if other.requires_grad: other.grad += reduce_grad(out.grad * self.data, other.data.shape) out._backward = _backward out._prev = [self, other] return out def __truediv__(self, other: Tensor | float): other = other if isinstance(other, Tensor) else Tensor(np.array(other, dtype=np.float32)) out = Tensor(self.data / other.data, requires_grad=(self.requires_grad or other.requires_grad)) def _backward(): if self.requires_grad: self.grad += reduce_grad(out.grad * (1.0 / other.data), self.data.shape) if other.requires_grad: other.grad += reduce_grad(out.grad * (-self.data / (other.data ** 2)), other.data.shape) out._backward = _backward out._prev = [self, other] return out def matmul(self, other: Tensor): out = Tensor(self.data @ other.data, requires_grad=(self.requires_grad or other.requires_grad)) def _backward(): if self.requires_grad: grad_self = out.grad @ np.swapaxes(other.data, -1, -2) self.grad += reduce_grad(grad_self, self.data.shape) if other.requires_grad: grad_other = np.swapaxes(self.data, -1, -2) @ out.grad other.grad += reduce_grad(grad_other, other.data.shape) out._backward = _backward out._prev = [self, other] return out def T(self): out = Tensor(self.data.T, requires_grad=self.requires_grad) def _backward(): if self.requires_grad: self.grad += out.grad.T out._backward = _backward out._prev = [self] return out def sum(self, axis=None, keepdims=False): out = Tensor(self.data.sum(axis=axis, keepdims=keepdims), requires_grad=self.requires_grad) def _backward(): if not self.requires_grad: return grad = out.grad if axis is not None and not keepdims: shape = list(self.data.shape) if isinstance(axis, int): axis_ = [axis] else: axis_ = list(axis) for ax in axis_: shape[ax] = 1 grad = grad.reshape(shape) grad = np.broadcast_to(grad, self.data.shape) self.grad += grad out._backward = _backward out._prev = [self] return out def mean(self, axis=None, keepdims=False): denom = self.data.size if axis is None else (self.data.shape[axis] if isinstance(axis, int) else np.prod([self.data.shape[a] for a in axis])) return self.sum(axis=axis, keepdims=keepdims) * (1.0/denom) def relu(self): out_data = np.maximum(self.data, 0) out = Tensor(out_data, requires_grad=self.requires_grad) def _backward(): if self.requires_grad: self.grad += (self.data > 0).astype(np.float32) * out.grad out._backward = _backward out._prev = [self] return out def gelu(self): out_data = gelu(self.data) out = Tensor(out_data, requires_grad=self.requires_grad) def _backward(): if self.requires_grad: c = np.sqrt(2.0/np.pi) t = c * (self.data + 0.044715 * (self.data**3)) th = np.tanh(t) dt_dx = c * (1 + 3*0.044715*(self.data**2)) * (1 - th**2) dgelu = 0.5 * (1 + th) + 0.5 * self.data * dt_dx self.grad += dgelu * out.grad out._backward = _backward out._prev = [self] return out def softmax(self, axis=-1): out_data = softmax(self.data, axis=axis) out = Tensor(out_data, requires_grad=self.requires_grad) def _backward(): if not self.requires_grad: return y = out.data g = out.grad s = np.sum(g * y, axis=axis, keepdims=True) self.grad += y * (g - s) out._backward = _backward out._prev = [self] return out def layernorm(self, eps=1e-12): mean = self.data.mean(axis=-1, keepdims=True) var = ((self.data - mean)**2).mean(axis=-1, keepdims=True) inv_std = 1.0 / np.sqrt(var + eps) normed = (self.data - mean) * inv_std out = Tensor(normed, requires_grad=self.requires_grad) def _backward(): if not self.requires_grad: return N = self.data.shape[-1] g = out.grad xmu = self.data - mean dx = (1.0/np.sqrt(var + eps)) * (g - g.mean(axis=-1, keepdims=True) - xmu * (g * xmu).mean(axis=-1, keepdims=True) / (var + eps)) self.grad += dx out._backward = _backward out._prev = [self] return out def tanh(self): y = np.tanh(self.data) out = Tensor(y, requires_grad=self.requires_grad) def _backward(): if self.requires_grad: self.grad += (1 - y**2) * out.grad out._backward = _backward out._prev = [self] return out def detach(self): return Tensor(self.data.copy(), requires_grad=False) @staticmethod def from_np(x: np.ndarray, requires_grad=False, name: str = ""): return Tensor(x, requires_grad=requires_grad, name=name) setattr(Tensor, 'transpose_last2', lambda self: Tensor(self.data.swapaxes(-1,-2), requires_grad=self.requires_grad)) ############################################################ # 레이어/모듈 정의 ############################################################ class Module: def parameters(self) -> List[Tensor]: raise NotImplementedError def zero_grad(self): for p in self.parameters(): p.zero_grad() class Dense(Module): def __init__(self, in_features: int, out_features: int, bias: bool = True, name: str = "dense"): self.W = Tensor.from_np(xavier_init((in_features, out_features)), requires_grad=True, name=f"{name}.W") self.b = Tensor.from_np(np.zeros((out_features,), dtype=np.float32), requires_grad=True, name=f"{name}.b") if bias else None def __call__(self, x: Tensor) -> Tensor: out = x.matmul(self.W) if self.b is not None: out = out + self.b return out def parameters(self): return [p for p in [self.W, self.b] if p is not None] class LayerNorm(Module): def __init__(self, hidden_size: int, eps: float = 1e-12, name: str = "ln"): self.gamma = Tensor.from_np(np.ones((hidden_size,), dtype=np.float32), requires_grad=True, name=f"{name}.gamma") self.beta = Tensor.from_np(np.zeros((hidden_size,), dtype=np.float32), requires_grad=True, name=f"{name}.beta") self.eps = eps def __call__(self, x: Tensor) -> Tensor: normed = x.layernorm(self.eps) return normed * self.gamma + self.beta def parameters(self): return [self.gamma, self.beta] class Dropout(Module): def __init__(self, p: float = 0.1): self.p = p self.training = True self.mask: Optional[np.ndarray] = None def __call__(self, x: Tensor) -> Tensor: if not self.training or self.p == 0.0: return x self.mask = (np.random.rand(*x.data.shape) >= self.p).astype(np.float32) / (1.0 - self.p) out = Tensor(x.data * self.mask, requires_grad=x.requires_grad) def _backward(): if x.requires_grad: x.grad += out.grad * self.mask out._backward = _backward out._prev = [x] return out def parameters(self): return [] def dropout_is_training(module: Module, training: bool): for attr in dir(module): try: obj = getattr(module, attr) except Exception: continue if isinstance(obj, Dropout): obj.training = training if isinstance(obj, Module): dropout_is_training(obj, training) class MultiHeadSelfAttention(Module): def __init__(self, hidden_size: int, num_heads: int, attn_dropout: float = 0.1, proj_dropout: float = 0.1, name: str = "mha"): assert hidden_size % num_heads == 0 self.hidden = hidden_size self.num_heads = num_heads self.head_dim = hidden_size // num_heads self.Wq = Dense(hidden_size, hidden_size, name=f"{name}.Wq") self.Wk = Dense(hidden_size, hidden_size, name=f"{name}.Wk") self.Wv = Dense(hidden_size, hidden_size, name=f"{name}.Wv") self.Wo = Dense(hidden_size, hidden_size, name=f"{name}.Wo") self.attn_drop = Dropout(attn_dropout) self.proj_drop = Dropout(proj_dropout) def __call__(self, x: Tensor, attention_mask: Optional[np.ndarray]) -> Tensor: B, T, H = x.data.shape q = self.Wq(x); k = self.Wk(x); v = self.Wv(x) def split_heads(t: Tensor) -> Tensor: t2 = t.data.reshape(B, T, self.num_heads, self.head_dim).transpose(0,2,1,3) out = Tensor(t2, requires_grad=t.requires_grad) def _backward(): if t.requires_grad: grad = out.grad.transpose(0,2,1,3).reshape(B, T, self.hidden) t.grad += grad out._backward = _backward out._prev = [t] return out qh, kh, vh = split_heads(q), split_heads(k), split_heads(v) scale = 1.0 / np.sqrt(self.head_dim) def bmm(a: Tensor, b: Tensor) -> Tensor: # a: (B, H, Tq, D), b: (B, H, D, Tk) Bn, Nh, Tq, D = a.data.shape _, _, D2, Tk = b.data.shape assert D == D2 out_data = np.matmul(a.data, b.data) # (B, H, Tq, Tk) out = Tensor(out_data, requires_grad=(a.requires_grad or b.requires_grad)) def _backward(): if a.requires_grad: grad_a = np.matmul(out.grad, np.swapaxes(b.data, -1, -2)) # (B, H, Tq, D) a.grad += grad_a if b.requires_grad: grad_b = np.matmul(np.swapaxes(a.data, -1, -2), out.grad) # (B, H, D, Tk) b.grad += grad_b out._backward = _backward out._prev = [a, b] return out kh_T = Tensor(kh.data.transpose(0,1,3,2), requires_grad=kh.requires_grad) def _backward_kh_T(): if kh.requires_grad and kh_T.grad is not None: kh.grad += kh_T.grad.transpose(0,1,3,2) kh_T._backward = _backward_kh_T kh_T._prev = [kh] scores = bmm(qh, kh_T) * Tensor(np.array(scale, dtype=np.float32)) if attention_mask is not None: scores = Tensor(scores.data + attention_mask, requires_grad=scores.requires_grad) attn = scores.softmax(axis=-1) attn = self.attn_drop(attn) context = bmm(attn, vh) def combine_heads(t: Tensor) -> Tensor: Bn, Nh, Tq, D = t.data.shape t2 = t.data.transpose(0,2,1,3).reshape(Bn, Tq, Nh*D) out = Tensor(t2, requires_grad=t.requires_grad) def _backward(): if t.requires_grad: grad = out.grad.reshape(Bn, Tq, Nh, D).transpose(0,2,1,3) t.grad += grad out._backward = _backward out._prev = [t] return out context_merged = combine_heads(context) out = self.Wo(context_merged) out = self.proj_drop(out) return out class PositionwiseFFN(Module): """성능 향상을 위한 "두 개의 FFN 블록" 구조. (hidden -> intermediate -> hidden) 이 2번 연속으로 쌓여 있다. 각 블록은 Dropout을 포함하고, 블록 후 residual 연결은 EncoderLayer에서 수행된다. """ def __init__(self, hidden_size: int, intermediate_size: int, dropout: float = 0.1, name: str = "ffn"): # 첫 번째 FFN self.dense1 = Dense(hidden_size, intermediate_size, name=f"{name}.dense1") self.dense2 = Dense(intermediate_size, hidden_size, name=f"{name}.dense2") # 두 번째 FFN (추가 깊이) self.dense3 = Dense(hidden_size, intermediate_size, name=f"{name}.dense3") self.dense4 = Dense(intermediate_size, hidden_size, name=f"{name}.dense4") self.drop = Dropout(dropout) def __call__(self, x: Tensor) -> Tensor: # block 1 h = self.dense1(x).gelu() h = self.drop(h) h = self.dense2(h) # block 2 h2 = self.dense3(h).gelu() h2 = self.drop(h2) h2 = self.dense4(h2) return h2 def parameters(self): return self.dense1.parameters() + self.dense2.parameters() + self.dense3.parameters() + self.dense4.parameters() class EncoderLayer(Module): """Pre-LayerNorm Transformer Encoder Layer 구조: x -> LN -> MHA -> dropout -> x + out x -> LN -> FFN (여기선 두 블록) -> dropout -> x + out Pre-LN은 학습 안정성이 좋은 편이다. """ def __init__(self, hidden_size: int, num_heads: int, intermediate_size: int, attn_dropout=0.1, dropout=0.1, name: str = "enc"): self.mha = MultiHeadSelfAttention(hidden_size, num_heads, attn_dropout=attn_dropout, proj_dropout=dropout, name=f"{name}.mha") self.ln1 = LayerNorm(hidden_size, name=f"{name}.ln1") self.ffn = PositionwiseFFN(hidden_size, intermediate_size, dropout=dropout, name=f"{name}.ffn") self.ln2 = LayerNorm(hidden_size, name=f"{name}.ln2") self.drop = Dropout(dropout) def __call__(self, x: Tensor, attention_mask: Optional[np.ndarray]) -> Tensor: # Pre-LN -> MHA x_ln = self.ln1(x) attn_out = self.mha(x_ln, attention_mask) x = x + self.drop(attn_out) # Pre-LN -> FFN x_ln2 = self.ln2(x) ffn_out = self.ffn(x_ln2) x = x + self.drop(ffn_out) return x def parameters(self): ps = [] ps += self.mha.Wq.parameters() ps += self.mha.Wk.parameters() ps += self.mha.Wv.parameters() ps += self.mha.Wo.parameters() ps += self.ln1.parameters() ps += self.ffn.parameters() ps += self.ln2.parameters() return ps class BertEmbeddings(Module): def __init__(self, vocab_size: int, hidden_size: int, max_position: int = 512, type_vocab_size: int = 2, dropout=0.1, name: str = "emb"): self.word_embeddings = Tensor.from_np(xavier_init((vocab_size, hidden_size)), requires_grad=True, name=f"{name}.word") self.position_embeddings = Tensor.from_np(xavier_init((max_position, hidden_size)), requires_grad=True, name=f"{name}.pos") self.token_type_embeddings = Tensor.from_np(xavier_init((type_vocab_size, hidden_size)), requires_grad=True, name=f"{name}.type") self.ln = LayerNorm(hidden_size, name=f"{name}.ln") self.drop = Dropout(dropout) self.max_position = max_position def __call__(self, input_ids: np.ndarray, token_type_ids: np.ndarray) -> Tensor: B, T = input_ids.shape assert T <= self.max_position word = self.word_embeddings.data[input_ids] type_ = self.token_type_embeddings.data[token_type_ids] pos_ids = np.arange(T, dtype=np.int32)[None, :] pos = self.position_embeddings.data[pos_ids] out_data = word + type_ + pos x = Tensor(out_data, requires_grad=True) def _backward(): if x.grad is None: return grad_flat = x.grad.reshape(-1, x.grad.shape[-1]) # (B*T, H) # word embedding grad if self.word_embeddings.requires_grad: ids = input_ids.reshape(-1).astype(np.int64) # (B*T,) np.add.at(self.word_embeddings.grad, ids, grad_flat) # token type embedding grad if self.token_type_embeddings.requires_grad: ids = token_type_ids.reshape(-1).astype(np.int64) # (B*T,) np.add.at(self.token_type_embeddings.grad, ids, grad_flat) # position embedding grad (FIXED) if self.position_embeddings.requires_grad: ids = np.arange(T, dtype=np.int64) # (T,) ids = np.tile(ids, B) # (B*T,) np.add.at(self.position_embeddings.grad, ids, grad_flat) x._backward = _backward x._prev = [] x = self.ln(x) x = self.drop(x) return x def parameters(self): return [self.word_embeddings, self.position_embeddings, self.token_type_embeddings] + self.ln.parameters() class BertEncoder(Module): def __init__(self, num_layers: int, hidden_size: int, num_heads: int, intermediate_size: int, dropout=0.1): self.layers = [EncoderLayer(hidden_size, num_heads, intermediate_size, dropout=dropout, name=f"layer{i}") for i in range(num_layers)] def __call__(self, x: Tensor, attention_mask: Optional[np.ndarray]) -> Tensor: for layer in self.layers: x = layer(x, attention_mask) return x def parameters(self): ps = [] for l in self.layers: ps += l.parameters() return ps class BertPooler(Module): def __init__(self, hidden_size: int): self.dense = Dense(hidden_size, hidden_size, name="pooler.dense") def __call__(self, x: Tensor) -> Tensor: cls = Tensor(x.data[:,0,:], requires_grad=x.requires_grad) def _backward(): if x.requires_grad and cls.grad is not None: x.grad[:,0,:] += cls.grad cls._backward = _backward cls._prev = [x] pooled = self.dense(cls).tanh() return pooled def parameters(self): return self.dense.parameters() class BertForPreTraining(Module): def __init__(self, vocab_size: int, hidden_size: int = 768, num_layers: int = 12, num_heads: int = 12, intermediate_size: int = 3072, max_position: int = 512, dropout=0.1): self.emb = BertEmbeddings(vocab_size, hidden_size, max_position=max_position, dropout=dropout) self.encoder = BertEncoder(num_layers, hidden_size, num_heads, intermediate_size, dropout=dropout) self.pooler = BertPooler(hidden_size) self.pred_ln = LayerNorm(hidden_size, name="pred.ln") self.pred_dense = Dense(hidden_size, hidden_size, name="pred.proj") self.mlm_bias = Tensor.from_np(np.zeros((vocab_size,), dtype=np.float32), requires_grad=True, name="pred.bias") self.nsp = Dense(hidden_size, 2, name="nsp") def __call__(self, input_ids: np.ndarray, token_type_ids: np.ndarray, attention_mask: np.ndarray) -> Tuple[Tensor, Tensor, Tensor]: mask = (1.0 - attention_mask).astype(np.float32) * -1e4 mask = mask[:, None, None, :] x = self.emb(input_ids, token_type_ids) x = self.encoder(x, mask) pooled = self.pooler(x) pred = self.pred_ln(x) pred = self.pred_dense(pred).gelu() # weight tying: pred (B,T,H) @ word_embeddings.T (H,V) -> (B,T,V) logits = pred.matmul(self.emb.word_embeddings.T()) + self.mlm_bias nsp_logits = self.nsp(pooled) return logits, nsp_logits, x def parameters(self): ps = [] ps += self.emb.parameters() ps += self.encoder.parameters() ps += self.pooler.parameters() ps += self.pred_ln.parameters() ps += self.pred_dense.parameters() ps += [self.mlm_bias] ps += self.nsp.parameters() return ps ############################################################ # 손실 및 옵티마이저/스케줄러 ############################################################ def cross_entropy(logits: Tensor, target: np.ndarray, ignore_index: int = -100) -> Tensor: C = logits.data.shape[-1] x = logits.data x = x - np.max(x, axis=-1, keepdims=True) logsumexp = np.log(np.sum(np.exp(x), axis=-1, keepdims=True)) log_probs_data = x - logsumexp mask = (target != ignore_index).astype(np.float32) flat_idx = np.arange(target.size) target_flat = target.reshape(-1) log_probs_flat = log_probs_data.reshape(-1, C) nll_flat = -log_probs_flat[flat_idx, target_flat] nll_flat = nll_flat * mask.reshape(-1) loss_data = nll_flat.sum() / (mask.sum() + 1e-12) loss = Tensor(np.array(loss_data, dtype=np.float32), requires_grad=True) def _backward(): probs = np.exp(log_probs_data) grad = probs onehot = np.zeros_like(probs) onehot.reshape(-1, C)[flat_idx, target_flat] = 1.0 grad = (grad - onehot) * mask[..., None] grad = grad / (mask.sum() + 1e-12) if logits.grad is None: logits.grad = np.zeros_like(logits.data) logits.grad += grad.astype(np.float32) loss._backward = _backward loss._prev = [logits] return loss class AdamW: def __init__(self, params: List[Tensor], lr=1e-4, betas=(0.9, 0.999), eps=1e-8, weight_decay=0.01): self.params = params self.lr = lr self.b1, self.b2 = betas self.eps = eps self.wd = weight_decay self.t = 0 self.m: Dict[int, np.ndarray] = {} self.v: Dict[int, np.ndarray] = {} def step(self): self.t += 1 for p in self.params: if p.grad is None: continue pid = id(p) if pid not in self.m: self.m[pid] = np.zeros_like(p.data) self.v[pid] = np.zeros_like(p.data) g = p.grad if self.wd > 0 and p.data.ndim > 1: p.data -= self.lr * self.wd * p.data self.m[pid] = self.b1 * self.m[pid] + (1 - self.b1) * g self.v[pid] = self.b2 * self.v[pid] + (1 - self.b2) * (g * g) mhat = self.m[pid] / (1 - self.b1 ** self.t) vhat = self.v[pid] / (1 - self.b2 ** self.t) p.data -= self.lr * mhat / (np.sqrt(vhat) + self.eps) def zero_grad(self): for p in self.params: p.zero_grad() class LRScheduler: def __init__(self, optimizer: AdamW, base_lr: float, warmup_steps: int, total_steps: int): self.opt = optimizer self.base_lr = base_lr self.warmup = warmup_steps self.total = total_steps self.step_num = 0 def step(self): self.step_num += 1 if self.step_num <= self.warmup: scale = self.step_num / max(1, self.warmup) else: progress = (self.step_num - self.warmup) / max(1, (self.total - self.warmup)) scale = max(0.0, 1.0 - progress) lr = self.base_lr * scale self.opt.lr = lr return lr ############################################################ # 토크나이저 ############################################################ class BasicTokenizer: def __init__(self, do_lower_case=True): self.do_lower_case = do_lower_case def _is_whitespace(self, ch): return ch.isspace() def _is_punctuation(self, ch): cp = ord(ch) if ((cp >= 33 and cp <= 47) or (cp >= 58 and cp <= 64) or (cp >= 91 and cp <= 96) or (cp >= 123 and cp <= 126)): return True cat = unicodedata.category(ch) return cat.startswith("P") def _clean_text(self, text): text = text.replace("nul", " ") return text def _tokenize_chinese_chars(self, text): output = [] for ch in text: cp = ord(ch) if (cp >= 0x4E00 and cp <= 0x9FFF): output.append(" "+ch+" ") else: output.append(ch) return "".join(output) def tokenize(self, text: str) -> List[str]: text = self._clean_text(text) text = self._tokenize_chinese_chars(text) if self.do_lower_case: text = text.lower() text = unicodedata.normalize("NFD", text) text = "".join([ch for ch in text if unicodedata.category(ch) != 'Mn']) spaced = [] for ch in text: if self._is_punctuation(ch) or self._is_whitespace(ch): spaced.append(" ") else: spaced.append(ch) text = "".join(spaced) return text.strip().split() class WordPieceTokenizer: def __init__(self, vocab: Dict[str,int], unk_token="[UNK]", max_input_chars_per_word=100): self.vocab = vocab self.unk = unk_token self.max_chars = max_input_chars_per_word def tokenize(self, token: str) -> List[str]: if len(token) > self.max_chars: return [self.unk] sub_tokens = [] start = 0 while start < len(token): end = len(token) cur = None while start < end: substr = token[start:end] if start > 0: substr = "##" + substr if substr in self.vocab: cur = substr break end -= 1 if cur is None: return [self.unk] sub_tokens.append(cur) start = end return sub_tokens class BertTokenizer: def __init__(self, vocab: Dict[str,int]): self.vocab = vocab self.inv_vocab = {i:s for s,i in vocab.items()} self.basic = BasicTokenizer(do_lower_case=True) self.wordpiece = WordPieceTokenizer(vocab) self.cls_token = "[CLS]"; self.sep_token = "[SEP]"; self.mask_token="[MASK]"; self.pad_token="[PAD]" self.cls_id = vocab[self.cls_token]; self.sep_id=vocab[self.sep_token]; self.mask_id=vocab[self.mask_token]; self.pad_id=vocab[self.pad_token] def encode(self, text_a: str, text_b: Optional[str]=None, max_len: int = 128) -> Tuple[List[int], List[int], List[int]]: a_tokens = [] for tok in self.basic.tokenize(text_a): a_tokens.extend(self.wordpiece.tokenize(tok)) b_tokens = [] if text_b: for tok in self.basic.tokenize(text_b): b_tokens.extend(self.wordpiece.tokenize(tok)) max_a = max_len - 3 if not b_tokens else (max_len - 3) // 2 max_b = max_len - 3 - max_a a_tokens = a_tokens[:max_a] b_tokens = b_tokens[:max_b] tokens = [self.cls_token] + a_tokens + [self.sep_token] type_ids = [0]*(len(tokens)) if b_tokens: tokens += b_tokens + [self.sep_token] type_ids += [1]*(len(b_tokens)+1) input_ids = [self.vocab.get(t, self.vocab.get("[UNK]", 100)) for t in tokens] attention_mask = [1]*len(input_ids) while len(input_ids) < max_len: input_ids.append(self.pad_id); attention_mask.append(0); type_ids.append(0) return input_ids[:max_len], attention_mask[:max_len], type_ids[:max_len] ############################################################ # 데이터 준비 ############################################################ @dataclass class PretrainBatch: input_ids: np.ndarray token_type_ids: np.ndarray attention_mask: np.ndarray mlm_labels: np.ndarray nsp_labels: np.ndarray def load_vocab_from_hub(repo_id: str = "bert-base-uncased", filename: str = "vocab.txt") -> Dict[str,int]: if not HAS_HF: raise RuntimeError("huggingface_hub / datasets가 설치되어 있어야 함") path = hf_hub_download(repo_id=repo_id, filename=filename) vocab = {} with open(path, "r", encoding="utf-8") as f: for i, line in enumerate(f): tok = line.strip() vocab[tok] = i return vocab def create_mlm_nsp_examples(texts: List[str], tokenizer: BertTokenizer, max_len: int = 128, dupe_factor: int = 1, masked_lm_prob=0.15) -> List[PretrainBatch]: sents = [s for s in texts if len(s.strip()) > 0] examples = [] for _ in range(dupe_factor): for i in range(len(sents)-1): a = sents[i] if random.random() < 0.5: b = sents[i+1] is_next = 1 else: b = random.choice(sents) is_next = 0 input_ids, attn, type_ids = tokenizer.encode(a, b, max_len) input_ids = np.array(input_ids, dtype=np.int32) attn = np.array(attn, dtype=np.int32) type_ids = np.array(type_ids, dtype=np.int32) mlm_labels = np.full_like(input_ids, fill_value=-100) cand_indexes = [j for j, tid in enumerate(input_ids) if tid not in (tokenizer.cls_id, tokenizer.sep_id, tokenizer.pad_id)] num_to_mask = max(1, int(round(len(cand_indexes) * masked_lm_prob))) random.shuffle(cand_indexes) masked = cand_indexes[:num_to_mask] for pos in masked: original = input_ids[pos] r = random.random() if r < 0.8: input_ids[pos] = tokenizer.mask_id elif r < 0.9: input_ids[pos] = random.randint(0, len(tokenizer.vocab)-1) else: pass mlm_labels[pos] = original examples.append(PretrainBatch( input_ids=input_ids, token_type_ids=type_ids, attention_mask=attn, mlm_labels=mlm_labels, nsp_labels=np.array([is_next], dtype=np.int32), )) return examples def collate_batches(batches: List[PretrainBatch], batch_size: int) -> List[PretrainBatch]: out = [] for i in range(0, len(batches), batch_size): chunk = batches[i:i+batch_size] if not chunk: continue B = len(chunk) T = len(chunk[0].input_ids) def stack(arrs): return np.stack(arrs, axis=0) out.append(PretrainBatch( input_ids=stack([b.input_ids for b in chunk]), token_type_ids=stack([b.token_type_ids for b in chunk]), attention_mask=stack([b.attention_mask for b in chunk]), mlm_labels=stack([b.mlm_labels for b in chunk]), nsp_labels=stack([b.nsp_labels for b in chunk]).reshape(B), )) return out ############################################################ # 모델 유틸: 요약 및 저장 ############################################################ def model_summary(model: BertForPreTraining): """간단한 모델 요약: 레이어 수, 히든, 헤드 수, 파라미터 개수(근사) """ print("===== MODEL SUMMARY =====") # 아키텍쳐 정보 try: hidden = model.emb.word_embeddings.data.shape[1] vocab = model.emb.word_embeddings.data.shape[0] num_layers = len(model.encoder.layers) except Exception: hidden = None; vocab = None; num_layers = None print(f"Vocab size: {vocab}") print(f"Hidden size: {hidden}") print(f"Num layers: {num_layers}") # 근사 파라미터 수(모든 텐서를 합산) total = 0 names = set() for p in model.parameters(): total += p.data.size names.add(p.name) print(f"Total parameters (approx): {total:,}") print("=========================") def save_model(model: BertForPreTraining, path_base: str = "./bert_numpy_model"): """모델의 모든 파라미터를 수집하여 .npz와 .npy로 저장한다. - .npz: 각 파라미터를 개별 배열로 저장 - .npy: 파이썬 dict 객체로 저장 (로드 시 np.load(..., allow_pickle=True) 필요) """ sd = {} used = set() i = 0 for p in model.parameters(): name = p.name if getattr(p, 'name', '') else f'param_{i}' # 중복 이름 방지 if name in used: name = f"{name}_{i}" sd[name] = p.data used.add(name) i += 1 np.savez(path_base + ".npz", **sd) # 또한 dict 형태로 보존 np.save(path_base + ".npy", sd) print(f"Model saved to {path_base}.npz and {path_base}.npy") ############################################################ # 학습 루프 (완성형): gradient accumulation, scheduler, 드롭아웃, 저장 ############################################################ def train_demo(use_large_model: bool = True): """학습 데모 함수 - use_large_model: True이면 기본적으로 12-layer, H=768 설정을 사용 (무거움). 테스트용으로 False로 설정하면 더 작은 모델을 씀. """ set_seed(1234) if not HAS_HF: raise RuntimeError("datasets/huggingface_hub 설치 필요. pip install datasets huggingface_hub") print("[info] Loading vocab and dataset from hub...") vocab = load_vocab_from_hub("bert-base-uncased", "vocab.txt") tokenizer = BertTokenizer(vocab) # 데이터 (데모 용량으로 제한) ds = load_dataset("wikitext", "wikitext-2-raw-v1") raw_lines = ds['train']['text'][:2000] print("[info] Creating examples (MLM+NSP)...") examples = create_mlm_nsp_examples(raw_lines, tokenizer, max_len=128, dupe_factor=1) random.shuffle(examples) # 모델 설정: 대형/소형 옵션 if use_large_model: model = BertForPreTraining(vocab_size=len(vocab), hidden_size=768, num_layers=12, num_heads=12, intermediate_size=3072, max_position=512, dropout=0.1) else: # 빠른 테스트용 소형 모델 model = BertForPreTraining(vocab_size=len(vocab), hidden_size=256, num_layers=4, num_heads=4, intermediate_size=1024, max_position=128, dropout=0.1) model_summary(model) # 배치 / 학습 하이퍼파라미터 per_step_batch = 4 accum_steps = 4 batches = collate_batches(examples, batch_size=per_step_batch) params = model.parameters() optim = AdamW(params, lr=2e-4, weight_decay=0.01) total_steps = 500 warmup_steps = 50 scheduler = LRScheduler(optim, base_lr=2e-4, warmup_steps=warmup_steps, total_steps=total_steps) print("[info] Start training (gradient accumulation enabled)...") global_step = 0 for step, batch in enumerate(batches): if global_step >= total_steps: break dropout_is_training(model, True) mlm_logits, nsp_logits, _ = model(batch.input_ids, batch.token_type_ids, batch.attention_mask) mlm_loss = cross_entropy(mlm_logits, batch.mlm_labels, ignore_index=-100) nsp_loss = cross_entropy(nsp_logits, batch.nsp_labels) loss = mlm_loss + nsp_loss # 역전파: loss.backward() -> 그래디언트가 각 파라미터의 .grad에 쌓인다 loss.backward() if (step + 1) % accum_steps == 0: lr = scheduler.step() optim.step() optim.zero_grad() global_step += 1 if global_step % 10 == 0: print(f"global_step={global_step:4d} | lr={lr:.6f} | loss={loss.data.item():.4f} | mlm={mlm_loss.data.item():.4f} | nsp={nsp_loss.data.item():.4f}") print("[info] Training finished. Saving model...") save_model(model, "./bert_numpy_model") print("[info] Done.") ############################################################ # 메인 ############################################################ if __name__ == "__main__": # 주의: 기본값은 use_large_model=True로 되어있어 메모리/시간이 많이 든다. # 테스트 시에는 use_large_model=False로 설정하여 소형 모델로 먼저 검증하라. train_demo(use_large_model=False)