model-prototype / Model.py
Yuchan
Create Model.py
8ca26fe verified
raw
history blame
10.8 kB
import json
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers
import sentencepiece as spm
import requests
# โฌ‡๏ธ ํŒŒ์ผ ๋‹ค์šด๋กœ๋“œ ํ•จ์ˆ˜
def download_file(url, save_path):
response = requests.get(url, stream=True)
response.raise_for_status()
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"โœ… ํŒŒ์ผ ์ €์žฅ๋จ: {save_path}")
# โฌ‡๏ธ ๋ฐ์ดํ„ฐ์™€ ํ† ํฌ๋‚˜์ด์ € ๋‹ค์šด๋กœ๋“œ
download_file('https://huggingface.co/datasets/Yuchan5386/TinyInst/resolve/main/ko_unigram.model?download=true', 'ko_unigram.model')
download_file('https://huggingface.co/datasets/Yuchan5386/TinyInst/resolve/refs%2Fconvert%2Fparquet/default/train/0000.parquet?download=true', 'dataset.parquet')
# โฌ‡๏ธ Parquet ๋ฐ์ดํ„ฐ ๋ถˆ๋Ÿฌ์˜ค๊ธฐ
df = pd.read_parquet("dataset.parquet", engine="pyarrow")
# โฌ‡๏ธ <start> ์งˆ๋ฌธ <sep> ๋‹ต๋ณ€ <end> ํฌ๋งท์œผ๋กœ ๋ณ€ํ™˜
train_sentences = []
for conversations in df["conversations"]:
for i in range(0, len(conversations) - 1, 2):
item1, item2 = conversations[i], conversations[i + 1]
if item1.get("from") == "human" and item2.get("from") == "gpt":
prompt = item1.get("value", "").strip().replace("\n", " ")
response = item2.get("value", "").strip().replace("\n", " ")
full = f"<start> {prompt} <sep> {response} <end>"
train_sentences.append(full)
train_sentences = train_sentences
print(f"์ด ๋ฌธ์žฅ ๊ฐœ์ˆ˜: {len(train_sentences)}")
# โฌ‡๏ธ ํ† ํฌ๋‚˜์ด์ € ๋ถˆ๋Ÿฌ์˜ค๊ธฐ
sp = spm.SentencePieceProcessor()
sp.load("ko_unigram.model")
# โฌ‡๏ธ ํŠน์ˆ˜ ํ† ํฐ ID ์ถ”์ถœ
pad_id = sp.piece_to_id("<pad>") if sp.piece_to_id("<pad>") != -1 else 0
start_id = sp.piece_to_id("<start>")
sep_id = sp.piece_to_id("<sep>")
end_id = sp.piece_to_id("<end>")
unk_id = sp.piece_to_id("<unk>")
vocab_size = sp.get_piece_size()
print(f"โœ… Vocabulary size: {vocab_size}")
# โฌ‡๏ธ ํ…์ŠคํŠธ <-> ID ๋ณ€ํ™˜ ํ•จ์ˆ˜
def text_to_ids(text):
return sp.encode(text, out_type=int)
def ids_to_text(ids):
return sp.decode(ids)
# โฌ‡๏ธ ์ „์ฒ˜๋ฆฌ ํ•˜์ดํผํŒŒ๋ผ๋ฏธํ„ฐ
max_len = 100
batch_size = 128
# โฌ‡๏ธ ์ธํ’‹๊ณผ ํƒ€๊ฒŸ ๋งˆ์Šคํ‚น ํฌํ•จ๋œ ์ „์ฒ˜๋ฆฌ
encoded_inputs = []
targets = []
for sentence in train_sentences:
if "<sep>" not in sentence:
continue
sep_index = sentence.index("<sep>")
input_text = sentence[:sep_index + len("<sep>")].strip()
target_text = sentence[sep_index + len("<sep>"):].strip()
input_ids = text_to_ids(input_text)
target_ids = text_to_ids(target_text + " <end>")
full_input = input_ids + target_ids
full_input = full_input[:max_len]
target_mask = [0] * len(input_ids) + [1] * len(target_ids)
target_mask = target_mask[:max_len]
if len(full_input) < max_len:
pad_len = max_len - len(full_input)
full_input += [pad_id] * pad_len
target_mask += [0] * pad_len
encoded_inputs.append(full_input)
target_seq = full_input[1:] + [end_id]
target_seq = target_seq[:max_len]
masked_target = [
t if m == 1 else pad_id
for t, m in zip(target_seq, target_mask)
]
targets.append(masked_target)
# โฌ‡๏ธ ๋„˜ํŒŒ์ด ๋ณ€ํ™˜
encoded_inputs = np.array(encoded_inputs)
targets = np.array(targets)
# โฌ‡๏ธ TensorFlow Dataset ์ƒ์„ฑ
def data_generator():
for input_seq, target_seq in zip(encoded_inputs, targets):
yield input_seq, target_seq
dataset = tf.data.Dataset.from_generator(
data_generator,
output_signature=(
tf.TensorSpec(shape=(max_len,), dtype=tf.int32),
tf.TensorSpec(shape=(max_len,), dtype=tf.int32)
)
)
dataset = dataset.shuffle(1000).batch(batch_size).prefetch(tf.data.AUTOTUNE)
print("โœ… TF Dataset ์ƒ์„ฑ ์™„๋ฃŒ!")
class Adapter(layers.Layer):
def __init__(self, d_model):
super().__init__()
# ๋‚ด๋ถ€ ๊ณ„์‚ฐ์€ float32๋กœ ์œ ์ง€
self.proj = layers.Dense(d_model, use_bias=True, dtype='float32')
self.p = layers.Dense(128, use_bias=True, dtype='float32')
self._out_dtype = 'float32'
self.ln = layers.LayerNormalization(epsilon=1e-5, dtype="float32")
self.ln1 = layers.LayerNormalization(epsilon=1e-5, dtype="float32")
def call(self, x):
# x may be bfloat16; cast to float32 for stable intermediate computation
x_f32 = tf.cast(x, tf.float32)
re = x_f32
x_f32 = self.ln(x_f32)
x = self.p(x_f32)
x = tf.nn.gelu(x)
x = self.proj(x)
x = self.ln1(x) + re
# cast back to model dtype for consistency
return tf.cast(x, self._out_dtype)
class SwiGLU(layers.Layer):
def __init__(self, d_model):
super().__init__()
self.proj = layers.Dense(2304)
self.w1 = layers.Dense(d_model)
self.ln = layers.LayerNormalization(epsilon=1e-5, dtype="float32")
self.ln1 = layers.LayerNormalization(epsilon=1e-5, dtype="float32")
def call(self, x):
x = self.ln(x)
x = self.proj(x)
a, b = tf.split(x, 2, axis=-1)
o = tf.nn.silu(a) * b
o = self.ln1(self.w1(o))
return o
class LowRankGLA(tf.keras.layers.Layer):
def __init__(self, d_model, low_rank_dim, **kwargs):
super(LowRankGLA, self).__init__(**kwargs)
self.d_model = d_model
self.low_rank_dim = low_rank_dim
# Low-rank projections for Q, K, V, G
# W_q โ‰ˆ W_q_A * W_q_B
self.W_q_A = layers.Dense(low_rank_dim, use_bias=True)
self.W_k_A = layers.Dense(low_rank_dim, use_bias=True)
self.W_v_A = layers.Dense(low_rank_dim, use_bias=True)
self.W_g_A = layers.Dense(low_rank_dim, use_bias=True)
# Output projection
self.output_dense_B = layers.Dense(d_model, use_bias=True)
def call(self, inputs):
# inputs shape: (batch_size, seq_len, d_model)
# Low-rank projections
# Q = inputs * W_q_A * W_q_B
q = self.W_q_A(inputs)
k = self.W_k_A(inputs)
v = self.W_v_A(inputs)
g = self.W_g_A(inputs)
# Apply activation functions
q = tf.nn.sigmoid(q)
k = tf.nn.sigmoid(k)
g = tf.nn.sigmoid(g)
# GLA computation with cumulative sum
attn_weights = q * k # (batch_size, seq_len, d_model)
numerator = tf.cumsum(attn_weights * v, axis=1)
denominator = tf.cumsum(attn_weights, axis=1) + 1e-12
output = numerator / denominator
output = output * g # Apply gate
# Final low-rank output projection
output = self.output_dense_B(output)
return output
def get_config(self):
config = super().get_config()
config.update({
"d_model": self.d_model,
"low_rank_dim": self.low_rank_dim,
})
return config
class Respiso(tf.keras.Model):
def __init__(self, vocab_size, max_seq_len, d_model, n_layers, dropout_rate=0.1):
super().__init__()
self.token_embedding = layers.Embedding(vocab_size, d_model)
self.gla = LowRankGLA(d_model, 48)
self.glu = SwiGLU(d_model)
self.adapter = Adapter(d_model)
self.ln_f = layers.LayerNormalization(epsilon=1e-5, dtype="float32")
self.lm_head = layers.Dense(vocab_size, use_bias=False)
def call(self, x, training=False):
x = self.token_embedding(x)
x = self.glu(x)
x = self.adapter(x)
x = self.ln_f(x)
logits = self.lm_head(x)
return tf.cast(logits, tf.float32)
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')
def masked_loss(y_true, y_pred):
loss = loss_fn(y_true, y_pred)
mask = tf.cast(tf.not_equal(y_true, pad_id), tf.float32)
masked_loss = tf.reduce_sum(loss * mask) / tf.reduce_sum(mask)
return masked_loss
def masked_perplexity(y_true, y_pred):
loss = loss_fn(y_true, y_pred)
mask = tf.cast(tf.not_equal(y_true, pad_id), tf.float32)
avg_loss = tf.reduce_sum(loss * mask) / tf.reduce_sum(mask)
return tf.exp(tf.minimum(avg_loss, 10.0)) # ์ˆ˜์น˜ ์•ˆ์ •์„ฑ ํ™•๋ณด
def create_lr_schedule(initial_lr=5e-5, decay_steps=10000, decay_rate=0.9):
return tf.keras.optimizers.schedules.ExponentialDecay(
initial_learning_rate=initial_lr,
decay_steps=decay_steps,
decay_rate=decay_rate,
staircase=False
)
# ๋ชจ๋ธ ์ƒ์„ฑ
model = Respiso(
vocab_size=vocab_size,
max_seq_len=max_len,
d_model=256,
n_layers=1
)
# ์˜ตํ‹ฐ๋งˆ์ด์ € ์„ค์ •
optimizer = tf.keras.optimizers.Adam(
learning_rate=create_lr_schedule(),
beta_1=0.9,
beta_2=0.95,
epsilon=1e-8,
clipnorm=1.0
)
# ๋ชจ๋ธ ์ปดํŒŒ์ผ
model.compile(
optimizer=optimizer,
loss=masked_loss,
metrics=[
masked_perplexity
]
)
# ๋”๋ฏธ ์ธํ’‹์œผ๋กœ ๋ชจ๋ธ ์ดˆ๊ธฐํ™”
dummy_input = np.zeros((1, max_len), dtype=np.int32)
model(dummy_input)
model.summary()
# ํ•™์Šต ์‹œ์ž‘
history = model.fit(
dataset,
epochs=1,
steps_per_epoch = encoded_inputs.shape[0] // batch_size,
verbose=1
)
# ๊ฐ€์ค‘์น˜ ์ €์žฅ
model.save_weights("Cobra.weights.h5")
print("๋ชจ๋ธ ๊ฐ€์ค‘์น˜ ์ €์žฅ ์™„๋ฃŒ!")
def generate_text_topp(model, prompt, max_len=100, max_gen=98, p=0.9, temperature=0.8, min_len=20):
model_input = text_to_ids(f"<start> {prompt} <sep>")
model_input = model_input[:max_len]
generated = list(model_input)
for step in range(max_gen):
if len(generated) > max_len:
input_seq = generated[-max_len:]
else:
input_seq = generated
input_padded = np.pad(input_seq, (0, max_len - len(input_seq)), constant_values=pad_id)
input_tensor = tf.convert_to_tensor([input_padded])
logits = model(input_tensor, training=False)
next_token_logits = logits[0, len(input_seq) - 1].numpy()
next_token_logits[end_id] -= 5.0
next_token_logits[pad_id] -= 10.0
probs = tf.nn.softmax(next_token_logits / temperature).numpy()
sorted_indices = np.argsort(probs)[::-1]
sorted_probs = probs[sorted_indices]
cumulative_probs = np.cumsum(sorted_probs)
cutoff = np.searchsorted(cumulative_probs, p)
top_indices = sorted_indices[:cutoff + 1]
top_probs = sorted_probs[:cutoff + 1]
top_probs /= np.sum(top_probs)
next_token_id = np.random.choice(top_indices, p=top_probs)
if next_token_id == end_id and len(generated) >= min_len:
break
generated.append(int(next_token_id))
return ids_to_text(generated)
print("\n\n===== ์ƒ์„ฑ ๊ฒฐ๊ณผ =====")
print(generate_text_topp(model, "์•ˆ๋…•", p=0.9))