import tensorflow as tf from tensorflow.keras import layers, Model import numpy as np import tensorflow.keras.backend as K from tensorflow.keras import mixed_precision import sentencepiece as spm import os, json import requests print('1') tf.get_logger().setLevel("ERROR") SEED = 42 tf.random.set_seed(SEED) np.random.seed(SEED) max_len = 150 # 기존 코드에서 200으로 설정됨 batch_size = 128 # TPU 초기화 (기존 코드와 동일) try: resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu="local") tf.tpu.experimental.initialize_tpu_system(resolver) strategy = tf.distribute.TPUStrategy(resolver) print("✅ TPU 초기화 완료:", resolver.cluster_spec().as_dict()) on_tpu = True except Exception as e: print("⚠️ TPU 미사용, GPU/CPU로 진행:", e) strategy = tf.distribute.get_strategy() on_tpu = False # Mixed precision (기존 코드와 동일) policy = mixed_precision.Policy("mixed_bfloat16" if on_tpu else "float32") mixed_precision.set_global_policy(policy) print("✅ Mixed precision:", policy) # ======================= # 1) 파일 다운로드 및 토크나이저 초기화 (기존 코드와 동일) # ======================= def download_file(url, save_path): r = requests.get(url, stream=True) r.raise_for_status() with open(save_path, "wb") as f: for chunk in r.iter_content(8192*2): f.write(chunk) print(f"✅ {save_path} 저장됨") DATA_PATH = "converted.jsonl" TOKENIZER_PATH = "ko_unigram.model" if not os.path.exists(DATA_PATH): download_file( "https://huggingface.co/datasets/Yuchan5386/TinyInst/resolve/main/output.jsonl?download=true", DATA_PATH ) if not os.path.exists(TOKENIZER_PATH): download_file( "https://huggingface.co/datasets/Yuchan5386/TinyInst/resolve/main/ko_unigram.model?download=true", TOKENIZER_PATH ) sp = spm.SentencePieceProcessor(TOKENIZER_PATH) pad_id = sp.piece_to_id("") if sp.piece_to_id("") != -1 else 0 start_id = sp.piece_to_id("") sep_id = sp.piece_to_id("") end_id = sp.piece_to_id("") unk_id = sp.piece_to_id("") vocab_size = sp.get_piece_size() print(f"✅ Vocabulary size: {vocab_size}") def text_to_ids(text): return sp.encode(text, out_type=int) def ids_to_text(ids): return sp.decode(ids) class SwiGLU(layers.Layer): def __init__(self, d_model, d_ff): super().__init__() self.proj = layers.Dense(d_ff) self.out = layers.Dense(d_model) def call(self, x): x_proj = self.proj(x) x_val, x_gate = tf.split(x_proj, 2, axis=-1) return self.out(x_val * tf.nn.silu(x_gate)) class LoU(layers.Layer): def __init__(self, d_model, clip_value=5.0, eps=1e-6): super().__init__() self.d_model = d_model self.clip_value = float(clip_value) self.eps = float(eps) self.Q = layers.Dense(d_model, dtype='float32') self.K = layers.Dense(d_model, dtype='float32') self.V = layers.Dense(d_model, dtype='float32') self.norm = layers.LayerNormalization(epsilon=1e-5, dtype='float32') self.norm1 = layers.LayerNormalization(epsilon=1e-5, dtype='float32') self.glu = SwiGLU(d_model, 320) def call(self, x): x_f32 = tf.cast(x, tf.float32) residual = x_f32 x_f32 = self.norm1(x) q = self.Q(x_f32) k = self.K(x_f32) V = self.V(x_f32) g_q = (tf.nn.tanh(q) + 1.0) / 2.0 g_k = (tf.nn.tanh(k) + 1.0) / 2.0 score = g_q * g_k score = tf.cumsum(score, axis=1) # (B, L, D) # 💡 수정된 부분: 현재 토큰까지의 누적합 평균으로 정규화 seq_len = tf.shape(score)[1] # [1, 2, 3, ..., L]을 D_model 차원으로 확장 count_for_mean = tf.cast(tf.range(seq_len) + 1, score.dtype) count_for_mean = tf.reshape(count_for_mean, (1, seq_len, 1)) # 누적합을 현재까지의 토큰 개수로 나누어 평균 누적합 계산 (B, L, D) score_mean = score / count_for_mean # 정규화 분모 설정 denom = tf.maximum(score_mean, self.eps) score_norm = score / denom # ----------------------------------------------- score_clipped = tf.clip_by_value(score_norm, -self.clip_value, self.clip_value) x_comb = score_clipped * V out = self.norm(x_comb + residual) out = self.glu(out) return tf.cast(out, x.dtype) class Lo(layers.Layer): def __init__(self, d_model): super().__init__() self.d = layers.Dense(64, activation='silu') self.w = layers.Dense(d_model) self.norm = layers.LayerNormalization(epsilon=1e-5, dtype='float32') def call(self, x): p = self.d(x) p = self.w(p) return self.norm(p) + x class Block(layers.Layer): def __init__(self, d_model): super().__init__() self.lou = LoU(d_model) self.lo = Lo(d_model) def call(self, x): x = self.lou(x) x = self.lo(x) return x class ReLM(tf.keras.Model): def __init__(self, vocab_size, max_seq_len, d_model, n_layers, dropout_rate=0.1): super().__init__() self.token_embedding = layers.Embedding(vocab_size, d_model) self.pos_embedding = layers.Embedding(max_seq_len, d_model) self.blocks = [Block(d_model) for _ in range(n_layers)] self.ln_f = layers.LayerNormalization(epsilon=1e-5, dtype="float32") def call(self, x, training=False): batch_size, seq_len = tf.shape(x)[0], tf.shape(x)[1] positions = tf.range(seq_len)[tf.newaxis, :] x = self.token_embedding(x) + self.pos_embedding(positions) for block in self.blocks: x = block(x) x = self.ln_f(x) embedding_matrix = tf.cast(self.token_embedding.embeddings, x.dtype) logits = tf.matmul(x, embedding_matrix, transpose_b=True) return tf.cast(logits, tf.float32) model = ReLM( vocab_size=vocab_size, max_seq_len=max_len, d_model=256, n_layers=1 ) dummy_input = { "enc_inputs": tf.zeros((1, max_len), dtype=tf.int32), "dec_inputs": tf.zeros((1, max_len), dtype=tf.int32) } _ = chat_model(dummy_input) chat_model.load_weights('/kaggle/working/chat_model.weights.h5') print("모델 가중치 로드 완료!") # ======================= # 6) 추론 함수 (기존 코드 유지) # ======================= def generate_text_topp(model, prompt, max_len=150, max_gen=100, p=0.9, temperature=0.8, min_len=20): # 인코더 입력은 Prompt 만 사용 model_input = text_to_ids(f" {prompt} ") model_input = model_input[:max_len] generated = list(model_input) for step in range(max_gen): current_len = len(generated) # 현재까지 생성된 시퀀스를 입력으로 사용 if current_len > max_len: input_seq = generated[-max_len:] else: input_seq = generated # 패딩 input_padded = np.pad(input_seq, (0, max_len - len(input_seq)), constant_values=pad_id) input_tensor = tf.convert_to_tensor([input_padded]) # 모델 추론 (enc_inputs, dec_inputs 모두 동일한 시퀀스를 사용) dummy_input = { "enc_inputs": input_tensor, "dec_inputs": input_tensor } logits = model(dummy_input, training=False) # 다음 토큰의 로짓은 시퀀스의 마지막 토큰 위치에서 가져옴 (0-based index: current_len - 1) # 하지만 패딩 후 input_tensor의 실제 시퀀스 길이는 len(input_seq) next_token_logits = logits[0, len(input_seq) - 1].numpy() # 특수 토큰 생성 억제 next_token_logits[end_id] -= 5.0 next_token_logits[pad_id] -= 10.0 probs = tf.nn.softmax(next_token_logits / temperature).numpy() sorted_indices = np.argsort(probs)[::-1] sorted_probs = probs[sorted_indices] # Top-p (Nucleus) Sampling cumulative_probs = np.cumsum(sorted_probs) cutoff = np.searchsorted(cumulative_probs, p) top_indices = sorted_indices[:cutoff + 1] top_probs = sorted_probs[:cutoff + 1] top_probs /= np.sum(top_probs) next_token_id = np.random.choice(top_indices, p=top_probs) if next_token_id == end_id and len(generated) >= min_len: break generated.append(int(next_token_id)) # 토큰 제거 및 이전 부분 제거 try: sep_index = generated.index(sep_id) # 이후부터 이전까지의 응답만 반환 result_ids = generated[sep_index + 1:] try: end_index = result_ids.index(end_id) result_ids = result_ids[:end_index] except ValueError: pass return ids_to_text(result_ids) except ValueError: return ids_to_text(generated) # 이 없으면 전체 반환 print("\n\n===== 생성 결과 =====") # 모델이 1 epoch만 학습되었으므로 의미 있는 결과가 아닐 수 있습니다. print(generate_text_topp(chat_model, "제가 이따가 버스를 타야 해서 준비 좀 해야겠어요. 재미있는 대화였습니다!", p=0.9))