# train_predict.py import torch import torch.nn as nn from torch.utils.data import DataLoader, random_split import numpy as np import matplotlib.pyplot as plt import os import joblib from pathlib import Path from PIL import Image import json # 从config.py导入配置 from config import DATA_DIR, SCORE_FILE_NAME, MODEL_SAVE_BASE_PATH, \ DEFAULT_BATCH_SIZE, DEFAULT_EPOCHS, DEFAULT_LR, \ DEFAULT_DROPOUT_RATE, DEFAULT_WEIGHT_DECAY, DEFAULT_PCA_VARIANCE_RATIO, \ DEFAULT_OPTIMIZER, DEFAULT_LR_SCHEDULER, DEFAULT_SCHEDULER_PATIENCE, \ DEFAULT_SCHEDULER_FACTOR, DEFAULT_SCHEDULER_T_MAX, DEFAULT_LOSS_FUNCTION, \ DEFAULT_EARLY_STOPPING_PATIENCE, VALIDATION_SPLIT_RATIO, DEFAULT_DATA_AUGMENTATION # 从其他模块导入 from utils import ScoreDataset, get_transforms, get_image_size_by_model_name, calculate_metrics from feature_extractor import FeatureExtractor from regressors import PytorchRegressor, get_sklearn_model_pipeline, FullCNNRegressor # --- 配置 Matplotlib 支持中文 --- plt.rcParams['font.sans-serif'] = ['Microsoft YaHei', 'SimHei', 'Arial Unicode MS', 'DejaVu Sans'] plt.rcParams['axes.unicode_minus'] = False # ------------------------------------ class TrainingAndPredictionEngine: """ 负责管理整个训练和预测流程的引擎。 包含数据准备、模型切换、训练循环和预测功能。 """ def __init__(self): self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"PyTorch Version: {torch.__version__}") print(f"CUDA Available: {torch.cuda.is_available()}") if torch.cuda.is_available(): print(f"CUDA Device Count: {torch.cuda.device_count()}") print(f"Current CUDA Device Name: {torch.cuda.get_device_name(0)}") self.feature_extractor = None self.pytorch_regressor = None self.full_cnn_regressor = None self.sklearn_regressor = None self.sklearn_feature_pipeline = None self.current_model_type = None self.active_base_cnn_name = None self.train_loss_history = [] self.val_loss_history = [] self.val_mse_history = [] self.val_mae_history = [] self.val_r2_history = [] self.train_dataloader = None self.val_dataloader = None # 新增:保存数据集的 min_label 和 max_label,用于后续的反归一化 self.dataset_min_label = 0.0 self.dataset_max_label = 100.0 # 保存训练时使用的超参数,以便保存和加载模型时使用 self.last_trained_params = { "model_type": None, "base_cnn_name": None, "dropout_rate": DEFAULT_DROPOUT_RATE, "weight_decay": DEFAULT_WEIGHT_DECAY, "pca_variance_ratio": DEFAULT_PCA_VARIANCE_RATIO, "optimizer": DEFAULT_OPTIMIZER, "lr_scheduler": DEFAULT_LR_SCHEDULER, "scheduler_patience": DEFAULT_SCHEDULER_PATIENCE, "scheduler_factor": DEFAULT_SCHEDULER_FACTOR, "scheduler_t_max": DEFAULT_SCHEDULER_T_MAX, "loss_function": DEFAULT_LOSS_FUNCTION, "early_stopping_patience": DEFAULT_EARLY_STOPPING_PATIENCE, "validation_split_ratio": VALIDATION_SPLIT_RATIO, "batch_size": DEFAULT_BATCH_SIZE, "min_label": self.dataset_min_label, # 初始值 "max_label": self.dataset_max_label, # 初始值 "enable_augmentation": DEFAULT_DATA_AUGMENTATION } Path(os.path.dirname(MODEL_SAVE_BASE_PATH)).mkdir(exist_ok=True, parents=True) def _get_internal_model_name(self, ui_model_name): mapping = { "深度学习": "pytorch_detached", "端到端深度学习": "pytorch_full_cnn", "随机森林": "random_forest", "支持向量回归": "svr", "梯度提升回归": "gradient_boosting", "堆叠回归": "stacking", "K近邻": "knn", # <-- 新增 "线性回归": "linear_regression" # <-- 新增 } return mapping.get(ui_model_name, "unknown_model") def _get_optimizer(self, model_params, optimizer_name, lr, weight_decay): if optimizer_name == "Adam": return torch.optim.Adam(model_params, lr=lr, weight_decay=weight_decay) elif optimizer_name == "AdamW": return torch.optim.AdamW(model_params, lr=lr, weight_decay=weight_decay) elif optimizer_name == "SGD": # 对于SGD,通常需要动量,这里可以增加一个默认值或UI参数 return torch.optim.SGD(model_params, lr=lr, momentum=0.9, weight_decay=weight_decay) else: raise ValueError(f"不支持的优化器: {optimizer_name}") def _get_lr_scheduler(self, optimizer, scheduler_name, patience, factor, t_max): if scheduler_name == "None": return None elif scheduler_name == "ReduceLROnPlateau": return torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=patience, factor=factor, verbose=True) elif scheduler_name == "CosineAnnealingLR": return torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=t_max) else: raise ValueError(f"不支持的学习率调度器: {scheduler_name}") def _get_loss_function(self, loss_name): if loss_name == "MSELoss": return nn.MSELoss() elif loss_name == "L1Loss": return nn.L1Loss() elif loss_name == "SmoothL1Loss": return nn.SmoothL1Loss() else: raise ValueError(f"不支持的损失函数: {loss_name}") def switch_model_type(self, model_type_str, base_cnn_name="resnet50", dropout_rate=DEFAULT_DROPOUT_RATE, weight_decay=DEFAULT_WEIGHT_DECAY, pca_variance_ratio=DEFAULT_PCA_VARIANCE_RATIO, optimizer_name=DEFAULT_OPTIMIZER, lr_scheduler_name=DEFAULT_LR_SCHEDULER, scheduler_patience=DEFAULT_SCHEDULER_PATIENCE, scheduler_factor=DEFAULT_SCHEDULER_FACTOR, scheduler_t_max=DEFAULT_SCHEDULER_T_MAX, loss_function_name=DEFAULT_LOSS_FUNCTION, early_stopping_patience=DEFAULT_EARLY_STOPPING_PATIENCE, batch_size=DEFAULT_BATCH_SIZE, enable_augmentation=DEFAULT_DATA_AUGMENTATION ): self.current_model_type = model_type_str self.active_base_cnn_name = base_cnn_name # 批量大小和验证集比例在 prepare_data_for_training 中会用到,需要先设置 self.last_trained_params['batch_size'] = batch_size self.last_trained_params['validation_split_ratio'] = VALIDATION_SPLIT_RATIO # 确保这里是固定的值 # 其他参数保存到实例变量,供训练和保存时使用 self.last_trained_params.update({ "model_type": model_type_str, "base_cnn_name": base_cnn_name, "dropout_rate": dropout_rate, "weight_decay": weight_decay, "pca_variance_ratio": pca_variance_ratio, "optimizer": optimizer_name, "lr_scheduler": lr_scheduler_name, "scheduler_patience": scheduler_patience, "scheduler_factor": scheduler_factor, "scheduler_t_max": scheduler_t_max, "loss_function": loss_function_name, "early_stopping_patience": early_stopping_patience, # min_label和max_label会在prepare_data_for_training中更新并保存 "enable_augmentation": enable_augmentation }) print(f"已切换到 {self.current_model_type} 模型模式, 基础CNN: {self.active_base_cnn_name}.") print( f"参数: BatchSize={batch_size}, Dropout={dropout_rate}, WeightDecay={weight_decay}, PCA={pca_variance_ratio},") print( f" Optimizer={optimizer_name}, Scheduler={lr_scheduler_name}, Loss={loss_function_name}, EarlyStopping={early_stopping_patience},") print(f" Data Augmentation: {enable_augmentation}") if self.current_model_type == "深度学习": self.feature_extractor = FeatureExtractor(model_name=self.active_base_cnn_name).to(self.device) self.feature_extractor.eval() feature_dim = self.feature_extractor.get_output_dim() self.pytorch_regressor = PytorchRegressor(in_features=feature_dim, dropout_rate=dropout_rate).to( self.device) self.full_cnn_regressor = None self.sklearn_regressor = None self.sklearn_feature_pipeline = None elif self.current_model_type == "端到端深度学习": self.full_cnn_regressor = FullCNNRegressor(model_name=self.active_base_cnn_name, dropout_rate=dropout_rate).to(self.device) self.feature_extractor = None self.pytorch_regressor = None self.sklearn_regressor = None self.sklearn_feature_pipeline = None else: # Sklearn模型 self.feature_extractor = FeatureExtractor(model_name=self.active_base_cnn_name).to(self.device) self.feature_extractor.eval() self.sklearn_regressor, self.sklearn_feature_pipeline = \ get_sklearn_model_pipeline(self.current_model_type, pca_variance_ratio=pca_variance_ratio) self.pytorch_regressor = None self.full_cnn_regressor = None return f"已切换到 {self.current_model_type} 模型模式, 基础CNN: {self.active_base_cnn_name}. 参数已设置。" def prepare_data_for_training(self): image_paths = [] scores = [] score_file_path = Path(DATA_DIR) / SCORE_FILE_NAME if not score_file_path.exists(): return False, f"错误: 训练数据文件 {score_file_path} 不存在。请先在‘原始数据导入’或‘训练数据管理’标签页保存数据。" try: with open(score_file_path, 'r') as f: for line in f: filename, score_str = line.strip().split(',') full_image_path = Path(DATA_DIR) / filename if full_image_path.exists(): image_paths.append(str(full_image_path)) scores.append(float(score_str)) else: print(f"警告: 图像文件 {full_image_path} 不存在,已跳过。") except Exception as e: return False, f"错误: 读取分数文件 {score_file_path} 为空或失败: {e}" if not image_paths: return False, "没有找到有效的图片数据用于训练。请检查 'data' 文件夹。" current_image_size = get_image_size_by_model_name(self.active_base_cnn_name) temp_dataset = ScoreDataset(image_paths, scores, transform=None) self.dataset_min_label = temp_dataset.min_label self.dataset_max_label = temp_dataset.max_label self.last_trained_params['min_label'] = self.dataset_min_label self.last_trained_params['max_label'] = self.dataset_max_label full_dataset = ScoreDataset(image_paths, scores, transform=get_transforms(train=True, image_size=current_image_size, enable_augmentation=self.last_trained_params[ 'enable_augmentation'])) num_total = len(full_dataset) num_val = int(self.last_trained_params['validation_split_ratio'] * num_total) num_train = num_total - num_val if num_train < 1: return False, f"错误: 训练集样本数量不足1。总数据量: {num_total}, 训练集: {num_train},请增加数据量或调整验证集比例。" if num_total <= 1: num_train = num_total num_val = 0 print(f"警告: 总样本数过少({num_total}),不进行验证集划分。") elif num_val < 1: print(f"警告: 验证集样本数量不足1 ({num_val})。总数据量: {num_total}。验证集可能无法进行评估。") try: train_dataset, val_dataset = random_split(full_dataset, [num_train, num_val], generator=torch.Generator().manual_seed(42)) except ValueError as e: return False, f"数据划分失败: {e}。请检查数据量({num_total})和划分比例({self.last_trained_params['validation_split_ratio']})。" self.train_dataloader = DataLoader( train_dataset, batch_size=self.last_trained_params['batch_size'], shuffle=True, num_workers=os.cpu_count() // 2 or 1, drop_last=True ) if len(self.train_dataloader) == 0: return False, f"错误: 训练数据加载器为空。训练集样本数量: {len(train_dataset)}, 批量大小: {self.last_trained_params['batch_size']}。请减小批量大小或增加训练集样本。" self.val_dataloader = DataLoader( val_dataset, batch_size=self.last_trained_params['batch_size'], shuffle=False, num_workers=os.cpu_count() // 2 or 1, drop_last=False ) if len(val_dataset) > 0 and len(self.val_dataloader) == 0: print( f"警告: 验证数据加载器为空。验证集样本数量: {len(val_dataset)}, 批量大小: {self.last_trained_params['batch_size']}。验证集可能无法进行评估。") return True, f"数据准备完成。训练集: {len(train_dataset)} 张图片, 验证集: {len(val_dataset)} 张图片。" def train_model(self, epochs=DEFAULT_EPOCHS, lr=DEFAULT_LR): self.train_loss_history = [] self.val_loss_history = [] self.val_mse_history = [] self.val_mae_history = [] self.val_r2_history = [] # 创建第一个子图:只用于训练损失和验证损失 fig_loss, ax_loss = plt.subplots(figsize=(10, 6)) # 命名为 fig_loss, ax_loss ax_loss.set_xlabel("Epoch") ax_loss.set_ylabel("损失") ax_loss.set_title("训练与验证损失") # 创建第二个子图:用于验证MSE和MAE fig_metrics, ax_metrics = plt.subplots(figsize=(10, 6)) # 命名为 fig_metrics, ax_metrics ax_metrics.set_xlabel("Epoch") ax_metrics.set_ylabel("误差指标") ax_metrics.set_title("验证MSE与MAE") if self.train_dataloader is None or len(self.train_dataloader) == 0: ax_loss.text(0.5, 0.5, "训练数据加载器为空。请先正确加载数据。", horizontalalignment='center', verticalalignment='center', transform=ax_loss.transAxes, fontsize=12, color='red') ax_metrics.text(0.5, 0.5, "训练数据加载器为空。", horizontalalignment='center', verticalalignment='center', transform=ax_metrics.transAxes, fontsize=12, color='red') fig_loss.tight_layout() fig_metrics.tight_layout() return fig_loss, fig_metrics min_label = self.dataset_min_label max_label = self.dataset_max_label if min_label is None or max_label is None: print("错误: dataset_min_label/max_label 未设置,使用默认 0-100。") min_label = 0.0 max_label = 100.0 if self.current_model_type == "深度学习": if self.pytorch_regressor is None or self.feature_extractor is None: ax_loss.text(0.5, 0.5, "深度学习模型(分离模式)未正确初始化。请重试。", horizontalalignment='center', verticalalignment='center', transform=ax_loss.transAxes, fontsize=12, color='red') ax_metrics.text(0.5, 0.5, "深度学习模型未初始化。", horizontalalignment='center', verticalalignment='center', transform=ax_metrics.transAxes, fontsize=12, color='red') fig_loss.tight_layout() fig_metrics.tight_layout() return fig_loss, fig_metrics model = self.pytorch_regressor optimizer = self._get_optimizer(model.parameters(), self.last_trained_params["optimizer"], lr, self.last_trained_params["weight_decay"]) criterion = self._get_loss_function(self.last_trained_params["loss_function"]) lr_scheduler = self._get_lr_scheduler(optimizer, self.last_trained_params["lr_scheduler"], self.last_trained_params["scheduler_patience"], self.last_trained_params["scheduler_factor"], self.last_trained_params["scheduler_t_max"]) best_val_loss = float('inf') epochs_no_improve = 0 best_epoch = 0 best_regressor_state = None best_feature_extractor_state = None print(f"开始训练深度学习模型 (PyTorch, 分离模式, 基础CNN: {self.active_base_cnn_name}),共 {epochs} 轮次...") print( f"学习率: {lr}, 批量大小: {self.last_trained_params['batch_size']}, Dropout: {self.last_trained_params['dropout_rate']}, Weight Decay: {self.last_trained_params['weight_decay']}") print( f"优化器: {self.last_trained_params['optimizer']}, 损失函数: {self.last_trained_params['loss_function']}, 调度器: {self.last_trained_params['lr_scheduler']}, 早停耐心: {self.last_trained_params['early_stopping_patience']}") print( f"原始分数范围: [{min_label:.2f}, {max_label:.2f}], 数据增强: {self.last_trained_params['enable_augmentation']}") for epoch in range(epochs): # --- 训练阶段 --- model.train() self.feature_extractor.eval() running_train_loss = 0.0 for batch_idx, (images, labels) in enumerate(self.train_dataloader): images = images.to(self.device) labels = labels.unsqueeze(1).to(self.device) with torch.no_grad(): features = self.feature_extractor(images) optimizer.zero_grad() outputs = model(features) loss = criterion(outputs, labels) loss.backward() optimizer.step() running_train_loss += loss.item() avg_train_loss = running_train_loss / len(self.train_dataloader) self.train_loss_history.append(avg_train_loss) # --- 验证阶段 --- if len(self.val_dataloader.dataset) > 0 and len(self.val_dataloader) > 0: model.eval() val_losses = [] val_predictions_normalized = [] val_true_labels_normalized = [] with torch.no_grad(): for images, labels in self.val_dataloader: images = images.to(self.device) labels = labels.unsqueeze(1).to(self.device) features = self.feature_extractor(images) outputs = model(features) loss = criterion(outputs, labels) val_losses.append(loss.item()) val_predictions_normalized.extend(outputs.cpu().numpy().flatten()) val_true_labels_normalized.extend(labels.cpu().numpy().flatten()) avg_val_loss = np.mean(val_losses) self.val_loss_history.append(avg_val_loss) val_mse, val_mae, val_r2 = calculate_metrics( val_true_labels_normalized, val_predictions_normalized, min_label, max_label ) self.val_mse_history.append(val_mse) self.val_mae_history.append(val_mae) self.val_r2_history.append(val_r2) # R2仍会计算并记录 print( f"Epoch {epoch + 1}/{epochs}: Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}, Val MSE: {val_mse:.2f}, Val MAE: {val_mae:.2f}, Val R2: {val_r2:.2f}") if lr_scheduler: if isinstance(lr_scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau): lr_scheduler.step(avg_val_loss) else: lr_scheduler.step() if avg_val_loss < best_val_loss: best_val_loss = avg_val_loss epochs_no_improve = 0 best_epoch = epoch + 1 best_regressor_state = model.state_dict() best_feature_extractor_state = self.feature_extractor.state_dict() else: epochs_no_improve += 1 if epochs_no_improve >= self.last_trained_params['early_stopping_patience']: print( f"早停触发!验证损失在 {self.last_trained_params['early_stopping_patience']} 个Epochs内没有改善。") break else: print( f"Epoch {epoch + 1}/{epochs}: Train Loss: {avg_train_loss:.4f} (无验证集评估或验证dataloader为空)") if best_regressor_state and best_feature_extractor_state: model.load_state_dict(best_regressor_state) self.feature_extractor.load_state_dict(best_feature_extractor_state) print(f"已加载第 {best_epoch} 轮次的最佳模型。") else: print("没有找到更好的验证损失模型(无验证集或未改善)。使用最后一次训练的模型状态。") # 绘制第一个图:损失 ax_loss.clear() ax_loss.plot(self.train_loss_history, label="训练损失", color='blue') if len(self.val_loss_history) > 0: ax_loss.plot(self.val_loss_history, label="验证损失", color='orange') ax_loss.legend() ax_loss.set_xlabel("Epoch") ax_loss.set_ylabel("损失") if len(self.val_loss_history) > 0: ax_loss.set_title(f"深度学习模型训练完成 (分离模式, 基础CNN: {self.active_base_cnn_name})\n" f"最佳验证损失: {best_val_loss:.4f} (Epoch {best_epoch})") else: ax_loss.set_title(f"深度学习模型训练完成 (分离模式, 基础CNN: {self.active_base_cnn_name})\n" f"最终训练损失: {self.train_loss_history[-1]:.4f} (无验证集)") fig_loss.tight_layout() # 确保对正确的图表进行布局 # 绘制第二个图:MSE和MAE ax_metrics.clear() if len(self.val_mse_history) > 0: ax_metrics.plot(self.val_mse_history, label="验证MSE", color='green', linestyle='--') ax_metrics.plot(self.val_mae_history, label="验证MAE", color='red', linestyle=':') ax_metrics.legend() ax_metrics.set_xlabel("Epoch") ax_metrics.set_ylabel("误差指标") ax_metrics.set_title(f"验证MSE与MAE (深度学习模型)") fig_metrics.tight_layout() # 确保对正确的图表进行布局 self._save_model_artifacts("深度学习") return fig_loss, fig_metrics elif self.current_model_type == "端到端深度学习": if self.full_cnn_regressor is None: ax_loss.text(0.5, 0.5, "端到端深度学习模型未正确初始化。请重试。", horizontalalignment='center', verticalalignment='center', transform=ax_loss.transAxes, fontsize=12, color='red') ax_metrics.text(0.5, 0.5, "端到端深度学习模型未初始化。", horizontalalignment='center', verticalalignment='center', transform=ax_metrics.transAxes, fontsize=12, color='red') fig_loss.tight_layout() fig_metrics.tight_layout() return fig_loss, fig_metrics model = self.full_cnn_regressor optimizer = self._get_optimizer(model.parameters(), self.last_trained_params["optimizer"], lr, self.last_trained_params["weight_decay"]) criterion = self._get_loss_function(self.last_trained_params["loss_function"]) lr_scheduler = self._get_lr_scheduler(optimizer, self.last_trained_params["lr_scheduler"], self.last_trained_params["scheduler_patience"], self.last_trained_params["scheduler_factor"], self.last_trained_params["scheduler_t_max"]) best_val_loss = float('inf') epochs_no_improve = 0 best_epoch = 0 best_model_state = None print(f"开始训练端到端深度学习模型 (基础CNN: {self.active_base_cnn_name}),共 {epochs} 轮次...") print( f"学习率: {lr}, 批量大小: {self.last_trained_params['batch_size']}, Dropout: {self.last_trained_params['dropout_rate']}, Weight Decay: {self.last_trained_params['weight_decay']}") print( f"优化器: {self.last_trained_params['optimizer']}, 损失函数: {self.last_trained_params['loss_function']}, 调度器: {self.last_trained_params['lr_scheduler']}, 早停耐心: {self.last_trained_params['early_stopping_patience']}") print( f"原始分数范围: [{min_label:.2f}, {max_label:.2f}], 数据增强: {self.last_trained_params['enable_augmentation']}") for epoch in range(epochs): # --- 训练阶段 --- model.train() running_train_loss = 0.0 for batch_idx, (images, labels) in enumerate(self.train_dataloader): images = images.to(self.device) labels = labels.unsqueeze(1).to(self.device) optimizer.zero_grad() outputs = model(images) loss = criterion(outputs, labels) loss.backward() optimizer.step() running_train_loss += loss.item() avg_train_loss = running_train_loss / len(self.train_dataloader) self.train_loss_history.append(avg_train_loss) # --- 验证阶段 --- if len(self.val_dataloader.dataset) > 0 and len(self.val_dataloader) > 0: model.eval() val_losses = [] val_predictions_normalized = [] val_true_labels_normalized = [] with torch.no_grad(): for images, labels in self.val_dataloader: images = images.to(self.device) labels = labels.unsqueeze(1).to(self.device) outputs = model(images) loss = criterion(outputs, labels) val_losses.append(loss.item()) val_predictions_normalized.extend(outputs.cpu().numpy().flatten()) val_true_labels_normalized.extend(labels.cpu().numpy().flatten()) avg_val_loss = np.mean(val_losses) self.val_loss_history.append(avg_val_loss) val_mse, val_mae, val_r2 = calculate_metrics( val_true_labels_normalized, val_predictions_normalized, min_label, max_label ) self.val_mse_history.append(val_mse) self.val_mae_history.append(val_mae) self.val_r2_history.append(val_r2) print( f"Epoch {epoch + 1}/{epochs}: Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}, Val MSE: {val_mse:.2f}, Val MAE: {val_mae:.2f}, Val R2: {val_r2:.2f}") if lr_scheduler: if isinstance(lr_scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau): lr_scheduler.step(avg_val_loss) else: lr_scheduler.step() if avg_val_loss < best_val_loss: best_val_loss = avg_val_loss epochs_no_improve = 0 best_epoch = epoch + 1 best_model_state = model.state_dict() else: epochs_no_improve += 1 if epochs_no_improve >= self.last_trained_params['early_stopping_patience']: print( f"早停触发!验证损失在 {self.last_trained_params['early_stopping_patience']} 个Epochs内没有改善。") break else: print( f"Epoch {epoch + 1}/{epochs}: Train Loss: {avg_train_loss:.4f} (无验证集评估或验证dataloader为空)") if best_model_state: model.load_state_dict(best_model_state) print(f"已加载第 {best_epoch} 轮次的最佳模型。") else: print("没有找到更好的验证损失模型(无验证集或未改善)。使用最后一次训练的模型状态。") # 绘制第一个图:损失 ax_loss.clear() ax_loss.plot(self.train_loss_history, label="训练损失", color='blue') if len(self.val_loss_history) > 0: ax_loss.plot(self.val_loss_history, label="验证损失", color='orange') ax_loss.legend() ax_loss.set_xlabel("Epoch") ax_loss.set_ylabel("损失") if len(self.val_loss_history) > 0: ax_loss.set_title(f"端到端深度学习模型训练完成 (基础CNN: {self.active_base_cnn_name})\n" f"最佳验证损失: {best_val_loss:.4f} (Epoch {best_epoch})") else: ax_loss.set_title(f"端到端深度学习模型训练完成 (基础CNN: {self.active_base_cnn_name})\n" f"最终训练损失: {self.train_loss_history[-1]:.4f} (无验证集)") fig_loss.tight_layout() # 绘制第二个图:MSE和MAE ax_metrics.clear() if len(self.val_mse_history) > 0: ax_metrics.plot(self.val_mse_history, label="验证MSE", color='green', linestyle='--') ax_metrics.plot(self.val_mae_history, label="验证MAE", color='red', linestyle=':') ax_metrics.legend() ax_metrics.set_xlabel("Epoch") ax_metrics.set_ylabel("误差指标") ax_metrics.set_title(f"验证MSE与MAE (端到端深度学习模型)") fig_metrics.tight_layout() self._save_model_artifacts("端到端深度学习") return fig_loss, fig_metrics elif self.current_model_type in ["随机森林", "支持向量回归", "梯度提升回归", "堆叠回归", "K近邻", "线性回归"]: if self.sklearn_regressor is None or self.sklearn_feature_pipeline is None or self.feature_extractor is None: ax_loss.text(0.5, 0.5, "Sklearn模型或其特征提取器未正确初始化。请重试。", horizontalalignment='center', verticalalignment='center', transform=ax_loss.transAxes, fontsize=12, color='red') ax_metrics.text(0.5, 0.5, "Sklearn模型未初始化。", horizontalalignment='center', verticalalignment='center', transform=ax_metrics.transAxes, fontsize=12, color='red') fig_loss.tight_layout() fig_metrics.tight_layout() return fig_loss, fig_metrics print(f"正在提取所有图片的特征用于Sklearn模型训练 (基础CNN: {self.active_base_cnn_name})...") print(f"PCA保留方差比例: {self.last_trained_params['pca_variance_ratio']}") print( f"原始分数范围: [{min_label:.2f}, {max_label:.2f}], 数据增强: {self.last_trained_params['enable_augmentation']}") self.sklearn_regressor, self.sklearn_feature_pipeline = \ get_sklearn_model_pipeline(self.current_model_type, self.last_trained_params["pca_variance_ratio"]) X_train_features = [] y_train_labels_normalized = [] X_val_features = [] y_val_labels_normalized = [] self.feature_extractor.eval() with torch.no_grad(): for images, labels in self.train_dataloader: images = images.to(self.device) features = self.feature_extractor(images).cpu().numpy() X_train_features.extend(features) y_train_labels_normalized.extend(labels.cpu().numpy()) if len(self.val_dataloader.dataset) > 0 and len(self.val_dataloader) > 0: for images, labels in self.val_dataloader: images = images.to(self.device) features = self.feature_extractor(images).cpu().numpy() X_val_features.extend(features) y_val_labels_normalized.extend(labels.cpu().numpy()) X_train = np.array(X_train_features) y_train_normalized = np.array(y_train_labels_normalized) X_val = np.array(X_val_features) y_val_normalized = np.array(y_val_labels_normalized) if X_train.shape[0] == 0: ax_loss.text(0.5, 0.5, "无训练特征数据,请检查图片加载。", horizontalalignment='center', verticalalignment='center', transform=ax_loss.transAxes, fontsize=12, color='red') ax_metrics.text(0.5, 0.5, "无训练特征数据。", horizontalalignment='center', verticalalignment='center', transform=ax_metrics.transAxes, fontsize=12, color='red') fig_loss.tight_layout() fig_metrics.tight_layout() return fig_loss, fig_metrics print("正在对提取的特征进行预处理 (标准化, PCA)...") X_train_processed = self.sklearn_feature_pipeline.fit_transform(X_train) print(f"正在训练Sklearn {self.current_model_type} 模型...") self.sklearn_regressor.fit(X_train_processed, y_train_normalized) print(f"Sklearn {self.current_model_type} 模型训练完成。") train_predictions_normalized = self.sklearn_regressor.predict(X_train_processed) train_mse, train_mae, train_r2 = calculate_metrics( y_train_normalized, train_predictions_normalized, min_label, max_label ) # Sklearn模型没有Epochs概念,只显示最终状态 ax_loss.clear() ax_metrics.clear() if len(X_val) > 0: X_val_processed = self.sklearn_feature_pipeline.transform(X_val) val_predictions_normalized = self.sklearn_regressor.predict(X_val_processed) val_mse, val_mae, val_r2 = calculate_metrics( y_val_normalized, val_predictions_normalized, min_label, max_label ) print(f"训练完成: Train MSE: {train_mse:.2f}, Train MAE: {train_mae:.2f}, Train R2: {train_r2:.2f}") print(f" Val MSE: {val_mse:.2f}, Val MAE: {val_mae:.2f}, Val R2: {val_r2:.2f}") # Sklearn模型的损失图可以显示一个文本摘要 ax_loss.text(0.5, 0.5, f"Sklearn {self.current_model_type} 训练完成\n" f"训练集损失 (MSE/MAE): {train_mse:.2f}/{train_mae:.2f}\n" # 显示损失 f"验证集损失 (MSE/MAE): {val_mse:.2f}/{val_mae:.2f}\n" # 显示损失 f"训练集 R2: {train_r2:.2f}, 验证集 R2: {val_r2:.2f}", # 显示 R2 horizontalalignment='center', verticalalignment='center', transform=ax_loss.transAxes, fontsize=10, color='green') ax_loss.axis('off') # 隐藏坐标轴,因为没有连续曲线 # Sklearn的第二个图仍然是MSE/MAE ax_metrics.text(0.5, 0.5, f"Sklearn {self.current_model_type} 训练完成\n" f"训练集 MSE: {train_mse:.2f}\n" f"验证集 MSE: {val_mse:.2f}\n" f"训练集 MAE: {train_mae:.2f}\n" f"验证集 MAE: {val_mae:.2f}", horizontalalignment='center', verticalalignment='center', transform=ax_metrics.transAxes, fontsize=10, color='green') ax_metrics.axis('off') else: print( f"训练完成: Train MSE: {train_mse:.2f}, Train MAE: {train_mae:.2f}, Train R2: {train_r2:.2f} (无验证集评估)") ax_loss.text(0.5, 0.5, f"Sklearn {self.current_model_type} 训练完成\n" f"训练集损失 (MSE/MAE): {train_mse:.2f}/{train_mae:.2f}\n" f"训练集 R2: {train_r2:.2f} (无验证集)", horizontalalignment='center', verticalalignment='center', transform=ax_loss.transAxes, fontsize=10, color='green') ax_loss.axis('off') ax_metrics.text(0.5, 0.5, f"Sklearn {self.current_model_type} 训练完成\n" f"训练集 MSE: {train_mse:.2f}\n" f"训练集 MAE: {train_mae:.2f} (无验证集)", horizontalalignment='center', verticalalignment='center', transform=ax_metrics.transAxes, fontsize=10, color='green') ax_metrics.axis('off') ax_loss.set_title(f"Sklearn模型训练状态 ({self.current_model_type})") # 更具体标题 ax_metrics.set_title(f"Sklearn模型训练状态 ({self.current_model_type}) - 误差指标") # 更具体标题 fig_loss.tight_layout() fig_metrics.tight_layout() self._save_model_artifacts(self.current_model_type) return fig_loss, fig_metrics else: ax_loss.text(0.5, 0.5, "未选择有效的模型类型进行训练。", horizontalalignment='center', verticalalignment='center', transform=ax_loss.transAxes, fontsize=12, color='red') ax_metrics.text(0.5, 0.5, "未选择有效的模型类型。", horizontalalignment='center', verticalalignment='center', transform=ax_metrics.transAxes, fontsize=12, color='red') fig_loss.tight_layout() fig_metrics.tight_layout() return fig_loss, fig_metrics # <-- 重新添加的 predict_score 方法! def predict_score(self, image_path, model_type_str, base_cnn_name_for_predict): if not self._load_model_artifacts(model_type_str, base_cnn_name_for_predict): return "模型未训练或未加载!请先训练对应模型。" current_image_size = get_image_size_by_model_name(base_cnn_name_for_predict) # 预测时不进行数据增强 transform = get_transforms(train=False, image_size=current_image_size, enable_augmentation=False) try: image = Image.open(image_path).convert("RGB") image_tensor = transform(image).unsqueeze(0).to(self.device) except Exception as e: return f"图片加载或预处理失败: {e}" output_score_normalized = 0 # 0-1 范围的预测值 if model_type_str == "深度学习": self.pytorch_regressor.eval() self.feature_extractor.eval() with torch.no_grad(): features = self.feature_extractor(image_tensor) output_score_normalized = self.pytorch_regressor(features).item() elif model_type_str == "端到端深度学习": self.full_cnn_regressor.eval() with torch.no_grad(): output_score_normalized = self.full_cnn_regressor(image_tensor).item() else: # Sklearn模型 self.feature_extractor.eval() with torch.no_grad(): features = self.feature_extractor(image_tensor).cpu().numpy() processed_features = self.sklearn_feature_pipeline.transform(features) output_score_normalized = self.sklearn_regressor.predict(processed_features)[0] # 确保预测值在0-1范围内(如果模型没有Sigmoid,可能会超出) output_score_normalized = max(0.0, min(1.0, output_score_normalized)) # 将归一化的预测值反归一化到原始分数范围 predicted_original_score = output_score_normalized * ( self.dataset_max_label - self.dataset_min_label) + self.dataset_min_label # 最终钳制到0-100(因为原始分数可能不是0-100,但显示时通常希望在0-100) predicted_original_score = max(0, min(100, predicted_original_score)) return f"预测分数: {predicted_original_score:.2f} (百分制)" def _save_model_artifacts(self, model_type_str): internal_name = self._get_internal_model_name(model_type_str) meta_data_path = f"{MODEL_SAVE_BASE_PATH}_{internal_name}_meta.json" if internal_name in ["pytorch_detached", "pytorch_full_cnn"]: meta_data_path = f"{MODEL_SAVE_BASE_PATH}_{self.active_base_cnn_name}_{internal_name}_meta.json" try: with open(meta_data_path, 'w') as f: json.dump(self.last_trained_params, f, indent=4) print(f"模型元数据已保存到: {meta_data_path}") except Exception as e: print(f"保存模型元数据失败: {e}") if internal_name == "pytorch_detached": feat_extractor_path = f"{MODEL_SAVE_BASE_PATH}_{self.active_base_cnn_name}_features.pth" regressor_path = f"{MODEL_SAVE_BASE_PATH}_{self.active_base_cnn_name}_pytorch_detached_regressor.pth" torch.save(self.feature_extractor.state_dict(), feat_extractor_path) torch.save(self.pytorch_regressor.state_dict(), regressor_path) print(f"PyTorch模型组件 (分离模式) 已保存。") elif internal_name == "pytorch_full_cnn": full_cnn_path = f"{MODEL_SAVE_BASE_PATH}_{self.active_base_cnn_name}_full_cnn.pth" torch.save(self.full_cnn_regressor.state_dict(), full_cnn_path) print(f"端到端深度学习模型 已保存。") else: # Sklearn 模型 regressor_path = f"{MODEL_SAVE_BASE_PATH}_{internal_name}_regressor.pkl" pipeline_path = f"{MODEL_SAVE_BASE_PATH}_{internal_name}_feature_pipeline.pkl" joblib.dump(self.sklearn_regressor, regressor_path) joblib.dump(self.sklearn_feature_pipeline, pipeline_path) print(f"Sklearn {model_type_str} 模型和特征管道已保存。") # <-- 重新添加的 _load_model_artifacts 方法! def _load_model_artifacts(self, model_type_str, base_cnn_name_to_load): internal_name = self._get_internal_model_name(model_type_str) loaded_params = None # 尝试加载对应模型类型的元数据 meta_data_path = f"{MODEL_SAVE_BASE_PATH}_{internal_name}_meta.json" if internal_name in ["pytorch_detached", "pytorch_full_cnn"]: meta_data_path = f"{MODEL_SAVE_BASE_PATH}_{base_cnn_name_to_load}_{internal_name}_meta.json" try: with open(meta_data_path, 'r') as f: loaded_params = json.load(f) self.active_base_cnn_name = loaded_params.get("base_cnn_name", base_cnn_name_to_load) # 加载 min_label 和 max_label self.dataset_min_label = loaded_params.get("min_label", 0.0) self.dataset_max_label = loaded_params.get("max_label", 100.0) print(f"加载模型参数: {loaded_params}") except FileNotFoundError: print(f"警告: 模型元数据文件 {meta_data_path} 未找到。使用默认参数进行加载。") loaded_params = { "base_cnn_name": base_cnn_name_to_load, "dropout_rate": DEFAULT_DROPOUT_RATE, "weight_decay": DEFAULT_WEIGHT_DECAY, "pca_variance_ratio": DEFAULT_PCA_VARIANCE_RATIO, "min_label": 0.0, # 默认值 "max_label": 100.0 # 默认值 } self.active_base_cnn_name = base_cnn_name_to_load self.dataset_min_label = 0.0 self.dataset_max_label = 100.0 # 使用加载或默认的参数来实例化模型 if internal_name == "pytorch_detached": try: self.feature_extractor = FeatureExtractor(model_name=self.active_base_cnn_name).to(self.device) feat_extractor_path = f"{MODEL_SAVE_BASE_PATH}_{self.active_base_cnn_name}_features.pth" self.feature_extractor.load_state_dict( torch.load(feat_extractor_path, map_location=self.device)) self.feature_extractor.eval() feature_dim = self.feature_extractor.get_output_dim() self.pytorch_regressor = PytorchRegressor( in_features=feature_dim, dropout_rate=loaded_params.get("dropout_rate", DEFAULT_DROPOUT_RATE) ).to(self.device) regressor_path = f"{MODEL_SAVE_BASE_PATH}_{self.active_base_cnn_name}_pytorch_detached_regressor.pth" self.pytorch_regressor.load_state_dict( torch.load(regressor_path, map_location=self.device)) self.pytorch_regressor.eval() print(f"PyTorch模型组件 (分离模式, 基础CNN: {self.active_base_cnn_name}) 已加载。") return True except FileNotFoundError as e: print(f"PyTorch模型文件 (分离模式, 基础CNN: {self.active_base_cnn_name}) 未找到: {e}") self.pytorch_regressor = None self.feature_extractor = None return False elif internal_name == "pytorch_full_cnn": try: self.full_cnn_regressor = FullCNNRegressor( model_name=self.active_base_cnn_name, dropout_rate=loaded_params.get("dropout_rate", DEFAULT_DROPOUT_RATE) ).to(self.device) full_cnn_path = f"{MODEL_SAVE_BASE_PATH}_{self.active_base_cnn_name}_full_cnn.pth" self.full_cnn_regressor.load_state_dict( torch.load(full_cnn_path, map_location=self.device)) self.full_cnn_regressor.eval() print(f"端到端深度学习模型 ({self.active_base_cnn_name}) 已加载。") return True except FileNotFoundError as e: print(f"端到端深度学习模型文件 ({self.active_base_cnn_name}) 未找到: {e}") self.full_cnn_regressor = None return False else: # Sklearn 模型 try: self.feature_extractor = FeatureExtractor(model_name=self.active_base_cnn_name).to(self.device) self.feature_extractor.eval() regressor_path = f"{MODEL_SAVE_BASE_PATH}_{internal_name}_regressor.pkl" pipeline_path = f"{MODEL_SAVE_BASE_PATH}_{internal_name}_feature_pipeline.pkl" # 重新实例化 Sklearn 模型和管道,以便使用加载的PCA参数 self.sklearn_regressor, self.sklearn_feature_pipeline = \ get_sklearn_model_pipeline( model_type_str, pca_variance_ratio=loaded_params.get("pca_variance_ratio", DEFAULT_PCA_VARIANCE_RATIO) ) self.sklearn_regressor = joblib.load(regressor_path) self.sklearn_feature_pipeline = joblib.load(pipeline_path) print(f"Sklearn {model_type_str} 模型和特征管道 (基础CNN: {self.active_base_cnn_name}) 已加载。") return True except FileNotFoundError as e: print(f"Sklearn模型文件 {regressor_path} 或 {pipeline_path} 未找到: {e}") self.sklearn_regressor = None self.sklearn_feature_pipeline = None self.feature_extractor = None return False return False