|
|
| import torch
|
| import torch.nn as nn
|
| from torch.utils.data import DataLoader, random_split
|
| import numpy as np
|
| import matplotlib.pyplot as plt
|
| import os
|
| import joblib
|
| from pathlib import Path
|
| from PIL import Image
|
| import json
|
|
|
|
|
| from config import DATA_DIR, SCORE_FILE_NAME, MODEL_SAVE_BASE_PATH, \
|
| DEFAULT_BATCH_SIZE, DEFAULT_EPOCHS, DEFAULT_LR, \
|
| DEFAULT_DROPOUT_RATE, DEFAULT_WEIGHT_DECAY, DEFAULT_PCA_VARIANCE_RATIO, \
|
| DEFAULT_OPTIMIZER, DEFAULT_LR_SCHEDULER, DEFAULT_SCHEDULER_PATIENCE, \
|
| DEFAULT_SCHEDULER_FACTOR, DEFAULT_SCHEDULER_T_MAX, DEFAULT_LOSS_FUNCTION, \
|
| DEFAULT_EARLY_STOPPING_PATIENCE, VALIDATION_SPLIT_RATIO, DEFAULT_DATA_AUGMENTATION
|
|
|
|
|
| from utils import ScoreDataset, get_transforms, get_image_size_by_model_name, calculate_metrics
|
| from feature_extractor import FeatureExtractor
|
| from regressors import PytorchRegressor, get_sklearn_model_pipeline, FullCNNRegressor
|
|
|
|
|
| plt.rcParams['font.sans-serif'] = ['Microsoft YaHei', 'SimHei', 'Arial Unicode MS', 'DejaVu Sans']
|
| plt.rcParams['axes.unicode_minus'] = False
|
|
|
|
|
|
|
|
|
| class TrainingAndPredictionEngine:
|
| """
|
| 负责管理整个训练和预测流程的引擎。
|
| 包含数据准备、模型切换、训练循环和预测功能。
|
| """
|
|
|
| def __init__(self):
|
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
| print(f"PyTorch Version: {torch.__version__}")
|
| print(f"CUDA Available: {torch.cuda.is_available()}")
|
| if torch.cuda.is_available():
|
| print(f"CUDA Device Count: {torch.cuda.device_count()}")
|
| print(f"Current CUDA Device Name: {torch.cuda.get_device_name(0)}")
|
|
|
| self.feature_extractor = None
|
| self.pytorch_regressor = None
|
| self.full_cnn_regressor = None
|
| self.sklearn_regressor = None
|
| self.sklearn_feature_pipeline = None
|
|
|
| self.current_model_type = None
|
| self.active_base_cnn_name = None
|
|
|
| self.train_loss_history = []
|
| self.val_loss_history = []
|
| self.val_mse_history = []
|
| self.val_mae_history = []
|
| self.val_r2_history = []
|
|
|
| self.train_dataloader = None
|
| self.val_dataloader = None
|
|
|
|
|
| self.dataset_min_label = 0.0
|
| self.dataset_max_label = 100.0
|
|
|
|
|
| self.last_trained_params = {
|
| "model_type": None,
|
| "base_cnn_name": None,
|
| "dropout_rate": DEFAULT_DROPOUT_RATE,
|
| "weight_decay": DEFAULT_WEIGHT_DECAY,
|
| "pca_variance_ratio": DEFAULT_PCA_VARIANCE_RATIO,
|
| "optimizer": DEFAULT_OPTIMIZER,
|
| "lr_scheduler": DEFAULT_LR_SCHEDULER,
|
| "scheduler_patience": DEFAULT_SCHEDULER_PATIENCE,
|
| "scheduler_factor": DEFAULT_SCHEDULER_FACTOR,
|
| "scheduler_t_max": DEFAULT_SCHEDULER_T_MAX,
|
| "loss_function": DEFAULT_LOSS_FUNCTION,
|
| "early_stopping_patience": DEFAULT_EARLY_STOPPING_PATIENCE,
|
| "validation_split_ratio": VALIDATION_SPLIT_RATIO,
|
| "batch_size": DEFAULT_BATCH_SIZE,
|
| "min_label": self.dataset_min_label,
|
| "max_label": self.dataset_max_label,
|
| "enable_augmentation": DEFAULT_DATA_AUGMENTATION
|
| }
|
|
|
| Path(os.path.dirname(MODEL_SAVE_BASE_PATH)).mkdir(exist_ok=True, parents=True)
|
|
|
| def _get_internal_model_name(self, ui_model_name):
|
| mapping = {
|
| "深度学习": "pytorch_detached",
|
| "端到端深度学习": "pytorch_full_cnn",
|
| "随机森林": "random_forest",
|
| "支持向量回归": "svr",
|
| "梯度提升回归": "gradient_boosting",
|
| "堆叠回归": "stacking",
|
| "K近邻": "knn",
|
| "线性回归": "linear_regression"
|
| }
|
| return mapping.get(ui_model_name, "unknown_model")
|
|
|
| def _get_optimizer(self, model_params, optimizer_name, lr, weight_decay):
|
| if optimizer_name == "Adam":
|
| return torch.optim.Adam(model_params, lr=lr, weight_decay=weight_decay)
|
| elif optimizer_name == "AdamW":
|
| return torch.optim.AdamW(model_params, lr=lr, weight_decay=weight_decay)
|
| elif optimizer_name == "SGD":
|
|
|
| return torch.optim.SGD(model_params, lr=lr, momentum=0.9, weight_decay=weight_decay)
|
| else:
|
| raise ValueError(f"不支持的优化器: {optimizer_name}")
|
|
|
| def _get_lr_scheduler(self, optimizer, scheduler_name, patience, factor, t_max):
|
| if scheduler_name == "None":
|
| return None
|
| elif scheduler_name == "ReduceLROnPlateau":
|
| return torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=patience, factor=factor,
|
| verbose=True)
|
| elif scheduler_name == "CosineAnnealingLR":
|
| return torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=t_max)
|
| else:
|
| raise ValueError(f"不支持的学习率调度器: {scheduler_name}")
|
|
|
| def _get_loss_function(self, loss_name):
|
| if loss_name == "MSELoss":
|
| return nn.MSELoss()
|
| elif loss_name == "L1Loss":
|
| return nn.L1Loss()
|
| elif loss_name == "SmoothL1Loss":
|
| return nn.SmoothL1Loss()
|
| else:
|
| raise ValueError(f"不支持的损失函数: {loss_name}")
|
|
|
| def switch_model_type(self, model_type_str, base_cnn_name="resnet50",
|
| dropout_rate=DEFAULT_DROPOUT_RATE,
|
| weight_decay=DEFAULT_WEIGHT_DECAY,
|
| pca_variance_ratio=DEFAULT_PCA_VARIANCE_RATIO,
|
| optimizer_name=DEFAULT_OPTIMIZER,
|
| lr_scheduler_name=DEFAULT_LR_SCHEDULER,
|
| scheduler_patience=DEFAULT_SCHEDULER_PATIENCE,
|
| scheduler_factor=DEFAULT_SCHEDULER_FACTOR,
|
| scheduler_t_max=DEFAULT_SCHEDULER_T_MAX,
|
| loss_function_name=DEFAULT_LOSS_FUNCTION,
|
| early_stopping_patience=DEFAULT_EARLY_STOPPING_PATIENCE,
|
| batch_size=DEFAULT_BATCH_SIZE,
|
| enable_augmentation=DEFAULT_DATA_AUGMENTATION
|
| ):
|
| self.current_model_type = model_type_str
|
| self.active_base_cnn_name = base_cnn_name
|
|
|
|
|
| self.last_trained_params['batch_size'] = batch_size
|
| self.last_trained_params['validation_split_ratio'] = VALIDATION_SPLIT_RATIO
|
|
|
|
|
| self.last_trained_params.update({
|
| "model_type": model_type_str,
|
| "base_cnn_name": base_cnn_name,
|
| "dropout_rate": dropout_rate,
|
| "weight_decay": weight_decay,
|
| "pca_variance_ratio": pca_variance_ratio,
|
| "optimizer": optimizer_name,
|
| "lr_scheduler": lr_scheduler_name,
|
| "scheduler_patience": scheduler_patience,
|
| "scheduler_factor": scheduler_factor,
|
| "scheduler_t_max": scheduler_t_max,
|
| "loss_function": loss_function_name,
|
| "early_stopping_patience": early_stopping_patience,
|
|
|
| "enable_augmentation": enable_augmentation
|
| })
|
|
|
| print(f"已切换到 {self.current_model_type} 模型模式, 基础CNN: {self.active_base_cnn_name}.")
|
| print(
|
| f"参数: BatchSize={batch_size}, Dropout={dropout_rate}, WeightDecay={weight_decay}, PCA={pca_variance_ratio},")
|
| print(
|
| f" Optimizer={optimizer_name}, Scheduler={lr_scheduler_name}, Loss={loss_function_name}, EarlyStopping={early_stopping_patience},")
|
| print(f" Data Augmentation: {enable_augmentation}")
|
|
|
| if self.current_model_type == "深度学习":
|
| self.feature_extractor = FeatureExtractor(model_name=self.active_base_cnn_name).to(self.device)
|
| self.feature_extractor.eval()
|
| feature_dim = self.feature_extractor.get_output_dim()
|
| self.pytorch_regressor = PytorchRegressor(in_features=feature_dim, dropout_rate=dropout_rate).to(
|
| self.device)
|
| self.full_cnn_regressor = None
|
| self.sklearn_regressor = None
|
| self.sklearn_feature_pipeline = None
|
| elif self.current_model_type == "端到端深度学习":
|
| self.full_cnn_regressor = FullCNNRegressor(model_name=self.active_base_cnn_name,
|
| dropout_rate=dropout_rate).to(self.device)
|
| self.feature_extractor = None
|
| self.pytorch_regressor = None
|
| self.sklearn_regressor = None
|
| self.sklearn_feature_pipeline = None
|
| else:
|
| self.feature_extractor = FeatureExtractor(model_name=self.active_base_cnn_name).to(self.device)
|
| self.feature_extractor.eval()
|
| self.sklearn_regressor, self.sklearn_feature_pipeline = \
|
| get_sklearn_model_pipeline(self.current_model_type, pca_variance_ratio=pca_variance_ratio)
|
| self.pytorch_regressor = None
|
| self.full_cnn_regressor = None
|
|
|
| return f"已切换到 {self.current_model_type} 模型模式, 基础CNN: {self.active_base_cnn_name}. 参数已设置。"
|
|
|
| def prepare_data_for_training(self):
|
| image_paths = []
|
| scores = []
|
|
|
| score_file_path = Path(DATA_DIR) / SCORE_FILE_NAME
|
| if not score_file_path.exists():
|
| return False, f"错误: 训练数据文件 {score_file_path} 不存在。请先在‘原始数据导入’或‘训练数据管理’标签页保存数据。"
|
|
|
| try:
|
| with open(score_file_path, 'r') as f:
|
| for line in f:
|
| filename, score_str = line.strip().split(',')
|
| full_image_path = Path(DATA_DIR) / filename
|
| if full_image_path.exists():
|
| image_paths.append(str(full_image_path))
|
| scores.append(float(score_str))
|
| else:
|
| print(f"警告: 图像文件 {full_image_path} 不存在,已跳过。")
|
|
|
| except Exception as e:
|
| return False, f"错误: 读取分数文件 {score_file_path} 为空或失败: {e}"
|
|
|
| if not image_paths:
|
| return False, "没有找到有效的图片数据用于训练。请检查 'data' 文件夹。"
|
|
|
| current_image_size = get_image_size_by_model_name(self.active_base_cnn_name)
|
|
|
| temp_dataset = ScoreDataset(image_paths, scores, transform=None)
|
| self.dataset_min_label = temp_dataset.min_label
|
| self.dataset_max_label = temp_dataset.max_label
|
|
|
| self.last_trained_params['min_label'] = self.dataset_min_label
|
| self.last_trained_params['max_label'] = self.dataset_max_label
|
|
|
| full_dataset = ScoreDataset(image_paths, scores,
|
| transform=get_transforms(train=True, image_size=current_image_size,
|
| enable_augmentation=self.last_trained_params[
|
| 'enable_augmentation']))
|
|
|
| num_total = len(full_dataset)
|
| num_val = int(self.last_trained_params['validation_split_ratio'] * num_total)
|
| num_train = num_total - num_val
|
|
|
| if num_train < 1:
|
| return False, f"错误: 训练集样本数量不足1。总数据量: {num_total}, 训练集: {num_train},请增加数据量或调整验证集比例。"
|
|
|
| if num_total <= 1:
|
| num_train = num_total
|
| num_val = 0
|
| print(f"警告: 总样本数过少({num_total}),不进行验证集划分。")
|
| elif num_val < 1:
|
| print(f"警告: 验证集样本数量不足1 ({num_val})。总数据量: {num_total}。验证集可能无法进行评估。")
|
|
|
| try:
|
| train_dataset, val_dataset = random_split(full_dataset, [num_train, num_val],
|
| generator=torch.Generator().manual_seed(42))
|
| except ValueError as e:
|
| return False, f"数据划分失败: {e}。请检查数据量({num_total})和划分比例({self.last_trained_params['validation_split_ratio']})。"
|
|
|
| self.train_dataloader = DataLoader(
|
| train_dataset,
|
| batch_size=self.last_trained_params['batch_size'],
|
| shuffle=True,
|
| num_workers=os.cpu_count() // 2 or 1,
|
| drop_last=True
|
| )
|
| if len(self.train_dataloader) == 0:
|
| return False, f"错误: 训练数据加载器为空。训练集样本数量: {len(train_dataset)}, 批量大小: {self.last_trained_params['batch_size']}。请减小批量大小或增加训练集样本。"
|
|
|
| self.val_dataloader = DataLoader(
|
| val_dataset,
|
| batch_size=self.last_trained_params['batch_size'],
|
| shuffle=False,
|
| num_workers=os.cpu_count() // 2 or 1,
|
| drop_last=False
|
| )
|
| if len(val_dataset) > 0 and len(self.val_dataloader) == 0:
|
| print(
|
| f"警告: 验证数据加载器为空。验证集样本数量: {len(val_dataset)}, 批量大小: {self.last_trained_params['batch_size']}。验证集可能无法进行评估。")
|
|
|
| return True, f"数据准备完成。训练集: {len(train_dataset)} 张图片, 验证集: {len(val_dataset)} 张图片。"
|
|
|
| def train_model(self, epochs=DEFAULT_EPOCHS, lr=DEFAULT_LR):
|
| self.train_loss_history = []
|
| self.val_loss_history = []
|
| self.val_mse_history = []
|
| self.val_mae_history = []
|
| self.val_r2_history = []
|
|
|
| fig_loss, ax_loss = plt.subplots(figsize=(10, 6))
|
| ax_loss.set_xlabel("Epoch")
|
| ax_loss.set_ylabel("损失")
|
| ax_loss.set_title("训练与验证损失")
|
|
|
| fig_metrics, ax_metrics = plt.subplots(figsize=(10, 6))
|
| ax_metrics.set_xlabel("Epoch")
|
| ax_metrics.set_ylabel("误差指标")
|
| ax_metrics.set_title("验证MSE与MAE")
|
| if self.train_dataloader is None or len(self.train_dataloader) == 0:
|
| ax_loss.text(0.5, 0.5, "训练数据加载器为空。请先正确加载数据。", horizontalalignment='center',
|
| verticalalignment='center',
|
| transform=ax_loss.transAxes, fontsize=12, color='red')
|
| ax_metrics.text(0.5, 0.5, "训练数据加载器为空。", horizontalalignment='center', verticalalignment='center',
|
| transform=ax_metrics.transAxes, fontsize=12, color='red')
|
| fig_loss.tight_layout()
|
| fig_metrics.tight_layout()
|
| return fig_loss, fig_metrics
|
| min_label = self.dataset_min_label
|
| max_label = self.dataset_max_label
|
| if min_label is None or max_label is None:
|
| print("错误: dataset_min_label/max_label 未设置,使用默认 0-100。")
|
| min_label = 0.0
|
| max_label = 100.0
|
| if self.current_model_type == "深度学习":
|
| if self.pytorch_regressor is None or self.feature_extractor is None:
|
| ax_loss.text(0.5, 0.5, "深度学习模型(分离模式)未正确初始化。请重试。", horizontalalignment='center',
|
| verticalalignment='center', transform=ax_loss.transAxes, fontsize=12, color='red')
|
| ax_metrics.text(0.5, 0.5, "深度学习模型未初始化。", horizontalalignment='center',
|
| verticalalignment='center', transform=ax_metrics.transAxes, fontsize=12, color='red')
|
| fig_loss.tight_layout()
|
| fig_metrics.tight_layout()
|
| return fig_loss, fig_metrics
|
| model = self.pytorch_regressor
|
| optimizer = self._get_optimizer(model.parameters(), self.last_trained_params["optimizer"], lr,
|
| self.last_trained_params["weight_decay"])
|
| criterion = self._get_loss_function(self.last_trained_params["loss_function"])
|
| lr_scheduler = self._get_lr_scheduler(optimizer, self.last_trained_params["lr_scheduler"],
|
| self.last_trained_params["scheduler_patience"],
|
| self.last_trained_params["scheduler_factor"],
|
| self.last_trained_params["scheduler_t_max"])
|
| best_val_loss = float('inf')
|
| epochs_no_improve = 0
|
| best_epoch = 0
|
| best_regressor_state = None
|
| best_feature_extractor_state = None
|
| print(f"开始训练深度学习模型 (PyTorch, 分离模式, 基础CNN: {self.active_base_cnn_name}),共 {epochs} 轮次...")
|
| print(
|
| f"学习率: {lr}, 批量大小: {self.last_trained_params['batch_size']}, Dropout: {self.last_trained_params['dropout_rate']}, Weight Decay: {self.last_trained_params['weight_decay']}")
|
| print(
|
| f"优化器: {self.last_trained_params['optimizer']}, 损失函数: {self.last_trained_params['loss_function']}, 调度器: {self.last_trained_params['lr_scheduler']}, 早停耐心: {self.last_trained_params['early_stopping_patience']}")
|
| print(
|
| f"原始分数范围: [{min_label:.2f}, {max_label:.2f}], 数据增强: {self.last_trained_params['enable_augmentation']}")
|
| for epoch in range(epochs):
|
|
|
| model.train()
|
| self.feature_extractor.eval()
|
| running_train_loss = 0.0
|
| for batch_idx, (images, labels) in enumerate(self.train_dataloader):
|
| images = images.to(self.device)
|
| labels = labels.unsqueeze(1).to(self.device)
|
| with torch.no_grad():
|
| features = self.feature_extractor(images)
|
| optimizer.zero_grad()
|
| outputs = model(features)
|
| loss = criterion(outputs, labels)
|
| loss.backward()
|
| optimizer.step()
|
|
|
| running_train_loss += loss.item()
|
|
|
| avg_train_loss = running_train_loss / len(self.train_dataloader)
|
| self.train_loss_history.append(avg_train_loss)
|
|
|
| if len(self.val_dataloader.dataset) > 0 and len(self.val_dataloader) > 0:
|
| model.eval()
|
| val_losses = []
|
| val_predictions_normalized = []
|
| val_true_labels_normalized = []
|
| with torch.no_grad():
|
| for images, labels in self.val_dataloader:
|
| images = images.to(self.device)
|
| labels = labels.unsqueeze(1).to(self.device)
|
| features = self.feature_extractor(images)
|
| outputs = model(features)
|
|
|
| loss = criterion(outputs, labels)
|
| val_losses.append(loss.item())
|
| val_predictions_normalized.extend(outputs.cpu().numpy().flatten())
|
| val_true_labels_normalized.extend(labels.cpu().numpy().flatten())
|
|
|
| avg_val_loss = np.mean(val_losses)
|
| self.val_loss_history.append(avg_val_loss)
|
| val_mse, val_mae, val_r2 = calculate_metrics(
|
| val_true_labels_normalized,
|
| val_predictions_normalized,
|
| min_label, max_label
|
| )
|
| self.val_mse_history.append(val_mse)
|
| self.val_mae_history.append(val_mae)
|
| self.val_r2_history.append(val_r2)
|
| print(
|
| f"Epoch {epoch + 1}/{epochs}: Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}, Val MSE: {val_mse:.2f}, Val MAE: {val_mae:.2f}, Val R2: {val_r2:.2f}")
|
| if lr_scheduler:
|
| if isinstance(lr_scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau):
|
| lr_scheduler.step(avg_val_loss)
|
| else:
|
| lr_scheduler.step()
|
| if avg_val_loss < best_val_loss:
|
| best_val_loss = avg_val_loss
|
| epochs_no_improve = 0
|
| best_epoch = epoch + 1
|
| best_regressor_state = model.state_dict()
|
| best_feature_extractor_state = self.feature_extractor.state_dict()
|
| else:
|
| epochs_no_improve += 1
|
| if epochs_no_improve >= self.last_trained_params['early_stopping_patience']:
|
| print(
|
| f"早停触发!验证损失在 {self.last_trained_params['early_stopping_patience']} 个Epochs内没有改善。")
|
| break
|
| else:
|
| print(
|
| f"Epoch {epoch + 1}/{epochs}: Train Loss: {avg_train_loss:.4f} (无验证集评估或验证dataloader为空)")
|
|
|
| if best_regressor_state and best_feature_extractor_state:
|
| model.load_state_dict(best_regressor_state)
|
| self.feature_extractor.load_state_dict(best_feature_extractor_state)
|
| print(f"已加载第 {best_epoch} 轮次的最佳模型。")
|
| else:
|
| print("没有找到更好的验证损失模型(无验证集或未改善)。使用最后一次训练的模型状态。")
|
|
|
| ax_loss.clear()
|
| ax_loss.plot(self.train_loss_history, label="训练损失", color='blue')
|
| if len(self.val_loss_history) > 0:
|
| ax_loss.plot(self.val_loss_history, label="验证损失", color='orange')
|
| ax_loss.legend()
|
| ax_loss.set_xlabel("Epoch")
|
| ax_loss.set_ylabel("损失")
|
|
|
| if len(self.val_loss_history) > 0:
|
| ax_loss.set_title(f"深度学习模型训练完成 (分离模式, 基础CNN: {self.active_base_cnn_name})\n"
|
| f"最佳验证损失: {best_val_loss:.4f} (Epoch {best_epoch})")
|
| else:
|
| ax_loss.set_title(f"深度学习模型训练完成 (分离模式, 基础CNN: {self.active_base_cnn_name})\n"
|
| f"最终训练损失: {self.train_loss_history[-1]:.4f} (无验证集)")
|
|
|
| fig_loss.tight_layout()
|
|
|
| ax_metrics.clear()
|
| if len(self.val_mse_history) > 0:
|
| ax_metrics.plot(self.val_mse_history, label="验证MSE", color='green', linestyle='--')
|
| ax_metrics.plot(self.val_mae_history, label="验证MAE", color='red', linestyle=':')
|
| ax_metrics.legend()
|
| ax_metrics.set_xlabel("Epoch")
|
| ax_metrics.set_ylabel("误差指标")
|
| ax_metrics.set_title(f"验证MSE与MAE (深度学习模型)")
|
|
|
| fig_metrics.tight_layout()
|
|
|
| self._save_model_artifacts("深度学习")
|
| return fig_loss, fig_metrics
|
| elif self.current_model_type == "端到端深度学习":
|
| if self.full_cnn_regressor is None:
|
| ax_loss.text(0.5, 0.5, "端到端深度学习模型未正确初始化。请重试。", horizontalalignment='center',
|
| verticalalignment='center', transform=ax_loss.transAxes, fontsize=12, color='red')
|
| ax_metrics.text(0.5, 0.5, "端到端深度学习模型未初始化。", horizontalalignment='center',
|
| verticalalignment='center', transform=ax_metrics.transAxes, fontsize=12, color='red')
|
| fig_loss.tight_layout()
|
| fig_metrics.tight_layout()
|
| return fig_loss, fig_metrics
|
|
|
| model = self.full_cnn_regressor
|
| optimizer = self._get_optimizer(model.parameters(), self.last_trained_params["optimizer"], lr,
|
| self.last_trained_params["weight_decay"])
|
| criterion = self._get_loss_function(self.last_trained_params["loss_function"])
|
| lr_scheduler = self._get_lr_scheduler(optimizer, self.last_trained_params["lr_scheduler"],
|
| self.last_trained_params["scheduler_patience"],
|
| self.last_trained_params["scheduler_factor"],
|
| self.last_trained_params["scheduler_t_max"])
|
| best_val_loss = float('inf')
|
| epochs_no_improve = 0
|
| best_epoch = 0
|
| best_model_state = None
|
| print(f"开始训练端到端深度学习模型 (基础CNN: {self.active_base_cnn_name}),共 {epochs} 轮次...")
|
| print(
|
| f"学习率: {lr}, 批量大小: {self.last_trained_params['batch_size']}, Dropout: {self.last_trained_params['dropout_rate']}, Weight Decay: {self.last_trained_params['weight_decay']}")
|
| print(
|
| f"优化器: {self.last_trained_params['optimizer']}, 损失函数: {self.last_trained_params['loss_function']}, 调度器: {self.last_trained_params['lr_scheduler']}, 早停耐心: {self.last_trained_params['early_stopping_patience']}")
|
| print(
|
| f"原始分数范围: [{min_label:.2f}, {max_label:.2f}], 数据增强: {self.last_trained_params['enable_augmentation']}")
|
| for epoch in range(epochs):
|
|
|
| model.train()
|
| running_train_loss = 0.0
|
| for batch_idx, (images, labels) in enumerate(self.train_dataloader):
|
| images = images.to(self.device)
|
| labels = labels.unsqueeze(1).to(self.device)
|
| optimizer.zero_grad()
|
| outputs = model(images)
|
| loss = criterion(outputs, labels)
|
| loss.backward()
|
| optimizer.step()
|
|
|
| running_train_loss += loss.item()
|
|
|
| avg_train_loss = running_train_loss / len(self.train_dataloader)
|
| self.train_loss_history.append(avg_train_loss)
|
|
|
| if len(self.val_dataloader.dataset) > 0 and len(self.val_dataloader) > 0:
|
| model.eval()
|
| val_losses = []
|
| val_predictions_normalized = []
|
| val_true_labels_normalized = []
|
| with torch.no_grad():
|
| for images, labels in self.val_dataloader:
|
| images = images.to(self.device)
|
| labels = labels.unsqueeze(1).to(self.device)
|
| outputs = model(images)
|
|
|
| loss = criterion(outputs, labels)
|
| val_losses.append(loss.item())
|
| val_predictions_normalized.extend(outputs.cpu().numpy().flatten())
|
| val_true_labels_normalized.extend(labels.cpu().numpy().flatten())
|
|
|
| avg_val_loss = np.mean(val_losses)
|
| self.val_loss_history.append(avg_val_loss)
|
| val_mse, val_mae, val_r2 = calculate_metrics(
|
| val_true_labels_normalized,
|
| val_predictions_normalized,
|
| min_label, max_label
|
| )
|
| self.val_mse_history.append(val_mse)
|
| self.val_mae_history.append(val_mae)
|
| self.val_r2_history.append(val_r2)
|
| print(
|
| f"Epoch {epoch + 1}/{epochs}: Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}, Val MSE: {val_mse:.2f}, Val MAE: {val_mae:.2f}, Val R2: {val_r2:.2f}")
|
| if lr_scheduler:
|
| if isinstance(lr_scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau):
|
| lr_scheduler.step(avg_val_loss)
|
| else:
|
| lr_scheduler.step()
|
| if avg_val_loss < best_val_loss:
|
| best_val_loss = avg_val_loss
|
| epochs_no_improve = 0
|
| best_epoch = epoch + 1
|
| best_model_state = model.state_dict()
|
| else:
|
| epochs_no_improve += 1
|
| if epochs_no_improve >= self.last_trained_params['early_stopping_patience']:
|
| print(
|
| f"早停触发!验证损失在 {self.last_trained_params['early_stopping_patience']} 个Epochs内没有改善。")
|
| break
|
| else:
|
| print(
|
| f"Epoch {epoch + 1}/{epochs}: Train Loss: {avg_train_loss:.4f} (无验证集评估或验证dataloader为空)")
|
|
|
| if best_model_state:
|
| model.load_state_dict(best_model_state)
|
| print(f"已加载第 {best_epoch} 轮次的最佳模型。")
|
| else:
|
| print("没有找到更好的验证损失模型(无验证集或未改善)。使用最后一次训练的模型状态。")
|
|
|
| ax_loss.clear()
|
| ax_loss.plot(self.train_loss_history, label="训练损失", color='blue')
|
| if len(self.val_loss_history) > 0:
|
| ax_loss.plot(self.val_loss_history, label="验证损失", color='orange')
|
| ax_loss.legend()
|
| ax_loss.set_xlabel("Epoch")
|
| ax_loss.set_ylabel("损失")
|
|
|
| if len(self.val_loss_history) > 0:
|
| ax_loss.set_title(f"端到端深度学习模型训练完成 (基础CNN: {self.active_base_cnn_name})\n"
|
| f"最佳验证损失: {best_val_loss:.4f} (Epoch {best_epoch})")
|
| else:
|
| ax_loss.set_title(f"端到端深度学习模型训练完成 (基础CNN: {self.active_base_cnn_name})\n"
|
| f"最终训练损失: {self.train_loss_history[-1]:.4f} (无验证集)")
|
| fig_loss.tight_layout()
|
|
|
| ax_metrics.clear()
|
| if len(self.val_mse_history) > 0:
|
| ax_metrics.plot(self.val_mse_history, label="验证MSE", color='green', linestyle='--')
|
| ax_metrics.plot(self.val_mae_history, label="验证MAE", color='red', linestyle=':')
|
| ax_metrics.legend()
|
| ax_metrics.set_xlabel("Epoch")
|
| ax_metrics.set_ylabel("误差指标")
|
| ax_metrics.set_title(f"验证MSE与MAE (端到端深度学习模型)")
|
|
|
| fig_metrics.tight_layout()
|
|
|
| self._save_model_artifacts("端到端深度学习")
|
| return fig_loss, fig_metrics
|
| elif self.current_model_type in ["随机森林", "支持向量回归", "梯度提升回归", "堆叠回归", "K近邻", "线性回归"]:
|
| if self.sklearn_regressor is None or self.sklearn_feature_pipeline is None or self.feature_extractor is None:
|
| ax_loss.text(0.5, 0.5, "Sklearn模型或其特征提取器未正确初始化。请重试。", horizontalalignment='center',
|
| verticalalignment='center', transform=ax_loss.transAxes, fontsize=12, color='red')
|
| ax_metrics.text(0.5, 0.5, "Sklearn模型未初始化。", horizontalalignment='center',
|
| verticalalignment='center', transform=ax_metrics.transAxes, fontsize=12, color='red')
|
| fig_loss.tight_layout()
|
| fig_metrics.tight_layout()
|
| return fig_loss, fig_metrics
|
| print(f"正在提取所有图片的特征用于Sklearn模型训练 (基础CNN: {self.active_base_cnn_name})...")
|
| print(f"PCA保留方差比例: {self.last_trained_params['pca_variance_ratio']}")
|
| print(
|
| f"原始分数范围: [{min_label:.2f}, {max_label:.2f}], 数据增强: {self.last_trained_params['enable_augmentation']}")
|
| self.sklearn_regressor, self.sklearn_feature_pipeline = \
|
| get_sklearn_model_pipeline(self.current_model_type, self.last_trained_params["pca_variance_ratio"])
|
| X_train_features = []
|
| y_train_labels_normalized = []
|
| X_val_features = []
|
| y_val_labels_normalized = []
|
| self.feature_extractor.eval()
|
| with torch.no_grad():
|
| for images, labels in self.train_dataloader:
|
| images = images.to(self.device)
|
| features = self.feature_extractor(images).cpu().numpy()
|
| X_train_features.extend(features)
|
| y_train_labels_normalized.extend(labels.cpu().numpy())
|
|
|
| if len(self.val_dataloader.dataset) > 0 and len(self.val_dataloader) > 0:
|
| for images, labels in self.val_dataloader:
|
| images = images.to(self.device)
|
| features = self.feature_extractor(images).cpu().numpy()
|
| X_val_features.extend(features)
|
| y_val_labels_normalized.extend(labels.cpu().numpy())
|
| X_train = np.array(X_train_features)
|
| y_train_normalized = np.array(y_train_labels_normalized)
|
| X_val = np.array(X_val_features)
|
| y_val_normalized = np.array(y_val_labels_normalized)
|
| if X_train.shape[0] == 0:
|
| ax_loss.text(0.5, 0.5, "无训练特征数据,请检查图片加载。", horizontalalignment='center',
|
| verticalalignment='center', transform=ax_loss.transAxes, fontsize=12, color='red')
|
| ax_metrics.text(0.5, 0.5, "无训练特征数据。", horizontalalignment='center', verticalalignment='center',
|
| transform=ax_metrics.transAxes, fontsize=12, color='red')
|
| fig_loss.tight_layout()
|
| fig_metrics.tight_layout()
|
| return fig_loss, fig_metrics
|
| print("正在对提取的特征进行预处理 (标准化, PCA)...")
|
| X_train_processed = self.sklearn_feature_pipeline.fit_transform(X_train)
|
|
|
| print(f"正在训练Sklearn {self.current_model_type} 模型...")
|
| self.sklearn_regressor.fit(X_train_processed, y_train_normalized)
|
| print(f"Sklearn {self.current_model_type} 模型训练完成。")
|
| train_predictions_normalized = self.sklearn_regressor.predict(X_train_processed)
|
|
|
| train_mse, train_mae, train_r2 = calculate_metrics(
|
| y_train_normalized, train_predictions_normalized, min_label, max_label
|
| )
|
|
|
| ax_loss.clear()
|
| ax_metrics.clear()
|
| if len(X_val) > 0:
|
| X_val_processed = self.sklearn_feature_pipeline.transform(X_val)
|
| val_predictions_normalized = self.sklearn_regressor.predict(X_val_processed)
|
| val_mse, val_mae, val_r2 = calculate_metrics(
|
| y_val_normalized, val_predictions_normalized, min_label, max_label
|
| )
|
|
|
| print(f"训练完成: Train MSE: {train_mse:.2f}, Train MAE: {train_mae:.2f}, Train R2: {train_r2:.2f}")
|
| print(f" Val MSE: {val_mse:.2f}, Val MAE: {val_mae:.2f}, Val R2: {val_r2:.2f}")
|
|
|
|
|
| ax_loss.text(0.5, 0.5, f"Sklearn {self.current_model_type} 训练完成\n"
|
| f"训练集损失 (MSE/MAE): {train_mse:.2f}/{train_mae:.2f}\n"
|
| f"验证集损失 (MSE/MAE): {val_mse:.2f}/{val_mae:.2f}\n"
|
| f"训练集 R2: {train_r2:.2f}, 验证集 R2: {val_r2:.2f}",
|
| horizontalalignment='center', verticalalignment='center',
|
| transform=ax_loss.transAxes, fontsize=10, color='green')
|
| ax_loss.axis('off')
|
|
|
|
|
| ax_metrics.text(0.5, 0.5, f"Sklearn {self.current_model_type} 训练完成\n"
|
| f"训练集 MSE: {train_mse:.2f}\n"
|
| f"验证集 MSE: {val_mse:.2f}\n"
|
| f"训练集 MAE: {train_mae:.2f}\n"
|
| f"验证集 MAE: {val_mae:.2f}",
|
| horizontalalignment='center', verticalalignment='center',
|
| transform=ax_metrics.transAxes, fontsize=10, color='green')
|
| ax_metrics.axis('off')
|
| else:
|
| print(
|
| f"训练完成: Train MSE: {train_mse:.2f}, Train MAE: {train_mae:.2f}, Train R2: {train_r2:.2f} (无验证集评估)")
|
| ax_loss.text(0.5, 0.5, f"Sklearn {self.current_model_type} 训练完成\n"
|
| f"训练集损失 (MSE/MAE): {train_mse:.2f}/{train_mae:.2f}\n"
|
| f"训练集 R2: {train_r2:.2f} (无验证集)",
|
| horizontalalignment='center', verticalalignment='center',
|
| transform=ax_loss.transAxes, fontsize=10, color='green')
|
| ax_loss.axis('off')
|
| ax_metrics.text(0.5, 0.5, f"Sklearn {self.current_model_type} 训练完成\n"
|
| f"训练集 MSE: {train_mse:.2f}\n"
|
| f"训练集 MAE: {train_mae:.2f} (无验证集)",
|
| horizontalalignment='center', verticalalignment='center',
|
| transform=ax_metrics.transAxes, fontsize=10, color='green')
|
| ax_metrics.axis('off')
|
| ax_loss.set_title(f"Sklearn模型训练状态 ({self.current_model_type})")
|
| ax_metrics.set_title(f"Sklearn模型训练状态 ({self.current_model_type}) - 误差指标")
|
|
|
| fig_loss.tight_layout()
|
| fig_metrics.tight_layout()
|
| self._save_model_artifacts(self.current_model_type)
|
| return fig_loss, fig_metrics
|
| else:
|
| ax_loss.text(0.5, 0.5, "未选择有效的模型类型进行训练。", horizontalalignment='center',
|
| verticalalignment='center',
|
| transform=ax_loss.transAxes, fontsize=12, color='red')
|
| ax_metrics.text(0.5, 0.5, "未选择有效的模型类型。", horizontalalignment='center', verticalalignment='center',
|
| transform=ax_metrics.transAxes, fontsize=12, color='red')
|
| fig_loss.tight_layout()
|
| fig_metrics.tight_layout()
|
| return fig_loss, fig_metrics
|
|
|
|
|
| def predict_score(self, image_path, model_type_str, base_cnn_name_for_predict):
|
| if not self._load_model_artifacts(model_type_str, base_cnn_name_for_predict):
|
| return "模型未训练或未加载!请先训练对应模型。"
|
|
|
| current_image_size = get_image_size_by_model_name(base_cnn_name_for_predict)
|
|
|
| transform = get_transforms(train=False, image_size=current_image_size, enable_augmentation=False)
|
| try:
|
| image = Image.open(image_path).convert("RGB")
|
| image_tensor = transform(image).unsqueeze(0).to(self.device)
|
| except Exception as e:
|
| return f"图片加载或预处理失败: {e}"
|
|
|
| output_score_normalized = 0
|
| if model_type_str == "深度学习":
|
| self.pytorch_regressor.eval()
|
| self.feature_extractor.eval()
|
| with torch.no_grad():
|
| features = self.feature_extractor(image_tensor)
|
| output_score_normalized = self.pytorch_regressor(features).item()
|
| elif model_type_str == "端到端深度学习":
|
| self.full_cnn_regressor.eval()
|
| with torch.no_grad():
|
| output_score_normalized = self.full_cnn_regressor(image_tensor).item()
|
| else:
|
| self.feature_extractor.eval()
|
| with torch.no_grad():
|
| features = self.feature_extractor(image_tensor).cpu().numpy()
|
| processed_features = self.sklearn_feature_pipeline.transform(features)
|
| output_score_normalized = self.sklearn_regressor.predict(processed_features)[0]
|
|
|
|
|
| output_score_normalized = max(0.0, min(1.0, output_score_normalized))
|
|
|
|
|
| predicted_original_score = output_score_normalized * (
|
| self.dataset_max_label - self.dataset_min_label) + self.dataset_min_label
|
|
|
|
|
| predicted_original_score = max(0, min(100, predicted_original_score))
|
|
|
| return f"预测分数: {predicted_original_score:.2f} (百分制)"
|
|
|
| def _save_model_artifacts(self, model_type_str):
|
| internal_name = self._get_internal_model_name(model_type_str)
|
| meta_data_path = f"{MODEL_SAVE_BASE_PATH}_{internal_name}_meta.json"
|
|
|
| if internal_name in ["pytorch_detached", "pytorch_full_cnn"]:
|
| meta_data_path = f"{MODEL_SAVE_BASE_PATH}_{self.active_base_cnn_name}_{internal_name}_meta.json"
|
| try:
|
| with open(meta_data_path, 'w') as f:
|
| json.dump(self.last_trained_params, f, indent=4)
|
| print(f"模型元数据已保存到: {meta_data_path}")
|
| except Exception as e:
|
| print(f"保存模型元数据失败: {e}")
|
| if internal_name == "pytorch_detached":
|
| feat_extractor_path = f"{MODEL_SAVE_BASE_PATH}_{self.active_base_cnn_name}_features.pth"
|
| regressor_path = f"{MODEL_SAVE_BASE_PATH}_{self.active_base_cnn_name}_pytorch_detached_regressor.pth"
|
| torch.save(self.feature_extractor.state_dict(), feat_extractor_path)
|
| torch.save(self.pytorch_regressor.state_dict(), regressor_path)
|
| print(f"PyTorch模型组件 (分离模式) 已保存。")
|
| elif internal_name == "pytorch_full_cnn":
|
| full_cnn_path = f"{MODEL_SAVE_BASE_PATH}_{self.active_base_cnn_name}_full_cnn.pth"
|
| torch.save(self.full_cnn_regressor.state_dict(), full_cnn_path)
|
| print(f"端到端深度学习模型 已保存。")
|
| else:
|
| regressor_path = f"{MODEL_SAVE_BASE_PATH}_{internal_name}_regressor.pkl"
|
| pipeline_path = f"{MODEL_SAVE_BASE_PATH}_{internal_name}_feature_pipeline.pkl"
|
| joblib.dump(self.sklearn_regressor, regressor_path)
|
| joblib.dump(self.sklearn_feature_pipeline, pipeline_path)
|
| print(f"Sklearn {model_type_str} 模型和特征管道已保存。")
|
|
|
| def _load_model_artifacts(self, model_type_str, base_cnn_name_to_load):
|
| internal_name = self._get_internal_model_name(model_type_str)
|
|
|
| loaded_params = None
|
|
|
| meta_data_path = f"{MODEL_SAVE_BASE_PATH}_{internal_name}_meta.json"
|
| if internal_name in ["pytorch_detached", "pytorch_full_cnn"]:
|
| meta_data_path = f"{MODEL_SAVE_BASE_PATH}_{base_cnn_name_to_load}_{internal_name}_meta.json"
|
|
|
| try:
|
| with open(meta_data_path, 'r') as f:
|
| loaded_params = json.load(f)
|
| self.active_base_cnn_name = loaded_params.get("base_cnn_name", base_cnn_name_to_load)
|
|
|
| self.dataset_min_label = loaded_params.get("min_label", 0.0)
|
| self.dataset_max_label = loaded_params.get("max_label", 100.0)
|
| print(f"加载模型参数: {loaded_params}")
|
| except FileNotFoundError:
|
| print(f"警告: 模型元数据文件 {meta_data_path} 未找到。使用默认参数进行加载。")
|
| loaded_params = {
|
| "base_cnn_name": base_cnn_name_to_load,
|
| "dropout_rate": DEFAULT_DROPOUT_RATE,
|
| "weight_decay": DEFAULT_WEIGHT_DECAY,
|
| "pca_variance_ratio": DEFAULT_PCA_VARIANCE_RATIO,
|
| "min_label": 0.0,
|
| "max_label": 100.0
|
| }
|
| self.active_base_cnn_name = base_cnn_name_to_load
|
| self.dataset_min_label = 0.0
|
| self.dataset_max_label = 100.0
|
|
|
|
|
| if internal_name == "pytorch_detached":
|
| try:
|
| self.feature_extractor = FeatureExtractor(model_name=self.active_base_cnn_name).to(self.device)
|
| feat_extractor_path = f"{MODEL_SAVE_BASE_PATH}_{self.active_base_cnn_name}_features.pth"
|
| self.feature_extractor.load_state_dict(
|
| torch.load(feat_extractor_path, map_location=self.device))
|
| self.feature_extractor.eval()
|
|
|
| feature_dim = self.feature_extractor.get_output_dim()
|
| self.pytorch_regressor = PytorchRegressor(
|
| in_features=feature_dim,
|
| dropout_rate=loaded_params.get("dropout_rate", DEFAULT_DROPOUT_RATE)
|
| ).to(self.device)
|
| regressor_path = f"{MODEL_SAVE_BASE_PATH}_{self.active_base_cnn_name}_pytorch_detached_regressor.pth"
|
| self.pytorch_regressor.load_state_dict(
|
| torch.load(regressor_path, map_location=self.device))
|
| self.pytorch_regressor.eval()
|
| print(f"PyTorch模型组件 (分离模式, 基础CNN: {self.active_base_cnn_name}) 已加载。")
|
| return True
|
| except FileNotFoundError as e:
|
| print(f"PyTorch模型文件 (分离模式, 基础CNN: {self.active_base_cnn_name}) 未找到: {e}")
|
| self.pytorch_regressor = None
|
| self.feature_extractor = None
|
| return False
|
| elif internal_name == "pytorch_full_cnn":
|
| try:
|
| self.full_cnn_regressor = FullCNNRegressor(
|
| model_name=self.active_base_cnn_name,
|
| dropout_rate=loaded_params.get("dropout_rate", DEFAULT_DROPOUT_RATE)
|
| ).to(self.device)
|
| full_cnn_path = f"{MODEL_SAVE_BASE_PATH}_{self.active_base_cnn_name}_full_cnn.pth"
|
| self.full_cnn_regressor.load_state_dict(
|
| torch.load(full_cnn_path, map_location=self.device))
|
| self.full_cnn_regressor.eval()
|
| print(f"端到端深度学习模型 ({self.active_base_cnn_name}) 已加载。")
|
| return True
|
| except FileNotFoundError as e:
|
| print(f"端到端深度学习模型文件 ({self.active_base_cnn_name}) 未找到: {e}")
|
| self.full_cnn_regressor = None
|
| return False
|
| else:
|
| try:
|
| self.feature_extractor = FeatureExtractor(model_name=self.active_base_cnn_name).to(self.device)
|
| self.feature_extractor.eval()
|
|
|
| regressor_path = f"{MODEL_SAVE_BASE_PATH}_{internal_name}_regressor.pkl"
|
| pipeline_path = f"{MODEL_SAVE_BASE_PATH}_{internal_name}_feature_pipeline.pkl"
|
|
|
|
|
| self.sklearn_regressor, self.sklearn_feature_pipeline = \
|
| get_sklearn_model_pipeline(
|
| model_type_str,
|
| pca_variance_ratio=loaded_params.get("pca_variance_ratio", DEFAULT_PCA_VARIANCE_RATIO)
|
| )
|
|
|
| self.sklearn_regressor = joblib.load(regressor_path)
|
| self.sklearn_feature_pipeline = joblib.load(pipeline_path)
|
| print(f"Sklearn {model_type_str} 模型和特征管道 (基础CNN: {self.active_base_cnn_name}) 已加载。")
|
| return True
|
| except FileNotFoundError as e:
|
| print(f"Sklearn模型文件 {regressor_path} 或 {pipeline_path} 未找到: {e}")
|
| self.sklearn_regressor = None
|
| self.sklearn_feature_pipeline = None
|
| self.feature_extractor = None
|
| return False
|
| return False
|
|
|