FBAGSTM's picture
STM32 AI Experimentation Hub
747451d
# /*---------------------------------------------------------------------------------------------
#  * Copyright (c) 2025 STMicroelectronics.
#  * All rights reserved.
#  *
#  * This software is licensed under terms that can be found in the LICENSE file in
#  * the root directory of this software component.
#  * If no LICENSE file comes with this software, it is provided AS-IS.
#  *--------------------------------------------------------------------------------------------*/
import os
import sys
import itertools
import logging
import numpy as np
import torch
from torch.utils.data import DataLoader, ConcatDataset
from torch.optim.lr_scheduler import MultiStepLR, CosineAnnealingLR
from object_detection.pt.src.utils.ssd.misc import Timer
from object_detection.pt.src.models.ssd.detectors.ssd import MatchPrior
from object_detection.pt.src.models.ssd.losses.multibox_loss import MultiboxLoss
from object_detection.pt.src.models.ssd.detectors.config.mobilenetv1_ssd_config import MOBILENET_CONFIG
from common.onnx_utils.onnx_model_convertor import torch_model_export_static
from object_detection.pt.wrappers.evaluation.ssd import SSDEvaluatorWrapper
from common.onnx_utils.ssd_onnx_export import SSDExportWrapper
from torch.cuda.amp import GradScaler, autocast
from object_detection.pt.src.data.ssd.data_preprocessing import TrainAugmentation, TestTransform
from torch.utils.tensorboard import SummaryWriter
from pathlib import Path
import copy
current_file = Path(__file__).resolve()
zoo_path = current_file.parents[5]
sys.path.append(str(zoo_path))
from pathlib import Path
class SSDTrainer:
# Default values for config parameters
DEFAULTS = {
# Optimizer
'optimizer': 'SGD',
'SGD.learning_rate': 0.01,
'SGD.momentum': 0.9,
'SGD.weight_decay': 0.0005,
'Adam.learning_rate': 0.001,
'Adam.weight_decay': 0.0005,
'Adam.betas': [0.9, 0.999],
# Training
'training.base_net_lr': None,
'training.extra_layers_lr': None,
'training.scheduler': 'cosine',
'training.t_max': 200,
'training.milestones': '80,100',
'training.gamma': 0.1,
'training.validation_epochs': 5,
'training.print_interval': 50,
# Model
'model.input_shape': [3, 300, 300],
'model.num_classes': 20,
'model.width_mult': 1.0,
'model.pretrained': False,
# Dataset
'dataset.num_workers': 4,
'dataset.class_names': ['aeroplane', 'bicycle', 'bird', 'boat', 'bottle', 'bus', 'car', 'cat', 'chair',
'cow', 'diningtable', 'dog', 'horse', 'motorbike', 'person', 'pottedplant',
'sheep', 'sofa', 'train', 'tvmonitor'],
}
def _get_default(self, value, key):
"""Return value if not None, else return default from DEFAULTS and log it."""
if value is None:
default = self.DEFAULTS.get(key)
if default is not None:
logging.info(f"'{key}' not set in config, using default: {default}")
return default
return value
def __init__(self, dataloaders, model, cfg):
self.cfg = cfg
self.model = model
self.dataloader = dataloaders
self.batch_size = cfg.training.batch_size
self.timer = Timer()
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if torch.cuda.is_available():
torch.backends.cudnn.benchmark = True
logging.info("Use CUDA.")
model_name = getattr(cfg.model, "model_name", None)
if model_name is None:
raise ValueError("Model name must be provided cfg.model.model_name")
self.model_name = model_name
self.config=MOBILENET_CONFIG()
self.scaler = GradScaler() if torch.cuda.is_available() else None
self.evaluator = None
# TensorBoard setup
tensorboard_dir = os.path.join(cfg.output_dir, 'tensorboard')
self.writer = SummaryWriter(log_dir=tensorboard_dir)
logging.info(f"TensorBoard logs will be saved to: {tensorboard_dir}")
self.global_step = 0 # Track global training steps
# ------------------------------- data ------------------------------------
def setup_data(self):
logging.info("Prepare training datasets.")
self.train_loader, self.val_loader = self.dataloader['train'], self.dataloader['valid']
# ------------------------------ model ------------------------------------
def build_model_and_optim(self):
cfg = self.cfg
maybe_cfg = getattr(self.model, "config", None)
if maybe_cfg is not None:
self.config = maybe_cfg
# Get optimizer config - detect which optimizer is defined
optimizer_cfg = getattr(cfg.training, 'optimizer', None)
sgd_cfg = getattr(optimizer_cfg, 'SGD', None) if optimizer_cfg else None
adam_cfg = getattr(optimizer_cfg, 'Adam', None) if optimizer_cfg else None
if sgd_cfg is not None:
optimizer_name = 'SGD'
lr = self._get_default(getattr(sgd_cfg, 'learning_rate', None), 'SGD.learning_rate')
momentum = self._get_default(getattr(sgd_cfg, 'momentum', None), 'SGD.momentum')
weight_decay = self._get_default(getattr(sgd_cfg, 'weight_decay', None), 'SGD.weight_decay')
elif adam_cfg is not None:
optimizer_name = 'Adam'
lr = self._get_default(getattr(adam_cfg, 'learning_rate', None), 'Adam.learning_rate')
weight_decay = self._get_default(getattr(adam_cfg, 'weight_decay', None), 'Adam.weight_decay')
betas = tuple(self._get_default(getattr(adam_cfg, 'betas', None), 'Adam.betas'))
else:
# Default to SGD with default values
optimizer_name = self.DEFAULTS['optimizer']
logging.info(f"'optimizer' not set in config, using default: {optimizer_name}")
lr = self.DEFAULTS['SGD.learning_rate']
momentum = self.DEFAULTS['SGD.momentum']
weight_decay = self.DEFAULTS['SGD.weight_decay']
logging.info(f"Using default SGD params: lr={lr}, momentum={momentum}, weight_decay={weight_decay}")
base_net_lr = self._get_default(getattr(cfg.training, 'base_net_lr', None), 'training.base_net_lr')
if base_net_lr is None:
base_net_lr = lr
extra_layers_lr = self._get_default(getattr(cfg.training, 'extra_layers_lr', None), 'training.extra_layers_lr')
if extra_layers_lr is None:
extra_layers_lr = lr
params = [
{'params': self.model.base_net.parameters(), 'lr': base_net_lr},
{'params': itertools.chain(
self.model.source_layer_add_ons.parameters(),
self.model.extras.parameters()
), 'lr': extra_layers_lr},
{'params': itertools.chain(
self.model.regression_headers.parameters(),
self.model.classification_headers.parameters()
)}
]
self.criterion = MultiboxLoss(
self.config.priors,
iou_threshold=getattr(cfg, "iou_threshold", 0.5) or 0.5,
neg_pos_ratio=3,
center_variance=0.1,
size_variance=0.2,
device=self.device
)
# Create optimizer based on config
if optimizer_name == 'SGD':
self.optimizer = torch.optim.SGD(
params, lr=lr, momentum=momentum, weight_decay=weight_decay
)
logging.info(f"Using SGD optimizer: lr={lr}, momentum={momentum}, weight_decay={weight_decay}")
elif optimizer_name == 'Adam':
self.optimizer = torch.optim.Adam(
params, lr=lr, betas=betas, weight_decay=weight_decay
)
logging.info(f"Using Adam optimizer: lr={lr}, betas={betas}, weight_decay={weight_decay}")
logging.info(
f"Base net learning rate: {base_net_lr}, Extra Layers learning rate: {extra_layers_lr}."
)
self.last_epoch = -1
# Scheduler
if cfg.training.scheduler == 'multi-step':
logging.info("Uses MultiStepLR scheduler.")
milestones = [int(v.strip()) for v in cfg.training.milestones.split(",")]
self.scheduler = MultiStepLR(self.optimizer, milestones=milestones, gamma=0.1, last_epoch=self.last_epoch)
elif cfg.training.scheduler == 'cosine':
logging.info("Uses CosineAnnealingLR scheduler.")
self.scheduler = CosineAnnealingLR(self.optimizer, cfg.training.t_max, last_epoch=self.last_epoch)
else:
raise ValueError(f"Unsupported Scheduler: {cfg.training.scheduler}.")
# --------------------------- one epoch train ------------------------------
def train_one_epoch(self, epoch, debug_steps=50):
self.model.train(True)
running_loss = 0.0
running_regression_loss = 0.0
running_classification_loss = 0.0
total_steps = len(self.train_loader)
for i, data in enumerate(self.train_loader):
images, boxes, labels = data
images = images.to(self.device)
boxes = boxes.to(self.device)
labels = labels.to(self.device)
self.optimizer.zero_grad()
if self.scaler is not None:
with autocast():
confidence, locations = self.model(images)
regression_loss, classification_loss = self.criterion(confidence, locations, labels, boxes)
loss = regression_loss + classification_loss
self.scaler.scale(loss).backward()
self.scaler.step(self.optimizer)
self.scaler.update()
else:
confidence, locations = self.model(images)
regression_loss, classification_loss = self.criterion(confidence, locations, labels, boxes)
loss = regression_loss + classification_loss
loss.backward()
self.optimizer.step()
running_loss += loss.item()
running_regression_loss += regression_loss.item()
running_classification_loss += classification_loss.item()
# Log to TensorBoard every step
self.writer.add_scalar('Train/Loss', loss.item(), self.global_step)
self.writer.add_scalar('Train/Regression_Loss', regression_loss.item(), self.global_step)
self.writer.add_scalar('Train/Classification_Loss', classification_loss.item(), self.global_step)
self.global_step += 1
if i and i % debug_steps == 0:
avg_loss = running_loss / debug_steps
avg_reg_loss = running_regression_loss / debug_steps
avg_clf_loss = running_classification_loss / debug_steps
logging.info(
f"Epoch: {epoch}, Step: {i}/{total_steps}, "
f"Average Loss: {avg_loss:.4f}, "
f"Average Regression Loss {avg_reg_loss:.4f}, "
f"Average Classification Loss: {avg_clf_loss:.4f}"
)
running_loss = 0.0
running_regression_loss = 0.0
running_classification_loss = 0.0
# ----------------------------- evaluation --------------------------------
@torch.no_grad()
def evaluate(self, epoch=None):
"""
Combined evaluation:
- Always compute validation loss.
- Additionally run SSD VOC mAP via SSDEvaluatorWrapper based on cfg.
"""
# ---------------- val loss ----------------
self.model.eval()
self.epoch = epoch
running_loss = 0.0
running_regression_loss = 0.0
running_classification_loss = 0.0
num = 0
for _, data in enumerate(self.val_loader):
images, boxes, labels = data
images = images.to(self.device)
boxes = boxes.to(self.device)
labels = labels.to(self.device)
num += 1
confidence, locations = self.model(images)
regression_loss, classification_loss = self.criterion(
confidence, locations, labels, boxes
)
loss = regression_loss + classification_loss
running_loss += loss.item()
running_regression_loss += regression_loss.item()
running_classification_loss += classification_loss.item()
avg_loss = running_loss / max(num, 1)
avg_reg = running_regression_loss / max(num, 1)
avg_cls = running_classification_loss / max(num, 1)
loss_dict = {
"loss": avg_loss,
"reg_loss": avg_reg,
"cls_loss": avg_cls,
}
# Log validation metrics to TensorBoard
if self.epoch is not None:
self.writer.add_scalar('Val/Loss', avg_loss, self.epoch)
self.writer.add_scalar('Val/Regression_Loss', avg_reg, self.epoch)
self.writer.add_scalar('Val/Classification_Loss', avg_cls, self.epoch)
map_dict = None
#if map_interval is different than validation_epochs
# if (self.epoch + 1) % map_interval == 0:
if self.evaluator is None:
self.evaluator = SSDEvaluatorWrapper(
dataloaders=self.dataloader,
model=self.model,
cfg=self.cfg,
)
metrics = self.evaluator.evaluate()
map_dict = metrics
# Log mAP to TensorBoard
if map_dict is not None and self.epoch is not None:
if 'mAP' in map_dict:
self.writer.add_scalar('Val/mAP', map_dict['mAP'], self.epoch)
# Log per-class AP if available
for key, value in map_dict.items():
if key != 'mAP' and isinstance(value, (int, float)):
self.writer.add_scalar(f'Val/AP_{key}', value, self.epoch)
return loss_dict, map_dict
# ------------------------------ training loop ----------------------------
def train(self):
cfg = self.cfg
self.setup_data()
self.build_model_and_optim()
min_loss = float("inf")
self.last_epoch = -1
logging.info(f"Start training from epoch {self.last_epoch + 1}.")
for epoch in range(self.last_epoch + 1, cfg.training.epochs):
self.scheduler.step()
self.train_one_epoch(epoch, debug_steps=self.cfg.training.print_interval)
if epoch % cfg.training.validation_epochs == 0 or epoch == cfg.training.epochs - 1:
loss_dict, map_dict = self.evaluate(epoch=epoch)
val_loss = loss_dict["loss"]
val_reg = loss_dict["reg_loss"]
val_cls = loss_dict["cls_loss"]
logging.info(
f"Epoch: {epoch}, "
f"Validation Loss: {val_loss:.4f}, "
f"Validation Regression Loss {val_reg:.4f}, "
f"Validation Classification Loss: {val_cls:.4f}"
)
if map_dict is not None and map_dict.get("mAP") is not None:
logging.info(f"Epoch: {epoch}, SSD mAP: {map_dict['mAP']:.4f}")
self.save_checkpoint(epoch, val_loss)
min_loss = min(min_loss, val_loss)
# Close TensorBoard writer
self.writer.close()
logging.info("TensorBoard writer closed.")
onnx_model = torch_model_export_static(cfg=self.cfg,
model_dir=self.cfg.output_dir,
model=self.model)
return onnx_model
def save_checkpoint(self, epoch, val_loss):
model_path = os.path.join(self.cfg.output_dir, self.cfg.general.saved_models_dir, f"{self.model_name}-Epoch-{epoch}-Loss-{val_loss}.pth")
self.model.save(model_path)
logging.info(f"Saved model {model_path}")