LSTM_test / LSTM.py
gihakkk's picture
Upload LSTM.py
f1b07c4 verified
import numpy as np
# --- 1. μƒ˜ν”Œ 데이터 및 ν•˜μ΄νΌνŒŒλΌλ―Έν„° μ •μ˜ ---
input_dim = 3
hidden_dim = 4
output_dim = 2
sequence_length = 5
learning_rate = 0.01
epochs = 500
# 예: μ‹œν€€μŠ€ λ°μ΄ν„°μ˜ 총 합이 νŠΉμ • 값보닀 크면 1, μ•„λ‹ˆλ©΄ 0으둜 λΆ„λ₯˜
sample_input = np.random.rand(sequence_length, input_dim)
if np.sum(sample_input) > (sequence_length * input_dim / 2):
sample_y = np.array([1, 0]).reshape(-1, 1) # Class 0
else:
sample_y = np.array([0, 1]).reshape(-1, 1) # Class 1
print(f"Sample Input Shape: {sample_input.shape}")
print(f"True Label: Class {np.argmax(sample_y)}")
print("-" * 30)
# --- 2. ν•„μš” ν•¨μˆ˜ μ •μ˜ (ν™œμ„±ν™” ν•¨μˆ˜ 및 손싀 ν•¨μˆ˜) ---
# μ‹œκ·Έλͺ¨μ΄λ“œ ν™œμ„±ν™” ν•¨μˆ˜
def sigmoid(x):
return 1 / (1 + np.exp(-x))
# μ‹œκ·Έλͺ¨μ΄λ“œ ν•¨μˆ˜μ˜ λ„ν•¨μˆ˜
def sigmoid_derivative(x):
s = sigmoid(x)
return s * (1 - s)
# ν•˜μ΄νΌλ³Όλ¦­ νƒ„μ  νŠΈ(tanh) ν™œμ„±ν™” ν•¨μˆ˜
def tanh(x):
return np.tanh(x)
# tanh ν•¨μˆ˜μ˜ λ„ν•¨μˆ˜
def tanh_derivative(x):
return 1 - np.tanh(x)**2
# μ†Œν”„νŠΈλ§₯슀 ν•¨μˆ˜
def softmax(x):
# 수치적 μ•ˆμ •μ„±μ„ μœ„ν•΄ μž…λ ₯κ°’μ—μ„œ μ΅œλŒ“κ°’μ„ 빼쀌 (Overflow λ°©μ§€)
e_x = np.exp(x - np.max(x, axis=0, keepdims=True))
return e_x / np.sum(e_x, axis=0, keepdims=True)
# 크둜슀 μ—”νŠΈλ‘œν”Ό 손싀 ν•¨μˆ˜
def cross_entropy_loss(y_pred, y_true):
# y_pred에 μ•„μ£Ό μž‘μ€ 값을 더해 log(0) λ°©μ§€
return -np.sum(y_true * np.log(y_pred + 1e-9))
# --- 3. NumpyLSTM λͺ¨λΈ 클래슀 ---
class NumpyLSTM:
# λͺ¨λΈμ˜ κ°€μ€‘μΉ˜μ™€ νŒŒλΌλ―Έν„°λ₯Ό μ΄ˆκΈ°ν™”ν•©λ‹ˆλ‹€.
# - input_size: μž…λ ₯ λ²‘ν„°μ˜ 차원
# - hidden_size: 은닉 μƒνƒœ 및 μ…€ μƒνƒœ λ²‘ν„°μ˜ 차원
# - output_size: 좜λ ₯ 벑터(클래슀 개수)의 차원
def __init__(self, input_size, hidden_size, output_size, learning_rate=0.01):
self.input_size = input_size
self.hidden_size = hidden_size
self.output_size = output_size
self.learning_rate = learning_rate
# LSTM νŒŒλΌλ―Έν„° μ΄ˆκΈ°ν™” (Forget, Input, Cell, Output 게이트)
# 각 κ²Œμ΄νŠΈλŠ” μž…λ ₯(x)κ³Ό 이전 은닉 μƒνƒœ(h)λ₯Ό λͺ¨λ‘ λ°›μœΌλ―€λ‘œ, κ°€μ€‘μΉ˜ 행렬을 ν•©μ³μ„œ μ •μ˜
self.Wx = np.random.randn(4 * hidden_size, input_size) * 0.1
self.Wh = np.random.randn(4 * hidden_size, hidden_size) * 0.1
self.b = np.zeros((4 * hidden_size, 1))
# Dense Layer (좜λ ₯μΈ΅) νŒŒλΌλ―Έν„° μ΄ˆκΈ°ν™”
self.Why = np.random.randn(output_size, hidden_size) * 0.1
self.by = np.zeros((output_size, 1))
# κ·Έλž˜λ””μ–ΈνŠΈλ₯Ό μ €μž₯ν•  λ³€μˆ˜ μ΄ˆκΈ°ν™”
self.dWx, self.dWh, self.db = np.zeros_like(self.Wx), np.zeros_like(self.Wh), np.zeros_like(self.b)
self.dWhy, self.dby = np.zeros_like(self.Why), np.zeros_like(self.by)
# μˆœμ „νŒŒ 과정을 μˆ˜ν–‰ν•©λ‹ˆλ‹€.
# - inputs: (μ‹œν€€μŠ€ 길이, μž…λ ₯ 차원) ν˜•νƒœμ˜ 2D numpy λ°°μ—΄
# - y_true: (좜λ ₯ 차원, 1) ν˜•νƒœμ˜ one-hot μΈμ½”λ”©λœ μ •λ‹΅ λ ˆμ΄λΈ”
def forward(self, inputs, y_true):
self.inputs = inputs
self.y_true = y_true
seq_length = inputs.shape[0]
# 이전 은닉 μƒνƒœμ™€ μ…€ μƒνƒœλ₯Ό μ €μž₯ν•  λ”•μ…”λ„ˆλ¦¬
self.h_states, self.c_states = {}, {}
self.h_states[-1] = np.zeros((self.hidden_size, 1))
self.c_states[-1] = np.zeros((self.hidden_size, 1))
# μˆœμ „νŒŒμ— ν•„μš”ν•œ 쀑간 값듀을 μ €μž₯ν•  λ”•μ…”λ„ˆλ¦¬
self.z_s, self.f_s, self.i_s, self.c_tilde_s, self.o_s = {}, {}, {}, {}, {}
# 1. LSTM μ…€ μˆœμ „νŒŒ (μ‹œκ°„ μˆœμ„œλŒ€λ‘œ)
for t in range(seq_length):
xt = self.inputs[t].reshape(-1, 1) # ν˜„μž¬ νƒ€μž„μŠ€ν…μ˜ μž…λ ₯
h_prev = self.h_states[t - 1]
c_prev = self.c_states[t - 1]
# (1) 게이트 계산을 μœ„ν•œ μ„ ν˜• κ²°ν•©
# 4개의 게이트(f, i, c_tilde, o) 계산을 ν•œ λ²ˆμ— μˆ˜ν–‰
self.z_s[t] = self.Wx @ xt + self.Wh @ h_prev + self.b
# (2) 각 게이트 ν™œμ„±ν™”
# Forget Gate (망각 게이트)
self.f_s[t] = sigmoid(self.z_s[t][:self.hidden_size, :])
# Input Gate (μž…λ ₯ 게이트)
self.i_s[t] = sigmoid(self.z_s[t][self.hidden_size:2*self.hidden_size, :])
# Cell Candidate (μ…€ μƒνƒœ 후보)
self.c_tilde_s[t] = tanh(self.z_s[t][2*self.hidden_size:3*self.hidden_size, :])
# Output Gate (좜λ ₯ 게이트)
self.o_s[t] = sigmoid(self.z_s[t][3*self.hidden_size:, :])
# (3) μ…€ μƒνƒœ 및 은닉 μƒνƒœ μ—…λ°μ΄νŠΈ
self.c_states[t] = self.f_s[t] * c_prev + self.i_s[t] * self.c_tilde_s[t]
self.h_states[t] = self.o_s[t] * tanh(self.c_states[t])
# 2. Dense Layer & Softmax μˆœμ „νŒŒ
self.final_h = self.h_states[seq_length - 1]
self.logits = self.Why @ self.final_h + self.by
self.y_pred = softmax(self.logits)
# 3. 손싀(Loss) 계산
self.loss = cross_entropy_loss(self.y_pred, self.y_true)
return self.loss, self.y_pred
# μ—­μ „νŒŒ(BPTT) 과정을 μˆ˜ν–‰ν•˜μ—¬ κ·Έλž˜λ””μ–ΈνŠΈλ₯Ό κ³„μ‚°ν•©λ‹ˆλ‹€.
def backward(self):
# κ·Έλž˜λ””μ–ΈνŠΈ μ΄ˆκΈ°ν™”
self.dWx, self.dWh, self.db = np.zeros_like(self.Wx), np.zeros_like(self.Wh), np.zeros_like(self.b)
self.dWhy, self.dby = np.zeros_like(self.Why), np.zeros_like(self.by)
# λ‹€μŒ νƒ€μž„μŠ€ν…μ—μ„œ λ„˜μ–΄μ˜¬ κ·Έλž˜λ””μ–ΈνŠΈ μ΄ˆκΈ°ν™”
dh_next = np.zeros_like(self.h_states[0])
dc_next = np.zeros_like(self.c_states[0])
# 1. Dense & Softmax Layer μ—­μ „νŒŒ
d_logits = self.y_pred - self.y_true # Loss에 λŒ€ν•œ Logits의 κ·Έλž˜λ””μ–ΈνŠΈ
self.dWhy = d_logits @ self.final_h.T
self.dby = d_logits
dh_final = self.Why.T @ d_logits # LSTM의 μ΅œμ’… 은닉 μƒνƒœμ— λŒ€ν•œ κ·Έλž˜λ””μ–ΈνŠΈ
# dh_next에 μ΅œμ’… κ·Έλž˜λ””μ–ΈνŠΈ μΆ”κ°€
dh_next += dh_final
# 2. LSTM μ…€ μ—­μ „νŒŒ (μ‹œκ°„ μ—­μˆœμœΌλ‘œ)
for t in reversed(range(len(self.inputs))):
xt = self.inputs[t].reshape(-1, 1)
h_prev = self.h_states[t - 1]
c_prev = self.c_states[t - 1]
# (1) 은닉 μƒνƒœμ™€ μ…€ μƒνƒœμ— λŒ€ν•œ κ·Έλž˜λ””μ–ΈνŠΈ 계산
do = dh_next * tanh(self.c_states[t])
dc = dc_next + dh_next * self.o_s[t] * tanh_derivative(self.c_states[t])
# (2) 각 게이트의 ν™œμ„±ν™” 이전 κ°’(z)에 λŒ€ν•œ κ·Έλž˜λ””μ–ΈνŠΈ 계산
dz_o = do * sigmoid_derivative(self.z_s[t][3*self.hidden_size:, :])
dc_tilde = dc * self.i_s[t]
dz_c = dc_tilde * tanh_derivative(self.z_s[t][2*self.hidden_size:3*self.hidden_size, :])
di = dc * self.c_tilde_s[t]
dz_i = di * sigmoid_derivative(self.z_s[t][self.hidden_size:2*self.hidden_size, :])
df = dc * c_prev
dz_f = df * sigmoid_derivative(self.z_s[t][:self.hidden_size, :])
# (3) 4개의 κ·Έλž˜λ””μ–ΈνŠΈλ₯Ό ν•˜λ‚˜λ‘œ ν•©μΉ˜κΈ°
dz = np.vstack((dz_f, dz_i, dz_c, dz_o))
# (4) νŒŒλΌλ―Έν„°μ— λŒ€ν•œ κ·Έλž˜λ””μ–ΈνŠΈ λˆ„μ 
self.dWx += dz @ xt.T
self.dWh += dz @ h_prev.T
self.db += dz
# (5) 이전 νƒ€μž„μŠ€ν…μœΌλ‘œ 전달할 κ·Έλž˜λ””μ–ΈνŠΈ 계산
dh_next = self.Wh.T @ dz
dc_next = self.f_s[t] * dc
# κ·Έλž˜λ””μ–ΈνŠΈ 폭발(exploding gradients)을 λ°©μ§€ν•˜κΈ° μœ„ν•œ 클리핑
for dparam in [self.dWx, self.dWh, self.db, self.dWhy, self.dby]:
np.clip(dparam, -5, 5, out=dparam)
# κ³„μ‚°λœ κ·Έλž˜λ””μ–ΈνŠΈλ₯Ό μ‚¬μš©ν•˜μ—¬ νŒŒλΌλ―Έν„°λ₯Ό μ—…λ°μ΄νŠΈν•©λ‹ˆλ‹€. (Gradient Descent)
def update(self):
self.Wx -= self.learning_rate * self.dWx
self.Wh -= self.learning_rate * self.dWh
self.b -= self.learning_rate * self.db
self.Why -= self.learning_rate * self.dWhy
self.by -= self.learning_rate * self.dby
# --- 4. λͺ¨λΈ ν•™μŠ΅ μ‹€ν–‰ ---
if __name__ == '__main__':
# λͺ¨λΈ μΈμŠ€ν„΄μŠ€ 생성
lstm = NumpyLSTM(input_size=input_dim, hidden_size=hidden_dim, output_size=output_dim, learning_rate=learning_rate)
# ν•™μŠ΅ 루프
for epoch in range(epochs):
# 1. μˆœμ „νŒŒ (μ˜€νƒ€ μˆ˜μ •λ¨)
loss, y_pred = lstm.forward(sample_input, sample_y)
# 2. μ—­μ „νŒŒ
lstm.backward()
# 3. κ°€μ€‘μΉ˜ μ—…λ°μ΄νŠΈ
lstm.update()
if epoch % 100 == 0:
print(f"Epoch {epoch}, Loss: {loss:.4f}")
print(f"Predicted Probs: {y_pred.flatten()}")
print(f"Predicted Class: {np.argmax(y_pred)}")
print("-" * 20)
print("\n--- Training Finished ---")
final_loss, final_y_pred = lstm.forward(sample_input, sample_y)
print(f"Final Loss: {final_loss:.4f}")
print(f"Final Prediction: Class {np.argmax(final_y_pred)} (Probs: {final_y_pred.flatten()})")
print(f"True Label: Class {np.argmax(sample_y)}")