DDPM-NumPy-from-Scratch / src /Scratch_Diffusion.py
brk9999's picture
Rename Scratch_Diffusion.py to src/Scratch_Diffusion.py
9fb77cd verified
Raw
History Blame Contribute Delete
6.28 kB
import numpy as np
# Computes alpha and beta schedules for DDPM using a cosine schedule
def cosineScheduler(T, s=0.008):
a = np.zeros(T+1)
beta = np.zeros(T)
for t in range(T+1):
a[t] = np.cos(((t/T + s)/(1+s)) * (np.pi/2)) ** 2
for t in range(T):
beta[t] = 1 - a[t+1]/a[t]
alpha = a[1:] / a[:-1]
return a, beta, alpha
# Converts timestep t into a high-dimensional embedding for the MLP
def time_embedding(t, embedding_size):
indices = np.arange(embedding_size) / 2
div_term = 10000**(indices / embedding_size)
t_vector = np.array([t]).reshape(-1, 1)
emb = np.empty(embedding_size)
emb[0::2] = np.sin(t_vector / div_term)[0, 0::2]
emb[1::2] = np.cos(t_vector / div_term)[0, 1::2]
return emb
# Adds noise to the input x0 over T timesteps to simulate forward diffusion
def forward_diffusion(x0, beta):
T = len(beta)
x = np.zeros((T+1,) + x0.shape)
x[0] = x0
for t in range(1, T+1):
noise = np.random.normal(0, 1, x0.shape)
x[t] = np.sqrt(1 - beta[t-1]) * x[t-1] + np.sqrt(beta[t-1]) * noise
return x
# Prepares input for MLP, returns predicted noise (epsilon_hat) and intermediate activations
def forward_diffusion_for_backward(x_t, t, weights, biases, activations, hidden_size):
t_emb = time_embedding(t, hidden_size)
h = np.concatenate([x_t.flatten(), t_emb]) # Combine input and time embedding
model_cache=[]
a = h
for i in range(len(weights)):
z = a @ weights[i] + biases[i]
model_cache.append(a)
# Apply activation functions
if activations[i] == 'relu':
a = np.maximum(0, z)
elif activations[i] == 'tanh':
a = np.tanh(z)
else:
a = z
epsilon_hat = a # predicted noise
return epsilon_hat, model_cache
T = 10
alpha_bar, beta, alpha = cosineScheduler(T)
x0 = np.ones((3, 3))
x_forward = forward_diffusion(x0, beta)
print("x_forward shape:", x_forward.shape)
# Computes gradients of weights and biases via backpropagation using MSE loss
def diffusion_backward(x_t, epsilon, epsilon_hat, model_cache, weights, biases, activations):
batch_size = 1
# Derivative of loss with respect to epsilon_hat
dl_depsilon_hat = (2 * (epsilon_hat.flatten() - epsilon.flatten()) / batch_size).reshape(1, -1)
grads_W = [] # weight gradients
grads_b = [] # bias gradients
delta = dl_depsilon_hat.copy()
for i in reversed(range(len(weights))):
a_prev = model_cache[i].reshape(1, -1)
W = weights[i]
b = biases[i]
dW = a_prev.T @ delta # Gradient of loss w.r.t weight
db = np.sum(delta, axis=0)
grads_W.insert(0, dW)
grads_b.insert(0, db)
if i != 0:
da_prev = delta @ W.T
# Derivative of activations
if activations[i-1] == "relu":
dz = da_prev * (a_prev > 0)
elif activations[i-1] == "tanh":
dz = da_prev * (1 - a_prev**2)
else:
dz = da_prev
delta = dz
return grads_W, grads_b
# Performs AdamW update: combines Adam optimization with weight decay
def AdamW(weights, dl_dw, m_prev, u_prev, t_step, n, Y=0.01, b1 = 0.9, b2 = 0.999, eps=1e-6):
weight_decay = Y * weights
m_new = b1 * m_prev + (1-b1) * dl_dw
u_new = b2 * u_prev + (1-b2) * (dl_dw ** 2)
m_hat = m_new/ (1- (b1 ** t_step))
u_hat = u_new/ (1- (b2 ** t_step))
adam_update = n * (m_hat/ (np.sqrt(u_hat) + eps))
weights_new = weights- n * (adam_update + weight_decay)
return weights_new, m_new, u_new
# MLP setup
input_size = 3*3 +8
hidden_sizes = [64,64,9] # neurons in hidden layers
activations = ["relu", "relu", "linear"]
# Initialize weights, biases, and AdamW moments
weights=[np.random.randn(input_size, hidden_sizes[0])*0.1,
np.random.randn(hidden_sizes[0], hidden_sizes[1])* 0.1,
np.random.randn(hidden_sizes[1], hidden_sizes[2])* 0.1]
biases = [np.zeros(h) for h in hidden_sizes]
m_prev = [np.zeros_like(w) for w in weights]
u_prev = [np.zeros_like(w) for w in weights]
T = 10
alpha_bar, beta, alpha = cosineScheduler(T)
x0 = np.ones((3,3)) # example input
n_steps = 10000
lr = 0.005
# Training loop
for step in range(1, n_steps+1):
t = np.random.randint(1, T+1)
x_forward = forward_diffusion(x0, beta)
x_t = x_forward[t]
epsilon = (x_t - np.sqrt(alpha_bar[t-1]) * x0) / np.sqrt(1 - alpha_bar[t-1])
epsilon_hat, model_cache = forward_diffusion_for_backward(x_t, t, weights, biases, activations, hidden_size=8)
grads_w, grads_b = diffusion_backward(x_t.flatten(), epsilon.flatten(), epsilon_hat, model_cache, weights, biases, activations)
for i in range(len(weights)):
weights[i], m_prev[i], u_prev[i] = AdamW(weights[i], grads_w[i], m_prev[i], u_prev[i], step, lr)
biases[i], _, _ = AdamW(biases[i], grads_b[i], np.zeros_like(grads_b[i]), np.zeros_like(grads_b[i]), step, lr)
# Print MSE loss every 500 steps
if step % 500 == 0:
loss = np.mean((epsilon_hat - epsilon.flatten()) ** 2)
print(f"Step {step}, MSE Loss {loss:.4f}")
# Reconstruct x0 from x_T using predicted noise
def reverseDiffusion(x_T, beta, alpha, alpha_bar, weights, biases, activations, hidden_size):
T = len(beta)
x = np.zeros((T+1,) + x_T.shape)
x[T] = x_T
for t in range(T-1, -1, -1):
z = np.random.normal(0, 1, x_T.shape)
stochastic = z * np.sqrt(beta[t])
eps_hat, _ = forward_diffusion_for_backward(x[t+1], t+1, weights, biases, activations, hidden_size)
eps_hat = eps_hat.reshape(x_T.shape)
x[t] = (1/np.sqrt(alpha[t])) * (x[t+1] - beta[t] / np.sqrt(1-alpha_bar[t]) * eps_hat) + stochastic
return x
# Test reconstruction
x_T = x_forward[-1]
x_reverse = reverseDiffusion(
x_T, beta, alpha, alpha_bar,
weights=weights, biases=biases, activations=activations, hidden_size=8
)
print("x0 (original):\n", x0)
print("x_T (noisy last step):\n", x_T)
print("x_reverse[-1] (after reverse diffusion):\n", x_reverse[-1])