openlem2 / .py
OpenLab-NLP's picture
Create .py
b321543 verified
import sentencepiece as spm
import os, json, numpy as np, tensorflow as tf
from tensorflow.keras import layers, Model
import requests
from tensorflow.keras import mixed_precision
import glob
tf.get_logger().setLevel("ERROR")
SEED = 42
tf.random.set_seed(SEED)
np.random.seed(SEED)
try:
    resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu="local")
    tf.tpu.experimental.initialize_tpu_system(resolver)
    strategy = tf.distribute.TPUStrategy(resolver)
    print("✅ TPU 초기화 완료")
    on_tpu = True
except Exception as e:
    print("⚠️ TPU 미사용, GPU/CPU로 진행:", e)
    strategy = tf.distribute.get_strategy()
    on_tpu = False
policy_name = "mixed_bfloat16" if on_tpu else "float32"
policy = mixed_precision.Policy(policy_name)
mixed_precision.set_global_policy(policy)
print(f"✅ Mixed precision: {policy_name}")
def download_file(url, save_path):
    if not os.path.exists(save_path):
        r = requests.get(url, stream=True)
        r.raise_for_status()
        with open(save_path, "wb") as f:
            for chunk in r.iter_content(8192*2):
                f.write(chunk)
        print(f"✅ {save_path} 저장됨")
