import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torch.utils.data import Dataset, DataLoader import os import numpy as np from PIL import Image # https://en.wikipedia.org/wiki/Convolution # https://github.com/vdumoulin/conv_arithmetic/blob/master/README.md # https://en.wikipedia.org/wiki/Forsyth%E2%80%93Edwards_Notation # piece types # - white Rook R # - white Knights N # - white Bishop B # - white Queen Q # - white King K # - white Pawn P # - black Rook r # - black Knights n # - black Bishop b # - black Queen q # - black King k # - black Pawn p # - empty TRAIN_DIR = "/mnt/c/Users/krzsa/IdeaProjects/Agents-Course-Assignment/train-data" TRAIN_DIR_BLACK = f"{TRAIN_DIR}/black" TRAIN_DIR_WHITE = f"{TRAIN_DIR}/white" TRAIN_DIR_EMPTY = f"{TRAIN_DIR}/empty" # # 0: 1 # 1: K # 2: Q # 3: R # 4: B # 5: N # 6: P # 7: k # 8: q # 9: r # 10: b # 11: n # 12: p TRAIN_DATA = [ (f"{TRAIN_DIR_EMPTY}/1_001.png", "1"), (f"{TRAIN_DIR_EMPTY}/1_002.png", "1"), (f"{TRAIN_DIR_BLACK}/b_001.png", "b"), (f"{TRAIN_DIR_BLACK}/b_002.png", "b"), (f"{TRAIN_DIR_BLACK}/k_001.png", "k"), (f"{TRAIN_DIR_BLACK}/k_002.png", "k"), (f"{TRAIN_DIR_BLACK}/n_001.png", "n"), (f"{TRAIN_DIR_BLACK}/n_002.png", "n"), (f"{TRAIN_DIR_BLACK}/p_001.png", "p"), (f"{TRAIN_DIR_BLACK}/p_002.png", "p"), (f"{TRAIN_DIR_BLACK}/q_001.png", "q"), (f"{TRAIN_DIR_BLACK}/q_002.png", "q"), (f"{TRAIN_DIR_BLACK}/r_001.png", "r"), (f"{TRAIN_DIR_BLACK}/r_002.png", "r"), (f"{TRAIN_DIR_WHITE}/B_001.png", "B"), (f"{TRAIN_DIR_WHITE}/B_002.png", "B"), (f"{TRAIN_DIR_WHITE}/K_001.png", "K"), (f"{TRAIN_DIR_WHITE}/K_002.png", "K"), (f"{TRAIN_DIR_WHITE}/N_001.png", "N"), (f"{TRAIN_DIR_WHITE}/N_002.png", "N"), (f"{TRAIN_DIR_WHITE}/P_001.png", "P"), (f"{TRAIN_DIR_WHITE}/P_002.png", "P"), (f"{TRAIN_DIR_WHITE}/Q_001.png", "Q"), (f"{TRAIN_DIR_WHITE}/Q_002.png", "Q"), (f"{TRAIN_DIR_WHITE}/R_001.png", "R"), (f"{TRAIN_DIR_WHITE}/R_002.png", "R"), ] TEST_DATA = TRAIN_DATA # https://docs.pytorch.org/docs/stable/nn.html # https://docs.pytorch.org/docs/stable/optim.html # https://docs.pytorch.org/docs/stable/generated/torch.nn.Module.html#torch.nn.Module class CNNModel(nn.Module): def __init__(self, _model_name, _model_dir): super(CNNModel, self).__init__() self.name = _model_name self.model_dir = _model_dir print("***KS*** Model: Creating layers") # First Convolutional Layer: 32 features, 5x5 kernel # https://docs.pytorch.org/docs/stable/generated/torch.nn.Conv2d.html#torch.nn.Conv2d # https://github.com/vdumoulin/conv_arithmetic/blob/master/README.md self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=5, padding=2) # Second Convolutional Layer: 64 features, 5x5 kernel self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=5, padding=2) # Fully connected layer # https://docs.pytorch.org/docs/stable/generated/torch.nn.Linear.html#torch.nn.Linear # 64 because last convolution had 64 channels # 8 x 8 because 2 pool2d calculations will reduce 32 x 32 --> 16 x 16 --> 8 x 8 self.fc1 = nn.Linear(8 * 8 * 64, 1024) self.dropout = nn.Dropout(p=0.5) # Changed from 0.3 to 0.5 # Output layer self.fc2 = nn.Linear(1024, 13) self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Initialize weights and biases self._initialize_weights() def _initialize_weights(self): # Load the pre-trained model model_name = os.path.join(self.model_dir, self.name) print(f"***KS*** Checking pre-trained model: '{model_name}'") if os.path.exists(model_name): print(f"***KS*** Model '{model_name}' exists, loading weights ...") self.load_state_dict(torch.load(model_name, map_location=self.device)) print("*** KS *** Model loaded.") else: print(f"*** KS *** Model file '{model_name}' not found. Initializing weights with random values") # Initialize weights with truncated normal (approximate with normal and clamp) nn.init.trunc_normal_(self.conv1.weight, std=0.1) nn.init.constant_(self.conv1.bias, 0.1) nn.init.trunc_normal_(self.conv2.weight, std=0.1) nn.init.constant_(self.conv2.bias, 0.1) nn.init.trunc_normal_(self.fc1.weight, std=0.1) nn.init.constant_(self.fc1.bias, 0.1) nn.init.trunc_normal_(self.fc2.weight, std=0.1) nn.init.constant_(self.fc2.bias, 0.1) self.to(self.device) def save_weights(self): print(f"***KS*** Saving model ...") # Save the model checkpoint os.makedirs('saved_models', exist_ok=True) model_save_path = f"../saved_models/{self.name}.pth" torch.save(self.state_dict(), model_save_path) print(f'*** KS *** Model saved in file: {model_save_path}') # Define the computation performed at every call. # Should be overridden by all subclasses. def forward(self, x): print("***KS*** Model: Executing forward calculations") # Apply first convolutional layer + ReLU activation print(f"***KS*** [0] {x.shape}") # [26, 1, 32, 32] x = self.conv1(x) print(f"***KS*** [1] {x.shape}") # [26, 32, 32, 32] 26 - number of images, first 32 number of convolutions # --> 32 channels # --> each channel is [x,x] size # https://docs.pytorch.org/docs/stable/generated/torch.nn.ReLU.html#torch.nn.ReLU x = F.relu(x) print(f"***KS*** [2] {x.shape}") # [26, 32, 32, 32] # https://docs.pytorch.org/docs/stable/generated/torch.nn.MaxPool2d.html#torch.nn.MaxPool2d x = F.max_pool2d(x, kernel_size=2, stride=2) # First pooling print(f"***KS*** [3] {x.shape}") # [26, 32, 16, 16] # Apply second convolutional layer + ReLU activation x = F.relu(self.conv2(x)) print(f"***KS*** [4] {x.shape}") # [26, 64, 16, 16] x = F.max_pool2d(x, kernel_size=2, stride=2) # Second pooling # --> 32 channels # --> each channel is [x/2 , x/2] print(f"***KS*** [5] {x.shape}") # [26, 64, 8, 8] # Flatten the tensor # https://docs.pytorch.org/docs/stable/tensor_view.html # https://docs.pytorch.org/docs/stable/generated/torch.Tensor.view.html#torch.Tensor.view x = x.view(-1, 8 * 8 * 64) print(f"***KS*** [6] {x.shape}") # [26, 4096] # --> first dimension inferred from existing dimensions and from the second dimension below # --> second dimensions 8*8*64 = 4096 # Fully connected layer + ReLU activation x = self.fc1(x) print(f"***KS*** [7] {x.shape}") # [26, 1024] # input [?, 4096] # output [?, 1024] x = F.relu(x) print(f"***KS*** [8] {x.shape}") # [26, 1024] # Apply dropout x = self.dropout(x) print(f"***KS*** [9] {x.shape}") # [26, 1024] # Output layer (no activation, as CrossEntropyLoss applies Softmax internally) x = self.fc2(x) print(f"***KS*** [10] {x.shape}") # [26, 13] # input [?, 1024] # output [?, 13] return x def get_device(self): return self.device # Dataset class for PyTorch # https://docs.pytorch.org/docs/stable/data.html#torch.utils.data.Dataset class ChessDataset(Dataset): CHESS_PIECES = '1KQRBNPkqrbnp' def __init__(self, image_train_date): #self.image_filepaths = image_filepaths self.num_images = len(image_train_date) # Each tile is a 32x32 grayscale image self.images = np.zeros([self.num_images, 32, 32], dtype=np.uint8) self.labels = np.zeros([self.num_images], dtype=np.int64) # Store labels as integers for i, image_file_path_and_label in enumerate(image_train_date): # Load Image with Image.open(image_file_path_and_label[0]) as img: img = img.convert('L') # Ensure image is in grayscale self.images[i, :, :] = np.array(img, dtype=np.uint8) self.labels[i] = self.__get_piece_index_from_label__(image_file_path_and_label[1]) print("***KS*** Done loading training data") def __get_piece_index_from_label__(self, label) -> int: return self.CHESS_PIECES.find(label) def get_piece_label(self, idx) -> str: return self.CHESS_PIECES[idx] def __len__(self): return self.num_images # required to be implemented # returns an item for given key def __getitem__(self, idx): image = self.images[idx].astype('float32') / 255.0 # Normalize image = np.expand_dims(image, axis=0) # Add channel dimension label = self.labels[idx] return torch.tensor(image, dtype=torch.float32), label class ChessImagesDataset(Dataset): CHESS_PIECES = '1KQRBNPkqrbnp' def __init__(self, images): self.num_images = len(images) self.images = images def __len__(self): return self.num_images def get_piece_label(self, idx) -> str: return self.CHESS_PIECES[idx] # required to be implemented # returns an item for given key def __getitem__(self, idx): image = self.images[idx].astype('float32') / 255.0 # Normalize image = np.expand_dims(image, axis=0) # Add channel dimension label = "" # not needed return torch.tensor(image, dtype=torch.float32), label class ChessPiecesRecognition: def __init__(self, _model_name, _model_dir): print(f"***KS*** Chess pieces recognition initializing ...") self.model = CNNModel(_model_name, _model_dir) self.__load_train_data__() def __load_train_data__(self): print(f"*** KS *** loading training data") # Load training dataset # Data loader combines a dataset and a sampler, and provides an iterable over the given dataset. print(f"Loading {len(TRAIN_DATA)} Training tiles", end='') train_dataset = ChessDataset(TRAIN_DATA) # Load testing dataset print(f"\n*** KS *** Loading {len(TEST_DATA)} Testing tiles", end='') test_dataset = ChessDataset(TEST_DATA) print() batch_size = 64 # @param {type:"number"} # https://docs.pytorch.org/docs/stable/data.html#torch.utils.data.DataLoader self.train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) self.test_loader = DataLoader(test_dataset, batch_size=batch_size) def train(self): print(f"***KS*** Training chess pieces recognition") # Define loss function and optimizer # https://docs.pytorch.org/docs/stable/generated/torch.nn.CrossEntropyLoss.html#torch.nn.CrossEntropyLoss criterion = nn.CrossEntropyLoss() # For multi-class classification # https://docs.pytorch.org/docs/stable/generated/torch.nn.Module.html#torch.nn.Module.parameters # https://docs.pytorch.org/docs/stable/optim.html # https://docs.pytorch.org/docs/stable/generated/torch.optim.Adam.html#torch.optim.Adam optimizer = optim.Adam(self.model.parameters(), lr=1e-4) # Move model to GPU if available # Set training parameters do_training = True # Set to True to train the model epochs = 100 # @param {type:"number"} if do_training: # Training loop self.model.train() print(f"*** KS *** Starting training for {epochs} epochs...") for epoch in range(epochs): running_loss = 0.0 print(f"***KS*** Epoch: {epoch}") for i, (inputs, labels) in enumerate(self.train_loader): # Move inputs and labels to device inputs = inputs.to(self.model.get_device()) labels = labels.to(self.model.get_device()) # Zero the parameter gradients optimizer.zero_grad() # Forward pass outputs = self.model(inputs) loss = criterion(outputs, labels) # Backward pass and optimize loss.backward() optimizer.step() # Print statistics running_loss += loss.item() if (i + 1) % 10 == 0: # Print every 10 batches print(f'*** KS *** Epoch [{epoch +1}/{epochs}], Step [{i +1}/{len(self.train_loader)}], ' f'Loss: {running_loss / 10:.4f}') running_loss = 0.0 print('Finished Training') self.model.save_weights() def eval(self): # Evaluate the model on the testing dataset self.model.eval() # Set model to evaluation mode correct = 0 total = 0 with torch.no_grad(): for inputs, labels in self.test_loader: # Move inputs and labels to device inputs = inputs.to(self.model.get_device()) labels = labels.to(self.model.get_device()) outputs = self.model(inputs) print(f"***KS*** Got model outputs: \nshape: {outputs.shape}\n{outputs}") labels_detected = np.argmax(outputs.cpu(), axis=1) print(f"***KS*** Got labels idx detected: \nshape: {labels_detected.shape}\n{labels_detected}") _, predicted = torch.max(outputs.data, 1) total += labels.size(0) correct += (predicted == labels).sum().item() test_accuracy = correct / total print(f'Accuracy on test set: {test_accuracy * 100:.2f}%\n') def classify_pieces(self, images): dataset = ChessImagesDataset(images) loader = DataLoader(dataset, batch_size=64) # Evaluate the model on the testing dataset labels_str = "" self.model.eval() # Set model to evaluation mode with torch.no_grad(): for inputs, labels in loader: # Move inputs and labels to device inputs = inputs.to(self.model.get_device()) outputs = self.model(inputs) print(f"***KS*** Got model outputs: \nshape: {outputs.shape}") labels_detected = np.argmax(outputs.cpu(), axis=1) print(f"***KS*** Got labels idx detected: \nshape: {labels_detected.shape}\n{labels_detected}") labels = [dataset.get_piece_label(ix) for ix in labels_detected] labels_str = ''.join(labels) return labels_str #t = ChessPiecesRecognition() #t.train() #t.eval()