Agents-Course-Assignment / my_train_chess_pieces_recognition.py
krzsam's picture
commit
1154bfc
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import os
import numpy as np
from PIL import Image
# https://en.wikipedia.org/wiki/Convolution
# https://github.com/vdumoulin/conv_arithmetic/blob/master/README.md
# https://en.wikipedia.org/wiki/Forsyth%E2%80%93Edwards_Notation
# piece types
# - white Rook R
# - white Knights N
# - white Bishop B
# - white Queen Q
# - white King K
# - white Pawn P
# - black Rook r
# - black Knights n
# - black Bishop b
# - black Queen q
# - black King k
# - black Pawn p
# - empty
TRAIN_DIR = "/mnt/c/Users/krzsa/IdeaProjects/Agents-Course-Assignment/train-data"
TRAIN_DIR_BLACK = f"{TRAIN_DIR}/black"
TRAIN_DIR_WHITE = f"{TRAIN_DIR}/white"
TRAIN_DIR_EMPTY = f"{TRAIN_DIR}/empty"
#
# 0: 1
# 1: K
# 2: Q
# 3: R
# 4: B
# 5: N
# 6: P
# 7: k
# 8: q
# 9: r
# 10: b
# 11: n
# 12: p
TRAIN_DATA = [
(f"{TRAIN_DIR_EMPTY}/1_001.png", "1"),
(f"{TRAIN_DIR_EMPTY}/1_002.png", "1"),
(f"{TRAIN_DIR_BLACK}/b_001.png", "b"),
(f"{TRAIN_DIR_BLACK}/b_002.png", "b"),
(f"{TRAIN_DIR_BLACK}/k_001.png", "k"),
(f"{TRAIN_DIR_BLACK}/k_002.png", "k"),
(f"{TRAIN_DIR_BLACK}/n_001.png", "n"),
(f"{TRAIN_DIR_BLACK}/n_002.png", "n"),
(f"{TRAIN_DIR_BLACK}/p_001.png", "p"),
(f"{TRAIN_DIR_BLACK}/p_002.png", "p"),
(f"{TRAIN_DIR_BLACK}/q_001.png", "q"),
(f"{TRAIN_DIR_BLACK}/q_002.png", "q"),
(f"{TRAIN_DIR_BLACK}/r_001.png", "r"),
(f"{TRAIN_DIR_BLACK}/r_002.png", "r"),
(f"{TRAIN_DIR_WHITE}/B_001.png", "B"),
(f"{TRAIN_DIR_WHITE}/B_002.png", "B"),
(f"{TRAIN_DIR_WHITE}/K_001.png", "K"),
(f"{TRAIN_DIR_WHITE}/K_002.png", "K"),
(f"{TRAIN_DIR_WHITE}/N_001.png", "N"),
(f"{TRAIN_DIR_WHITE}/N_002.png", "N"),
(f"{TRAIN_DIR_WHITE}/P_001.png", "P"),
(f"{TRAIN_DIR_WHITE}/P_002.png", "P"),
(f"{TRAIN_DIR_WHITE}/Q_001.png", "Q"),
(f"{TRAIN_DIR_WHITE}/Q_002.png", "Q"),
(f"{TRAIN_DIR_WHITE}/R_001.png", "R"),
(f"{TRAIN_DIR_WHITE}/R_002.png", "R"),
]
TEST_DATA = TRAIN_DATA
# https://docs.pytorch.org/docs/stable/nn.html
# https://docs.pytorch.org/docs/stable/optim.html
# https://docs.pytorch.org/docs/stable/generated/torch.nn.Module.html#torch.nn.Module
class CNNModel(nn.Module):
def __init__(self, _model_name, _model_dir):
super(CNNModel, self).__init__()
self.name = _model_name
self.model_dir = _model_dir
print("***KS*** Model: Creating layers")
# First Convolutional Layer: 32 features, 5x5 kernel
# https://docs.pytorch.org/docs/stable/generated/torch.nn.Conv2d.html#torch.nn.Conv2d
# https://github.com/vdumoulin/conv_arithmetic/blob/master/README.md
self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=5, padding=2)
# Second Convolutional Layer: 64 features, 5x5 kernel
self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=5, padding=2)
# Fully connected layer
# https://docs.pytorch.org/docs/stable/generated/torch.nn.Linear.html#torch.nn.Linear
# 64 because last convolution had 64 channels
# 8 x 8 because 2 pool2d calculations will reduce 32 x 32 --> 16 x 16 --> 8 x 8
self.fc1 = nn.Linear(8 * 8 * 64, 1024)
self.dropout = nn.Dropout(p=0.5) # Changed from 0.3 to 0.5
# Output layer
self.fc2 = nn.Linear(1024, 13)
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Initialize weights and biases
self._initialize_weights()
def _initialize_weights(self):
# Load the pre-trained model
model_name = os.path.join(self.model_dir, self.name)
print(f"***KS*** Checking pre-trained model: '{model_name}'")
if os.path.exists(model_name):
print(f"***KS*** Model '{model_name}' exists, loading weights ...")
self.load_state_dict(torch.load(model_name, map_location=self.device))
print("*** KS *** Model loaded.")
else:
print(f"*** KS *** Model file '{model_name}' not found. Initializing weights with random values")
# Initialize weights with truncated normal (approximate with normal and clamp)
nn.init.trunc_normal_(self.conv1.weight, std=0.1)
nn.init.constant_(self.conv1.bias, 0.1)
nn.init.trunc_normal_(self.conv2.weight, std=0.1)
nn.init.constant_(self.conv2.bias, 0.1)
nn.init.trunc_normal_(self.fc1.weight, std=0.1)
nn.init.constant_(self.fc1.bias, 0.1)
nn.init.trunc_normal_(self.fc2.weight, std=0.1)
nn.init.constant_(self.fc2.bias, 0.1)
self.to(self.device)
def save_weights(self):
print(f"***KS*** Saving model ...")
# Save the model checkpoint
os.makedirs('saved_models', exist_ok=True)
model_save_path = f"../saved_models/{self.name}.pth"
torch.save(self.state_dict(), model_save_path)
print(f'*** KS *** Model saved in file: {model_save_path}')
# Define the computation performed at every call.
# Should be overridden by all subclasses.
def forward(self, x):
print("***KS*** Model: Executing forward calculations")
# Apply first convolutional layer + ReLU activation
print(f"***KS*** [0] {x.shape}")
# [26, 1, 32, 32]
x = self.conv1(x)
print(f"***KS*** [1] {x.shape}")
# [26, 32, 32, 32] 26 - number of images, first 32 number of convolutions
# --> 32 channels
# --> each channel is [x,x] size
# https://docs.pytorch.org/docs/stable/generated/torch.nn.ReLU.html#torch.nn.ReLU
x = F.relu(x)
print(f"***KS*** [2] {x.shape}")
# [26, 32, 32, 32]
# https://docs.pytorch.org/docs/stable/generated/torch.nn.MaxPool2d.html#torch.nn.MaxPool2d
x = F.max_pool2d(x, kernel_size=2, stride=2) # First pooling
print(f"***KS*** [3] {x.shape}")
# [26, 32, 16, 16]
# Apply second convolutional layer + ReLU activation
x = F.relu(self.conv2(x))
print(f"***KS*** [4] {x.shape}")
# [26, 64, 16, 16]
x = F.max_pool2d(x, kernel_size=2, stride=2) # Second pooling
# --> 32 channels
# --> each channel is [x/2 , x/2]
print(f"***KS*** [5] {x.shape}")
# [26, 64, 8, 8]
# Flatten the tensor
# https://docs.pytorch.org/docs/stable/tensor_view.html
# https://docs.pytorch.org/docs/stable/generated/torch.Tensor.view.html#torch.Tensor.view
x = x.view(-1, 8 * 8 * 64)
print(f"***KS*** [6] {x.shape}")
# [26, 4096]
# --> first dimension inferred from existing dimensions and from the second dimension below
# --> second dimensions 8*8*64 = 4096
# Fully connected layer + ReLU activation
x = self.fc1(x)
print(f"***KS*** [7] {x.shape}")
# [26, 1024]
# input [?, 4096]
# output [?, 1024]
x = F.relu(x)
print(f"***KS*** [8] {x.shape}")
# [26, 1024]
# Apply dropout
x = self.dropout(x)
print(f"***KS*** [9] {x.shape}")
# [26, 1024]
# Output layer (no activation, as CrossEntropyLoss applies Softmax internally)
x = self.fc2(x)
print(f"***KS*** [10] {x.shape}")
# [26, 13]
# input [?, 1024]
# output [?, 13]
return x
def get_device(self):
return self.device
# Dataset class for PyTorch
# https://docs.pytorch.org/docs/stable/data.html#torch.utils.data.Dataset
class ChessDataset(Dataset):
CHESS_PIECES = '1KQRBNPkqrbnp'
def __init__(self, image_train_date):
#self.image_filepaths = image_filepaths
self.num_images = len(image_train_date)
# Each tile is a 32x32 grayscale image
self.images = np.zeros([self.num_images, 32, 32], dtype=np.uint8)
self.labels = np.zeros([self.num_images], dtype=np.int64) # Store labels as integers
for i, image_file_path_and_label in enumerate(image_train_date):
# Load Image
with Image.open(image_file_path_and_label[0]) as img:
img = img.convert('L') # Ensure image is in grayscale
self.images[i, :, :] = np.array(img, dtype=np.uint8)
self.labels[i] = self.__get_piece_index_from_label__(image_file_path_and_label[1])
print("***KS*** Done loading training data")
def __get_piece_index_from_label__(self, label) -> int:
return self.CHESS_PIECES.find(label)
def get_piece_label(self, idx) -> str:
return self.CHESS_PIECES[idx]
def __len__(self):
return self.num_images
# required to be implemented
# returns an item for given key
def __getitem__(self, idx):
image = self.images[idx].astype('float32') / 255.0 # Normalize
image = np.expand_dims(image, axis=0) # Add channel dimension
label = self.labels[idx]
return torch.tensor(image, dtype=torch.float32), label
class ChessImagesDataset(Dataset):
CHESS_PIECES = '1KQRBNPkqrbnp'
def __init__(self, images):
self.num_images = len(images)
self.images = images
def __len__(self):
return self.num_images
def get_piece_label(self, idx) -> str:
return self.CHESS_PIECES[idx]
# required to be implemented
# returns an item for given key
def __getitem__(self, idx):
image = self.images[idx].astype('float32') / 255.0 # Normalize
image = np.expand_dims(image, axis=0) # Add channel dimension
label = "" # not needed
return torch.tensor(image, dtype=torch.float32), label
class ChessPiecesRecognition:
def __init__(self, _model_name, _model_dir):
print(f"***KS*** Chess pieces recognition initializing ...")
self.model = CNNModel(_model_name, _model_dir)
self.__load_train_data__()
def __load_train_data__(self):
print(f"*** KS *** loading training data")
# Load training dataset
# Data loader combines a dataset and a sampler, and provides an iterable over the given dataset.
print(f"Loading {len(TRAIN_DATA)} Training tiles", end='')
train_dataset = ChessDataset(TRAIN_DATA)
# Load testing dataset
print(f"\n*** KS *** Loading {len(TEST_DATA)} Testing tiles", end='')
test_dataset = ChessDataset(TEST_DATA)
print()
batch_size = 64 # @param {type:"number"}
# https://docs.pytorch.org/docs/stable/data.html#torch.utils.data.DataLoader
self.train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
self.test_loader = DataLoader(test_dataset, batch_size=batch_size)
def train(self):
print(f"***KS*** Training chess pieces recognition")
# Define loss function and optimizer
# https://docs.pytorch.org/docs/stable/generated/torch.nn.CrossEntropyLoss.html#torch.nn.CrossEntropyLoss
criterion = nn.CrossEntropyLoss() # For multi-class classification
# https://docs.pytorch.org/docs/stable/generated/torch.nn.Module.html#torch.nn.Module.parameters
# https://docs.pytorch.org/docs/stable/optim.html
# https://docs.pytorch.org/docs/stable/generated/torch.optim.Adam.html#torch.optim.Adam
optimizer = optim.Adam(self.model.parameters(), lr=1e-4)
# Move model to GPU if available
# Set training parameters
do_training = True # Set to True to train the model
epochs = 100 # @param {type:"number"}
if do_training:
# Training loop
self.model.train()
print(f"*** KS *** Starting training for {epochs} epochs...")
for epoch in range(epochs):
running_loss = 0.0
print(f"***KS*** Epoch: {epoch}")
for i, (inputs, labels) in enumerate(self.train_loader):
# Move inputs and labels to device
inputs = inputs.to(self.model.get_device())
labels = labels.to(self.model.get_device())
# Zero the parameter gradients
optimizer.zero_grad()
# Forward pass
outputs = self.model(inputs)
loss = criterion(outputs, labels)
# Backward pass and optimize
loss.backward()
optimizer.step()
# Print statistics
running_loss += loss.item()
if (i + 1) % 10 == 0: # Print every 10 batches
print(f'*** KS *** Epoch [{epoch +1}/{epochs}], Step [{i +1}/{len(self.train_loader)}], '
f'Loss: {running_loss / 10:.4f}')
running_loss = 0.0
print('Finished Training')
self.model.save_weights()
def eval(self):
# Evaluate the model on the testing dataset
self.model.eval() # Set model to evaluation mode
correct = 0
total = 0
with torch.no_grad():
for inputs, labels in self.test_loader:
# Move inputs and labels to device
inputs = inputs.to(self.model.get_device())
labels = labels.to(self.model.get_device())
outputs = self.model(inputs)
print(f"***KS*** Got model outputs: \nshape: {outputs.shape}\n{outputs}")
labels_detected = np.argmax(outputs.cpu(), axis=1)
print(f"***KS*** Got labels idx detected: \nshape: {labels_detected.shape}\n{labels_detected}")
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
test_accuracy = correct / total
print(f'Accuracy on test set: {test_accuracy * 100:.2f}%\n')
def classify_pieces(self, images):
dataset = ChessImagesDataset(images)
loader = DataLoader(dataset, batch_size=64)
# Evaluate the model on the testing dataset
labels_str = ""
self.model.eval() # Set model to evaluation mode
with torch.no_grad():
for inputs, labels in loader:
# Move inputs and labels to device
inputs = inputs.to(self.model.get_device())
outputs = self.model(inputs)
print(f"***KS*** Got model outputs: \nshape: {outputs.shape}")
labels_detected = np.argmax(outputs.cpu(), axis=1)
print(f"***KS*** Got labels idx detected: \nshape: {labels_detected.shape}\n{labels_detected}")
labels = [dataset.get_piece_label(ix) for ix in labels_detected]
labels_str = ''.join(labels)
return labels_str
#t = ChessPiecesRecognition()
#t.train()
#t.eval()