Spaces:
Sleeping
Sleeping
| """ | |
| Section 1: Imports and network configurations | |
| """ | |
| from __future__ import annotations | |
| import numpy as np | |
| import argparse | |
| import csv | |
| from pathlib import Path | |
| from copy import deepcopy | |
| from numpy.lib.stride_tricks import sliding_window_view | |
| BASE_DIR = Path(__file__).resolve().parent | |
| ARCHIVE_DIR = BASE_DIR / "archive" | |
| DATASET_PATH = ARCHIVE_DIR / "mnist_compressed.npz" | |
| np.random.seed(42) | |
| # Network configuration | |
| IMAGE_CHANNELS = 1 | |
| IMAGE_HEIGHT = 28 | |
| IMAGE_WIDTH = 56 | |
| INPUT_DIM = IMAGE_HEIGHT * IMAGE_WIDTH # flattened input for compatibility | |
| CONV_FILTERS = (16, 32) | |
| KERNEL_SIZE = 3 | |
| POOL_SIZE = 2 | |
| FC_HIDDEN_DIM = 256 | |
| OUTPUT_DIM = 100 | |
| EPOCHS = 20 | |
| BATCH_SIZE = 256 | |
| LEARNING_RATE = 1e-3 | |
| REG_LAMBDA = 1e-4 | |
| DROP_RATE_FC = 0.4 | |
| EARLY_STOP_PATIENCE = 5 | |
| EARLY_STOP_MIN_DELTA = 1e-3 | |
| MAX_SHIFT_PIXELS = 2 | |
| CONTRAST_JITTER_STD = 0.1 | |
| BETA1 = 0.9 | |
| BETA2 = 0.999 | |
| EPSILON = 1e-8 | |
| DEV_SIZE = 10_000 # held-out validation set size | |
| def save_history_to_csv(history, filepath): | |
| target_path = Path(filepath) | |
| target_path.parent.mkdir(parents=True, exist_ok=True) | |
| with target_path.open("w", newline="") as csvfile: | |
| writer = csv.DictWriter(csvfile, fieldnames=("epoch", "loss", "train_acc", "dev_acc")) | |
| writer.writeheader() | |
| for row in history: | |
| writer.writerow(row) | |
| def save_sweep_summary(results, filepath, *, include_trial=False): | |
| target_path = Path(filepath) | |
| target_path.parent.mkdir(parents=True, exist_ok=True) | |
| fieldnames = ["learning_rate", "reg_lambda", "dev_acc"] | |
| if include_trial: | |
| fieldnames.insert(0, "trial") | |
| with target_path.open("w", newline="") as csvfile: | |
| writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | |
| writer.writeheader() | |
| for entry in results: | |
| row = { | |
| "learning_rate": float(entry["learning_rate"]), | |
| "reg_lambda": float(entry["reg_lambda"]), | |
| "dev_acc": float(entry["dev_acc"]), | |
| } | |
| if include_trial: | |
| row["trial"] = int(entry["trial"]) | |
| writer.writerow(row) | |
| """ | |
| Section 2: Loads the input data, transposes (so arrays are feature x samples) and normalises it (scales features to 0-1) | |
| """ | |
| def load_data(path: Path, dev_size: int = DEV_SIZE): | |
| """ | |
| Load the MNIST-100 dataset from the compressed archive and return | |
| training / validation splits flattened to (features, samples). | |
| """ | |
| path = Path(path) | |
| if not path.exists(): | |
| raise FileNotFoundError(f"Dataset not found at '{path}'") | |
| with np.load(path) as data: | |
| train_images = data["train_images"].astype(np.float32) | |
| train_labels = data["train_labels"].astype(np.int64) | |
| test_images = data["test_images"].astype(np.float32) | |
| test_labels = data["test_labels"].astype(np.int64) | |
| # Flatten images to column-major format (features, samples) | |
| X_full = train_images.reshape(train_images.shape[0], -1).T # (input_dim, m) | |
| Y_full = train_labels | |
| # Shuffle before splitting to validation | |
| permutation = np.random.permutation(X_full.shape[1]) | |
| X_full = X_full[:, permutation] | |
| Y_full = Y_full[permutation] | |
| X_dev = X_full[:, :dev_size] | |
| Y_dev = Y_full[:dev_size] | |
| X_train = X_full[:, dev_size:] | |
| Y_train = Y_full[dev_size:] | |
| # Also flatten the test set for later reuse if needed. | |
| X_test = test_images.reshape(test_images.shape[0], -1).T | |
| return X_train, Y_train, X_dev, Y_dev, X_test, test_labels | |
| """ | |
| Section 3: Normalises the features [(0, 255)] to [(0, 1)] | |
| """ | |
| def normalize_features(X_train, X_dev): | |
| """ | |
| Normalize features to zero mean and unit variance using the training set. | |
| """ | |
| X_train /= 255.0 | |
| X_dev /= 255.0 | |
| mean = np.mean(X_train, axis=1, keepdims=True) | |
| std = np.std(X_train, axis=1, keepdims=True) + 1e-8 | |
| X_train = (X_train - mean) / std | |
| X_dev = (X_dev - mean) / std | |
| return X_train, X_dev, mean, std | |
| """ | |
| Section 4: Initialises the parameters (layers, weights and biases) and adam optimizer | |
| """ | |
| def init_params(): | |
| params = {} | |
| conv1_fan_in = IMAGE_CHANNELS * KERNEL_SIZE * KERNEL_SIZE | |
| params["conv1_W"] = ( | |
| np.random.randn(CONV_FILTERS[0], IMAGE_CHANNELS, KERNEL_SIZE, KERNEL_SIZE) * np.sqrt(2.0 / conv1_fan_in) | |
| ).astype(np.float32) | |
| params["conv1_b"] = np.zeros((CONV_FILTERS[0], 1), dtype=np.float32) | |
| conv2_fan_in = CONV_FILTERS[0] * KERNEL_SIZE * KERNEL_SIZE | |
| params["conv2_W"] = ( | |
| np.random.randn(CONV_FILTERS[1], CONV_FILTERS[0], KERNEL_SIZE, KERNEL_SIZE) * np.sqrt(2.0 / conv2_fan_in) | |
| ).astype(np.float32) | |
| params["conv2_b"] = np.zeros((CONV_FILTERS[1], 1), dtype=np.float32) | |
| height_after_pool1 = IMAGE_HEIGHT // POOL_SIZE | |
| width_after_pool1 = IMAGE_WIDTH // POOL_SIZE | |
| height_after_pool2 = height_after_pool1 // POOL_SIZE | |
| width_after_pool2 = width_after_pool1 // POOL_SIZE | |
| flattened_dim = CONV_FILTERS[1] * height_after_pool2 * width_after_pool2 | |
| params["fc1_W"] = ( | |
| np.random.randn(FC_HIDDEN_DIM, flattened_dim) * np.sqrt(2.0 / flattened_dim) | |
| ).astype(np.float32) | |
| params["fc1_b"] = np.zeros((FC_HIDDEN_DIM, 1), dtype=np.float32) | |
| params["fc2_W"] = ( | |
| np.random.randn(OUTPUT_DIM, FC_HIDDEN_DIM) * np.sqrt(2.0 / FC_HIDDEN_DIM) | |
| ).astype(np.float32) | |
| params["fc2_b"] = np.zeros((OUTPUT_DIM, 1), dtype=np.float32) | |
| return params | |
| def init_adam(params): | |
| v = {} | |
| s = {} | |
| for key, value in params.items(): | |
| v[key] = np.zeros_like(value) | |
| s[key] = np.zeros_like(value) | |
| return v, s | |
| """ | |
| Section 5: ReLu activation function and backward ReLu function | |
| """ | |
| def relu(Z): | |
| return np.maximum(0.0, Z) | |
| def relu_backward(Z): | |
| return (Z > 0).astype(np.float32) | |
| """ | |
| Section 6: Reshapes the flattened input to 4D tensors (batch, channels, height, width) for the convolutional layers | |
| """ | |
| def reshape_flat_to_images(X: np.ndarray, *, batch_size: int | None = None): | |
| """ | |
| Convert flattened columns (features, batch) into 4D tensors (batch, channels, height, width). | |
| """ | |
| _, m = X.shape | |
| if batch_size is not None and m != batch_size: | |
| raise ValueError(f"Expected batch size {batch_size}, got {m}") | |
| images = X.T.reshape(m, IMAGE_HEIGHT, IMAGE_WIDTH) | |
| return images[:, None, :, :] # add channel dim | |
| """ | |
| Section 7: Convolutional layer forward pass and backward pass | |
| """ | |
| def im2col(X, kernel_h, kernel_w, stride, padding): | |
| X_padded = np.pad( | |
| X, | |
| ((0, 0), (0, 0), (padding, padding), (padding, padding)), | |
| mode="constant", | |
| ) | |
| windows = sliding_window_view(X_padded, (kernel_h, kernel_w), axis=(2, 3)) | |
| # windows shape: (batch, channels, out_height, out_width, kernel_h, kernel_w) | |
| batch_size, channels, out_height, out_width, _, _ = windows.shape | |
| cols = windows.transpose(0, 2, 3, 1, 4, 5).reshape(batch_size * out_height * out_width, channels * kernel_h * kernel_w) | |
| return X_padded, cols, out_height, out_width | |
| def col2im(cols, X_shape, kernel_h, kernel_w, stride, padding, out_height, out_width): | |
| batch_size, channels, height, width = X_shape | |
| cols_reshaped = cols.reshape(batch_size, out_height, out_width, channels, kernel_h, kernel_w) | |
| cols_reshaped = cols_reshaped.transpose(0, 3, 1, 2, 4, 5) | |
| X_padded = np.zeros((batch_size, channels, height + 2 * padding, width + 2 * padding), dtype=np.float32) | |
| for h_idx in range(out_height): | |
| h_start = h_idx * stride | |
| h_end = h_start + kernel_h | |
| for w_idx in range(out_width): | |
| w_start = w_idx * stride | |
| w_end = w_start + kernel_w | |
| X_padded[:, :, h_start:h_end, w_start:w_end] += cols_reshaped[:, :, h_idx, w_idx, :, :] | |
| if padding > 0: | |
| return X_padded[:, :, padding:-padding, padding:-padding] | |
| return X_padded | |
| def conv_forward(X, W, b, *, stride: int = 1, padding: int = 0): | |
| batch_size, in_channels, height, width = X.shape | |
| num_filters, _, kernel_h, kernel_w = W.shape | |
| X_padded, cols, out_height, out_width = im2col(X, kernel_h, kernel_w, stride, padding) | |
| W_col = W.reshape(num_filters, -1) | |
| out_cols = cols @ W_col.T # (batch*out_height*out_width, num_filters) | |
| out = out_cols.reshape(batch_size, out_height, out_width, num_filters).transpose(0, 3, 1, 2) | |
| out = out.astype(np.float32, copy=False) | |
| out += b.reshape(1, num_filters, 1, 1) | |
| cache = { | |
| "X": X, | |
| "X_padded": X_padded, | |
| "W": W, | |
| "stride": stride, | |
| "padding": padding, | |
| "kernel_h": kernel_h, | |
| "kernel_w": kernel_w, | |
| "out_height": out_height, | |
| "out_width": out_width, | |
| "cols": cols, | |
| "W_col": W_col, | |
| "output_shape": out.shape, | |
| } | |
| return out, cache | |
| def conv_backward(dout, cache): | |
| X = cache["X"] | |
| W = cache["W"] | |
| stride = cache["stride"] | |
| padding = cache["padding"] | |
| kernel_h = cache["kernel_h"] | |
| kernel_w = cache["kernel_w"] | |
| out_height = cache["out_height"] | |
| out_width = cache["out_width"] | |
| cols = cache["cols"] | |
| W_col = cache["W_col"] | |
| batch_size, _, _, _ = X.shape | |
| num_filters = W.shape[0] | |
| dout_cols = dout.transpose(0, 2, 3, 1).reshape(batch_size * out_height * out_width, num_filters) | |
| dW_col = dout_cols.T @ cols | |
| dW = dW_col.reshape(W.shape) | |
| db = np.sum(dout, axis=(0, 2, 3)).reshape(num_filters, 1) | |
| dcols = dout_cols @ W_col | |
| dX = col2im(dcols, X.shape, kernel_h, kernel_w, stride, padding, out_height, out_width) | |
| return dX, dW, db | |
| """ | |
| Section 8: Max pooling layer forward pass and backward pass | |
| """ | |
| def maxpool_forward(X, *, pool_size: int = 2, stride: int = 2): | |
| batch_size, channels, height, width = X.shape | |
| out_height = (height - pool_size) // stride + 1 | |
| out_width = (width - pool_size) // stride + 1 | |
| out = np.zeros((batch_size, channels, out_height, out_width), dtype=np.float32) | |
| for h_idx in range(out_height): | |
| h_start = h_idx * stride | |
| h_end = h_start + pool_size | |
| for w_idx in range(out_width): | |
| w_start = w_idx * stride | |
| w_end = w_start + pool_size | |
| window = X[:, :, h_start:h_end, w_start:w_end] | |
| max_vals = np.max(window, axis=(2, 3)) | |
| out[:, :, h_idx, w_idx] = max_vals | |
| cache = { | |
| "X": X, | |
| "pool_size": pool_size, | |
| "stride": stride, | |
| "output_shape": out.shape, | |
| } | |
| return out, cache | |
| def maxpool_backward(dout, cache): | |
| X = cache["X"] | |
| pool_size = cache["pool_size"] | |
| stride = cache["stride"] | |
| batch_size, channels, out_height, out_width = dout.shape | |
| dX = np.zeros_like(X) | |
| for h_idx in range(out_height): | |
| h_start = h_idx * stride | |
| h_end = h_start + pool_size | |
| for w_idx in range(out_width): | |
| w_start = w_idx * stride | |
| w_end = w_start + pool_size | |
| window = X[:, :, h_start:h_end, w_start:w_end] | |
| max_vals = np.max(window, axis=(2, 3), keepdims=True) | |
| mask = (window == max_vals).astype(np.float32) | |
| mask_sum = np.sum(mask, axis=(2, 3), keepdims=True) | |
| mask /= np.maximum(mask_sum, 1.0) | |
| grad_slice = dout[:, :, h_idx, w_idx][:, :, None, None] | |
| dX[:, :, h_start:h_end, w_start:w_end] += mask * grad_slice | |
| return dX | |
| def softmax(Z): | |
| Z_shift = Z - np.max(Z, axis=0, keepdims=True) | |
| expZ = np.exp(Z_shift) | |
| return expZ / np.sum(expZ, axis=0, keepdims=True) | |
| def one_hot(Y, num_classes=OUTPUT_DIM): | |
| one_hot_y = np.zeros((num_classes, Y.size), dtype=np.float32) | |
| one_hot_y[Y, np.arange(Y.size)] = 1.0 | |
| return one_hot_y | |
| """ | |
| Section 9: Forward propagation and comptutes for loss | |
| """ | |
| def forward_prop( | |
| X, | |
| params, | |
| *, | |
| training: bool = False, | |
| dropout_rate: float = DROP_RATE_FC, | |
| ): | |
| batch_size = X.shape[1] | |
| images = reshape_flat_to_images(X, batch_size=batch_size) | |
| padding = KERNEL_SIZE // 2 | |
| conv1_out, conv1_cache = conv_forward(images, params["conv1_W"], params["conv1_b"], stride=1, padding=padding) | |
| relu1 = relu(conv1_out) | |
| pool1_out, pool1_cache = maxpool_forward(relu1, pool_size=POOL_SIZE, stride=POOL_SIZE) | |
| conv2_out, conv2_cache = conv_forward(pool1_out, params["conv2_W"], params["conv2_b"], stride=1, padding=padding) | |
| relu2 = relu(conv2_out) | |
| pool2_out, pool2_cache = maxpool_forward(relu2, pool_size=POOL_SIZE, stride=POOL_SIZE) | |
| flattened = pool2_out.reshape(batch_size, -1).T # (features_flat, batch) | |
| Z_fc1 = params["fc1_W"] @ flattened + params["fc1_b"] | |
| A_fc1 = relu(Z_fc1) | |
| dropout_mask = None | |
| keep_prob = 1.0 - dropout_rate | |
| if training and dropout_rate > 0.0: | |
| dropout_mask = (np.random.rand(*A_fc1.shape) >= dropout_rate).astype(np.float32) | |
| A_fc1 = (A_fc1 * dropout_mask) / keep_prob | |
| Z_fc2 = params["fc2_W"] @ A_fc1 + params["fc2_b"] | |
| probs = softmax(Z_fc2) | |
| cache = { | |
| "X": X, | |
| "images": images, | |
| "conv1_out": conv1_out, | |
| "conv1_cache": conv1_cache, | |
| "pool1_cache": pool1_cache, | |
| "conv2_out": conv2_out, | |
| "conv2_cache": conv2_cache, | |
| "pool2_cache": pool2_cache, | |
| "flattened": flattened, | |
| "Z_fc1": Z_fc1, | |
| "A_fc1": A_fc1, | |
| "dropout_mask": dropout_mask, | |
| "keep_prob": keep_prob, | |
| "dropout_rate": dropout_rate, | |
| "Z_fc2": Z_fc2, | |
| "probs": probs, | |
| } | |
| return cache, probs | |
| def compute_loss(probs, Y_batch, params, reg_lambda): | |
| m = Y_batch.shape[1] | |
| log_likelihood = -np.log(probs + 1e-9) * Y_batch | |
| data_loss = np.sum(log_likelihood) / m | |
| l2_penalty = 0.0 | |
| for key in ("conv1_W", "conv2_W", "fc1_W", "fc2_W"): | |
| l2_penalty += np.sum(np.square(params[key])) | |
| l2_loss = (reg_lambda / (2 * m)) * l2_penalty | |
| return data_loss + l2_loss | |
| """ | |
| Section 10: Back propagation for the CNN model | |
| """ | |
| def back_prop(cache, Y_batch, params, reg_lambda, dropout_rate): | |
| m = Y_batch.shape[1] | |
| grads = {} | |
| probs = cache["probs"] | |
| A_fc1 = cache["A_fc1"] | |
| Z_fc1 = cache["Z_fc1"] | |
| flattened = cache["flattened"] | |
| dropout_mask = cache["dropout_mask"] | |
| keep_prob = cache["keep_prob"] | |
| dZ_fc2 = probs - Y_batch | |
| grads["fc2_W"] = (dZ_fc2 @ A_fc1.T) / m + (reg_lambda / m) * params["fc2_W"] | |
| grads["fc2_b"] = np.sum(dZ_fc2, axis=1, keepdims=True) / m | |
| dA_fc1 = params["fc2_W"].T @ dZ_fc2 | |
| if dropout_mask is not None: | |
| dA_fc1 = (dA_fc1 * dropout_mask) / keep_prob | |
| dZ_fc1 = dA_fc1 * relu_backward(Z_fc1) | |
| grads["fc1_W"] = (dZ_fc1 @ flattened.T) / m + (reg_lambda / m) * params["fc1_W"] | |
| grads["fc1_b"] = np.sum(dZ_fc1, axis=1, keepdims=True) / m | |
| dFlatten = params["fc1_W"].T @ dZ_fc1 # (flatten_dim, batch) | |
| pool2_shape = cache["pool2_cache"]["output_shape"] | |
| dPool2 = dFlatten.T.reshape(pool2_shape) | |
| dRelu2_input = maxpool_backward(dPool2, cache["pool2_cache"]) | |
| dConv2 = dRelu2_input * relu_backward(cache["conv2_out"]) | |
| dPool1_input, dConv2_W, dConv2_b = conv_backward(dConv2, cache["conv2_cache"]) | |
| grads["conv2_W"] = dConv2_W / m + (reg_lambda / m) * params["conv2_W"] | |
| grads["conv2_b"] = dConv2_b / m | |
| dRelu1_input = maxpool_backward(dPool1_input, cache["pool1_cache"]) | |
| dConv1 = dRelu1_input * relu_backward(cache["conv1_out"]) | |
| _, dConv1_W, dConv1_b = conv_backward(dConv1, cache["conv1_cache"]) | |
| grads["conv1_W"] = dConv1_W / m + (reg_lambda / m) * params["conv1_W"] | |
| grads["conv1_b"] = dConv1_b / m | |
| return grads | |
| """ | |
| Section 11: Updates the parameters using the adam optimizer | |
| """ | |
| def update_params_adam(params, grads, v, s, t, learning_rate): | |
| updated_params = {} | |
| for key in params: | |
| v[key] = BETA1 * v[key] + (1 - BETA1) * grads[key] | |
| s[key] = BETA2 * s[key] + (1 - BETA2) * (grads[key] ** 2) | |
| v_corrected = v[key] / (1 - BETA1 ** t) | |
| s_corrected = s[key] / (1 - BETA2 ** t) | |
| updated_params[key] = params[key] - learning_rate * v_corrected / (np.sqrt(s_corrected) + EPSILON) | |
| return updated_params, v, s | |
| def get_predictions(probs): | |
| return np.argmax(probs, axis=0) | |
| def get_accuracy(probs, labels): | |
| predictions = get_predictions(probs) | |
| return np.mean(predictions == labels) | |
| """ | |
| Section 12: Augments the batch with horizontal shifts and contrast/brightness jitter | |
| """ | |
| def augment_batch( | |
| X_batch, | |
| *, | |
| image_shape: tuple[int, int] = (28, 56), | |
| max_shift: int = MAX_SHIFT_PIXELS, | |
| contrast_jitter_std: float = CONTRAST_JITTER_STD, | |
| ): | |
| """ | |
| Apply lightweight augmentation: horizontal shifts and contrast/brightness jitter. | |
| """ | |
| if max_shift <= 0 and contrast_jitter_std <= 0.0: | |
| return X_batch | |
| batch_size = X_batch.shape[1] | |
| images = X_batch.T.reshape(batch_size, *image_shape) | |
| if max_shift > 0: | |
| shifts = np.random.randint(-max_shift, max_shift + 1, size=batch_size) | |
| for idx, shift in enumerate(shifts): | |
| if shift > 0: | |
| shifted = np.roll(images[idx], shift, axis=1) | |
| shifted[:, :shift] = 0.0 | |
| images[idx] = shifted | |
| elif shift < 0: | |
| shift = -shift | |
| shifted = np.roll(images[idx], -shift, axis=1) | |
| shifted[:, -shift:] = 0.0 | |
| images[idx] = shifted | |
| if contrast_jitter_std > 0.0: | |
| scale = 1.0 + np.random.normal(0.0, contrast_jitter_std, size=batch_size) | |
| bias = np.random.normal(0.0, contrast_jitter_std, size=batch_size) | |
| images *= scale[:, None, None] | |
| images += bias[:, None, None] | |
| np.clip(images, -3.0, 3.0, out=images) | |
| return images.reshape(batch_size, -1).T | |
| """ | |
| Section 13: Trains the model + evaluates the model | |
| """ | |
| def train_model( | |
| X_train, | |
| Y_train, | |
| X_dev, | |
| Y_dev, | |
| *, | |
| epochs: int = EPOCHS, | |
| batch_size: int = BATCH_SIZE, | |
| learning_rate: float = LEARNING_RATE, | |
| reg_lambda: float = REG_LAMBDA, | |
| dropout_rate: float = DROP_RATE_FC, | |
| early_stop_patience: int = EARLY_STOP_PATIENCE, | |
| early_stop_min_delta: float = EARLY_STOP_MIN_DELTA, | |
| use_augmentation: bool = True, | |
| ): | |
| params = init_params() | |
| v, s = init_adam(params) | |
| m_train = X_train.shape[1] | |
| global_step = 0 | |
| best_dev_acc = -np.inf | |
| best_params = deepcopy(params) | |
| patience_counter = 0 | |
| history = [] | |
| for epoch in range(1, epochs + 1): | |
| permutation = np.random.permutation(m_train) | |
| X_shuffled = X_train[:, permutation] | |
| Y_shuffled = Y_train[permutation] | |
| epoch_loss = 0.0 | |
| for start in range(0, m_train, batch_size): | |
| end = min(start + batch_size, m_train) | |
| X_batch = X_shuffled[:, start:end] | |
| Y_batch_indices = Y_shuffled[start:end] | |
| Y_batch = one_hot(Y_batch_indices) | |
| if use_augmentation: | |
| X_batch = augment_batch(X_batch.copy()) | |
| cache, probs = forward_prop( | |
| X_batch, | |
| params, | |
| training=True, | |
| dropout_rate=dropout_rate, | |
| ) | |
| loss = compute_loss(probs, Y_batch, params, reg_lambda) | |
| grads = back_prop(cache, Y_batch, params, reg_lambda, dropout_rate) | |
| global_step += 1 | |
| params, v, s = update_params_adam(params, grads, v, s, global_step, learning_rate) | |
| epoch_loss += loss * (end - start) | |
| epoch_loss /= m_train | |
| _, train_probs = forward_prop(X_train, params, training=False, dropout_rate=dropout_rate) | |
| train_accuracy = get_accuracy(train_probs, Y_train) | |
| _, dev_probs = forward_prop(X_dev, params, training=False, dropout_rate=dropout_rate) | |
| dev_accuracy = get_accuracy(dev_probs, Y_dev) | |
| print( | |
| f"Epoch {epoch:02d} - loss: {epoch_loss:.4f} " | |
| f"- train_acc: {train_accuracy:.4f} - dev_acc: {dev_accuracy:.4f}" | |
| ) | |
| history.append( | |
| { | |
| "epoch": epoch, | |
| "loss": epoch_loss, | |
| "train_acc": train_accuracy, | |
| "dev_acc": dev_accuracy, | |
| } | |
| ) | |
| if dev_accuracy > best_dev_acc + early_stop_min_delta: | |
| best_dev_acc = dev_accuracy | |
| best_params = deepcopy(params) | |
| patience_counter = 0 | |
| else: | |
| patience_counter += 1 | |
| if patience_counter >= early_stop_patience: | |
| print( | |
| f"Early stopping triggered at epoch {epoch:02d}. " | |
| f"Best dev_acc={best_dev_acc:.4f}" | |
| ) | |
| break | |
| return best_params, history | |
| def evaluate(params, X, Y): | |
| _, probs = forward_prop(X, params, training=False) | |
| predictions = get_predictions(probs) | |
| accuracy = np.mean(predictions == Y) | |
| return predictions, accuracy | |
| """ | |
| Section 14: Trains the model once | |
| """ | |
| def train_once( | |
| learning_rate: float, | |
| reg_lambda: float, | |
| *, | |
| epochs: int = EPOCHS, | |
| batch_size: int = BATCH_SIZE, | |
| dropout_rate: float = DROP_RATE_FC, | |
| history_path: Path | None = None, | |
| ): | |
| """ | |
| Convenience wrapper for hyperparameter sweeps. Returns trained params and dev accuracy. | |
| """ | |
| X_train, Y_train, X_dev, Y_dev, _, _ = load_data(DATASET_PATH) | |
| X_train, X_dev, mean, std = normalize_features(X_train, X_dev) | |
| params, history = train_model( | |
| X_train, | |
| Y_train, | |
| X_dev, | |
| Y_dev, | |
| epochs=epochs, | |
| batch_size=batch_size, | |
| learning_rate=learning_rate, | |
| reg_lambda=reg_lambda, | |
| dropout_rate=dropout_rate, | |
| ) | |
| _, dev_accuracy = evaluate(params, X_dev, Y_dev) | |
| if history_path is not None: | |
| save_history_to_csv(history, history_path) | |
| return params, dev_accuracy, mean, std, history | |
| """ | |
| Section 15: Hyperparameter sweep for learning rate, regularization and dropout rate | |
| """ | |
| def lr_sweep( | |
| learning_rates: list[float], | |
| *, | |
| reg_lambda: float = REG_LAMBDA, | |
| epochs: int = EPOCHS, | |
| batch_size: int = BATCH_SIZE, | |
| dropout_rate: float = DROP_RATE_FC, | |
| history_dir: Path | None = None, | |
| summary_path: Path | None = None, | |
| ): | |
| results = [] | |
| history_directory = Path(history_dir) if history_dir is not None else None | |
| if history_directory is not None: | |
| history_directory.mkdir(parents=True, exist_ok=True) | |
| for lr in learning_rates: | |
| history_path = None | |
| if history_directory is not None: | |
| safe_lr = f"{lr:.2e}".replace("+", "").replace("-", "m") | |
| history_path = history_directory / f"lr_{safe_lr}.csv" | |
| _, dev_acc, _, _, history = train_once( | |
| lr, | |
| reg_lambda, | |
| epochs=epochs, | |
| batch_size=batch_size, | |
| dropout_rate=dropout_rate, | |
| history_path=history_path, | |
| ) | |
| results.append( | |
| { | |
| "learning_rate": float(lr), | |
| "reg_lambda": float(reg_lambda), | |
| "dev_acc": float(dev_acc), | |
| "history": history, | |
| } | |
| ) | |
| if summary_path is not None: | |
| save_sweep_summary(results, summary_path) | |
| return results | |
| def random_search_hparams( | |
| num_trials: int, | |
| lr_bounds: tuple[float, float], | |
| reg_bounds: tuple[float, float], | |
| *, | |
| epochs: int = EPOCHS, | |
| batch_size: int = BATCH_SIZE, | |
| dropout_rate: float = DROP_RATE_FC, | |
| seed: int | None = None, | |
| history_dir: Path | None = None, | |
| summary_path: Path | None = None, | |
| ): | |
| if num_trials <= 0: | |
| raise ValueError("num_trials must be positive") | |
| lr_min, lr_max = lr_bounds | |
| reg_min, reg_max = reg_bounds | |
| if lr_min <= 0 or lr_max <= 0: | |
| raise ValueError("Learning rate bounds must be positive") | |
| if reg_min <= 0 or reg_max <= 0: | |
| raise ValueError("Regularization bounds must be positive") | |
| rng = np.random.default_rng(seed) | |
| history_directory = Path(history_dir) if history_dir is not None else None | |
| if history_directory is not None: | |
| history_directory.mkdir(parents=True, exist_ok=True) | |
| results = [] | |
| log_lr_min, log_lr_max = np.log(lr_min), np.log(lr_max) | |
| log_reg_min, log_reg_max = np.log(reg_min), np.log(reg_max) | |
| for trial in range(1, num_trials + 1): | |
| lr_sample = float(np.exp(rng.uniform(log_lr_min, log_lr_max))) | |
| reg_sample = float(np.exp(rng.uniform(log_reg_min, log_reg_max))) | |
| history_path = None | |
| if history_directory is not None: | |
| safe_lr = f"{lr_sample:.2e}".replace("+", "").replace("-", "m") | |
| safe_reg = f"{reg_sample:.2e}".replace("+", "").replace("-", "m") | |
| history_path = history_directory / f"trial_{trial:02d}_lr-{safe_lr}_reg-{safe_reg}.csv" | |
| _, dev_acc, _, _, history = train_once( | |
| lr_sample, | |
| reg_sample, | |
| epochs=epochs, | |
| batch_size=batch_size, | |
| dropout_rate=dropout_rate, | |
| history_path=history_path, | |
| ) | |
| results.append( | |
| { | |
| "trial": trial, | |
| "learning_rate": lr_sample, | |
| "reg_lambda": reg_sample, | |
| "dev_acc": float(dev_acc), | |
| "history": history, | |
| } | |
| ) | |
| results.sort(key=lambda item: item["dev_acc"], reverse=True) | |
| if summary_path is not None: | |
| save_sweep_summary(results, summary_path, include_trial=True) | |
| return results | |
| def auto_train_pipeline( | |
| *, | |
| trials: int, | |
| lr_bounds: tuple[float, float], | |
| reg_bounds: tuple[float, float], | |
| search_epochs: int, | |
| final_epochs: int, | |
| batch_size: int, | |
| dropout_rate: float, | |
| final_batch_size: int | None, | |
| final_dropout_rate: float | None, | |
| history_dir: Path | None, | |
| seed: int | None, | |
| output_model_path: Path | None, | |
| ): | |
| history_directory = Path(history_dir) if history_dir is not None else None | |
| if history_directory is not None: | |
| history_directory.mkdir(parents=True, exist_ok=True) | |
| search_summary_path = None | |
| if history_directory is not None: | |
| search_summary_path = history_directory / "random_search_summary.csv" | |
| results = random_search_hparams( | |
| trials, | |
| lr_bounds, | |
| reg_bounds, | |
| epochs=search_epochs, | |
| batch_size=batch_size, | |
| dropout_rate=dropout_rate, | |
| seed=seed, | |
| history_dir=history_directory / "search_histories" if history_directory is not None else None, | |
| summary_path=search_summary_path, | |
| ) | |
| best = results[0] | |
| print( | |
| f"\nBest search trial -> LR={best['learning_rate']:.3e}, " | |
| f"reg={best['reg_lambda']:.3e}, dev_acc={best['dev_acc']:.4f}" | |
| ) | |
| final_dropout = final_dropout_rate if final_dropout_rate is not None else dropout_rate | |
| final_history_path = None | |
| if history_directory is not None: | |
| final_history_path = history_directory / "final_train_history.csv" | |
| params, final_dev_acc, mean, std, final_history = train_once( | |
| best["learning_rate"], | |
| best["reg_lambda"], | |
| epochs=final_epochs, | |
| batch_size=final_batch_size or batch_size, | |
| dropout_rate=final_dropout, | |
| history_path=final_history_path, | |
| ) | |
| model_output_path = output_model_path if output_model_path is not None else ARCHIVE_DIR / "trained_model_mnist100.npz" | |
| save_model(params, mean, std, model_output_path) | |
| return { | |
| "best_trial": best, | |
| "final_dev_acc": final_dev_acc, | |
| "model_path": Path(model_output_path), | |
| "final_history": final_history, | |
| } | |
| """ | |
| Section 16: Saves the model | |
| """ | |
| def save_model(params, mean, std, filepath=None): | |
| target_path = Path(filepath) if filepath is not None else ARCHIVE_DIR / "trained_model_mnist100.npz" | |
| target_path.parent.mkdir(parents=True, exist_ok=True) | |
| print(f"\nSaving trained model to '{target_path}'...") | |
| np.savez(target_path, **params, mean=mean, std=std) | |
| print("Model saved successfully!") | |
| """ | |
| Section 17: Main function | |
| """ | |
| def main(): | |
| parser = argparse.ArgumentParser(description="MNIST-100 training and tuning utilities.") | |
| parser.add_argument( | |
| "--mode", | |
| choices=("train", "lr-sweep", "random-search", "auto-train"), | |
| default="train", | |
| help="Select high-level action.", | |
| ) | |
| parser.add_argument("--learning-rate", type=float, default=LEARNING_RATE, help="Base learning rate.") | |
| parser.add_argument("--learning-rates", type=str, help="Comma-separated list for LR sweep.") | |
| parser.add_argument("--reg-lambda", type=float, default=REG_LAMBDA, help="L2 regularization strength.") | |
| parser.add_argument("--lr-min", type=float, default=1e-4, help="Min LR for random search (exclusive mode).") | |
| parser.add_argument("--lr-max", type=float, default=5e-3, help="Max LR for random search.") | |
| parser.add_argument("--reg-min", type=float, default=1e-5, help="Min lambda for random search.") | |
| parser.add_argument("--reg-max", type=float, default=1e-3, help="Max lambda for random search.") | |
| parser.add_argument("--trials", type=int, default=5, help="Number of random-search trials.") | |
| parser.add_argument("--epochs", type=int, default=EPOCHS, help="Train epochs per run.") | |
| parser.add_argument("--batch-size", type=int, default=BATCH_SIZE, help="Mini-batch size.") | |
| parser.add_argument( | |
| "--final-epochs", | |
| type=int, | |
| default=40, | |
| help="Epoch budget for the final training run in auto-train mode.", | |
| ) | |
| parser.add_argument( | |
| "--final-batch-size", | |
| type=int, | |
| help="Mini-batch size for the final training run (defaults to --batch-size).", | |
| ) | |
| parser.add_argument( | |
| "--dropout", | |
| type=float, | |
| help="Override dropout rate for the fully connected layer.", | |
| ) | |
| parser.add_argument( | |
| "--final-dropout", | |
| type=float, | |
| help="Dropout rate for the final training pass in auto-train mode.", | |
| ) | |
| parser.add_argument( | |
| "--history-dir", | |
| type=Path, | |
| help="Directory for saving training histories (CSV).", | |
| ) | |
| parser.add_argument( | |
| "--output-model", | |
| type=Path, | |
| help="Path to save the trained model (.npz). Defaults to archive/trained_model_mnist100.npz.", | |
| ) | |
| parser.add_argument("--seed", type=int, help="Random seed for random search.") | |
| args = parser.parse_args() | |
| dropout_rate = DROP_RATE_FC if args.dropout is None else float(args.dropout) | |
| if not 0.0 <= dropout_rate < 1.0: | |
| raise ValueError("Dropout rate must be in [0, 1).") | |
| final_dropout_rate = None | |
| if args.final_dropout is not None: | |
| final_dropout_rate = float(args.final_dropout) | |
| if not 0.0 <= final_dropout_rate < 1.0: | |
| raise ValueError("Final dropout rate must be in [0, 1).") | |
| history_dir = args.history_dir | |
| if history_dir is not None: | |
| history_dir = Path(history_dir) | |
| history_dir.mkdir(parents=True, exist_ok=True) | |
| if args.mode == "train": | |
| print(f"Loading dataset from '{DATASET_PATH}'...") | |
| X_train, Y_train, X_dev, Y_dev, _, _ = load_data(DATASET_PATH) | |
| X_train, X_dev, mean, std = normalize_features(X_train, X_dev) | |
| print( | |
| f"Training samples: {X_train.shape[1]}, features: {X_train.shape[0]} " | |
| f"| Dev samples: {X_dev.shape[1]}" | |
| ) | |
| params, history = train_model( | |
| X_train, | |
| Y_train, | |
| X_dev, | |
| Y_dev, | |
| epochs=args.epochs, | |
| batch_size=args.batch_size, | |
| learning_rate=args.learning_rate, | |
| reg_lambda=args.reg_lambda, | |
| dropout_rate=dropout_rate, | |
| ) | |
| _, dev_accuracy = evaluate(params, X_dev, Y_dev) | |
| print(f"\nFinal Dev Accuracy: {dev_accuracy:.4f}") | |
| if history_dir is not None: | |
| save_history_to_csv(history, history_dir / "train_history.csv") | |
| save_model(params, mean, std, args.output_model or ARCHIVE_DIR / "trained_model_mnist100.npz") | |
| elif args.mode == "lr-sweep": | |
| if args.learning_rates is None: | |
| raise ValueError("LR sweep mode requires --learning-rates.") | |
| lr_values = [float(value.strip()) for value in args.learning_rates.split(",") if value.strip()] | |
| print(f"Running LR sweep over {lr_values}...") | |
| summary_path = history_dir / "lr_sweep_summary.csv" if history_dir is not None else None | |
| results = lr_sweep( | |
| lr_values, | |
| reg_lambda=args.reg_lambda, | |
| epochs=args.epochs, | |
| batch_size=args.batch_size, | |
| dropout_rate=dropout_rate, | |
| history_dir=history_dir, | |
| summary_path=summary_path, | |
| ) | |
| for entry in results: | |
| print( | |
| f"LR={entry['learning_rate']:.3e} | reg={entry['reg_lambda']:.3e} " | |
| f"| dev_acc={entry['dev_acc']:.4f}" | |
| ) | |
| elif args.mode == "random-search": | |
| print( | |
| f"Running random search ({args.trials} trials) " | |
| f"LR∈[{args.lr_min:.2e},{args.lr_max:.2e}], " | |
| f"λ∈[{args.reg_min:.2e},{args.reg_max:.2e}]..." | |
| ) | |
| summary_path = history_dir / "random_search_summary.csv" if history_dir is not None else None | |
| results = random_search_hparams( | |
| args.trials, | |
| (args.lr_min, args.lr_max), | |
| (args.reg_min, args.reg_max), | |
| epochs=args.epochs, | |
| batch_size=args.batch_size, | |
| dropout_rate=dropout_rate, | |
| seed=args.seed, | |
| history_dir=history_dir, | |
| summary_path=summary_path, | |
| ) | |
| for entry in results: | |
| print( | |
| f"Trial {entry['trial']:02d} | LR={entry['learning_rate']:.3e} " | |
| f"| reg={entry['reg_lambda']:.3e} | dev_acc={entry['dev_acc']:.4f}" | |
| ) | |
| best = results[0] | |
| print( | |
| f"\nBest trial -> LR={best['learning_rate']:.3e}, " | |
| f"reg={best['reg_lambda']:.3e}, dev_acc={best['dev_acc']:.4f}" | |
| ) | |
| elif args.mode == "auto-train": | |
| print( | |
| f"Auto-train pipeline: {args.trials} search trials " | |
| f"(epochs={args.epochs}) followed by final training (epochs={args.final_epochs})." | |
| ) | |
| results = auto_train_pipeline( | |
| trials=args.trials, | |
| lr_bounds=(args.lr_min, args.lr_max), | |
| reg_bounds=(args.reg_min, args.reg_max), | |
| search_epochs=args.epochs, | |
| final_epochs=args.final_epochs, | |
| batch_size=args.batch_size, | |
| dropout_rate=dropout_rate, | |
| final_batch_size=args.final_batch_size, | |
| final_dropout_rate=final_dropout_rate, | |
| history_dir=history_dir, | |
| seed=args.seed, | |
| output_model_path=args.output_model, | |
| ) | |
| best = results["best_trial"] | |
| print( | |
| f"\nAuto-train complete. " | |
| f"Best trial LR={best['learning_rate']:.3e}, reg={best['reg_lambda']:.3e}. " | |
| f"Final dev_acc={results['final_dev_acc']:.4f}. " | |
| f"Model saved to '{results['model_path']}'." | |
| ) | |
| if __name__ == "__main__": | |
| main() | |