{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "pre-train with AUTSL dataset" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This is the code to train the model with the AUTSL dataset" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'\\nfrom torch.utils.data import DataLoader\\nimport torch\\nfrom torchinfo import summary\\nfrom feeder import FeederINCLUDE\\nfrom aagcn import Model\\nimport pytorch_lightning as pl\\nfrom pytorch_lightning.loggers import WandbLogger # Importing here\\nfrom pytorch_lightning.callbacks import ModelCheckpoint\\nimport wandb\\nfrom augumentation import Rotate, Compose\\nfrom torch.utils.data import random_split\\n\\n\\nif __name__ == \\'__main__\\':\\n\\n # Hyper parameter tuning : batch_size, learning_rate, weight_decay\\n config = {\\'batch_size\\': 150, \\'learning_rate\\': 0.0137296, \\'weight_decay\\': 0.000150403}\\n \\n # Load device\\n device = \"cuda\" if torch.cuda.is_available() else \"cpu\"\\n\\n # Initialize wandb\\n wandb.finish()\\n wandb.init(project=\"GCN_VSL\", config=config) \\n wandb_config = wandb.config # Access the config parameters\\n\\n try:\\n # Your training or evaluation code here\\n print(\"WandB initialized successfully.\")\\n\\n finally:\\n # Fnish the WandB run\\n wandb.finish()\\n\\n # Load model\\n model = Model(num_class=226, num_point=46, num_person=1, in_channels=2,\\n graph_args={\"layout\": \"mediapipe_two_hand\", \"strategy\": \"spatial\"},\\n learning_rate=wandb_config.learning_rate, weight_decay=wandb_config.weight_decay)\\n\\n # Callback PL\\n callbacks = [\\n ModelCheckpoint(\\n dirpath=\"checkpoints\",\\n monitor=\"valid_loss\",\\n mode=\"min\",\\n every_n_epochs=2,\\n filename=\\'{epoch}-{valid_loss:.2f}-{valid_accuracy:.2f}-autsl-aagcn\\'\\n ),\\n ]\\n\\n # Augmentation\\n transforms = Compose([\\n Rotate(15, 80, 25, (0.5, 0.5))\\n ])\\n\\n %cd /home/ibmelab/Documents/GG/VSLRecognition/AUTSL/AAGCN\\n # Dataset class\\n train_dataset = FeederINCLUDE(data_path=f\"autsl_train_data_preprocess.npy\", label_path=f\"train_label_preprocess.npy\",\\n transform=transforms)\\n test_dataset = FeederINCLUDE(data_path=f\"autsl_test_data_preprocess.npy\", label_path=f\"test_label_preprocess.npy\")\\n valid_dataset = FeederINCLUDE(data_path=f\"autsl_valid_data_preprocess.npy\", label_path=f\"valid_label_preprocess.npy\")\\n\\n # DataLoader\\n train_dataloader = DataLoader(train_dataset, batch_size=wandb_config.batch_size, shuffle=True)\\n test_dataloader = DataLoader(test_dataset, batch_size=wandb_config.batch_size, shuffle=False)\\n val_dataloader = DataLoader(valid_dataset, batch_size=wandb_config.batch_size, shuffle=False)\\n\\n # Wandb Logger\\n wandb_logger = WandbLogger(log_model=\\'all\\')\\n\\n %cd /media/ibmelab/ibme21/Test\\n # Trainer PL\\n trainer = pl.Trainer(max_epochs=120, accelerator=\"auto\", check_val_every_n_epoch=1,\\n devices=1, callbacks=callbacks, logger=wandb_logger) # Added logger here\\n\\n trainer.fit(model, train_dataloader, val_dataloader)\\n\\n # Optional: Uncomment this when you want to test\\n # trainer.test(model, test_dataloader, ckpt_path=\"checkpoints/your_checkpoint.ckpt\", verbose=True)\\n'" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "'''\n", "from torch.utils.data import DataLoader\n", "import torch\n", "from torchinfo import summary\n", "from feeder import FeederINCLUDE\n", "from aagcn import Model\n", "import pytorch_lightning as pl\n", "from pytorch_lightning.loggers import WandbLogger # Importing here\n", "from pytorch_lightning.callbacks import ModelCheckpoint\n", "import wandb\n", "from augumentation import Rotate, Compose\n", "from torch.utils.data import random_split\n", "\n", "\n", "if __name__ == '__main__':\n", "\n", " # Hyper parameter tuning : batch_size, learning_rate, weight_decay\n", " config = {'batch_size': 150, 'learning_rate': 0.0137296, 'weight_decay': 0.000150403}\n", " \n", " # Load device\n", " device = \"cuda\" if torch.cuda.is_available() else \"cpu\"\n", "\n", " # Initialize wandb\n", " wandb.finish()\n", " wandb.init(project=\"GCN_VSL\", config=config) \n", " wandb_config = wandb.config # Access the config parameters\n", "\n", " try:\n", " # Your training or evaluation code here\n", " print(\"WandB initialized successfully.\")\n", "\n", " finally:\n", " # Fnish the WandB run\n", " wandb.finish()\n", "\n", " # Load model\n", " model = Model(num_class=226, num_point=46, num_person=1, in_channels=2,\n", " graph_args={\"layout\": \"mediapipe_two_hand\", \"strategy\": \"spatial\"},\n", " learning_rate=wandb_config.learning_rate, weight_decay=wandb_config.weight_decay)\n", "\n", " # Callback PL\n", " callbacks = [\n", " ModelCheckpoint(\n", " dirpath=\"checkpoints\",\n", " monitor=\"valid_loss\",\n", " mode=\"min\",\n", " every_n_epochs=2,\n", " filename='{epoch}-{valid_loss:.2f}-{valid_accuracy:.2f}-autsl-aagcn'\n", " ),\n", " ]\n", "\n", " # Augmentation\n", " transforms = Compose([\n", " Rotate(15, 80, 25, (0.5, 0.5))\n", " ])\n", "\n", " %cd /home/ibmelab/Documents/GG/VSLRecognition/AUTSL/AAGCN\n", " # Dataset class\n", " train_dataset = FeederINCLUDE(data_path=f\"autsl_train_data_preprocess.npy\", label_path=f\"train_label_preprocess.npy\",\n", " transform=transforms)\n", " test_dataset = FeederINCLUDE(data_path=f\"autsl_test_data_preprocess.npy\", label_path=f\"test_label_preprocess.npy\")\n", " valid_dataset = FeederINCLUDE(data_path=f\"autsl_valid_data_preprocess.npy\", label_path=f\"valid_label_preprocess.npy\")\n", "\n", " # DataLoader\n", " train_dataloader = DataLoader(train_dataset, batch_size=wandb_config.batch_size, shuffle=True)\n", " test_dataloader = DataLoader(test_dataset, batch_size=wandb_config.batch_size, shuffle=False)\n", " val_dataloader = DataLoader(valid_dataset, batch_size=wandb_config.batch_size, shuffle=False)\n", "\n", " # Wandb Logger\n", " wandb_logger = WandbLogger(log_model='all')\n", "\n", " %cd /media/ibmelab/ibme21/Test\n", " # Trainer PL\n", " trainer = pl.Trainer(max_epochs=120, accelerator=\"auto\", check_val_every_n_epoch=1,\n", " devices=1, callbacks=callbacks, logger=wandb_logger) # Added logger here\n", "\n", " trainer.fit(model, train_dataloader, val_dataloader)\n", "\n", " # Optional: Uncomment this when you want to test\n", " # trainer.test(model, test_dataloader, ckpt_path=\"checkpoints/your_checkpoint.ckpt\", verbose=True)\n", "'''\n", " \n" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "import mediapipe as mp\n", "import cv2\n", "from collections import defaultdict\n", "from joblib import Parallel, delayed\n", "from tqdm import tqdm\n", "import ast\n", "import os\n", "import csv\n", "import re\n", "from sklearn.model_selection import KFold\n", "import numpy as np" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Load the videos to videos_list.csv (columns: file (path), label, gloss, video name, actor)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Video names have been written to videos_list.csv\n", "Minimum label: 20\n", "Labels have been updated and saved.\n" ] } ], "source": [ "folder_path = r'path_to_dataset_folder'\n", "csv_file_path = 'videos_list.csv'\n", "labels_file_path = '1_1000_label.csv'\n", "final_file_path = 'temp_videos_list.csv'\n", "\n", "label_to_gloss = {}\n", "with open(labels_file_path, mode='r', encoding='utf-8') as labels_file:\n", " csv_reader = csv.DictReader(labels_file)\n", " for row in csv_reader:\n", " label = int(row['id_label_in_documents'])\n", " gloss = row['name']\n", " label_to_gloss[label] = gloss\n", "\n", "with open(csv_file_path, mode='w', newline='', encoding='utf-8') as csv_file:\n", " csv_writer = csv.writer(csv_file)\n", " csv_writer.writerow(['file', 'label', 'gloss', 'video_name', 'actor'])\n", "\n", " for filename in os.listdir(folder_path):\n", " if filename.lower().endswith(('.mp4', '.mkv', '.avi', '.mov', '.flv', '.wmv')):\n", " actor = filename.split('_')[0]\n", " \n", " match = re.search(r'_(\\d+)\\.', filename)\n", " if match:\n", " label = int(match.group(1))\n", " gloss = label_to_gloss.get(label, 'Unknown')\n", " else:\n", " label = 'N/A'\n", " gloss = 'Unknown'\n", "\n", " if label != 200:\n", " full_filename = os.path.join(folder_path, filename)\n", " csv_writer.writerow([full_filename, label, gloss, filename, actor]) \n", "\n", "print(f'Video names have been written to {csv_file_path}')\n", "\n", "# Find min label\n", "with open(csv_file_path, mode='r', newline='', encoding='utf-8') as csv_file:\n", " csv_reader = csv.DictReader(csv_file)\n", " labels = [int(row[\"label\"]) for row in csv_reader if row[\"label\"].isdigit()] \n", " min_label = min(labels) if labels else None\n", "\n", "print(\"Minimum label:\", min_label)\n", "\n", "# Normalize labels\n", "with open(csv_file_path, mode='r', newline='', encoding='utf-8') as csv_file, \\\n", " open(final_file_path, mode='w', newline='', encoding='utf-8') as final_file:\n", " \n", " csv_reader = csv.DictReader(csv_file)\n", " fieldnames = csv_reader.fieldnames\n", " \n", " csv_writer = csv.DictWriter(final_file, fieldnames=fieldnames)\n", " csv_writer.writeheader()\n", " \n", " for row in csv_reader:\n", " if row['label'].isdigit(): # Check if label is a digit before converting\n", " row['label'] = str(int(row['label']) - min_label) \n", " csv_writer.writerow(row)\n", "\n", "# Replace the original file with the updated file\n", "os.replace(final_file_path, csv_file_path)\n", "\n", "print(\"Labels have been updated and saved.\")\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Number of labels in the dataset" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "10\n", "{20, 21, 22, 23, 24, 25, 26, 27, 28, 29}\n" ] } ], "source": [ "num_labels = len(set(labels))\n", "print(num_labels)\n", "print(set(labels))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Extract keypoints" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "import pandas as pd\n", "import mediapipe as mp\n", "import cv2\n", "import os\n", "from collections import defaultdict\n", "from joblib import Parallel, delayed\n", "from tqdm import tqdm\n", "\n", "mp_holistic = mp.solutions.holistic\n", "mp_drawing = mp.solutions.drawing_utils\n", "\n", "hand_landmarks = ['INDEX_FINGER_DIP', 'INDEX_FINGER_MCP', 'INDEX_FINGER_PIP', 'INDEX_FINGER_TIP', \n", " 'MIDDLE_FINGER_DIP', 'MIDDLE_FINGER_MCP', 'MIDDLE_FINGER_PIP', 'MIDDLE_FINGER_TIP', \n", " 'PINKY_DIP', 'PINKY_MCP', 'PINKY_PIP', 'PINKY_TIP', 'RING_FINGER_DIP', 'RING_FINGER_MCP', \n", " 'RING_FINGER_PIP', 'RING_FINGER_TIP', 'THUMB_CMC', 'THUMB_IP', 'THUMB_MCP', 'THUMB_TIP', 'WRIST']\n", "pose_landmarks = ['LEFT_ANKLE', 'LEFT_EAR', 'LEFT_ELBOW', 'LEFT_EYE', 'LEFT_EYE_INNER', 'LEFT_EYE_OUTER', \n", " 'LEFT_FOOT_INDEX', 'LEFT_HEEL', 'LEFT_HIP', 'LEFT_INDEX', 'LEFT_KNEE', 'LEFT_PINKY', \n", " 'LEFT_SHOULDER', 'LEFT_THUMB', 'LEFT_WRIST', 'MOUTH_LEFT', 'MOUTH_RIGHT', 'NOSE', \n", " 'RIGHT_ANKLE', 'RIGHT_EAR', 'RIGHT_ELBOW', 'RIGHT_EYE', 'RIGHT_EYE_INNER', 'RIGHT_EYE_OUTER', \n", " 'RIGHT_FOOT_INDEX', 'RIGHT_HEEL', 'RIGHT_HIP', 'RIGHT_INDEX', 'RIGHT_KNEE', 'RIGHT_PINKY', \n", " 'RIGHT_SHOULDER', 'RIGHT_THUMB', 'RIGHT_WRIST']\n", "\n", "def extract_keypoint(video_path, label, actor):\n", " cap = cv2.VideoCapture(video_path)\n", " \n", " keypoint_dict = defaultdict(list)\n", " count = 0\n", "\n", " with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:\n", " while True:\n", " ret, frame = cap.read()\n", " if not ret:\n", " break\n", " \n", " count += 1\n", " image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)\n", " results = holistic.process(image)\n", "\n", " if results.right_hand_landmarks:\n", " for idx, landmark in enumerate(results.right_hand_landmarks.landmark): \n", " keypoint_dict[f\"{hand_landmarks[idx]}_right_x\"].append(landmark.x)\n", " keypoint_dict[f\"{hand_landmarks[idx]}_right_y\"].append(landmark.y)\n", " keypoint_dict[f\"{hand_landmarks[idx]}_right_z\"].append(landmark.z)\n", " else:\n", " for idx in range(len(hand_landmarks)):\n", " keypoint_dict[f\"{hand_landmarks[idx]}_right_x\"].append(0)\n", " keypoint_dict[f\"{hand_landmarks[idx]}_right_y\"].append(0)\n", " keypoint_dict[f\"{hand_landmarks[idx]}_right_z\"].append(0)\n", "\n", " if results.left_hand_landmarks:\n", " for idx, landmark in enumerate(results.left_hand_landmarks.landmark): \n", " keypoint_dict[f\"{hand_landmarks[idx]}_left_x\"].append(landmark.x)\n", " keypoint_dict[f\"{hand_landmarks[idx]}_left_y\"].append(landmark.y)\n", " keypoint_dict[f\"{hand_landmarks[idx]}_left_z\"].append(landmark.z)\n", " else:\n", " for idx in range(len(hand_landmarks)):\n", " keypoint_dict[f\"{hand_landmarks[idx]}_left_x\"].append(0)\n", " keypoint_dict[f\"{hand_landmarks[idx]}_left_y\"].append(0)\n", " keypoint_dict[f\"{hand_landmarks[idx]}_left_z\"].append(0)\n", "\n", " if results.pose_landmarks:\n", " for idx, landmark in enumerate(results.pose_landmarks.landmark): \n", " keypoint_dict[f\"{pose_landmarks[idx]}_x\"].append(landmark.x)\n", " keypoint_dict[f\"{pose_landmarks[idx]}_y\"].append(landmark.y)\n", " keypoint_dict[f\"{pose_landmarks[idx]}_z\"].append(landmark.z)\n", " else:\n", " for idx in range(len(pose_landmarks)):\n", " keypoint_dict[f\"{pose_landmarks[idx]}_x\"].append(0)\n", " keypoint_dict[f\"{pose_landmarks[idx]}_y\"].append(0)\n", " keypoint_dict[f\"{pose_landmarks[idx]}_z\"].append(0)\n", "\n", " keypoint_dict[\"frame\"] = count\n", " keypoint_dict[\"video_path\"] = video_path\n", " keypoint_dict[\"label\"] = label\n", " keypoint_dict[\"actor\"] = actor\n", "\n", " return keypoint_dict\n", "\n", "def process_videos():\n", " csv_file = f\"videos_list.csv\"\n", " data = pd.read_csv(csv_file)\n", "\n", " keypoints_list = Parallel(n_jobs=-1)( \n", " delayed(extract_keypoint)(row['file'], row['label'], row['actor']) for index, row in tqdm(data.iterrows(), total=len(data), desc=\"Processing videos\", leave=False)\n", " )\n", "\n", " keypoints_df = pd.DataFrame(keypoints_list)\n", " keypoints_df.to_csv(f\"vsl{num_labels}_keypoints.csv\", index=False)\n", "\n", "if __name__ == '__main__':\n", " process_videos()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Interpolation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "100%|██████████| 280/280 [00:04<00:00, 67.80it/s]\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Interpolated keypoints saved to vsl10_interpolated_keypoints.csv\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "100%|██████████| 280/280 [00:03<00:00, 91.60it/s] " ] }, { "name": "stdout", "output_type": "stream", "text": [ "Data processing and saving completed.\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\n" ] } ], "source": [ "import pandas as pd\n", "import numpy as np\n", "import ast\n", "from tqdm import tqdm\n", "\n", "def find_index(array):\n", " for i, num in enumerate(array):\n", " if num != 0:\n", " return i\n", "\n", "def curl_skeleton(array):\n", " if sum(array) == 0:\n", " return array\n", " for i, location in enumerate(array):\n", " if location != 0:\n", " continue\n", " else:\n", " if i == 0 or i == len(array) - 1:\n", " continue\n", " else:\n", " if array[i + 1] != 0:\n", " array[i] = float((array[i - 1] + array[i + 1]) / 2)\n", " else:\n", " if sum(array[i:]) == 0:\n", " continue\n", " else:\n", " j = find_index(array[i + 1:])\n", " array[i] = float(((1 + j) * array[i - 1] + 1 * array[i + 1 + j]) / (2 + j))\n", " return array\n", "\n", "def interpolate_keypoints(input_file, output_file, body_identifiers):\n", " train_data = pd.read_csv(input_file)\n", " output_df = train_data.copy()\n", "\n", " for index, video in tqdm(train_data.iterrows(), total=train_data.shape[0]):\n", " for identifier in body_identifiers:\n", " # Interpolate the x and y keypoints\n", " x_values = curl_skeleton(ast.literal_eval(video[identifier + \"_x\"]))\n", " y_values = curl_skeleton(ast.literal_eval(video[identifier + \"_y\"]))\n", "\n", " output_df.at[index, identifier + \"_x\"] = str(x_values)\n", " output_df.at[index, identifier + \"_y\"] = str(y_values)\n", "\n", " output_df.to_csv(output_file, index=False)\n", " print(f\"Interpolated keypoints saved to {output_file}\")\n", "\n", "if __name__ == \"__main__\":\n", " input_file_path = f\"vsl{num_labels}_keypoints.csv\"\n", " output_file_path = f\"vsl{num_labels}_interpolated_keypoints.csv\"\n", "\n", " hand_landmarks = [\n", " 'INDEX_FINGER_DIP', 'INDEX_FINGER_MCP', 'INDEX_FINGER_PIP', 'INDEX_FINGER_TIP', \n", " 'MIDDLE_FINGER_DIP', 'MIDDLE_FINGER_MCP', 'MIDDLE_FINGER_PIP', 'MIDDLE_FINGER_TIP', \n", " 'PINKY_DIP', 'PINKY_MCP', 'PINKY_PIP', 'PINKY_TIP', \n", " 'RING_FINGER_DIP', 'RING_FINGER_MCP', 'RING_FINGER_PIP', 'RING_FINGER_TIP', \n", " 'THUMB_CMC', 'THUMB_IP', 'THUMB_MCP', 'THUMB_TIP', 'WRIST'\n", " ]\n", " HAND_IDENTIFIERS = [id + \"_right\" for id in hand_landmarks] + [id + \"_left\" for id in hand_landmarks]\n", " POSE_IDENTIFIERS = [\"RIGHT_SHOULDER\", \"LEFT_SHOULDER\", \"LEFT_ELBOW\", \"RIGHT_ELBOW\"]\n", " body_identifiers = HAND_IDENTIFIERS + POSE_IDENTIFIERS \n", "\n", " interpolate_keypoints(input_file_path, output_file_path, body_identifiers)\n", "\n", " # Load interpolated data and store them in numpy files\n", " train_data = pd.read_csv(output_file_path)\n", " frames = 80\n", "\n", " data = []\n", " labels = []\n", "\n", " for video_index, video in tqdm(train_data.iterrows(), total=train_data.shape[0]):\n", " T = len(ast.literal_eval(video[\"INDEX_FINGER_DIP_right_x\"]))\n", " current_row = np.empty(shape=(2, T, len(body_identifiers), 1))\n", "\n", " for index, identifier in enumerate(body_identifiers):\n", " data_keypoint_preprocess_x = ast.literal_eval(video[identifier + \"_x\"])\n", " current_row[0, :, index, :] = np.asarray(data_keypoint_preprocess_x).reshape(T, 1)\n", "\n", " data_keypoint_preprocess_y = ast.literal_eval(video[identifier + \"_y\"])\n", " current_row[1, :, index, :] = np.asarray(data_keypoint_preprocess_y).reshape(T, 1)\n", "\n", " if T < frames:\n", " target = np.zeros(shape=(2, frames, len(body_identifiers), 1))\n", " target[:, :T, :, :] = current_row\n", " else:\n", " target = current_row[:, :frames, :, :]\n", "\n", " data.append(target)\n", " labels.append(int(video[\"label\"]))\n", "\n", " keypoint_data = np.stack(data, axis=0)\n", " label_data = np.stack(labels, axis=0)\n", " np.save(f'vsl{num_labels}_data_preprocess.npy', keypoint_data)\n", " np.save(f'vsl{num_labels}_label_preprocess.npy', label_data)\n", " print(\"Data processing and saving completed.\")\n" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "(280, 2, 80, 46, 1)\n", "(280,)\n" ] } ], "source": [ "import numpy as np\n", "a = np.load(f'vsl{num_labels}_data_preprocess.npy')\n", "b = np.load(f'vsl{num_labels}_label_preprocess.npy')\n", "\n", "print(a.shape)\n", "print(b.shape)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Do K-Folds and store the keypoints in numpy files" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Number of actors: 28\n", "-----------------------------------------------------\n", "Fold 1: 30 test samples\n", "Fold 2: 29 test samples\n", "Fold 3: 30 test samples\n", "Fold 4: 30 test samples\n", "Fold 5: 30 test samples\n", "Fold 6: 30 test samples\n", "Fold 7: 31 test samples\n", "Fold 8: 30 test samples\n", "Fold 9: 20 test samples\n", "Fold 10: 20 test samples\n", "Processed and saved vsl10 fold 1 successfully.\n", "Processed and saved vsl10 fold 2 successfully.\n", "Processed and saved vsl10 fold 3 successfully.\n", "Processed and saved vsl10 fold 4 successfully.\n", "Processed and saved vsl10 fold 5 successfully.\n", "Processed and saved vsl10 fold 6 successfully.\n", "Processed and saved vsl10 fold 7 successfully.\n", "Processed and saved vsl10 fold 8 successfully.\n", "Processed and saved vsl10 fold 9 successfully.\n", "Processed and saved vsl10 fold 10 successfully.\n" ] } ], "source": [ "from sklearn.model_selection import KFold\n", "import os\n", "import numpy as np\n", "import pandas as pd\n", "from tqdm import tqdm\n", "\n", "def k_fold_cross_validation(train_data, keypoint_data, label_data, num_labels, k_folds, destination_folder=\"numpy_files\"):\n", " os.makedirs(destination_folder, exist_ok=True)\n", "\n", " actors = train_data['actor'].unique()\n", " print(f\"Number of actors: {len(actors)}\")\n", " print('-----------------------------------------------------')\n", "\n", " kf = KFold(n_splits=k_folds, shuffle=True, random_state=42)\n", "\n", " actor_to_indices = {actor: train_data.index[train_data['actor'] == actor].tolist() for actor in actors}\n", " folds = [[] for _ in range(k_folds)]\n", "\n", " for fold, (train_actors, test_actors) in enumerate(kf.split(actors)):\n", " train_actors = actors[train_actors]\n", " test_actors = actors[test_actors]\n", " \n", " for actor in test_actors:\n", " folds[fold].extend(actor_to_indices[actor])\n", "\n", " tqdm.write(f\"Fold {fold+1}: {len(folds[fold])} test samples\")\n", "\n", " # Iterate over each fold to create train-test splits\n", " for fold in range(k_folds):\n", " test_indices = folds[fold]\n", " train_indices = [idx for f in range(k_folds) if f != fold for idx in folds[f]]\n", "\n", " X_train, X_test = keypoint_data[train_indices], keypoint_data[test_indices]\n", " y_train = np.array(label_data[train_indices], dtype=np.int64)\n", " y_test = np.array(label_data[test_indices], dtype=np.int64)\n", "\n", " np.save(os.path.join(destination_folder, f'vsl{num_labels}_data_fold{fold+1}_train.npy'), X_train)\n", " np.save(os.path.join(destination_folder, f'vsl{num_labels}_label_fold{fold+1}_train.npy'), y_train)\n", " np.save(os.path.join(destination_folder, f'vsl{num_labels}_data_fold{fold+1}_test.npy'), X_test)\n", " np.save(os.path.join(destination_folder, f'vsl{num_labels}_label_fold{fold+1}_test.npy'), y_test)\n", "\n", " tqdm.write(f\"Processed and saved vsl{num_labels} fold {fold+1} successfully.\")\n", "\n", "if __name__ == \"__main__\":\n", " input_file_path = f\"vsl{num_labels}_interpolated_keypoints.csv\"\n", " train_data = pd.read_csv(input_file_path)\n", "\n", " keypoint_data = np.load(f'vsl{num_labels}_data_preprocess.npy')\n", " label_data = np.load(f'vsl{num_labels}_label_preprocess.npy')\n", "\n", " num_labels = len(np.unique(label_data))\n", "\n", " k_folds = 10\n", " k_fold_cross_validation(train_data, keypoint_data, label_data, num_labels, k_folds)\n" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "(29, 2, 80, 46, 1)\n", "(251, 2, 80, 46, 1)\n" ] } ], "source": [ "import numpy as np\n", "a = np.load(f'numpy_files/vsl{num_labels}_data_fold2_test.npy')\n", "b = np.load(f'numpy_files/vsl{num_labels}_data_fold2_train.npy')\n", "\n", "print(a.shape)\n", "print(b.shape)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "train directly with different folds" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "'''\n", "import os\n", "import numpy as np\n", "import torch\n", "from torch.utils.data import DataLoader\n", "import pytorch_lightning as pl\n", "from pytorch_lightning.callbacks import ModelCheckpoint\n", "from feeder import FeederINCLUDE\n", "from aagcn import Model\n", "from augumentation import Rotate, Compose\n", "\n", "os.environ[\"CUDA_VISIBLE_DEVICES\"] = \"1\"\n", "\n", "if __name__ == '__main__':\n", " k_folds = 10\n", " config = {'batch_size': 128, 'learning_rate': 0.0137296, 'weight_decay': 0.000150403}\n", " \n", " device = \"cuda\" if torch.cuda.is_available() else \"cpu\"\n", "\n", " best_accuracy = 0.0\n", " best_fold = -1\n", "\n", " for fold in range(k_folds):\n", " print(f\"Starting fold {fold + 1}/{k_folds}\")\n", " train_data_path = os.path.join(\"numpy_files\", f'vsl{num_labels}_data_fold{fold+1}_train.npy')\n", " train_label_path = os.path.join(\"numpy_files\", f'vsl{num_labels}_label_fold{fold+1}_train.npy')\n", " val_data_path = os.path.join(\"numpy_files\", f'vsl{num_labels}_data_fold{fold+1}_test.npy')\n", " val_label_path = os.path.join(\"numpy_files\", f'vsl{num_labels}_label_fold{fold+1}_test.npy')\n", "\n", " transforms = Compose([\n", " Rotate(15, 80, 25, (0.5, 0.5))\n", " ])\n", "\n", " train_dataset = FeederINCLUDE(\n", " data_path=train_data_path,\n", " label_path=train_label_path,\n", " transform=transforms\n", " )\n", " val_dataset = FeederINCLUDE(\n", " data_path=val_data_path,\n", " label_path=val_label_path\n", " )\n", "\n", " train_dataloader = DataLoader(train_dataset, batch_size=config['batch_size'], shuffle=True)\n", " val_dataloader = DataLoader(val_dataset, batch_size=config['batch_size'], shuffle=False)\n", "\n", " model = Model(num_class=num_labels, num_point=46, num_person=1, in_channels=2,\n", " graph_args={\"layout\": \"mediapipe_two_hand\", \"strategy\": \"spatial\"},\n", " learning_rate=config['learning_rate'], weight_decay=config['weight_decay'])\n", "\n", " callbacks = [\n", " ModelCheckpoint(\n", " dirpath=\"checkpoints\",\n", " monitor=\"valid_accuracy\",\n", " mode=\"max\",\n", " every_n_epochs=2,\n", " filename=f'vsl{num_labels}-aagcn-fold={fold+1}'\n", " ),\n", " ]\n", "\n", " trainer = pl.Trainer(max_epochs=2, accelerator=\"auto\", check_val_every_n_epoch=1,\n", " devices=1, callbacks=callbacks)\n", "\n", " trainer.fit(model, train_dataloader, val_dataloader)\n", " val_accuracy = trainer.callback_metrics['valid_accuracy'].item()\n", " print(f\"Fold {fold + 1} finished with validation accuracy: {val_accuracy:.4f}\")\n", "\n", " if val_accuracy > best_accuracy:\n", " best_accuracy = val_accuracy\n", " best_fold = fold + 1 \n", "\n", " print(f\"The highest validation accuracy achieved is {best_accuracy:.4f} from fold {best_fold}.\")\n", "'''" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "print(f\"The highest validation accuracy achieved of vsl{num_labels} is {best_accuracy:.4f} from fold {best_fold}.\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "train based on AUTSL with different folds" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "import os\n", "import numpy as np\n", "import torch\n", "from torch.utils.data import DataLoader\n", "import pytorch_lightning as pl\n", "from pytorch_lightning.callbacks import ModelCheckpoint\n", "from feeder import FeederINCLUDE\n", "from aagcn import Model\n", "from augumentation import Rotate, Compose\n", "from pytorch_lightning.utilities.migration import pl_legacy_patch\n", "\n", "os.environ[\"CUDA_VISIBLE_DEVICES\"] = \"1\"\n", "\n", "if __name__ == '__main__':\n", " k_folds = 10 \n", " config = {'batch_size': 128, 'learning_rate': 0.0137296, 'weight_decay': 0.000150403}\n", " \n", " device = \"cuda\" if torch.cuda.is_available() else \"cpu\"\n", "\n", " best_accuracy = 0.0\n", " best_fold = -1\n", "\n", " for fold in range(k_folds):\n", " print(f\"Starting fold {fold + 1}/{k_folds}\")\n", " train_data_path = os.path.join(\"numpy_files\", f'vsl{num_labels}_data_fold{fold+1}_train.npy')\n", " train_label_path = os.path.join(\"numpy_files\", f'vsl{num_labels}_label_fold{fold+1}_train.npy')\n", " val_data_path = os.path.join(\"numpy_files\", f'vsl{num_labels}_data_fold{fold+1}_test.npy')\n", " val_label_path = os.path.join(\"numpy_files\", f'vsl{num_labels}_label_fold{fold+1}_test.npy')\n", "\n", " transforms = Compose([\n", " Rotate(15, 80, 25, (0.5, 0.5))\n", " ])\n", "\n", " train_dataset = FeederINCLUDE(\n", " data_path=train_data_path,\n", " label_path=train_label_path,\n", " transform=transforms\n", " )\n", " val_dataset = FeederINCLUDE(\n", " data_path=val_data_path,\n", " label_path=val_label_path\n", " )\n", "\n", " train_dataloader = DataLoader(train_dataset, batch_size=config['batch_size'], shuffle=True)\n", " val_dataloader = DataLoader(val_dataset, batch_size=config['batch_size'], shuffle=False)\n", "\n", " model = Model(num_class=num_labels, num_point=46, num_person=1, in_channels=2,\n", " graph_args={\"layout\": \"mediapipe_two_hand\", \"strategy\": \"spatial\"},\n", " learning_rate=config['learning_rate'], weight_decay=config['weight_decay'])\n", "\n", " # Path pre-trained checkpoint file on AUTSL\n", " checkpoint_path = \"epoch=55-valid_loss=0.41-valid_accuracy=0.85-autsl-aagcn.ckpt\"\n", "\n", " with pl_legacy_patch():\n", " checkpoint = torch.load(checkpoint_path, map_location=device)\n", "\n", " state_dict = checkpoint['state_dict']\n", " filtered_state_dict = {k: v for k, v in state_dict.items() if not k.startswith('fc.')}\n", " model.load_state_dict(filtered_state_dict, strict=False)\n", "\n", " callbacks = [\n", " ModelCheckpoint(\n", " dirpath=\"checkpoints\",\n", " monitor=\"valid_accuracy\",\n", " mode=\"max\",\n", " every_n_epochs=2,\n", " filename=f'autsl_vsl{num_labels}-aagcn-fold={fold+1}'\n", " ),\n", " ]\n", "\n", " trainer = pl.Trainer(max_epochs=100, accelerator=\"auto\", check_val_every_n_epoch=1,\n", " devices=1, callbacks=callbacks)\n", "\n", " trainer.fit(model, train_dataloader, val_dataloader)\n", " val_accuracy = trainer.callback_metrics['valid_accuracy'].item() \n", " print(f\"Fold {fold + 1} finished with validation accuracy: {val_accuracy:.4f}\")\n", "\n", " if val_accuracy > best_accuracy:\n", " best_accuracy = val_accuracy\n", " best_fold = fold + 1 \n", "\n", " print(f\"The highest validation accuracy achieved is {best_accuracy:.4f} from fold {best_fold}.\")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(f\"The highest validation accuracy achieved of autsl vsl{num_labels} is {best_accuracy:.4f} from fold {best_fold}.\")" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.11" } }, "nbformat": 4, "nbformat_minor": 2 }