import numpy as np from abc import ABC, abstractmethod class Layer(ABC): def __init__(self): self.input = None self.output = None @abstractmethod def forward(self, x_train): pass @abstractmethod def backward(self, e, eta=0.01): pass class Conv2D(Layer): def __init__(self, num_filter, filter_size, rgb: bool = True, stride=(1,1), pad: str = 'valid'): super().__init__() self.num_filter = num_filter self.filter_size = filter_size self.stride = stride self.pad = pad self.input = None self.padding = None if rgb: self.filter = np.random.randn(num_filter, 3, filter_size, filter_size) else: self.filter = np.random.randn(num_filter, 1, filter_size, filter_size) self.bias = np.random.randn(num_filter) def forward(self, x_train): self.input = x_train num_samples, h, w, c = self.input.shape pad_h, pad_w = (0, 0) if self.pad == 'same': pad_h = ((h - 1)(self.stride[0] - 1) + self.filter[0] - 1)//2 pad_w = ((w - 1)(self.stride[1] - 1) + self.filter[1] - 1)//2 self.padding = (pad_h, pad_w) self.input = np.pad(x_train, pad_width=((0, 0,), (pad_h, pad_h), (pad_w, pad_w), (0, 0)),mode='constant') out_h = (h - self.filter_size + 1) // self.stride + 1 out_w = (w - self.filter_size + 1) // self.stride + 1 self.output = np.zeros((num_samples, self.num_filter, out_h, out_w)) for h in range(out_h): for w in range(out_w): h_s = h * self.stride h_e = h_s + self.filter_size w_s = w * self.stride w_e = w_s + self.filter_size self.output[:, :, h, w] = np.sum(self.input[:, :, h_s:h_e, w_s:w_e] * self.filter + self.bias) return self.output def backward(self, e, eta=0.01): num_samples, channel, h, w = self.input.shape input_error = np.zeros(self.input.shape) filter_error = np.zeros(self.filter.shape) # get error per stride for i in range((h - self.filter_size) // self.stride + 1): for j in range((w - self.filter_size) // self.stride + 1): h_s = i * self.stride w_s = j * self.stride region = self.input[:, :, h_s:h_s+self.filter_size, w_s:w_s+self.filter_size] for k in range(self.num_filter): # take loop if the image is rgb filter_error += np.sum(region * e[:, k, i, j][:, None, None, None], axis=0) for n in range(num_samples): input_error[n, :, h_s:h_s + self.filter_size, w_s:w_s + self.filter_size] += np.sum(self.filter * e[n, :, i, j][:, None, None, None], axis=0) self.filter -= eta * filter_error if self.padding != (0, 0): input_error = input_error[:, :, self.padding[0]: -self.padding[0], self.padding[1]:-self.padding[1]] return input_error class MaxPooling2D(Layer): def __init__(self, pool_size=2, stride=2): super().__init__() self.pool_size = pool_size self.stride = stride self.max_indices = None def forward(self, x_train): batch, channel, h, w = x_train.shape self.input = x_train out_h = (h-self.pool_size)//self.stride + 1 out_w = (h-self.pool_size)//self.stride + 1 self.output = np.zeros((batch, channel, out_h, out_w)) self.max_indices = np.zeros_like(self.output, dtype=int) for i in range(out_h): for j in range(out_w): h_s = i*self.stride h_e = h_s+self.pool_size w_s = j*self.stride w_e = w_s+self.pool_size region = input[:, :, h_s:h_e, w_s:w_e] self.output[:, :, i, j] = np.max(region, axis=(2, 3)) self.max_indices[:, :, i, j] = np.argmax(region.reshape(batch, channel, -1), axis=1) return self.output def backward(self, e, eta=0.01): input_error = np.zeros_like(self.input) out_height, out_width = self.output.shape[2], self.output.shape[3] for i in range(out_height): for j in range(out_width): h_s = i * self.stride w_s = j * self.stride region = self.input[:, :, h_s:h_s + self.pool_size, w_s:w_s + self.pool_size] region_reshaped = region.reshape(region.shape[0], region.shape[1], -1) for k in range(region_reshaped.shape[0]): for l in range(region_reshaped.shape[1]): max_index = self.max_indices[k, l, i, j] input_error[k, l, h_s:h_s + self.pool_size, w_s:w_s + self.pool_size].reshape(-1)[max_index] = e[k, l, i, j] return input_error class Dense(Layer): def __init__(self, input_size, output_size, activation=None): super().__init__() self.weights = np.random.randn(input_size, output_size) * 0.01 self.biases = np.zeros((1, output_size)) self.activation = activation @staticmethod def sigmoid(x): return 1/(1 + np.exp(-x)) @staticmethod def relu(x): return np.maximum(0, x) @staticmethod def sigmoid_dev(x): return x*(1-x) @staticmethod def relu_dev(x): return np.where(x>1, 1, 0) def forward(self, x_train): self.input = x_train z = np.dot(x_train, self.weights) + self.biases if self.activation == 'sigmoid': self.output = self.sigmoid(z) elif self.activation == 'relu': self.output = self.relu(z) return self.output def backward(self, e, eta=0.01): activation_derivative = 0 if self.activation == 'relu': activation_derivative = self.relu_dev(self.output) else: activation_derivative = self.sigmoid_dev(self.output) input_error = np.dot(e * activation_derivative, self.weights.T) weights_error = np.dot(self.input.T, e * activation_derivative) # Update parameters self.weights -= eta * weights_error self.biases -= eta * np.sum(e * activation_derivative, axis=0, keepdims=True) return input_error class Flatten(Layer): def __init__(self): super().__init__() def forward(self, x_train): self.input = x_train flatten = x_train.reshape((x_train.shape[0], -1)) return flatten def backward(self, e, eta=0.01): return e.reshape(self.input.shape) class CNN: def __init__(self): self.layers = [] def append(self, layer): self.layers.append(layer) def predict(self, input): for layer in self.layers: input = layer.forward(input) return input def fit(self, x_train, y_train, batch_size=36, epochs=10): num = x_train.shape[0] for i in range(epochs): indices = np.arange(num) np.random.shuffle(indices) x = x_train[indices] y = y_train[indices] for i in range(0, num, batch_size): x_batch = x[i:i+batch_size] y_batch = y[i:i+batch_size] output = self.predict(x_batch) error = output - y_batch for layer in reversed(self.layers): error = layer.backward(error, 0.01)