| |
| |
| |
| |
| |
| |
| |
| |
| import os |
| import sys |
| import itertools |
| import logging |
| import numpy as np |
| import torch |
| from torch.utils.data import DataLoader, ConcatDataset |
| from torch.optim.lr_scheduler import MultiStepLR, CosineAnnealingLR |
| from object_detection.pt.src.utils.ssd.misc import Timer |
| from object_detection.pt.src.models.ssd.detectors.ssd import MatchPrior |
| from object_detection.pt.src.models.ssd.losses.multibox_loss import MultiboxLoss |
| from object_detection.pt.src.models.ssd.detectors.config.mobilenetv1_ssd_config import MOBILENET_CONFIG |
| from common.onnx_utils.onnx_model_convertor import torch_model_export_static |
| from object_detection.pt.wrappers.evaluation.ssd import SSDEvaluatorWrapper |
| from common.onnx_utils.ssd_onnx_export import SSDExportWrapper |
| from torch.cuda.amp import GradScaler, autocast |
| from object_detection.pt.src.data.ssd.data_preprocessing import TrainAugmentation, TestTransform |
| from torch.utils.tensorboard import SummaryWriter |
| from pathlib import Path |
| import copy |
| current_file = Path(__file__).resolve() |
| zoo_path = current_file.parents[5] |
| sys.path.append(str(zoo_path)) |
| from pathlib import Path |
|
|
| class SSDTrainer: |
| |
| DEFAULTS = { |
| |
| 'optimizer': 'SGD', |
| 'SGD.learning_rate': 0.01, |
| 'SGD.momentum': 0.9, |
| 'SGD.weight_decay': 0.0005, |
| 'Adam.learning_rate': 0.001, |
| 'Adam.weight_decay': 0.0005, |
| 'Adam.betas': [0.9, 0.999], |
| |
| 'training.base_net_lr': None, |
| 'training.extra_layers_lr': None, |
| 'training.scheduler': 'cosine', |
| 'training.t_max': 200, |
| 'training.milestones': '80,100', |
| 'training.gamma': 0.1, |
| 'training.validation_epochs': 5, |
| 'training.print_interval': 50, |
| |
| 'model.input_shape': [3, 300, 300], |
| 'model.num_classes': 20, |
| 'model.width_mult': 1.0, |
| 'model.pretrained': False, |
| |
| 'dataset.num_workers': 4, |
| 'dataset.class_names': ['aeroplane', 'bicycle', 'bird', 'boat', 'bottle', 'bus', 'car', 'cat', 'chair', |
| 'cow', 'diningtable', 'dog', 'horse', 'motorbike', 'person', 'pottedplant', |
| 'sheep', 'sofa', 'train', 'tvmonitor'], |
| } |
|
|
| def _get_default(self, value, key): |
| """Return value if not None, else return default from DEFAULTS and log it.""" |
| if value is None: |
| default = self.DEFAULTS.get(key) |
| if default is not None: |
| logging.info(f"'{key}' not set in config, using default: {default}") |
| return default |
| return value |
|
|
| def __init__(self, dataloaders, model, cfg): |
| self.cfg = cfg |
| self.model = model |
| self.dataloader = dataloaders |
| self.batch_size = cfg.training.batch_size |
| self.timer = Timer() |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| if torch.cuda.is_available(): |
| torch.backends.cudnn.benchmark = True |
| logging.info("Use CUDA.") |
|
|
| model_name = getattr(cfg.model, "model_name", None) |
| if model_name is None: |
| raise ValueError("Model name must be provided cfg.model.model_name") |
| self.model_name = model_name |
| self.config=MOBILENET_CONFIG() |
| |
| self.scaler = GradScaler() if torch.cuda.is_available() else None |
| self.evaluator = None |
| |
| |
| tensorboard_dir = os.path.join(cfg.output_dir, 'tensorboard') |
| self.writer = SummaryWriter(log_dir=tensorboard_dir) |
| logging.info(f"TensorBoard logs will be saved to: {tensorboard_dir}") |
| self.global_step = 0 |
|
|
| |
| def setup_data(self): |
| logging.info("Prepare training datasets.") |
|
|
| self.train_loader, self.val_loader = self.dataloader['train'], self.dataloader['valid'] |
| |
| def build_model_and_optim(self): |
| cfg = self.cfg |
| maybe_cfg = getattr(self.model, "config", None) |
| if maybe_cfg is not None: |
| self.config = maybe_cfg |
|
|
| |
| optimizer_cfg = getattr(cfg.training, 'optimizer', None) |
| sgd_cfg = getattr(optimizer_cfg, 'SGD', None) if optimizer_cfg else None |
| adam_cfg = getattr(optimizer_cfg, 'Adam', None) if optimizer_cfg else None |
|
|
| if sgd_cfg is not None: |
| optimizer_name = 'SGD' |
| lr = self._get_default(getattr(sgd_cfg, 'learning_rate', None), 'SGD.learning_rate') |
| momentum = self._get_default(getattr(sgd_cfg, 'momentum', None), 'SGD.momentum') |
| weight_decay = self._get_default(getattr(sgd_cfg, 'weight_decay', None), 'SGD.weight_decay') |
| elif adam_cfg is not None: |
| optimizer_name = 'Adam' |
| lr = self._get_default(getattr(adam_cfg, 'learning_rate', None), 'Adam.learning_rate') |
| weight_decay = self._get_default(getattr(adam_cfg, 'weight_decay', None), 'Adam.weight_decay') |
| betas = tuple(self._get_default(getattr(adam_cfg, 'betas', None), 'Adam.betas')) |
| else: |
| |
| optimizer_name = self.DEFAULTS['optimizer'] |
| logging.info(f"'optimizer' not set in config, using default: {optimizer_name}") |
| lr = self.DEFAULTS['SGD.learning_rate'] |
| momentum = self.DEFAULTS['SGD.momentum'] |
| weight_decay = self.DEFAULTS['SGD.weight_decay'] |
| logging.info(f"Using default SGD params: lr={lr}, momentum={momentum}, weight_decay={weight_decay}") |
|
|
| base_net_lr = self._get_default(getattr(cfg.training, 'base_net_lr', None), 'training.base_net_lr') |
| if base_net_lr is None: |
| base_net_lr = lr |
| extra_layers_lr = self._get_default(getattr(cfg.training, 'extra_layers_lr', None), 'training.extra_layers_lr') |
| if extra_layers_lr is None: |
| extra_layers_lr = lr |
|
|
| params = [ |
| {'params': self.model.base_net.parameters(), 'lr': base_net_lr}, |
| {'params': itertools.chain( |
| self.model.source_layer_add_ons.parameters(), |
| self.model.extras.parameters() |
| ), 'lr': extra_layers_lr}, |
| {'params': itertools.chain( |
| self.model.regression_headers.parameters(), |
| self.model.classification_headers.parameters() |
| )} |
| ] |
|
|
| self.criterion = MultiboxLoss( |
| self.config.priors, |
| iou_threshold=getattr(cfg, "iou_threshold", 0.5) or 0.5, |
| neg_pos_ratio=3, |
| center_variance=0.1, |
| size_variance=0.2, |
| device=self.device |
| ) |
|
|
| |
| if optimizer_name == 'SGD': |
| self.optimizer = torch.optim.SGD( |
| params, lr=lr, momentum=momentum, weight_decay=weight_decay |
| ) |
| logging.info(f"Using SGD optimizer: lr={lr}, momentum={momentum}, weight_decay={weight_decay}") |
| elif optimizer_name == 'Adam': |
| self.optimizer = torch.optim.Adam( |
| params, lr=lr, betas=betas, weight_decay=weight_decay |
| ) |
| logging.info(f"Using Adam optimizer: lr={lr}, betas={betas}, weight_decay={weight_decay}") |
|
|
| logging.info( |
| f"Base net learning rate: {base_net_lr}, Extra Layers learning rate: {extra_layers_lr}." |
| ) |
| self.last_epoch = -1 |
|
|
| |
| if cfg.training.scheduler == 'multi-step': |
| logging.info("Uses MultiStepLR scheduler.") |
| milestones = [int(v.strip()) for v in cfg.training.milestones.split(",")] |
| self.scheduler = MultiStepLR(self.optimizer, milestones=milestones, gamma=0.1, last_epoch=self.last_epoch) |
| elif cfg.training.scheduler == 'cosine': |
| logging.info("Uses CosineAnnealingLR scheduler.") |
| self.scheduler = CosineAnnealingLR(self.optimizer, cfg.training.t_max, last_epoch=self.last_epoch) |
| else: |
| raise ValueError(f"Unsupported Scheduler: {cfg.training.scheduler}.") |
|
|
| |
| def train_one_epoch(self, epoch, debug_steps=50): |
| self.model.train(True) |
| running_loss = 0.0 |
| running_regression_loss = 0.0 |
| running_classification_loss = 0.0 |
| |
| total_steps = len(self.train_loader) |
| for i, data in enumerate(self.train_loader): |
| images, boxes, labels = data |
| images = images.to(self.device) |
| boxes = boxes.to(self.device) |
| labels = labels.to(self.device) |
|
|
| self.optimizer.zero_grad() |
| |
| if self.scaler is not None: |
| with autocast(): |
| confidence, locations = self.model(images) |
| regression_loss, classification_loss = self.criterion(confidence, locations, labels, boxes) |
| loss = regression_loss + classification_loss |
| |
| self.scaler.scale(loss).backward() |
| self.scaler.step(self.optimizer) |
| self.scaler.update() |
| else: |
| confidence, locations = self.model(images) |
| regression_loss, classification_loss = self.criterion(confidence, locations, labels, boxes) |
| loss = regression_loss + classification_loss |
| loss.backward() |
| self.optimizer.step() |
|
|
| running_loss += loss.item() |
| running_regression_loss += regression_loss.item() |
| running_classification_loss += classification_loss.item() |
| |
| |
| self.writer.add_scalar('Train/Loss', loss.item(), self.global_step) |
| self.writer.add_scalar('Train/Regression_Loss', regression_loss.item(), self.global_step) |
| self.writer.add_scalar('Train/Classification_Loss', classification_loss.item(), self.global_step) |
| self.global_step += 1 |
|
|
| if i and i % debug_steps == 0: |
| avg_loss = running_loss / debug_steps |
| avg_reg_loss = running_regression_loss / debug_steps |
| avg_clf_loss = running_classification_loss / debug_steps |
| logging.info( |
| f"Epoch: {epoch}, Step: {i}/{total_steps}, " |
| f"Average Loss: {avg_loss:.4f}, " |
| f"Average Regression Loss {avg_reg_loss:.4f}, " |
| f"Average Classification Loss: {avg_clf_loss:.4f}" |
| ) |
| running_loss = 0.0 |
| running_regression_loss = 0.0 |
| running_classification_loss = 0.0 |
|
|
| |
|
|
| @torch.no_grad() |
| def evaluate(self, epoch=None): |
| """ |
| Combined evaluation: |
| - Always compute validation loss. |
| - Additionally run SSD VOC mAP via SSDEvaluatorWrapper based on cfg. |
| """ |
| |
| self.model.eval() |
| self.epoch = epoch |
| running_loss = 0.0 |
| running_regression_loss = 0.0 |
| running_classification_loss = 0.0 |
| num = 0 |
|
|
| for _, data in enumerate(self.val_loader): |
| images, boxes, labels = data |
| images = images.to(self.device) |
| boxes = boxes.to(self.device) |
| labels = labels.to(self.device) |
| num += 1 |
|
|
| confidence, locations = self.model(images) |
| regression_loss, classification_loss = self.criterion( |
| confidence, locations, labels, boxes |
| ) |
| loss = regression_loss + classification_loss |
|
|
| running_loss += loss.item() |
| running_regression_loss += regression_loss.item() |
| running_classification_loss += classification_loss.item() |
|
|
| avg_loss = running_loss / max(num, 1) |
| avg_reg = running_regression_loss / max(num, 1) |
| avg_cls = running_classification_loss / max(num, 1) |
|
|
| loss_dict = { |
| "loss": avg_loss, |
| "reg_loss": avg_reg, |
| "cls_loss": avg_cls, |
| } |
| |
| |
| if self.epoch is not None: |
| self.writer.add_scalar('Val/Loss', avg_loss, self.epoch) |
| self.writer.add_scalar('Val/Regression_Loss', avg_reg, self.epoch) |
| self.writer.add_scalar('Val/Classification_Loss', avg_cls, self.epoch) |
|
|
| map_dict = None |
|
|
| |
| |
| if self.evaluator is None: |
| self.evaluator = SSDEvaluatorWrapper( |
| dataloaders=self.dataloader, |
| model=self.model, |
| cfg=self.cfg, |
| ) |
| metrics = self.evaluator.evaluate() |
|
|
| map_dict = metrics |
| |
| |
| if map_dict is not None and self.epoch is not None: |
| if 'mAP' in map_dict: |
| self.writer.add_scalar('Val/mAP', map_dict['mAP'], self.epoch) |
| |
| for key, value in map_dict.items(): |
| if key != 'mAP' and isinstance(value, (int, float)): |
| self.writer.add_scalar(f'Val/AP_{key}', value, self.epoch) |
|
|
| return loss_dict, map_dict |
|
|
| |
| def train(self): |
| cfg = self.cfg |
| self.setup_data() |
| self.build_model_and_optim() |
|
|
| min_loss = float("inf") |
| self.last_epoch = -1 |
|
|
| logging.info(f"Start training from epoch {self.last_epoch + 1}.") |
| for epoch in range(self.last_epoch + 1, cfg.training.epochs): |
| self.scheduler.step() |
| self.train_one_epoch(epoch, debug_steps=self.cfg.training.print_interval) |
|
|
| if epoch % cfg.training.validation_epochs == 0 or epoch == cfg.training.epochs - 1: |
| loss_dict, map_dict = self.evaluate(epoch=epoch) |
|
|
| val_loss = loss_dict["loss"] |
| val_reg = loss_dict["reg_loss"] |
| val_cls = loss_dict["cls_loss"] |
|
|
| logging.info( |
| f"Epoch: {epoch}, " |
| f"Validation Loss: {val_loss:.4f}, " |
| f"Validation Regression Loss {val_reg:.4f}, " |
| f"Validation Classification Loss: {val_cls:.4f}" |
| ) |
|
|
| if map_dict is not None and map_dict.get("mAP") is not None: |
| logging.info(f"Epoch: {epoch}, SSD mAP: {map_dict['mAP']:.4f}") |
|
|
| self.save_checkpoint(epoch, val_loss) |
| min_loss = min(min_loss, val_loss) |
| |
| |
| self.writer.close() |
| logging.info("TensorBoard writer closed.") |
| |
| onnx_model = torch_model_export_static(cfg=self.cfg, |
| model_dir=self.cfg.output_dir, |
| model=self.model) |
| |
| return onnx_model |
|
|
| def save_checkpoint(self, epoch, val_loss): |
| model_path = os.path.join(self.cfg.output_dir, self.cfg.general.saved_models_dir, f"{self.model_name}-Epoch-{epoch}-Loss-{val_loss}.pth") |
| self.model.save(model_path) |
| logging.info(f"Saved model {model_path}") |