TOKENIZER_PATH = "tokenizer.model"
download_file("https://huggingface.co/datasets/OpenLab-NLP/tiny-corpus/resolve/main/tokenizer.model?download=true", TOKENIZER_PATH)
DATA_DIR = "/kaggle/input/lm-pretrain"
FILE_PATTERN = os.path.join(DATA_DIR, "tokenized_variable_part_*.txt")
sp = spm.SentencePieceProcessor(TOKENIZER_PATH)
pad_id = sp.piece_to_id("<pad>") if sp.piece_to_id("<pad>") != -1 else 0
end_id = sp.piece_to_id("[EOS]")
vocab_size = sp.get_piece_size()
max_len = 512
batch_size = 768
import random
def prepare_packed_dataset(file_pattern, max_len, batch_size):
    file_list = tf.io.gfile.glob(file_pattern)
   
    # [수정] 파일 경로 리스트 자체를 무작위로 섞습니다.
    # 이렇게 하면 매 에포크마다 파일을 읽는 순서가 달라집니다.
    random.shuffle(file_list)
    print(f"🔄 파일 로드 순서 섞기 완료 (첫 번째 파일: {file_list[0]})")
    dataset = tf.data.TextLineDataset(file_list)
   
    def parse_tokens(line):
        return tf.strings.to_number(tf.strings.split(line), tf.int32)
    dataset = dataset.map(parse_tokens, num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.unbatch()
    dataset = dataset.batch(max_len + 1, drop_remainder=True)
   
    def split_input_target(chunk):
        return chunk[:-1], chunk[1:]
    dataset = dataset.map(split_input_target, num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.shuffle(20000)
    dataset = dataset.batch(batch_size, drop_remainder=True)  
    return dataset.prefetch(tf.data.AUTOTUNE)
with strategy.scope():
    dataset = prepare_packed_dataset(FILE_PATTERN, max_len, batch_size)
    dist_dataset = strategy.experimental_distribute_dataset(dataset)
    print("✅ 데이터 패킹 및 TPU 분산 파이프라인 준비 완료")
class TimeMix(layers.Layer):
    def __init__(self, d_model, layer_id, n_layers):
        super().__init__()
        self.d_model = d_model
        ratio = (layer_id / (n_layers - 1)) if n_layers > 1 else 0.5
        decay_speed = np.arange(d_model)
        self.time_decay = tf.Variable(
            -5 + 8 * (decay_speed / (d_model - 1)) ** (0.7 + 1.3 * ratio),
            dtype=tf.float32, name="time_decay"
        )
        self.time_first = tf.Variable(
            np.ones(d_model) * np.log(0.3),
            dtype=tf.float32, name="time_first"
        )
        self.time_mix_k = tf.Variable(1 - (ratio ** 0.5), dtype=tf.float32)
        self.time_mix_v = tf.Variable(1 - (ratio ** 0.5), dtype=tf.float32)
        self.time_mix_r = tf.Variable(1 - (ratio ** 0.2), dtype=tf.float32)
        self.key = layers.Dense(d_model, use_bias=False)
        self.value = layers.Dense(d_model, use_bias=False)
        self.receptance = layers.Dense(d_model, use_bias=False)
        self.output_projection = layers.Dense(d_model, use_bias=False)
    def call(self, x, training=False):
        t_type = x.dtype
        tm_k = tf.cast(self.time_mix_k, t_type)
        tm_v = tf.cast(self.time_mix_v, t_type)
        tm_r = tf.cast(self.time_mix_r, t_type)
        xx = tf.pad(x[:, :-1, :], [[0, 0], [1, 0], [0, 0]])
        k = self.key(x * tm_k + xx * (1 - tm_k))
        v = self.value(x * tm_v + xx * (1 - tm_v))
        r = self.receptance(x * tm_r + xx * (1 - tm_r))
        wkv = self.parallel_wkv(k, v)
        return self.output_projection(tf.nn.sigmoid(r) * wkv)
    def parallel_wkv(self, k, v):
        t_type = k.dtype
        w = tf.cast(tf.exp(self.time_decay), t_type)
        u = tf.cast(self.time_first, t_type)
        t = tf.shape(k)[1]
        t_index = tf.cast(tf.range(t), t_type)[:, None]
        s = k - (t_index * w)
        kv = tf.exp(s) * v
        k_exp = tf.exp(s)
        num = tf.cumsum(kv, axis=1) - kv + tf.exp(s + u) * v
        den = tf.cumsum(k_exp, axis=1) - k_exp + tf.exp(s + u)
        return num / (den + 1e-8)
class ChannelMix(layers.Layer):
def __init__(self, d_model, layer_id, n_layers, num_experts=8, top_k=2):
super().__init__()
self.d_model = d_model
self.num_experts = num_experts
self.top_k = top_k
ratio = (layer_id / (n_layers - 1)) if n_layers > 1 else 0.5
self.time_mix_k = tf.Variable(1 - (ratio ** 0.5), dtype=tf.float32)
self.time_mix_r = tf.Variable(1 - (ratio ** 0.5), dtype=tf.float32)
# Gate: 전문가 선택을 위한 로직
self.gate = layers.Dense(num_experts, use_bias=False)
# Experts 가중치
self.key_weight = self.add_weight(
name="expert_key",
shape=(num_experts, d_model, int(d_model * 4)),
initializer="glorot_uniform"
)
self.value_weight = self.add_weight(
name="expert_value",
shape=(num_experts, int(d_model * 4), d_model),
initializer="glorot_uniform"
)
self.receptance = layers.Dense(d_model, use_bias=False)
def call(self, x, training=False):
t_type = x.dtype
b, t, d = tf.shape(x)[0], tf.shape(x)[1], tf.shape(x)[2]
xx = tf.pad(x[:, :-1, :], [[0, 0], [1, 0], [0, 0]])
k_in = x * tf.cast(self.time_mix_k, t_type) + xx * (1 - tf.cast(self.time_mix_k, t_type))
# 1. Gate Logits 및 Top-K 선택
gate_logits = self.gate(k_in) # (B, T, num_experts)
# Top-K 전문가와 가중치 추출
raw_weights, indices = tf.math.top_k(gate_logits, k=self.top_k)
gate_weights = tf.nn.softmax(tf.cast(raw_weights, tf.float32))
gate_weights = tf.cast(gate_weights, t_type) # (B, T, top_k)
# 2. Sparse 연산을 위한 Mask 생성 (수치적 안정성 및 로드 밸런싱용)
# 실제로는 모든 전문가를 다 계산한 뒤 Masking하는 방식이 TPU MXU 활용에 유리할 수 있음
# 여기서는 einsum을 활용하되 선택된 전문가의 영향력만 남김
masks = tf.one_hot(indices, depth=self.num_experts, dtype=t_type) # (B, T, top_k, num_experts)
final_mask = tf.reduce_sum(masks * tf.expand_dims(gate_weights, -1), axis=2) # (B, T, num_experts)
# 3. Auxiliary Loss (전문가 균등 분배)
if training:
# Load Balancing Loss: gate_logits의 확률 분포가 균등하도록 유도
prob_dist = tf.nn.softmax(tf.cast(gate_logits, tf.float32), axis=-1)
importance = tf.reduce_sum(prob_dist, axis=[0, 1])
load = tf.reduce_sum(tf.cast(final_mask > 0, tf.float32), axis=[0, 1])
aux_loss = tf.reduce_sum(importance * load) * (self.num_experts / (tf.cast(b * t, tf.float32) ** 2))
self.add_loss(0.01 * aux_loss)
# 4. 전문가 연산 (Einsum 활용)
# 모든 전문가를 계산하되, mask를 통해 필요한 정보만 남김 (Sparse Approximation)
k_experts = tf.einsum('btd,edh->bteh', k_in, self.key_weight)
k_experts = tf.square(tf.nn.relu(k_experts))
v_experts = tf.einsum('bteh,ehd->bted', k_experts, self.value_weight) # (B, T, E, D)
# 5. 가중 합산 (최종 선택된 전문가의 결과만 결합)
kv = tf.reduce_sum(v_experts * tf.expand_dims(final_mask, -1), axis=2)
# Receptance (Gate) 연산
r_in = x * tf.cast(self.time_mix_r, t_type) + xx * (1 - tf.cast(self.time_mix_r, t_type))
r = self.receptance(r_in)
return tf.nn.sigmoid(r) * kv
class Block(layers.Layer):
    def __init__(self, d_model, layer_id, n_layers):
        super().__init__()
        self.ln = layers.LayerNormalization(epsilon=1e-5)
        self.time_mix = TimeMix(d_model, layer_id, n_layers)
        self.channel_mix = ChannelMix(d_model, layer_id, n_layers)
    def call(self, x, training=False):
        ln_x = self.ln(x)
        return x + self.time_mix(ln_x, training=training) + self.channel_mix(ln_x)
class Head(tf.keras.Model):
    def __init__(self, vocab_size):
        super().__init__()
        self.lm_head = layers.Dense(vocab_size, use_bias=False, name="output_head", dtype=policy)
    def call(self, x, training=False):
        logits = self.lm_head(x)
        return tf.cast(logits, tf.float32)
   
class LM(tf.keras.Model):
    def __init__(self, d_model, n_layers, dropout_rate=0.1):
        super().__init__()
        self.token_embedding = layers.Embedding(vocab_size, d_model)
        self.blocks = [Block(d_model, i, n_layers) for i in range(n_layers)]
        self.ln_f = layers.LayerNormalization(epsilon=1e-5, dtype=tf.float32)
    def call(self, x, training=False):
        x = self.token_embedding(x)
        for block in self.blocks:
            x = block(x, training=training)      
        x = tf.cast(x, tf.float32)
        x = self.ln_f(x)
        return x
def smoothed_loss_keras(y_true, y_pred, eps=0.1):
    y_true = tf.cast(y_true, tf.int32)
    mask = tf.cast(tf.not_equal(y_true, pad_id), tf.float32)
    vocab = tf.shape(y_pred)[-1]
    y_true_oh = tf.one_hot(y_true, depth=vocab, dtype=tf.float32)
    y_true_ls = (1.0 - eps) * y_true_oh + eps / tf.cast(vocab, tf.float32)
    log_probs = tf.nn.log_softmax(y_pred, axis=-1)
    per_tok = -tf.reduce_sum(y_true_ls * log_probs, axis=-1)
    return tf.reduce_sum(per_tok * mask) / (tf.reduce_sum(mask) + 1e-8)
with strategy.scope():
    blocklm = LM(d_model=512, n_layers=16)
    head = Head(vocab_size=vocab_size)
    inputs = layers.Input(shape=(max_len,), dtype=tf.int32)
    x = blocklm(inputs)
    outputs = head(x)
    model = tf.keras.Model(inputs=inputs, outputs=outputs)
    optimizer = tf.keras.optimizers.AdamW(learning_rate=1e-4, weight_decay=0.01)
    model.compile(optimizer=optimizer, loss=smoothed_loss_keras)
    dummy_input = np.zeros((1, max_len), dtype=np.int32)
    model(dummy_input)
    model.summary()
def get_training_stats(file_pattern, max_len, batch_size):
    total_tokens = 0
    files = glob.glob(file_pattern)
    for f in files:
        with open(f, 'r') as file:
            for line in file:
                total_tokens += len(line.split())
    total_chunks = total_tokens // (max_len + 1)
    steps_per_epoch = total_chunks // batch_size  
    return total_tokens, total_chunks, steps_per_epoch
#total_tokens, total_chunks, steps_per_epoch = get_training_stats(FILE_PATTERN, max_len, batch_size)
#print(f"✅ 총 토큰 수: {total_tokens}")
#print(f"✅ 생성된 총 덩어리(Chunk) 수: {total_chunks}")
#print(f"✅ steps_per_epoch: {steps_per_epoch}")
model.fit(dist_dataset, epochs=1, steps_per_epoch=14582)
blocklm.save_weights("blocklm.weights.h5")
head.save_weights("head.weights.h5")
print("저장됨")