import os, json, random, numpy as np, tensorflow as tf from tensorflow.keras import layers, Model import sentencepiece as spm import requests # =============================== # 0️⃣ 환경 설정 # =============================== TOKENIZER_PATH = "bpe.model" DATA_PATH = "corpus.txt" # 36M 문장 텍스트 파일 MAX_LEN = 128 EMBED_DIM = 384 LATENT_DIM = 384 BATCH_SIZE = 400 NEGATIVE_RATIO = 1 # negative sample 수 def download_file(url, save_path): if not os.path.exists(save_path): print(f"Downloading {save_path} ...") r = requests.get(url, stream=True) r.raise_for_status() with open(save_path, "wb") as f: for chunk in r.iter_content(8192*2): f.write(chunk) print(f"✅ {save_path} saved") download_file("https://huggingface.co/datasets/OpenLab-NLP/ko-corpus/resolve/main/bpe.model?download=true", TOKENIZER_PATH) download_file("https://huggingface.co/datasets/OpenLab-NLP/ko-corpus/resolve/main/shuffled_corpus%20(1).txt?download=true", DATA_PATH) # =============================== # 2️⃣ 토크나이저 준비 # =============================== sp = spm.SentencePieceProcessor(TOKENIZER_PATH) pad_id = sp.piece_to_id("") if sp.piece_to_id("") != -1 else 0 vocab_size = sp.get_piece_size() def encode_sentence(sentence, max_len=MAX_LEN): return sp.encode(sentence, out_type=int)[:max_len] def pad_sentence(tokens): return tokens + [pad_id]*(MAX_LEN - len(tokens)) def gen_pairs_streaming(txt_path=DATA_PATH, negative_ratio=NEGATIVE_RATIO): with open(txt_path, "r", encoding="utf-8") as f: sentences = [line.strip() for line in f if line.strip()] while True: for s1 in sentences: # positive pair (자기 자신) x1 = pad_sentence(encode_sentence(s1)) yield (x1, x1), 1.0 # negative pairs (자기 자신 제외) for _ in range(negative_ratio): s2 = s1 while s2 == s1: s2 = random.choice(sentences) x2 = pad_sentence(encode_sentence(s2)) yield (x1, x2), 0.0 dataset = tf.data.Dataset.from_generator( lambda: gen_pairs_streaming(), output_types=((tf.int32, tf.int32), tf.float32), output_shapes=(((MAX_LEN,), (MAX_LEN,)), ()) ).shuffle(1024).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE) class EncoderBlock(tf.keras.layers.Layer): def __init__(self, embed_dim=EMBED_DIM, ff_dim=1152, seq_len=MAX_LEN): super().__init__() self.embed_dim = embed_dim self.seq_len = seq_len self.fc1 = layers.Dense(ff_dim) self.fc2 = layers.Dense(embed_dim) self.fc3 = layers.Dense(ff_dim) self.fc4 = layers.Dense(embed_dim) # (seq_len, embed_dim)로 정의 — (L -> D) 투사용 self.w_proj = self.add_weight( name="w_proj_L_to_D", shape=(seq_len, embed_dim), initializer="glorot_uniform", trainable=True ) self.alpha2 = layers.Dense(1) self.ln = layers.LayerNormalization(epsilon=1e-5) self.ln1 = layers.LayerNormalization(epsilon=1e-5) self.ln2 = layers.LayerNormalization(epsilon=1e-5) def call(self, x): # x: (B, L, D) x_norm = self.ln(x) h = self.fc1(x_norm) # (B, L, ff_dim) g, v = tf.split(h, 2, axis=-1) # (B, L, ff_dim/2) 각 h = tf.nn.silu(g) * v h = self.fc2(h) # (B, L, D) # --- matmul -> (B, L, L) --- sim = tf.matmul(h, h, transpose_b=True) # (B, L, L) # (옵션) 정규화/스케일링 원하면 추가 sim = tf.nn.softmax(sim, axis=-1) # (B, L, L) # --- (B, L, L) -> (B, L, D) : tensordot axes 맞춰서 투사 --- # w_proj: (L, D), sim last axis matches w_proj first axis h2 = tf.tensordot(sim, self.w_proj, axes=[[2], [0]]) # (B, L, D) # 이제 shape 맞음 — v와 element-wise 곱 가능 v_gate = tf.nn.softmax(self.alpha2(v), axis=1) # (B, L, 1) v = v_gate * h2 # (B, L, D) x_norm = x_norm + self.ln2(v) z = self.fc3(x_norm) g, v = tf.split(z, 2, axis=-1) z = tf.nn.silu(g) * v z = self.fc4(z) return x_norm + self.ln1(z) class L2NormLayer(layers.Layer): def __init__(self, axis=1, epsilon=1e-10, **kwargs): super().__init__(**kwargs) self.axis = axis self.epsilon = epsilon def call(self, inputs): return tf.math.l2_normalize(inputs, axis=self.axis, epsilon=self.epsilon) def get_config(self): return {"axis": self.axis, "epsilon": self.epsilon, **super().get_config()} class SentenceEncoder(tf.keras.Model): def __init__(self, vocab_size, embed_dim=384, latent_dim=384, max_len=128, pad_id=pad_id): super().__init__() self.pad_id = pad_id self.embed = layers.Embedding(vocab_size, embed_dim) self.pos_embed = layers.Embedding(input_dim=max_len, output_dim=embed_dim) self.blocks = [EncoderBlock() for _ in range(1)] self.attn_pool = layers.Dense(1) self.ln_f = layers.LayerNormalization(epsilon=1e-5, dtype=tf.float32) self.latent = layers.Dense(latent_dim, activation=None) # tanh 제거 self.l2norm = L2NormLayer() # 추가 def call(self, x): positions = tf.range(tf.shape(x)[1])[tf.newaxis, :] x_embed = self.embed(x) + self.pos_embed(positions) mask = tf.cast(tf.not_equal(x, self.pad_id), tf.float32) x = x_embed for block in self.blocks: x = block(x) x = self.ln_f(x) scores = self.attn_pool(x) scores = tf.where(tf.equal(mask[..., tf.newaxis], 0), -1e9, scores) scores = tf.nn.softmax(scores, axis=1) pooled = tf.reduce_sum(x * scores, axis=1) latent = self.latent(pooled) return self.l2norm(latent) # L2 정규화 후 반환 # =============================== # 5️⃣ Cosine similarity layer + Contrastive Loss # =============================== class CosineSimilarityLayer(layers.Layer): def call(self, inputs): v1, v2 = inputs return tf.reduce_sum(v1 * v2, axis=-1) # 이미 L2 정규화돼서 dot product = cosine similarity def contrastive_loss(margin=0.5): def loss(y_true, y_pred): y_true = tf.cast(y_true, tf.float32) dist = 1 - y_pred pos_loss = y_true * tf.square(dist) neg_loss = (1 - y_true) * tf.square(tf.maximum(margin - dist, 0)) return tf.reduce_mean(pos_loss + neg_loss) return loss encoder = SentenceEncoder(vocab_size=vocab_size) # =============================== # 6️⃣ 시암 모델 정의 # =============================== input1 = tf.keras.Input(shape=(MAX_LEN,), dtype=tf.int32) input2 = tf.keras.Input(shape=(MAX_LEN,), dtype=tf.int32) v1 = encoder(input1) v2 = encoder(input2) cos_sim = CosineSimilarityLayer()([v1, v2]) siamese_model = tf.keras.Model([input1, input2], cos_sim) siamese_model.compile(optimizer=tf.keras.optimizers.Adam(1e-5), loss=contrastive_loss(margin=0.5)) siamese_model.summary() # =============================== # 7️⃣ 학습 # =============================== #steps_per_epoch = 36757266 // 400 steps_per_epoch = 1000000 // 400 # generator 기반 streaming 학습 siamese_model.fit(dataset, epochs=1, steps_per_epoch=steps_per_epoch) # steps_per_epoch는 필요에 따라 조절 encoder.save_weights("encoder.weights.h5") siamese_model.save_weights("siamese_model.weights.h5") # =============================== # 8️⃣ corpus 벡터 생성 + 캐싱 (안전하게 새로 생성) # =============================== LIMIT = 1000 # 검색용 corpus 문장 수 prompts = [] # prompts 먼저 읽기 with open(DATA_PATH, "r", encoding="utf-8") as f: for i, line in enumerate(f): if i >= LIMIT: break line = line.strip() if line: prompts.append(line) def get_sentence_vector(sentence): tokens = pad_sentence(encode_sentence(sentence)) return encoder(np.array([tokens])).numpy()[0] # corpus_vectors 항상 새로 생성 (기존 npy 무시) corpus_vectors = np.stack([get_sentence_vector(p) for p in prompts]).astype(np.float16) np.save("corpus_vectors.npy", corpus_vectors) # norms 계산 corpus_norms = np.linalg.norm(corpus_vectors, axis=1) # =============================== # 9️⃣ 검색 함수 # =============================== def search(query, top_k=3): q_vec = get_sentence_vector(query).astype(np.float16) sims = corpus_vectors @ q_vec sims /= (corpus_norms * np.linalg.norm(q_vec) + 1e-8) # top_k 안전 처리 top_k = min(top_k, len(prompts)) top_idx = np.argsort(sims)[::-1][:top_k] return [(prompts[i], float(sims[i])) for i in top_idx] # =============================== # 🔟 테스트 # =============================== query = "우리가 핸드폰, 배를 세계에서 제일 잘 만드는 것 이상으로 사랑을 제일 잘 실천할 수 있는 능력, 자질, 저력이 우리에게 있다." results = search(query) for p, s in results: print(f"Prompt: {p}\n유사도: {s:.3f}\n---") query = "안녕하세요! 오늘 날씨 어떤가요?" results = search(query) for p, s in results: print(f"Prompt: {p}\n유사도: {s:.3f}\n---")