import tensorflow as tf from tensorflow.keras import layers, Model import numpy as np import tensorflow.keras.backend as K from tensorflow.keras import mixed_precision import sentencepiece as spm import os, json import requests print('1') tf.get_logger().setLevel("ERROR") SEED = 42 tf.random.set_seed(SEED) np.random.seed(SEED) max_len = 224 # 기존 코드에서 200으로 설정됨 batch_size = 32 # TPU 초기화 (기존 코드와 동일) try: resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu="local") tf.tpu.experimental.initialize_tpu_system(resolver) strategy = tf.distribute.TPUStrategy(resolver) print("✅ TPU 초기화 완료:", resolver.cluster_spec().as_dict()) on_tpu = True except Exception as e: print("⚠️ TPU 미사용, GPU/CPU로 진행:", e) strategy = tf.distribute.get_strategy() on_tpu = False # Mixed precision (기존 코드와 동일) policy = mixed_precision.Policy("mixed_bfloat16" if on_tpu else "float32") mixed_precision.set_global_policy(policy) print("✅ Mixed precision:", policy) # ======================= # 1) 파일 다운로드 및 토크나이저 초기화 (기존 코드와 동일) # ======================= def download_file(url, save_path): r = requests.get(url, stream=True) r.raise_for_status() with open(save_path, "wb") as f: for chunk in r.iter_content(8192*2): f.write(chunk) print(f"✅ {save_path} 저장됨") DATA_PATH = "converted.jsonl" TOKENIZER_PATH = "ko_unigram.model" if not os.path.exists(DATA_PATH): download_file( "https://huggingface.co/datasets/Yuchan5386/Multiturn/resolve/main/dataset_shuffled.jsonl?download=true", DATA_PATH ) if not os.path.exists(TOKENIZER_PATH): download_file( "https://huggingface.co/datasets/Yuchan5386/Multiturn/resolve/main/unigram.model?download=true", TOKENIZER_PATH ) sp = spm.SentencePieceProcessor(TOKENIZER_PATH) pad_id = sp.piece_to_id("") if sp.piece_to_id("") != -1 else 0 start_id = sp.piece_to_id("") context_s_id = sp.piece_to_id("") context_e_id = sp.piece_to_id("") user_s_id = sp.piece_to_id("") user_e_id = sp.piece_to_id("") end_id = sp.piece_to_id("") unk_id = sp.piece_to_id("") vocab_size = sp.get_piece_size() print(f"✅ Vocabulary size: {vocab_size}") def text_to_ids(text): return sp.encode(text, out_type=int) def ids_to_text(ids): return sp.decode(ids) # ======================= # JSONL → TF Dataset 로드 (ID 레벨 특수 토큰 포함) # ======================= def jsonl_stream(file_path): with open(file_path, "r", encoding="utf-8") as f: for line in f: data = json.loads(line) context = data["context"] prompt = data["prompt"] answer = data["answer"] # ======================= # Encoder input: ID 레벨에서 특수 토큰 명시 # ======================= enc_ids = [context_s_id] + text_to_ids(context) + [context_e_id] + \ [user_s_id] + text_to_ids(prompt) + [user_e_id] enc_ids = enc_ids[:max_len] # max_len 제한 # ======================= # Decoder input: + answer # ======================= dec_input_ids = [start_id] + text_to_ids(answer) dec_input_ids = dec_input_ids[:max_len] # ======================= # Target: answer + # ======================= target_ids = text_to_ids(answer) + [end_id] target_ids = target_ids[:max_len] # ======================= # Padding # ======================= enc_ids += [pad_id] * (max_len - len(enc_ids)) dec_input_ids += [pad_id] * (max_len - len(dec_input_ids)) target_ids += [pad_id] * (max_len - len(target_ids)) yield ( tf.convert_to_tensor(enc_ids, dtype=tf.int32), tf.convert_to_tensor(dec_input_ids, dtype=tf.int32), tf.convert_to_tensor(target_ids, dtype=tf.int32), ) # ======================= # TF Dataset 생성 # ======================= dataset = tf.data.Dataset.from_generator( lambda: jsonl_stream(DATA_PATH), output_signature=( tf.TensorSpec(shape=(max_len,), dtype=tf.int32), # enc_inputs tf.TensorSpec(shape=(max_len,), dtype=tf.int32), # dec_inputs tf.TensorSpec(shape=(max_len,), dtype=tf.int32), # target ) ) # 학습을 위해 딕셔너리 형태로 매핑 def map_fn(enc_input, dec_input, dec_target): return {"enc_inputs": enc_input, "dec_inputs": dec_input}, dec_target dataset = dataset.map(map_fn, num_parallel_calls=tf.data.AUTOTUNE) dataset = dataset.shuffle(1000, seed=SEED).batch(batch_size, drop_remainder=True).prefetch(tf.data.AUTOTUNE) with strategy.scope(): dist_dataset = strategy.experimental_distribute_dataset(dataset) print("✅ ID 레벨 특수 토큰 적용 Dataset 로드 완료:", dist_dataset) # ======================= # 3) 모델 레이어 (기존 코드 유지) # ======================= class SwiGLU(layers.Layer): def __init__(self, d_model, d_ff): super().__init__() self.proj = layers.Dense(d_ff) self.out = layers.Dense(d_model) def call(self, x): x_proj = self.proj(x) x_val, x_gate = tf.split(x_proj, 2, axis=-1) return self.out(x_val * tf.nn.silu(x_gate)) class EncoderBlock(layers.Layer): def __init__(self, d_model, num_heads, dff, dropout=0.1): super().__init__() self.mha = layers.MultiHeadAttention(num_heads=num_heads, key_dim=d_model) self.ffn = SwiGLU(d_model, dff) self.norm1 = layers.LayerNormalization(epsilon=1e-6) self.norm2 = layers.LayerNormalization(epsilon=1e-6) self.dropout1 = layers.Dropout(dropout) self.dropout2 = layers.Dropout(dropout) def call(self, x, mask=None, training=False): attn_out = self.dropout1(self.mha(x, x, x, attention_mask=mask), training=training) out1 = self.norm1(attn_out + x) ffn_out = self.dropout2(self.ffn(out1), training=training) return self.norm2(out1 + ffn_out) class DecoderBlock(layers.Layer): def __init__(self, d_model, num_heads, dff, dropout=0.1): super().__init__() self.self_mha = layers.MultiHeadAttention(num_heads=num_heads, key_dim=d_model) self.cross_mha = layers.MultiHeadAttention(num_heads=num_heads, key_dim=d_model) self.ffn = SwiGLU(d_model, dff) self.norm1 = layers.LayerNormalization(epsilon=1e-6) self.norm2 = layers.LayerNormalization(epsilon=1e-6) self.norm3 = layers.LayerNormalization(epsilon=1e-6) self.dropout1 = layers.Dropout(dropout) self.dropout2 = layers.Dropout(dropout) self.dropout3 = layers.Dropout(dropout) def call(self, x, enc_out, training=False): attn1 = self.dropout1(self.self_mha(x, x, x, use_causal_mask=True), training=training) out1 = self.norm1(attn1 + x) attn2 = self.dropout2(self.cross_mha(out1, enc_out, enc_out), training=training) out2 = self.norm2(out1 + attn2) ffn_out = self.dropout3(self.ffn(out2), training=training) return self.norm3(out2 + ffn_out) class Transformer(tf.keras.Model): def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, max_len=256, dropout=0.1): super().__init__() self.max_len = max_len self.d_model = d_model self.enc_embedding = layers.Embedding(input_vocab_size, d_model) self.enc_pos_embedding = layers.Embedding(max_len, d_model) self.dec_embedding = layers.Embedding(target_vocab_size, d_model) self.dec_pos_embedding = layers.Embedding(max_len, d_model) self.enc_layers = [EncoderBlock(d_model, num_heads, dff, dropout) for _ in range(num_layers)] self.dec_layers = [DecoderBlock(d_model, num_heads, dff, dropout) for _ in range(num_layers)] self.final_layer = layers.Dense(target_vocab_size, use_bias=False) def call(self, inputs, training=False): enc_inputs = inputs["enc_inputs"] dec_inputs = inputs["dec_inputs"] enc_pos = tf.range(tf.shape(enc_inputs)[1])[tf.newaxis, :] dec_pos = tf.range(tf.shape(dec_inputs)[1])[tf.newaxis, :] x = self.enc_embedding(enc_inputs) + self.enc_pos_embedding(enc_pos) for layer in self.enc_layers: x = layer(x, training=training) enc_out = x y = self.dec_embedding(dec_inputs) + self.dec_pos_embedding(dec_pos) for layer in self.dec_layers: y = layer(y, enc_out, training=training) return self.final_layer(y) # 5) 학습 설정 및 실행 # ======================= def smoothed_loss_keras(y_true, y_pred, eps=0.1): y_true = tf.cast(y_true, tf.int32) mask = tf.cast(tf.not_equal(y_true, pad_id), tf.float32) vocab = tf.shape(y_pred)[-1] y_true_oh = tf.one_hot(y_true, depth=vocab, dtype=tf.float32) y_true_ls = (1.0 - eps) * y_true_oh + eps / tf.cast(vocab, tf.float32) log_probs = tf.nn.log_softmax(y_pred, axis=-1) per_tok = -tf.reduce_sum(y_true_ls * log_probs, axis=-1) per_tok = per_tok * mask return tf.reduce_sum(per_tok) / (tf.reduce_sum(mask) + 1e-8) def masked_perplexity(y_true, y_pred, eps=0.1): y_true = tf.cast(y_true, tf.int32) mask = tf.cast(tf.not_equal(y_true, pad_id), tf.float32) vocab = tf.shape(y_pred)[-1] y_true_oh = tf.one_hot(y_true, depth=vocab, dtype=tf.float32) y_true_ls = (1.0 - eps) * y_true_oh + eps / tf.cast(vocab, tf.float32) log_probs = tf.nn.log_softmax(y_pred, axis=-1) per_tok = -tf.reduce_sum(y_true_ls * log_probs, axis=-1) per_tok = per_tok * mask mean_loss = tf.reduce_sum(per_tok) / (tf.reduce_sum(mask) + 1e-8) return tf.exp(mean_loss) def create_lr_schedule(initial_lr=5e-5, decay_steps=10000, decay_rate=0.9): return tf.keras.optimizers.schedules.ExponentialDecay( initial_learning_rate=initial_lr, decay_steps=decay_steps, decay_rate=decay_rate, staircase=False ) with strategy.scope(): # ⚠️ 수정: chat_vocab_size 대신 정의된 vocab_size 사용 chat_model = Transformer(num_layers=2, d_model=320, num_heads=4, dff=960, input_vocab_size=vocab_size, target_vocab_size=vocab_size, max_len=256, dropout=0.1) dummy_input = { "enc_inputs": tf.zeros((1, max_len), dtype=tf.int32), "dec_inputs": tf.zeros((1, max_len), dtype=tf.int32) } _ = chat_model(dummy_input) loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none') # 옵티마이저 설정 optimizer = tf.keras.optimizers.Adam( learning_rate=create_lr_schedule(), beta_1=0.9, beta_2=0.95, epsilon=1e-8, clipnorm=1.0 ) chat_model.compile(optimizer=optimizer, loss=smoothed_loss_keras, metrics=[masked_perplexity]) chat_model.summary() print("✅ 모델 컴파일 완료, 학습 시작...") # ⚠️ 학습 실행 history = chat_model.fit(dataset, epochs=1, verbose=1) # 가중치 저장 chat_model.save_weights("chat_model.weights.h5") print("\n✅ 모델 가중치 저장 완료!") def generate_text_topp(model, context, prompt, max_len=256, max_gen=100, p=0.9, temperature=0.8, min_len=20): # Encoder input: ID 레벨로 특수 토큰 삽입 enc_ids = [context_s_id] + text_to_ids(context) + [context_e_id] + \ [user_s_id] + text_to_ids(prompt) + [user_e_id] enc_ids = enc_ids[-max_len:] # 길이 제한 enc_tensor = tf.convert_to_tensor([np.pad(enc_ids, (0, max_len - len(enc_ids)), constant_values=pad_id)], dtype=tf.int32) # Decoder input: 로 시작 generated = [start_id] for step in range(max_gen): dec_input = generated[-max_len:] # max_len 유지 dec_tensor = tf.convert_to_tensor([np.pad(dec_input, (0, max_len - len(dec_input)), constant_values=pad_id)], dtype=tf.int32) # 모델 추론 logits = model({"enc_inputs": enc_tensor, "dec_inputs": dec_tensor}, training=False) # 마지막 토큰 위치 logits 사용 next_token_logits = logits[0, len(dec_input) - 1].numpy() # 특수 토큰 억제 next_token_logits[pad_id] -= 10.0 next_token_logits[context_s_id] -= 5.0 next_token_logits[context_e_id] -= 5.0 next_token_logits[user_s_id] -= 5.0 next_token_logits[user_e_id] -= 5.0 # Softmax + Top-p probs = tf.nn.softmax(next_token_logits / temperature).numpy() sorted_indices = np.argsort(probs)[::-1] sorted_probs = probs[sorted_indices] cumulative_probs = np.cumsum(sorted_probs) cutoff = np.searchsorted(cumulative_probs, p) top_indices = sorted_indices[:cutoff + 1] top_probs = sorted_probs[:cutoff + 1] top_probs /= np.sum(top_probs) next_token_id = np.random.choice(top_indices, p=top_probs) if next_token_id == end_id and len(generated) >= min_len: break generated.append(int(next_token_id)) # 제거 후 텍스트로 변환 result_ids = generated[1:] # 첫 토큰 제거 return ids_to_text(result_ids) # 예시 사용 print("\n\n===== 생성 결과 =====") print(generate_text_topp(chat_model, "대화 시작", "안녕하세요! 어떻게 지내셨나요?", p=0.9))