0-99_Classification / 2.CNN /training-100.py
Eli181927's picture
Upload 2 files
6764326 verified
"""
Section 1: Imports and network configurations
"""
from __future__ import annotations
import numpy as np
import argparse
import csv
from pathlib import Path
from copy import deepcopy
from numpy.lib.stride_tricks import sliding_window_view
BASE_DIR = Path(__file__).resolve().parent
ARCHIVE_DIR = BASE_DIR / "archive"
DATASET_PATH = ARCHIVE_DIR / "mnist_compressed.npz"
np.random.seed(42)
# Network configuration
IMAGE_CHANNELS = 1
IMAGE_HEIGHT = 28
IMAGE_WIDTH = 56
INPUT_DIM = IMAGE_HEIGHT * IMAGE_WIDTH # flattened input for compatibility
CONV_FILTERS = (16, 32)
KERNEL_SIZE = 3
POOL_SIZE = 2
FC_HIDDEN_DIM = 256
OUTPUT_DIM = 100
EPOCHS = 20
BATCH_SIZE = 256
LEARNING_RATE = 1e-3
REG_LAMBDA = 1e-4
DROP_RATE_FC = 0.4
EARLY_STOP_PATIENCE = 5
EARLY_STOP_MIN_DELTA = 1e-3
MAX_SHIFT_PIXELS = 2
CONTRAST_JITTER_STD = 0.1
BETA1 = 0.9
BETA2 = 0.999
EPSILON = 1e-8
DEV_SIZE = 10_000 # held-out validation set size
def save_history_to_csv(history, filepath):
target_path = Path(filepath)
target_path.parent.mkdir(parents=True, exist_ok=True)
with target_path.open("w", newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=("epoch", "loss", "train_acc", "dev_acc"))
writer.writeheader()
for row in history:
writer.writerow(row)
def save_sweep_summary(results, filepath, *, include_trial=False):
target_path = Path(filepath)
target_path.parent.mkdir(parents=True, exist_ok=True)
fieldnames = ["learning_rate", "reg_lambda", "dev_acc"]
if include_trial:
fieldnames.insert(0, "trial")
with target_path.open("w", newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for entry in results:
row = {
"learning_rate": float(entry["learning_rate"]),
"reg_lambda": float(entry["reg_lambda"]),
"dev_acc": float(entry["dev_acc"]),
}
if include_trial:
row["trial"] = int(entry["trial"])
writer.writerow(row)
"""
Section 2: Loads the input data, transposes (so arrays are feature x samples) and normalises it (scales features to 0-1)
"""
def load_data(path: Path, dev_size: int = DEV_SIZE):
"""
Load the MNIST-100 dataset from the compressed archive and return
training / validation splits flattened to (features, samples).
"""
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"Dataset not found at '{path}'")
with np.load(path) as data:
train_images = data["train_images"].astype(np.float32)
train_labels = data["train_labels"].astype(np.int64)
test_images = data["test_images"].astype(np.float32)
test_labels = data["test_labels"].astype(np.int64)
# Flatten images to column-major format (features, samples)
X_full = train_images.reshape(train_images.shape[0], -1).T # (input_dim, m)
Y_full = train_labels
# Shuffle before splitting to validation
permutation = np.random.permutation(X_full.shape[1])
X_full = X_full[:, permutation]
Y_full = Y_full[permutation]
X_dev = X_full[:, :dev_size]
Y_dev = Y_full[:dev_size]
X_train = X_full[:, dev_size:]
Y_train = Y_full[dev_size:]
# Also flatten the test set for later reuse if needed.
X_test = test_images.reshape(test_images.shape[0], -1).T
return X_train, Y_train, X_dev, Y_dev, X_test, test_labels
"""
Section 3: Normalises the features [(0, 255)] to [(0, 1)]
"""
def normalize_features(X_train, X_dev):
"""
Normalize features to zero mean and unit variance using the training set.
"""
X_train /= 255.0
X_dev /= 255.0
mean = np.mean(X_train, axis=1, keepdims=True)
std = np.std(X_train, axis=1, keepdims=True) + 1e-8
X_train = (X_train - mean) / std
X_dev = (X_dev - mean) / std
return X_train, X_dev, mean, std
"""
Section 4: Initialises the parameters (layers, weights and biases) and adam optimizer
"""
def init_params():
params = {}
conv1_fan_in = IMAGE_CHANNELS * KERNEL_SIZE * KERNEL_SIZE
params["conv1_W"] = (
np.random.randn(CONV_FILTERS[0], IMAGE_CHANNELS, KERNEL_SIZE, KERNEL_SIZE) * np.sqrt(2.0 / conv1_fan_in)
).astype(np.float32)
params["conv1_b"] = np.zeros((CONV_FILTERS[0], 1), dtype=np.float32)
conv2_fan_in = CONV_FILTERS[0] * KERNEL_SIZE * KERNEL_SIZE
params["conv2_W"] = (
np.random.randn(CONV_FILTERS[1], CONV_FILTERS[0], KERNEL_SIZE, KERNEL_SIZE) * np.sqrt(2.0 / conv2_fan_in)
).astype(np.float32)
params["conv2_b"] = np.zeros((CONV_FILTERS[1], 1), dtype=np.float32)
height_after_pool1 = IMAGE_HEIGHT // POOL_SIZE
width_after_pool1 = IMAGE_WIDTH // POOL_SIZE
height_after_pool2 = height_after_pool1 // POOL_SIZE
width_after_pool2 = width_after_pool1 // POOL_SIZE
flattened_dim = CONV_FILTERS[1] * height_after_pool2 * width_after_pool2
params["fc1_W"] = (
np.random.randn(FC_HIDDEN_DIM, flattened_dim) * np.sqrt(2.0 / flattened_dim)
).astype(np.float32)
params["fc1_b"] = np.zeros((FC_HIDDEN_DIM, 1), dtype=np.float32)
params["fc2_W"] = (
np.random.randn(OUTPUT_DIM, FC_HIDDEN_DIM) * np.sqrt(2.0 / FC_HIDDEN_DIM)
).astype(np.float32)
params["fc2_b"] = np.zeros((OUTPUT_DIM, 1), dtype=np.float32)
return params
def init_adam(params):
v = {}
s = {}
for key, value in params.items():
v[key] = np.zeros_like(value)
s[key] = np.zeros_like(value)
return v, s
"""
Section 5: ReLu activation function and backward ReLu function
"""
def relu(Z):
return np.maximum(0.0, Z)
def relu_backward(Z):
return (Z > 0).astype(np.float32)
"""
Section 6: Reshapes the flattened input to 4D tensors (batch, channels, height, width) for the convolutional layers
"""
def reshape_flat_to_images(X: np.ndarray, *, batch_size: int | None = None):
"""
Convert flattened columns (features, batch) into 4D tensors (batch, channels, height, width).
"""
_, m = X.shape
if batch_size is not None and m != batch_size:
raise ValueError(f"Expected batch size {batch_size}, got {m}")
images = X.T.reshape(m, IMAGE_HEIGHT, IMAGE_WIDTH)
return images[:, None, :, :] # add channel dim
"""
Section 7: Convolutional layer forward pass and backward pass
"""
def im2col(X, kernel_h, kernel_w, stride, padding):
X_padded = np.pad(
X,
((0, 0), (0, 0), (padding, padding), (padding, padding)),
mode="constant",
)
windows = sliding_window_view(X_padded, (kernel_h, kernel_w), axis=(2, 3))
# windows shape: (batch, channels, out_height, out_width, kernel_h, kernel_w)
batch_size, channels, out_height, out_width, _, _ = windows.shape
cols = windows.transpose(0, 2, 3, 1, 4, 5).reshape(batch_size * out_height * out_width, channels * kernel_h * kernel_w)
return X_padded, cols, out_height, out_width
def col2im(cols, X_shape, kernel_h, kernel_w, stride, padding, out_height, out_width):
batch_size, channels, height, width = X_shape
cols_reshaped = cols.reshape(batch_size, out_height, out_width, channels, kernel_h, kernel_w)
cols_reshaped = cols_reshaped.transpose(0, 3, 1, 2, 4, 5)
X_padded = np.zeros((batch_size, channels, height + 2 * padding, width + 2 * padding), dtype=np.float32)
for h_idx in range(out_height):
h_start = h_idx * stride
h_end = h_start + kernel_h
for w_idx in range(out_width):
w_start = w_idx * stride
w_end = w_start + kernel_w
X_padded[:, :, h_start:h_end, w_start:w_end] += cols_reshaped[:, :, h_idx, w_idx, :, :]
if padding > 0:
return X_padded[:, :, padding:-padding, padding:-padding]
return X_padded
def conv_forward(X, W, b, *, stride: int = 1, padding: int = 0):
batch_size, in_channels, height, width = X.shape
num_filters, _, kernel_h, kernel_w = W.shape
X_padded, cols, out_height, out_width = im2col(X, kernel_h, kernel_w, stride, padding)
W_col = W.reshape(num_filters, -1)
out_cols = cols @ W_col.T # (batch*out_height*out_width, num_filters)
out = out_cols.reshape(batch_size, out_height, out_width, num_filters).transpose(0, 3, 1, 2)
out = out.astype(np.float32, copy=False)
out += b.reshape(1, num_filters, 1, 1)
cache = {
"X": X,
"X_padded": X_padded,
"W": W,
"stride": stride,
"padding": padding,
"kernel_h": kernel_h,
"kernel_w": kernel_w,
"out_height": out_height,
"out_width": out_width,
"cols": cols,
"W_col": W_col,
"output_shape": out.shape,
}
return out, cache
def conv_backward(dout, cache):
X = cache["X"]
W = cache["W"]
stride = cache["stride"]
padding = cache["padding"]
kernel_h = cache["kernel_h"]
kernel_w = cache["kernel_w"]
out_height = cache["out_height"]
out_width = cache["out_width"]
cols = cache["cols"]
W_col = cache["W_col"]
batch_size, _, _, _ = X.shape
num_filters = W.shape[0]
dout_cols = dout.transpose(0, 2, 3, 1).reshape(batch_size * out_height * out_width, num_filters)
dW_col = dout_cols.T @ cols
dW = dW_col.reshape(W.shape)
db = np.sum(dout, axis=(0, 2, 3)).reshape(num_filters, 1)
dcols = dout_cols @ W_col
dX = col2im(dcols, X.shape, kernel_h, kernel_w, stride, padding, out_height, out_width)
return dX, dW, db
"""
Section 8: Max pooling layer forward pass and backward pass
"""
def maxpool_forward(X, *, pool_size: int = 2, stride: int = 2):
batch_size, channels, height, width = X.shape
out_height = (height - pool_size) // stride + 1
out_width = (width - pool_size) // stride + 1
out = np.zeros((batch_size, channels, out_height, out_width), dtype=np.float32)
for h_idx in range(out_height):
h_start = h_idx * stride
h_end = h_start + pool_size
for w_idx in range(out_width):
w_start = w_idx * stride
w_end = w_start + pool_size
window = X[:, :, h_start:h_end, w_start:w_end]
max_vals = np.max(window, axis=(2, 3))
out[:, :, h_idx, w_idx] = max_vals
cache = {
"X": X,
"pool_size": pool_size,
"stride": stride,
"output_shape": out.shape,
}
return out, cache
def maxpool_backward(dout, cache):
X = cache["X"]
pool_size = cache["pool_size"]
stride = cache["stride"]
batch_size, channels, out_height, out_width = dout.shape
dX = np.zeros_like(X)
for h_idx in range(out_height):
h_start = h_idx * stride
h_end = h_start + pool_size
for w_idx in range(out_width):
w_start = w_idx * stride
w_end = w_start + pool_size
window = X[:, :, h_start:h_end, w_start:w_end]
max_vals = np.max(window, axis=(2, 3), keepdims=True)
mask = (window == max_vals).astype(np.float32)
mask_sum = np.sum(mask, axis=(2, 3), keepdims=True)
mask /= np.maximum(mask_sum, 1.0)
grad_slice = dout[:, :, h_idx, w_idx][:, :, None, None]
dX[:, :, h_start:h_end, w_start:w_end] += mask * grad_slice
return dX
def softmax(Z):
Z_shift = Z - np.max(Z, axis=0, keepdims=True)
expZ = np.exp(Z_shift)
return expZ / np.sum(expZ, axis=0, keepdims=True)
def one_hot(Y, num_classes=OUTPUT_DIM):
one_hot_y = np.zeros((num_classes, Y.size), dtype=np.float32)
one_hot_y[Y, np.arange(Y.size)] = 1.0
return one_hot_y
"""
Section 9: Forward propagation and comptutes for loss
"""
def forward_prop(
X,
params,
*,
training: bool = False,
dropout_rate: float = DROP_RATE_FC,
):
batch_size = X.shape[1]
images = reshape_flat_to_images(X, batch_size=batch_size)
padding = KERNEL_SIZE // 2
conv1_out, conv1_cache = conv_forward(images, params["conv1_W"], params["conv1_b"], stride=1, padding=padding)
relu1 = relu(conv1_out)
pool1_out, pool1_cache = maxpool_forward(relu1, pool_size=POOL_SIZE, stride=POOL_SIZE)
conv2_out, conv2_cache = conv_forward(pool1_out, params["conv2_W"], params["conv2_b"], stride=1, padding=padding)
relu2 = relu(conv2_out)
pool2_out, pool2_cache = maxpool_forward(relu2, pool_size=POOL_SIZE, stride=POOL_SIZE)
flattened = pool2_out.reshape(batch_size, -1).T # (features_flat, batch)
Z_fc1 = params["fc1_W"] @ flattened + params["fc1_b"]
A_fc1 = relu(Z_fc1)
dropout_mask = None
keep_prob = 1.0 - dropout_rate
if training and dropout_rate > 0.0:
dropout_mask = (np.random.rand(*A_fc1.shape) >= dropout_rate).astype(np.float32)
A_fc1 = (A_fc1 * dropout_mask) / keep_prob
Z_fc2 = params["fc2_W"] @ A_fc1 + params["fc2_b"]
probs = softmax(Z_fc2)
cache = {
"X": X,
"images": images,
"conv1_out": conv1_out,
"conv1_cache": conv1_cache,
"pool1_cache": pool1_cache,
"conv2_out": conv2_out,
"conv2_cache": conv2_cache,
"pool2_cache": pool2_cache,
"flattened": flattened,
"Z_fc1": Z_fc1,
"A_fc1": A_fc1,
"dropout_mask": dropout_mask,
"keep_prob": keep_prob,
"dropout_rate": dropout_rate,
"Z_fc2": Z_fc2,
"probs": probs,
}
return cache, probs
def compute_loss(probs, Y_batch, params, reg_lambda):
m = Y_batch.shape[1]
log_likelihood = -np.log(probs + 1e-9) * Y_batch
data_loss = np.sum(log_likelihood) / m
l2_penalty = 0.0
for key in ("conv1_W", "conv2_W", "fc1_W", "fc2_W"):
l2_penalty += np.sum(np.square(params[key]))
l2_loss = (reg_lambda / (2 * m)) * l2_penalty
return data_loss + l2_loss
"""
Section 10: Back propagation for the CNN model
"""
def back_prop(cache, Y_batch, params, reg_lambda, dropout_rate):
m = Y_batch.shape[1]
grads = {}
probs = cache["probs"]
A_fc1 = cache["A_fc1"]
Z_fc1 = cache["Z_fc1"]
flattened = cache["flattened"]
dropout_mask = cache["dropout_mask"]
keep_prob = cache["keep_prob"]
dZ_fc2 = probs - Y_batch
grads["fc2_W"] = (dZ_fc2 @ A_fc1.T) / m + (reg_lambda / m) * params["fc2_W"]
grads["fc2_b"] = np.sum(dZ_fc2, axis=1, keepdims=True) / m
dA_fc1 = params["fc2_W"].T @ dZ_fc2
if dropout_mask is not None:
dA_fc1 = (dA_fc1 * dropout_mask) / keep_prob
dZ_fc1 = dA_fc1 * relu_backward(Z_fc1)
grads["fc1_W"] = (dZ_fc1 @ flattened.T) / m + (reg_lambda / m) * params["fc1_W"]
grads["fc1_b"] = np.sum(dZ_fc1, axis=1, keepdims=True) / m
dFlatten = params["fc1_W"].T @ dZ_fc1 # (flatten_dim, batch)
pool2_shape = cache["pool2_cache"]["output_shape"]
dPool2 = dFlatten.T.reshape(pool2_shape)
dRelu2_input = maxpool_backward(dPool2, cache["pool2_cache"])
dConv2 = dRelu2_input * relu_backward(cache["conv2_out"])
dPool1_input, dConv2_W, dConv2_b = conv_backward(dConv2, cache["conv2_cache"])
grads["conv2_W"] = dConv2_W / m + (reg_lambda / m) * params["conv2_W"]
grads["conv2_b"] = dConv2_b / m
dRelu1_input = maxpool_backward(dPool1_input, cache["pool1_cache"])
dConv1 = dRelu1_input * relu_backward(cache["conv1_out"])
_, dConv1_W, dConv1_b = conv_backward(dConv1, cache["conv1_cache"])
grads["conv1_W"] = dConv1_W / m + (reg_lambda / m) * params["conv1_W"]
grads["conv1_b"] = dConv1_b / m
return grads
"""
Section 11: Updates the parameters using the adam optimizer
"""
def update_params_adam(params, grads, v, s, t, learning_rate):
updated_params = {}
for key in params:
v[key] = BETA1 * v[key] + (1 - BETA1) * grads[key]
s[key] = BETA2 * s[key] + (1 - BETA2) * (grads[key] ** 2)
v_corrected = v[key] / (1 - BETA1 ** t)
s_corrected = s[key] / (1 - BETA2 ** t)
updated_params[key] = params[key] - learning_rate * v_corrected / (np.sqrt(s_corrected) + EPSILON)
return updated_params, v, s
def get_predictions(probs):
return np.argmax(probs, axis=0)
def get_accuracy(probs, labels):
predictions = get_predictions(probs)
return np.mean(predictions == labels)
"""
Section 12: Augments the batch with horizontal shifts and contrast/brightness jitter
"""
def augment_batch(
X_batch,
*,
image_shape: tuple[int, int] = (28, 56),
max_shift: int = MAX_SHIFT_PIXELS,
contrast_jitter_std: float = CONTRAST_JITTER_STD,
):
"""
Apply lightweight augmentation: horizontal shifts and contrast/brightness jitter.
"""
if max_shift <= 0 and contrast_jitter_std <= 0.0:
return X_batch
batch_size = X_batch.shape[1]
images = X_batch.T.reshape(batch_size, *image_shape)
if max_shift > 0:
shifts = np.random.randint(-max_shift, max_shift + 1, size=batch_size)
for idx, shift in enumerate(shifts):
if shift > 0:
shifted = np.roll(images[idx], shift, axis=1)
shifted[:, :shift] = 0.0
images[idx] = shifted
elif shift < 0:
shift = -shift
shifted = np.roll(images[idx], -shift, axis=1)
shifted[:, -shift:] = 0.0
images[idx] = shifted
if contrast_jitter_std > 0.0:
scale = 1.0 + np.random.normal(0.0, contrast_jitter_std, size=batch_size)
bias = np.random.normal(0.0, contrast_jitter_std, size=batch_size)
images *= scale[:, None, None]
images += bias[:, None, None]
np.clip(images, -3.0, 3.0, out=images)
return images.reshape(batch_size, -1).T
"""
Section 13: Trains the model + evaluates the model
"""
def train_model(
X_train,
Y_train,
X_dev,
Y_dev,
*,
epochs: int = EPOCHS,
batch_size: int = BATCH_SIZE,
learning_rate: float = LEARNING_RATE,
reg_lambda: float = REG_LAMBDA,
dropout_rate: float = DROP_RATE_FC,
early_stop_patience: int = EARLY_STOP_PATIENCE,
early_stop_min_delta: float = EARLY_STOP_MIN_DELTA,
use_augmentation: bool = True,
):
params = init_params()
v, s = init_adam(params)
m_train = X_train.shape[1]
global_step = 0
best_dev_acc = -np.inf
best_params = deepcopy(params)
patience_counter = 0
history = []
for epoch in range(1, epochs + 1):
permutation = np.random.permutation(m_train)
X_shuffled = X_train[:, permutation]
Y_shuffled = Y_train[permutation]
epoch_loss = 0.0
for start in range(0, m_train, batch_size):
end = min(start + batch_size, m_train)
X_batch = X_shuffled[:, start:end]
Y_batch_indices = Y_shuffled[start:end]
Y_batch = one_hot(Y_batch_indices)
if use_augmentation:
X_batch = augment_batch(X_batch.copy())
cache, probs = forward_prop(
X_batch,
params,
training=True,
dropout_rate=dropout_rate,
)
loss = compute_loss(probs, Y_batch, params, reg_lambda)
grads = back_prop(cache, Y_batch, params, reg_lambda, dropout_rate)
global_step += 1
params, v, s = update_params_adam(params, grads, v, s, global_step, learning_rate)
epoch_loss += loss * (end - start)
epoch_loss /= m_train
_, train_probs = forward_prop(X_train, params, training=False, dropout_rate=dropout_rate)
train_accuracy = get_accuracy(train_probs, Y_train)
_, dev_probs = forward_prop(X_dev, params, training=False, dropout_rate=dropout_rate)
dev_accuracy = get_accuracy(dev_probs, Y_dev)
print(
f"Epoch {epoch:02d} - loss: {epoch_loss:.4f} "
f"- train_acc: {train_accuracy:.4f} - dev_acc: {dev_accuracy:.4f}"
)
history.append(
{
"epoch": epoch,
"loss": epoch_loss,
"train_acc": train_accuracy,
"dev_acc": dev_accuracy,
}
)
if dev_accuracy > best_dev_acc + early_stop_min_delta:
best_dev_acc = dev_accuracy
best_params = deepcopy(params)
patience_counter = 0
else:
patience_counter += 1
if patience_counter >= early_stop_patience:
print(
f"Early stopping triggered at epoch {epoch:02d}. "
f"Best dev_acc={best_dev_acc:.4f}"
)
break
return best_params, history
def evaluate(params, X, Y):
_, probs = forward_prop(X, params, training=False)
predictions = get_predictions(probs)
accuracy = np.mean(predictions == Y)
return predictions, accuracy
"""
Section 14: Trains the model once
"""
def train_once(
learning_rate: float,
reg_lambda: float,
*,
epochs: int = EPOCHS,
batch_size: int = BATCH_SIZE,
dropout_rate: float = DROP_RATE_FC,
history_path: Path | None = None,
):
"""
Convenience wrapper for hyperparameter sweeps. Returns trained params and dev accuracy.
"""
X_train, Y_train, X_dev, Y_dev, _, _ = load_data(DATASET_PATH)
X_train, X_dev, mean, std = normalize_features(X_train, X_dev)
params, history = train_model(
X_train,
Y_train,
X_dev,
Y_dev,
epochs=epochs,
batch_size=batch_size,
learning_rate=learning_rate,
reg_lambda=reg_lambda,
dropout_rate=dropout_rate,
)
_, dev_accuracy = evaluate(params, X_dev, Y_dev)
if history_path is not None:
save_history_to_csv(history, history_path)
return params, dev_accuracy, mean, std, history
"""
Section 15: Hyperparameter sweep for learning rate, regularization and dropout rate
"""
def lr_sweep(
learning_rates: list[float],
*,
reg_lambda: float = REG_LAMBDA,
epochs: int = EPOCHS,
batch_size: int = BATCH_SIZE,
dropout_rate: float = DROP_RATE_FC,
history_dir: Path | None = None,
summary_path: Path | None = None,
):
results = []
history_directory = Path(history_dir) if history_dir is not None else None
if history_directory is not None:
history_directory.mkdir(parents=True, exist_ok=True)
for lr in learning_rates:
history_path = None
if history_directory is not None:
safe_lr = f"{lr:.2e}".replace("+", "").replace("-", "m")
history_path = history_directory / f"lr_{safe_lr}.csv"
_, dev_acc, _, _, history = train_once(
lr,
reg_lambda,
epochs=epochs,
batch_size=batch_size,
dropout_rate=dropout_rate,
history_path=history_path,
)
results.append(
{
"learning_rate": float(lr),
"reg_lambda": float(reg_lambda),
"dev_acc": float(dev_acc),
"history": history,
}
)
if summary_path is not None:
save_sweep_summary(results, summary_path)
return results
def random_search_hparams(
num_trials: int,
lr_bounds: tuple[float, float],
reg_bounds: tuple[float, float],
*,
epochs: int = EPOCHS,
batch_size: int = BATCH_SIZE,
dropout_rate: float = DROP_RATE_FC,
seed: int | None = None,
history_dir: Path | None = None,
summary_path: Path | None = None,
):
if num_trials <= 0:
raise ValueError("num_trials must be positive")
lr_min, lr_max = lr_bounds
reg_min, reg_max = reg_bounds
if lr_min <= 0 or lr_max <= 0:
raise ValueError("Learning rate bounds must be positive")
if reg_min <= 0 or reg_max <= 0:
raise ValueError("Regularization bounds must be positive")
rng = np.random.default_rng(seed)
history_directory = Path(history_dir) if history_dir is not None else None
if history_directory is not None:
history_directory.mkdir(parents=True, exist_ok=True)
results = []
log_lr_min, log_lr_max = np.log(lr_min), np.log(lr_max)
log_reg_min, log_reg_max = np.log(reg_min), np.log(reg_max)
for trial in range(1, num_trials + 1):
lr_sample = float(np.exp(rng.uniform(log_lr_min, log_lr_max)))
reg_sample = float(np.exp(rng.uniform(log_reg_min, log_reg_max)))
history_path = None
if history_directory is not None:
safe_lr = f"{lr_sample:.2e}".replace("+", "").replace("-", "m")
safe_reg = f"{reg_sample:.2e}".replace("+", "").replace("-", "m")
history_path = history_directory / f"trial_{trial:02d}_lr-{safe_lr}_reg-{safe_reg}.csv"
_, dev_acc, _, _, history = train_once(
lr_sample,
reg_sample,
epochs=epochs,
batch_size=batch_size,
dropout_rate=dropout_rate,
history_path=history_path,
)
results.append(
{
"trial": trial,
"learning_rate": lr_sample,
"reg_lambda": reg_sample,
"dev_acc": float(dev_acc),
"history": history,
}
)
results.sort(key=lambda item: item["dev_acc"], reverse=True)
if summary_path is not None:
save_sweep_summary(results, summary_path, include_trial=True)
return results
def auto_train_pipeline(
*,
trials: int,
lr_bounds: tuple[float, float],
reg_bounds: tuple[float, float],
search_epochs: int,
final_epochs: int,
batch_size: int,
dropout_rate: float,
final_batch_size: int | None,
final_dropout_rate: float | None,
history_dir: Path | None,
seed: int | None,
output_model_path: Path | None,
):
history_directory = Path(history_dir) if history_dir is not None else None
if history_directory is not None:
history_directory.mkdir(parents=True, exist_ok=True)
search_summary_path = None
if history_directory is not None:
search_summary_path = history_directory / "random_search_summary.csv"
results = random_search_hparams(
trials,
lr_bounds,
reg_bounds,
epochs=search_epochs,
batch_size=batch_size,
dropout_rate=dropout_rate,
seed=seed,
history_dir=history_directory / "search_histories" if history_directory is not None else None,
summary_path=search_summary_path,
)
best = results[0]
print(
f"\nBest search trial -> LR={best['learning_rate']:.3e}, "
f"reg={best['reg_lambda']:.3e}, dev_acc={best['dev_acc']:.4f}"
)
final_dropout = final_dropout_rate if final_dropout_rate is not None else dropout_rate
final_history_path = None
if history_directory is not None:
final_history_path = history_directory / "final_train_history.csv"
params, final_dev_acc, mean, std, final_history = train_once(
best["learning_rate"],
best["reg_lambda"],
epochs=final_epochs,
batch_size=final_batch_size or batch_size,
dropout_rate=final_dropout,
history_path=final_history_path,
)
model_output_path = output_model_path if output_model_path is not None else ARCHIVE_DIR / "trained_model_mnist100.npz"
save_model(params, mean, std, model_output_path)
return {
"best_trial": best,
"final_dev_acc": final_dev_acc,
"model_path": Path(model_output_path),
"final_history": final_history,
}
"""
Section 16: Saves the model
"""
def save_model(params, mean, std, filepath=None):
target_path = Path(filepath) if filepath is not None else ARCHIVE_DIR / "trained_model_mnist100.npz"
target_path.parent.mkdir(parents=True, exist_ok=True)
print(f"\nSaving trained model to '{target_path}'...")
np.savez(target_path, **params, mean=mean, std=std)
print("Model saved successfully!")
"""
Section 17: Main function
"""
def main():
parser = argparse.ArgumentParser(description="MNIST-100 training and tuning utilities.")
parser.add_argument(
"--mode",
choices=("train", "lr-sweep", "random-search", "auto-train"),
default="train",
help="Select high-level action.",
)
parser.add_argument("--learning-rate", type=float, default=LEARNING_RATE, help="Base learning rate.")
parser.add_argument("--learning-rates", type=str, help="Comma-separated list for LR sweep.")
parser.add_argument("--reg-lambda", type=float, default=REG_LAMBDA, help="L2 regularization strength.")
parser.add_argument("--lr-min", type=float, default=1e-4, help="Min LR for random search (exclusive mode).")
parser.add_argument("--lr-max", type=float, default=5e-3, help="Max LR for random search.")
parser.add_argument("--reg-min", type=float, default=1e-5, help="Min lambda for random search.")
parser.add_argument("--reg-max", type=float, default=1e-3, help="Max lambda for random search.")
parser.add_argument("--trials", type=int, default=5, help="Number of random-search trials.")
parser.add_argument("--epochs", type=int, default=EPOCHS, help="Train epochs per run.")
parser.add_argument("--batch-size", type=int, default=BATCH_SIZE, help="Mini-batch size.")
parser.add_argument(
"--final-epochs",
type=int,
default=40,
help="Epoch budget for the final training run in auto-train mode.",
)
parser.add_argument(
"--final-batch-size",
type=int,
help="Mini-batch size for the final training run (defaults to --batch-size).",
)
parser.add_argument(
"--dropout",
type=float,
help="Override dropout rate for the fully connected layer.",
)
parser.add_argument(
"--final-dropout",
type=float,
help="Dropout rate for the final training pass in auto-train mode.",
)
parser.add_argument(
"--history-dir",
type=Path,
help="Directory for saving training histories (CSV).",
)
parser.add_argument(
"--output-model",
type=Path,
help="Path to save the trained model (.npz). Defaults to archive/trained_model_mnist100.npz.",
)
parser.add_argument("--seed", type=int, help="Random seed for random search.")
args = parser.parse_args()
dropout_rate = DROP_RATE_FC if args.dropout is None else float(args.dropout)
if not 0.0 <= dropout_rate < 1.0:
raise ValueError("Dropout rate must be in [0, 1).")
final_dropout_rate = None
if args.final_dropout is not None:
final_dropout_rate = float(args.final_dropout)
if not 0.0 <= final_dropout_rate < 1.0:
raise ValueError("Final dropout rate must be in [0, 1).")
history_dir = args.history_dir
if history_dir is not None:
history_dir = Path(history_dir)
history_dir.mkdir(parents=True, exist_ok=True)
if args.mode == "train":
print(f"Loading dataset from '{DATASET_PATH}'...")
X_train, Y_train, X_dev, Y_dev, _, _ = load_data(DATASET_PATH)
X_train, X_dev, mean, std = normalize_features(X_train, X_dev)
print(
f"Training samples: {X_train.shape[1]}, features: {X_train.shape[0]} "
f"| Dev samples: {X_dev.shape[1]}"
)
params, history = train_model(
X_train,
Y_train,
X_dev,
Y_dev,
epochs=args.epochs,
batch_size=args.batch_size,
learning_rate=args.learning_rate,
reg_lambda=args.reg_lambda,
dropout_rate=dropout_rate,
)
_, dev_accuracy = evaluate(params, X_dev, Y_dev)
print(f"\nFinal Dev Accuracy: {dev_accuracy:.4f}")
if history_dir is not None:
save_history_to_csv(history, history_dir / "train_history.csv")
save_model(params, mean, std, args.output_model or ARCHIVE_DIR / "trained_model_mnist100.npz")
elif args.mode == "lr-sweep":
if args.learning_rates is None:
raise ValueError("LR sweep mode requires --learning-rates.")
lr_values = [float(value.strip()) for value in args.learning_rates.split(",") if value.strip()]
print(f"Running LR sweep over {lr_values}...")
summary_path = history_dir / "lr_sweep_summary.csv" if history_dir is not None else None
results = lr_sweep(
lr_values,
reg_lambda=args.reg_lambda,
epochs=args.epochs,
batch_size=args.batch_size,
dropout_rate=dropout_rate,
history_dir=history_dir,
summary_path=summary_path,
)
for entry in results:
print(
f"LR={entry['learning_rate']:.3e} | reg={entry['reg_lambda']:.3e} "
f"| dev_acc={entry['dev_acc']:.4f}"
)
elif args.mode == "random-search":
print(
f"Running random search ({args.trials} trials) "
f"LR∈[{args.lr_min:.2e},{args.lr_max:.2e}], "
f"λ∈[{args.reg_min:.2e},{args.reg_max:.2e}]..."
)
summary_path = history_dir / "random_search_summary.csv" if history_dir is not None else None
results = random_search_hparams(
args.trials,
(args.lr_min, args.lr_max),
(args.reg_min, args.reg_max),
epochs=args.epochs,
batch_size=args.batch_size,
dropout_rate=dropout_rate,
seed=args.seed,
history_dir=history_dir,
summary_path=summary_path,
)
for entry in results:
print(
f"Trial {entry['trial']:02d} | LR={entry['learning_rate']:.3e} "
f"| reg={entry['reg_lambda']:.3e} | dev_acc={entry['dev_acc']:.4f}"
)
best = results[0]
print(
f"\nBest trial -> LR={best['learning_rate']:.3e}, "
f"reg={best['reg_lambda']:.3e}, dev_acc={best['dev_acc']:.4f}"
)
elif args.mode == "auto-train":
print(
f"Auto-train pipeline: {args.trials} search trials "
f"(epochs={args.epochs}) followed by final training (epochs={args.final_epochs})."
)
results = auto_train_pipeline(
trials=args.trials,
lr_bounds=(args.lr_min, args.lr_max),
reg_bounds=(args.reg_min, args.reg_max),
search_epochs=args.epochs,
final_epochs=args.final_epochs,
batch_size=args.batch_size,
dropout_rate=dropout_rate,
final_batch_size=args.final_batch_size,
final_dropout_rate=final_dropout_rate,
history_dir=history_dir,
seed=args.seed,
output_model_path=args.output_model,
)
best = results["best_trial"]
print(
f"\nAuto-train complete. "
f"Best trial LR={best['learning_rate']:.3e}, reg={best['reg_lambda']:.3e}. "
f"Final dev_acc={results['final_dev_acc']:.4f}. "
f"Model saved to '{results['model_path']}'."
)
if __name__ == "__main__":
main()