import tensorflow as tf from tensorflow.keras import layers, Model import numpy as np import tensorflow.keras.backend as K from tensorflow.keras import mixed_precision import sentencepiece as spm import os, json import requests import gradio as gr print('1') tf.get_logger().setLevel("ERROR") SEED = 42 tf.random.set_seed(SEED) np.random.seed(SEED) max_len = 512 # 기존 코드에서 200으로 설정됨 batch_size = 128 # TPU 초기화 (기존 코드와 동일) try: resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu="local") tf.tpu.experimental.initialize_tpu_system(resolver) strategy = tf.distribute.TPUStrategy(resolver) print("✅ TPU 초기화 완료:", resolver.cluster_spec().as_dict()) on_tpu = True except Exception as e: print("⚠️ TPU 미사용, GPU/CPU로 진행:", e) strategy = tf.distribute.get_strategy() on_tpu = False # Mixed precision (기존 코드와 동일) policy = mixed_precision.Policy("mixed_bfloat16" if on_tpu else "float32") mixed_precision.set_global_policy(policy) print("✅ Mixed precision:", policy) # ======================= # 1) 파일 다운로드 및 토크나이저 초기화 (기존 코드와 동일) # ======================= def download_file(url, save_path): r = requests.get(url, stream=True) r.raise_for_status() with open(save_path, "wb") as f: for chunk in r.iter_content(8192*2): f.write(chunk) print(f"✅ {save_path} 저장됨") MODEL_PATH = "model.weights.h5" TOKENIZER_PATH = "ko_unigram.model" if not os.path.exists(MODEL_PATH): download_file( "https://huggingface.co/Yuchan5386/Model_Prototype/resolve/main/model.weights.h5?download=true", MODEL_PATH ) if not os.path.exists(TOKENIZER_PATH): download_file( "https://huggingface.co/Yuchan5386/Respiso/resolve/main/bpe.model?download=true", TOKENIZER_PATH ) sp = spm.SentencePieceProcessor(TOKENIZER_PATH) pad_id = sp.piece_to_id("") if sp.piece_to_id("") != -1 else 0 start_id = sp.piece_to_id("") sep_id = sp.piece_to_id("") end_id = sp.piece_to_id("") unk_id = sp.piece_to_id("") vocab_size = sp.get_piece_size() print(f"✅ Vocabulary size: {vocab_size}") def text_to_ids(text): return sp.encode(text, out_type=int) def ids_to_text(ids): return sp.decode(ids) class SwiGLU(layers.Layer): def __init__(self, d_model, d_ff): super().__init__() self.proj = layers.Dense(d_ff) self.out = layers.Dense(d_model) def call(self, x): x_proj = self.proj(x) x_val, x_gate = tf.split(x_proj, 2, axis=-1) return self.out(x_val * tf.nn.silu(x_gate)) class LoU(layers.Layer): def __init__(self, d_model, clip_value=5.0, eps=1e-6): super().__init__() self.d_model = d_model self.clip_value = float(clip_value) self.eps = float(eps) self.Q = layers.Dense(d_model, dtype='float32') self.K = layers.Dense(d_model, dtype='float32') self.V = layers.Dense(d_model, dtype='float32') self.norm = layers.LayerNormalization(epsilon=1e-5, dtype='float32') self.norm1 = layers.LayerNormalization(epsilon=1e-5, dtype='float32') self.glu = SwiGLU(d_model, 320) def call(self, x): x_f32 = tf.cast(x, tf.float32) residual = x_f32 x_f32 = self.norm1(x) q = self.Q(x_f32) k = self.K(x_f32) V = self.V(x_f32) g_q = (tf.nn.tanh(q) + 1.0) / 2.0 g_k = (tf.nn.tanh(k) + 1.0) / 2.0 score = g_q * g_k score = tf.cumsum(score, axis=1) # (B, L, D) # 💡 수정된 부분: 현재 토큰까지의 누적합 평균으로 정규화 seq_len = tf.shape(score)[1] # [1, 2, 3, ..., L]을 D_model 차원으로 확장 count_for_mean = tf.cast(tf.range(seq_len) + 1, score.dtype) count_for_mean = tf.reshape(count_for_mean, (1, seq_len, 1)) # 누적합을 현재까지의 토큰 개수로 나누어 평균 누적합 계산 (B, L, D) score_mean = score / count_for_mean # 정규화 분모 설정 denom = tf.maximum(score_mean, self.eps) score_norm = score / denom # ----------------------------------------------- score_clipped = tf.clip_by_value(score_norm, -self.clip_value, self.clip_value) x_comb = score_clipped * V out = self.norm(x_comb + residual) out = self.glu(out) return tf.cast(out, x.dtype) class Lo(layers.Layer): def __init__(self, d_model): super().__init__() self.d = layers.Dense(64, activation='silu') self.w = layers.Dense(d_model) self.norm = layers.LayerNormalization(epsilon=1e-5, dtype='float32') def call(self, x): p = self.d(x) p = self.w(p) return self.norm(p) + x class Block(layers.Layer): def __init__(self, d_model): super().__init__() self.lou = LoU(d_model) self.lo = Lo(d_model) def call(self, x): x = self.lou(x) x = self.lo(x) return x class ReLM(tf.keras.Model): def __init__(self, vocab_size, max_seq_len, d_model, n_layers, dropout_rate=0.1): super().__init__() self.token_embedding = layers.Embedding(vocab_size, d_model) self.pos_embedding = layers.Embedding(max_seq_len, d_model) self.blocks = [Block(d_model) for _ in range(n_layers)] self.ln_f = layers.LayerNormalization(epsilon=1e-5, dtype="float32") def call(self, x, training=False): batch_size, seq_len = tf.shape(x)[0], tf.shape(x)[1] positions = tf.range(seq_len)[tf.newaxis, :] x = self.token_embedding(x) + self.pos_embedding(positions) for block in self.blocks: x = block(x) x = self.ln_f(x) embedding_matrix = tf.cast(self.token_embedding.embeddings, x.dtype) logits = tf.matmul(x, embedding_matrix, transpose_b=True) return tf.cast(logits, tf.float32) model = ReLM( vocab_size=vocab_size, max_seq_len=max_len, d_model=256, n_layers=1 ) dummy_input = np.zeros((1, max_len), dtype=np.int32) _ = model(dummy_input) model.summary() model.load_weights(MODEL_PATH) print("모델 가중치 로드 완료!") # ======================= # 6) 추론 함수 (기존 코드 유지) # 더미 인풋으로 모델 초기화 def generate_text_topp(model, prompt, max_len=512, max_gen=512, p=0.9, temperature=0.8, min_len=20): model_input = text_to_ids(f" {prompt}") model_input = model_input[:max_len] generated = list(model_input) for step in range(max_gen): if len(generated) > max_len: input_seq = generated[-max_len:] else: input_seq = generated input_padded = np.pad(input_seq, (0, max_len - len(input_seq)), constant_values=pad_id) input_tensor = tf.convert_to_tensor([input_padded]) logits = model(input_tensor, training=False) next_token_logits = logits[0, len(input_seq) - 1].numpy() next_token_logits[end_id] -= 5.0 next_token_logits[pad_id] -= 10.0 probs = tf.nn.softmax(next_token_logits / temperature).numpy() sorted_indices = np.argsort(probs)[::-1] sorted_probs = probs[sorted_indices] cumulative_probs = np.cumsum(sorted_probs) cutoff = np.searchsorted(cumulative_probs, p) top_indices = sorted_indices[:cutoff + 1] top_probs = sorted_probs[:cutoff + 1] top_probs /= np.sum(top_probs) next_token_id = np.random.choice(top_indices, p=top_probs) if next_token_id == end_id and len(generated) >= min_len: break generated.append(int(next_token_id)) return ids_to_text(generated) def gr_generate(prompt, max_len=512, max_gen=512, p=0.8, temperature=0.8): return generate_text_topp(model, prompt, max_len=max_len, p=p, temperature=temperature) # Gradio 인터페이스 정의 iface = gr.Interface( fn=gr_generate, inputs=[ gr.Textbox(label="Prompt 입력", placeholder="여기에 문장 입력...", lines=2), gr.Slider(20, 512, value=150, step=1, label="Max length"), gr.Slider(0.1, 1.0, value=0.8, step=0.05, label="Top-p"), gr.Slider(0.1, 2.0, value=0.8, step=0.05, label="Temperature") ], outputs=[ gr.Textbox(label="생성 결과", lines=10) ], title="Cuma LM 텍스트 생성", description="간단한 Gradio UI로 Cuma 모델 텍스트 생성 테스트" ) iface.launch()