import json import numpy as np import pandas as pd import tensorflow as tf from tensorflow.keras import layers import sentencepiece as spm import requests # ⬇️ 토크나이저 불러오기 sp = spm.SentencePieceProcessor() sp.load("ko_unigram.model") # ⬇️ 특수 토큰 ID 추출 pad_id = sp.piece_to_id("") if sp.piece_to_id("") != -1 else 0 start_id = sp.piece_to_id("") sep_id = sp.piece_to_id("") end_id = sp.piece_to_id("") unk_id = sp.piece_to_id("") vocab_size = sp.get_piece_size() print(f"✅ Vocabulary size: {vocab_size}") # ⬇️ 텍스트 <-> ID 변환 함수 def text_to_ids(text): return sp.encode(text, out_type=int) def ids_to_text(ids): return sp.decode(ids) max_len = 230 batch_size = 128 class Lo(layers.Layer): def __init__(self, d_model): super().__init__() # 내부 계산은 float32로 유지 self.proj = layers.Dense(d_model, use_bias=True, dtype='float32') self.p = layers.Dense(96, use_bias=True, dtype='float32') self._out_dtype = 'float32' def call(self, x): # x may be bfloat16; cast to float32 for stable intermediate computation x_f32 = tf.cast(x, tf.float32) x = self.proj(x_f32) x = tf.nn.gelu(x) x = self.p(x) # cast back to model dtype for consistency return tf.cast(x, self._out_dtype) class LoSoU(layers.Layer): """ 안정화된 LoSoU 레이어 (동적 alpha 사용) - alpha 값을 입력에 따라 동적으로 계산: alpha = sigmoid(Linear(x)) - 누적합 대신 지수이동평균(EMA) 사용 (alpha: smoothing factor) - 내부 계산은 float32로 수행 (TPU bfloat16 안정성 향상) - EMA 결과 클리핑 및 작은 epsilon 적용 - 안전한 split 처리 (짝수 차원 가정; 아니라면 마지막 차원 pad 필요) """ def __init__(self, d_model, clip_value=5.0, eps=1e-6): super().__init__() # 대부분 연산을 float32로 수행 self.d_model = d_model self.clip_value = float(clip_value) self.eps = float(eps) # projection / gating layers in float32 self.Q = layers.Dense(96, dtype='float32') self.K = layers.Dense(96, dtype='float32') self.V = layers.Dense(96, activation='gelu', dtype='float32') self.proj = layers.Dense(d_model, use_bias=True, dtype='float32') self.norm = layers.LayerNormalization(epsilon=1e-5, dtype='float32') # 동적 alpha 계산을 위한 레이어 # alpha는 [0, 1] 범위여야 하므로 sigmoid 사용 # 입력 x의 d_model 차원을 사용하여 각 샘플에 대해 alpha 계산 # 예: (B, L, d_model) -> (B, L, 1) -> (B, L, 1) with sigmoid # 또는 (B, L, d_model) -> (B, L, d_model) -> global reduce -> (B, L, 1) # 간단히 각 위치에 대해 동일한 alpha 사용 (입력의 평균 기반) # 또는 위치별로 다르게 사용 (각 위치에 대해 계산) # 여기서는 위치별로 다르게 계산 (B, L, 1) self.alpha_linear = layers.Dense(1, activation='sigmoid', dtype='float32') def _ema_over_time(self, score, alpha_dynamic): # score: (B, L, D) float32 in [0,1] roughly # alpha_dynamic: (B, L, 1) float32 in [0,1] # transpose to (L, B, D) to scan over time steps seq = tf.transpose(score, perm=[1, 0, 2]) # (L, B, D) alpha_seq = tf.transpose(alpha_dynamic, perm=[1, 0, 2]) # (L, B, 1) def step(prev_ema, inputs): x_t, alpha_t = inputs # prev_ema: (B, D), x_t: (B, D), alpha_t: (B, 1) new = alpha_t * x_t + (1.0 - alpha_t) * prev_ema return new # 초기값을 첫 step 값으로 설정 init = seq[0] # (B, D) first_alpha = alpha_seq[0] # (B, 1) # scan의 elems는 (L-1, B, D) 및 (L-1, B, 1) 이어야 함 remaining_seq = seq[1:] # (L-1, B, D) remaining_alpha = alpha_seq[1:] # (L-1, B, 1) # elems는 두 텐서의 튜플로 구성: (x_t, alpha_t) elems = (remaining_seq, remaining_alpha) ema_seq = tf.scan(fn=step, elems=elems, initializer=init) # 초기값 포함 ema_seq = tf.concat([tf.expand_dims(init, 0), ema_seq], axis=0) # (L, B, D) # transpose back to (B, L, D) ema = tf.transpose(ema_seq, perm=[1, 0, 2]) return ema def call(self, x): # x: (B, L, d_model) maybe bfloat16 or float32 # cast to float32 for all internal computations x_f32 = tf.cast(x, tf.float32) residual = x_f32 # Q, K, V q = self.Q(x_f32) # (B, L, 96) k = self.K(x_f32) # (B, L, 96) V = tf.cast(self.V(x), tf.float32) # ensure V's output is float32 # gating signals in (0,1) g_q = tf.nn.sigmoid(q) g_k = tf.nn.tanh(k) # elementwise product -> bounded roughly [0,1] score = g_q * g_k # 동적 alpha 계산: (B, L, d_model) -> (B, L, 1) alpha_dynamic = self.alpha_linear(x_f32) * 0.8 + 0.1 # (B, L, 1) # 필요시 alpha_dynamic에 대한 후처리 (예: min/max 등) 가능 # ex: alpha_dynamic = tf.clip_by_value(alpha_dynamic, 0.01, 0.99) # EMA across time (stable alternative to cumsum) score_ema = self._ema_over_time(score, alpha_dynamic) # optionally normalize by (mean + eps) across last dim to reduce scale variations mean_last = tf.reduce_mean(score_ema, axis=-1, keepdims=True) # (B, L, 1) denom = tf.maximum(mean_last, self.eps) score_norm = score_ema / denom # clip to avoid extremes score_clipped = tf.clip_by_value(score_norm, -self.clip_value, self.clip_value) # combine with V x_comb = score_clipped * V # (B, L, d_model) out = self.proj(x_comb) # (B, L, d_model) out = self.norm(out) # cast back to original dtype for downstream layers return tf.cast(out, x.dtype) class Block(layers.Layer): def __init__(self, d_model, hyper_n): super().__init__() self.losou = [LoSoU(d_model) for _ in range(hyper_n)] def call(self, x): for losou in self.losou: x = losou(x) return x class ReLaM(tf.keras.Model): def __init__(self, vocab_size, max_seq_len, d_model, n_layers, dropout_rate=0.1): super().__init__() self.token_embedding = layers.Embedding(vocab_size, 128) self.pos_embedding = layers.Embedding(max_seq_len, 128) self.blocks = [Block(d_model, hyper_n=1) for _ in range(n_layers)] self.proj = layers.Dense(128) self.ln_f = layers.LayerNormalization(epsilon=1e-5, dtype="float32") def call(self, x, training=False): batch_size, seq_len = tf.shape(x)[0], tf.shape(x)[1] positions = tf.range(seq_len)[tf.newaxis, :] x = self.token_embedding(x) + self.pos_embedding(positions) for block in self.blocks: x = block(x) x = self.proj(x) x = self.ln_f(x) embedding_matrix = tf.cast(self.token_embedding.embeddings, x.dtype) logits = tf.matmul(x, embedding_matrix, transpose_b=True) return tf.cast(logits, tf.float32) # 모델 생성 model = ReLaM( vocab_size=vocab_size, max_seq_len=max_len, d_model=256, n_layers=1 ) dummy_input = tf.zeros((1, max_len), dtype=tf.int32) _ = model(dummy_input) model.load_weights('/content/Cobra.weights.h5') print("모델 가중치 로드 완료!") def generate_text_topp(model, prompt, max_len=100, max_gen=98, p=0.9, temperature=0.8, min_len=30): model_input = text_to_ids(f" {prompt} ") model_input = model_input[:max_len] generated = list(model_input) for step in range(max_gen): if len(generated) > max_len: input_seq = generated[-max_len:] else: input_seq = generated input_padded = np.pad(input_seq, (0, max_len - len(input_seq)), constant_values=pad_id) input_tensor = tf.convert_to_tensor([input_padded]) logits = model(input_tensor, training=False) next_token_logits = logits[0, len(input_seq) - 1].numpy() next_token_logits[end_id] -= 5.0 next_token_logits[pad_id] -= 10.0 probs = tf.nn.softmax(next_token_logits / temperature).numpy() sorted_indices = np.argsort(probs)[::-1] sorted_probs = probs[sorted_indices] cumulative_probs = np.cumsum(sorted_probs) cutoff = np.searchsorted(cumulative_probs, p) top_indices = sorted_indices[:cutoff + 1] top_probs = sorted_probs[:cutoff + 1] top_probs /= np.sum(top_probs) next_token_id = np.random.choice(top_indices, p=top_probs) if next_token_id == end_id and len(generated) >= min_len: break generated.append(int(next_token_id)) return ids_to_text(generated) print("\n\n===== 생성 결과 =====") print(generate_text_topp(model, "제가 이따가 버스를 타야 해서 준비 좀 해야겠어요. 재미있는 대화였습니다!", p=0.8))