resNet0 / train_predict.py
suzakudry's picture
Upload 6 files
d426cb8 verified
# train_predict.py
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, random_split
import numpy as np
import matplotlib.pyplot as plt
import os
import joblib
from pathlib import Path
from PIL import Image
import json
# 从config.py导入配置
from config import DATA_DIR, SCORE_FILE_NAME, MODEL_SAVE_BASE_PATH, \
DEFAULT_BATCH_SIZE, DEFAULT_EPOCHS, DEFAULT_LR, \
DEFAULT_DROPOUT_RATE, DEFAULT_WEIGHT_DECAY, DEFAULT_PCA_VARIANCE_RATIO, \
DEFAULT_OPTIMIZER, DEFAULT_LR_SCHEDULER, DEFAULT_SCHEDULER_PATIENCE, \
DEFAULT_SCHEDULER_FACTOR, DEFAULT_SCHEDULER_T_MAX, DEFAULT_LOSS_FUNCTION, \
DEFAULT_EARLY_STOPPING_PATIENCE, VALIDATION_SPLIT_RATIO, DEFAULT_DATA_AUGMENTATION
# 从其他模块导入
from utils import ScoreDataset, get_transforms, get_image_size_by_model_name, calculate_metrics
from feature_extractor import FeatureExtractor
from regressors import PytorchRegressor, get_sklearn_model_pipeline, FullCNNRegressor
# --- 配置 Matplotlib 支持中文 ---
plt.rcParams['font.sans-serif'] = ['Microsoft YaHei', 'SimHei', 'Arial Unicode MS', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
# ------------------------------------
class TrainingAndPredictionEngine:
"""
负责管理整个训练和预测流程的引擎。
包含数据准备、模型切换、训练循环和预测功能。
"""
def __init__(self):
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"PyTorch Version: {torch.__version__}")
print(f"CUDA Available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
print(f"CUDA Device Count: {torch.cuda.device_count()}")
print(f"Current CUDA Device Name: {torch.cuda.get_device_name(0)}")
self.feature_extractor = None
self.pytorch_regressor = None
self.full_cnn_regressor = None
self.sklearn_regressor = None
self.sklearn_feature_pipeline = None
self.current_model_type = None
self.active_base_cnn_name = None
self.train_loss_history = []
self.val_loss_history = []
self.val_mse_history = []
self.val_mae_history = []
self.val_r2_history = []
self.train_dataloader = None
self.val_dataloader = None
# 新增:保存数据集的 min_label 和 max_label,用于后续的反归一化
self.dataset_min_label = 0.0
self.dataset_max_label = 100.0
# 保存训练时使用的超参数,以便保存和加载模型时使用
self.last_trained_params = {
"model_type": None,
"base_cnn_name": None,
"dropout_rate": DEFAULT_DROPOUT_RATE,
"weight_decay": DEFAULT_WEIGHT_DECAY,
"pca_variance_ratio": DEFAULT_PCA_VARIANCE_RATIO,
"optimizer": DEFAULT_OPTIMIZER,
"lr_scheduler": DEFAULT_LR_SCHEDULER,
"scheduler_patience": DEFAULT_SCHEDULER_PATIENCE,
"scheduler_factor": DEFAULT_SCHEDULER_FACTOR,
"scheduler_t_max": DEFAULT_SCHEDULER_T_MAX,
"loss_function": DEFAULT_LOSS_FUNCTION,
"early_stopping_patience": DEFAULT_EARLY_STOPPING_PATIENCE,
"validation_split_ratio": VALIDATION_SPLIT_RATIO,
"batch_size": DEFAULT_BATCH_SIZE,
"min_label": self.dataset_min_label, # 初始值
"max_label": self.dataset_max_label, # 初始值
"enable_augmentation": DEFAULT_DATA_AUGMENTATION
}
Path(os.path.dirname(MODEL_SAVE_BASE_PATH)).mkdir(exist_ok=True, parents=True)
def _get_internal_model_name(self, ui_model_name):
mapping = {
"深度学习": "pytorch_detached",
"端到端深度学习": "pytorch_full_cnn",
"随机森林": "random_forest",
"支持向量回归": "svr",
"梯度提升回归": "gradient_boosting",
"堆叠回归": "stacking",
"K近邻": "knn", # <-- 新增
"线性回归": "linear_regression" # <-- 新增
}
return mapping.get(ui_model_name, "unknown_model")
def _get_optimizer(self, model_params, optimizer_name, lr, weight_decay):
if optimizer_name == "Adam":
return torch.optim.Adam(model_params, lr=lr, weight_decay=weight_decay)
elif optimizer_name == "AdamW":
return torch.optim.AdamW(model_params, lr=lr, weight_decay=weight_decay)
elif optimizer_name == "SGD":
# 对于SGD,通常需要动量,这里可以增加一个默认值或UI参数
return torch.optim.SGD(model_params, lr=lr, momentum=0.9, weight_decay=weight_decay)
else:
raise ValueError(f"不支持的优化器: {optimizer_name}")
def _get_lr_scheduler(self, optimizer, scheduler_name, patience, factor, t_max):
if scheduler_name == "None":
return None
elif scheduler_name == "ReduceLROnPlateau":
return torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=patience, factor=factor,
verbose=True)
elif scheduler_name == "CosineAnnealingLR":
return torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=t_max)
else:
raise ValueError(f"不支持的学习率调度器: {scheduler_name}")
def _get_loss_function(self, loss_name):
if loss_name == "MSELoss":
return nn.MSELoss()
elif loss_name == "L1Loss":
return nn.L1Loss()
elif loss_name == "SmoothL1Loss":
return nn.SmoothL1Loss()
else:
raise ValueError(f"不支持的损失函数: {loss_name}")
def switch_model_type(self, model_type_str, base_cnn_name="resnet50",
dropout_rate=DEFAULT_DROPOUT_RATE,
weight_decay=DEFAULT_WEIGHT_DECAY,
pca_variance_ratio=DEFAULT_PCA_VARIANCE_RATIO,
optimizer_name=DEFAULT_OPTIMIZER,
lr_scheduler_name=DEFAULT_LR_SCHEDULER,
scheduler_patience=DEFAULT_SCHEDULER_PATIENCE,
scheduler_factor=DEFAULT_SCHEDULER_FACTOR,
scheduler_t_max=DEFAULT_SCHEDULER_T_MAX,
loss_function_name=DEFAULT_LOSS_FUNCTION,
early_stopping_patience=DEFAULT_EARLY_STOPPING_PATIENCE,
batch_size=DEFAULT_BATCH_SIZE,
enable_augmentation=DEFAULT_DATA_AUGMENTATION
):
self.current_model_type = model_type_str
self.active_base_cnn_name = base_cnn_name
# 批量大小和验证集比例在 prepare_data_for_training 中会用到,需要先设置
self.last_trained_params['batch_size'] = batch_size
self.last_trained_params['validation_split_ratio'] = VALIDATION_SPLIT_RATIO # 确保这里是固定的值
# 其他参数保存到实例变量,供训练和保存时使用
self.last_trained_params.update({
"model_type": model_type_str,
"base_cnn_name": base_cnn_name,
"dropout_rate": dropout_rate,
"weight_decay": weight_decay,
"pca_variance_ratio": pca_variance_ratio,
"optimizer": optimizer_name,
"lr_scheduler": lr_scheduler_name,
"scheduler_patience": scheduler_patience,
"scheduler_factor": scheduler_factor,
"scheduler_t_max": scheduler_t_max,
"loss_function": loss_function_name,
"early_stopping_patience": early_stopping_patience,
# min_label和max_label会在prepare_data_for_training中更新并保存
"enable_augmentation": enable_augmentation
})
print(f"已切换到 {self.current_model_type} 模型模式, 基础CNN: {self.active_base_cnn_name}.")
print(
f"参数: BatchSize={batch_size}, Dropout={dropout_rate}, WeightDecay={weight_decay}, PCA={pca_variance_ratio},")
print(
f" Optimizer={optimizer_name}, Scheduler={lr_scheduler_name}, Loss={loss_function_name}, EarlyStopping={early_stopping_patience},")
print(f" Data Augmentation: {enable_augmentation}")
if self.current_model_type == "深度学习":
self.feature_extractor = FeatureExtractor(model_name=self.active_base_cnn_name).to(self.device)
self.feature_extractor.eval()
feature_dim = self.feature_extractor.get_output_dim()
self.pytorch_regressor = PytorchRegressor(in_features=feature_dim, dropout_rate=dropout_rate).to(
self.device)
self.full_cnn_regressor = None
self.sklearn_regressor = None
self.sklearn_feature_pipeline = None
elif self.current_model_type == "端到端深度学习":
self.full_cnn_regressor = FullCNNRegressor(model_name=self.active_base_cnn_name,
dropout_rate=dropout_rate).to(self.device)
self.feature_extractor = None
self.pytorch_regressor = None
self.sklearn_regressor = None
self.sklearn_feature_pipeline = None
else: # Sklearn模型
self.feature_extractor = FeatureExtractor(model_name=self.active_base_cnn_name).to(self.device)
self.feature_extractor.eval()
self.sklearn_regressor, self.sklearn_feature_pipeline = \
get_sklearn_model_pipeline(self.current_model_type, pca_variance_ratio=pca_variance_ratio)
self.pytorch_regressor = None
self.full_cnn_regressor = None
return f"已切换到 {self.current_model_type} 模型模式, 基础CNN: {self.active_base_cnn_name}. 参数已设置。"
def prepare_data_for_training(self):
image_paths = []
scores = []
score_file_path = Path(DATA_DIR) / SCORE_FILE_NAME
if not score_file_path.exists():
return False, f"错误: 训练数据文件 {score_file_path} 不存在。请先在‘原始数据导入’或‘训练数据管理’标签页保存数据。"
try:
with open(score_file_path, 'r') as f:
for line in f:
filename, score_str = line.strip().split(',')
full_image_path = Path(DATA_DIR) / filename
if full_image_path.exists():
image_paths.append(str(full_image_path))
scores.append(float(score_str))
else:
print(f"警告: 图像文件 {full_image_path} 不存在,已跳过。")
except Exception as e:
return False, f"错误: 读取分数文件 {score_file_path} 为空或失败: {e}"
if not image_paths:
return False, "没有找到有效的图片数据用于训练。请检查 'data' 文件夹。"
current_image_size = get_image_size_by_model_name(self.active_base_cnn_name)
temp_dataset = ScoreDataset(image_paths, scores, transform=None)
self.dataset_min_label = temp_dataset.min_label
self.dataset_max_label = temp_dataset.max_label
self.last_trained_params['min_label'] = self.dataset_min_label
self.last_trained_params['max_label'] = self.dataset_max_label
full_dataset = ScoreDataset(image_paths, scores,
transform=get_transforms(train=True, image_size=current_image_size,
enable_augmentation=self.last_trained_params[
'enable_augmentation']))
num_total = len(full_dataset)
num_val = int(self.last_trained_params['validation_split_ratio'] * num_total)
num_train = num_total - num_val
if num_train < 1:
return False, f"错误: 训练集样本数量不足1。总数据量: {num_total}, 训练集: {num_train},请增加数据量或调整验证集比例。"
if num_total <= 1:
num_train = num_total
num_val = 0
print(f"警告: 总样本数过少({num_total}),不进行验证集划分。")
elif num_val < 1:
print(f"警告: 验证集样本数量不足1 ({num_val})。总数据量: {num_total}。验证集可能无法进行评估。")
try:
train_dataset, val_dataset = random_split(full_dataset, [num_train, num_val],
generator=torch.Generator().manual_seed(42))
except ValueError as e:
return False, f"数据划分失败: {e}。请检查数据量({num_total})和划分比例({self.last_trained_params['validation_split_ratio']})。"
self.train_dataloader = DataLoader(
train_dataset,
batch_size=self.last_trained_params['batch_size'],
shuffle=True,
num_workers=os.cpu_count() // 2 or 1,
drop_last=True
)
if len(self.train_dataloader) == 0:
return False, f"错误: 训练数据加载器为空。训练集样本数量: {len(train_dataset)}, 批量大小: {self.last_trained_params['batch_size']}。请减小批量大小或增加训练集样本。"
self.val_dataloader = DataLoader(
val_dataset,
batch_size=self.last_trained_params['batch_size'],
shuffle=False,
num_workers=os.cpu_count() // 2 or 1,
drop_last=False
)
if len(val_dataset) > 0 and len(self.val_dataloader) == 0:
print(
f"警告: 验证数据加载器为空。验证集样本数量: {len(val_dataset)}, 批量大小: {self.last_trained_params['batch_size']}。验证集可能无法进行评估。")
return True, f"数据准备完成。训练集: {len(train_dataset)} 张图片, 验证集: {len(val_dataset)} 张图片。"
def train_model(self, epochs=DEFAULT_EPOCHS, lr=DEFAULT_LR):
self.train_loss_history = []
self.val_loss_history = []
self.val_mse_history = []
self.val_mae_history = []
self.val_r2_history = []
# 创建第一个子图:只用于训练损失和验证损失
fig_loss, ax_loss = plt.subplots(figsize=(10, 6)) # 命名为 fig_loss, ax_loss
ax_loss.set_xlabel("Epoch")
ax_loss.set_ylabel("损失")
ax_loss.set_title("训练与验证损失")
# 创建第二个子图:用于验证MSE和MAE
fig_metrics, ax_metrics = plt.subplots(figsize=(10, 6)) # 命名为 fig_metrics, ax_metrics
ax_metrics.set_xlabel("Epoch")
ax_metrics.set_ylabel("误差指标")
ax_metrics.set_title("验证MSE与MAE")
if self.train_dataloader is None or len(self.train_dataloader) == 0:
ax_loss.text(0.5, 0.5, "训练数据加载器为空。请先正确加载数据。", horizontalalignment='center',
verticalalignment='center',
transform=ax_loss.transAxes, fontsize=12, color='red')
ax_metrics.text(0.5, 0.5, "训练数据加载器为空。", horizontalalignment='center', verticalalignment='center',
transform=ax_metrics.transAxes, fontsize=12, color='red')
fig_loss.tight_layout()
fig_metrics.tight_layout()
return fig_loss, fig_metrics
min_label = self.dataset_min_label
max_label = self.dataset_max_label
if min_label is None or max_label is None:
print("错误: dataset_min_label/max_label 未设置,使用默认 0-100。")
min_label = 0.0
max_label = 100.0
if self.current_model_type == "深度学习":
if self.pytorch_regressor is None or self.feature_extractor is None:
ax_loss.text(0.5, 0.5, "深度学习模型(分离模式)未正确初始化。请重试。", horizontalalignment='center',
verticalalignment='center', transform=ax_loss.transAxes, fontsize=12, color='red')
ax_metrics.text(0.5, 0.5, "深度学习模型未初始化。", horizontalalignment='center',
verticalalignment='center', transform=ax_metrics.transAxes, fontsize=12, color='red')
fig_loss.tight_layout()
fig_metrics.tight_layout()
return fig_loss, fig_metrics
model = self.pytorch_regressor
optimizer = self._get_optimizer(model.parameters(), self.last_trained_params["optimizer"], lr,
self.last_trained_params["weight_decay"])
criterion = self._get_loss_function(self.last_trained_params["loss_function"])
lr_scheduler = self._get_lr_scheduler(optimizer, self.last_trained_params["lr_scheduler"],
self.last_trained_params["scheduler_patience"],
self.last_trained_params["scheduler_factor"],
self.last_trained_params["scheduler_t_max"])
best_val_loss = float('inf')
epochs_no_improve = 0
best_epoch = 0
best_regressor_state = None
best_feature_extractor_state = None
print(f"开始训练深度学习模型 (PyTorch, 分离模式, 基础CNN: {self.active_base_cnn_name}),共 {epochs} 轮次...")
print(
f"学习率: {lr}, 批量大小: {self.last_trained_params['batch_size']}, Dropout: {self.last_trained_params['dropout_rate']}, Weight Decay: {self.last_trained_params['weight_decay']}")
print(
f"优化器: {self.last_trained_params['optimizer']}, 损失函数: {self.last_trained_params['loss_function']}, 调度器: {self.last_trained_params['lr_scheduler']}, 早停耐心: {self.last_trained_params['early_stopping_patience']}")
print(
f"原始分数范围: [{min_label:.2f}, {max_label:.2f}], 数据增强: {self.last_trained_params['enable_augmentation']}")
for epoch in range(epochs):
# --- 训练阶段 ---
model.train()
self.feature_extractor.eval()
running_train_loss = 0.0
for batch_idx, (images, labels) in enumerate(self.train_dataloader):
images = images.to(self.device)
labels = labels.unsqueeze(1).to(self.device)
with torch.no_grad():
features = self.feature_extractor(images)
optimizer.zero_grad()
outputs = model(features)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_train_loss += loss.item()
avg_train_loss = running_train_loss / len(self.train_dataloader)
self.train_loss_history.append(avg_train_loss)
# --- 验证阶段 ---
if len(self.val_dataloader.dataset) > 0 and len(self.val_dataloader) > 0:
model.eval()
val_losses = []
val_predictions_normalized = []
val_true_labels_normalized = []
with torch.no_grad():
for images, labels in self.val_dataloader:
images = images.to(self.device)
labels = labels.unsqueeze(1).to(self.device)
features = self.feature_extractor(images)
outputs = model(features)
loss = criterion(outputs, labels)
val_losses.append(loss.item())
val_predictions_normalized.extend(outputs.cpu().numpy().flatten())
val_true_labels_normalized.extend(labels.cpu().numpy().flatten())
avg_val_loss = np.mean(val_losses)
self.val_loss_history.append(avg_val_loss)
val_mse, val_mae, val_r2 = calculate_metrics(
val_true_labels_normalized,
val_predictions_normalized,
min_label, max_label
)
self.val_mse_history.append(val_mse)
self.val_mae_history.append(val_mae)
self.val_r2_history.append(val_r2) # R2仍会计算并记录
print(
f"Epoch {epoch + 1}/{epochs}: Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}, Val MSE: {val_mse:.2f}, Val MAE: {val_mae:.2f}, Val R2: {val_r2:.2f}")
if lr_scheduler:
if isinstance(lr_scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau):
lr_scheduler.step(avg_val_loss)
else:
lr_scheduler.step()
if avg_val_loss < best_val_loss:
best_val_loss = avg_val_loss
epochs_no_improve = 0
best_epoch = epoch + 1
best_regressor_state = model.state_dict()
best_feature_extractor_state = self.feature_extractor.state_dict()
else:
epochs_no_improve += 1
if epochs_no_improve >= self.last_trained_params['early_stopping_patience']:
print(
f"早停触发!验证损失在 {self.last_trained_params['early_stopping_patience']} 个Epochs内没有改善。")
break
else:
print(
f"Epoch {epoch + 1}/{epochs}: Train Loss: {avg_train_loss:.4f} (无验证集评估或验证dataloader为空)")
if best_regressor_state and best_feature_extractor_state:
model.load_state_dict(best_regressor_state)
self.feature_extractor.load_state_dict(best_feature_extractor_state)
print(f"已加载第 {best_epoch} 轮次的最佳模型。")
else:
print("没有找到更好的验证损失模型(无验证集或未改善)。使用最后一次训练的模型状态。")
# 绘制第一个图:损失
ax_loss.clear()
ax_loss.plot(self.train_loss_history, label="训练损失", color='blue')
if len(self.val_loss_history) > 0:
ax_loss.plot(self.val_loss_history, label="验证损失", color='orange')
ax_loss.legend()
ax_loss.set_xlabel("Epoch")
ax_loss.set_ylabel("损失")
if len(self.val_loss_history) > 0:
ax_loss.set_title(f"深度学习模型训练完成 (分离模式, 基础CNN: {self.active_base_cnn_name})\n"
f"最佳验证损失: {best_val_loss:.4f} (Epoch {best_epoch})")
else:
ax_loss.set_title(f"深度学习模型训练完成 (分离模式, 基础CNN: {self.active_base_cnn_name})\n"
f"最终训练损失: {self.train_loss_history[-1]:.4f} (无验证集)")
fig_loss.tight_layout() # 确保对正确的图表进行布局
# 绘制第二个图:MSE和MAE
ax_metrics.clear()
if len(self.val_mse_history) > 0:
ax_metrics.plot(self.val_mse_history, label="验证MSE", color='green', linestyle='--')
ax_metrics.plot(self.val_mae_history, label="验证MAE", color='red', linestyle=':')
ax_metrics.legend()
ax_metrics.set_xlabel("Epoch")
ax_metrics.set_ylabel("误差指标")
ax_metrics.set_title(f"验证MSE与MAE (深度学习模型)")
fig_metrics.tight_layout() # 确保对正确的图表进行布局
self._save_model_artifacts("深度学习")
return fig_loss, fig_metrics
elif self.current_model_type == "端到端深度学习":
if self.full_cnn_regressor is None:
ax_loss.text(0.5, 0.5, "端到端深度学习模型未正确初始化。请重试。", horizontalalignment='center',
verticalalignment='center', transform=ax_loss.transAxes, fontsize=12, color='red')
ax_metrics.text(0.5, 0.5, "端到端深度学习模型未初始化。", horizontalalignment='center',
verticalalignment='center', transform=ax_metrics.transAxes, fontsize=12, color='red')
fig_loss.tight_layout()
fig_metrics.tight_layout()
return fig_loss, fig_metrics
model = self.full_cnn_regressor
optimizer = self._get_optimizer(model.parameters(), self.last_trained_params["optimizer"], lr,
self.last_trained_params["weight_decay"])
criterion = self._get_loss_function(self.last_trained_params["loss_function"])
lr_scheduler = self._get_lr_scheduler(optimizer, self.last_trained_params["lr_scheduler"],
self.last_trained_params["scheduler_patience"],
self.last_trained_params["scheduler_factor"],
self.last_trained_params["scheduler_t_max"])
best_val_loss = float('inf')
epochs_no_improve = 0
best_epoch = 0
best_model_state = None
print(f"开始训练端到端深度学习模型 (基础CNN: {self.active_base_cnn_name}),共 {epochs} 轮次...")
print(
f"学习率: {lr}, 批量大小: {self.last_trained_params['batch_size']}, Dropout: {self.last_trained_params['dropout_rate']}, Weight Decay: {self.last_trained_params['weight_decay']}")
print(
f"优化器: {self.last_trained_params['optimizer']}, 损失函数: {self.last_trained_params['loss_function']}, 调度器: {self.last_trained_params['lr_scheduler']}, 早停耐心: {self.last_trained_params['early_stopping_patience']}")
print(
f"原始分数范围: [{min_label:.2f}, {max_label:.2f}], 数据增强: {self.last_trained_params['enable_augmentation']}")
for epoch in range(epochs):
# --- 训练阶段 ---
model.train()
running_train_loss = 0.0
for batch_idx, (images, labels) in enumerate(self.train_dataloader):
images = images.to(self.device)
labels = labels.unsqueeze(1).to(self.device)
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_train_loss += loss.item()
avg_train_loss = running_train_loss / len(self.train_dataloader)
self.train_loss_history.append(avg_train_loss)
# --- 验证阶段 ---
if len(self.val_dataloader.dataset) > 0 and len(self.val_dataloader) > 0:
model.eval()
val_losses = []
val_predictions_normalized = []
val_true_labels_normalized = []
with torch.no_grad():
for images, labels in self.val_dataloader:
images = images.to(self.device)
labels = labels.unsqueeze(1).to(self.device)
outputs = model(images)
loss = criterion(outputs, labels)
val_losses.append(loss.item())
val_predictions_normalized.extend(outputs.cpu().numpy().flatten())
val_true_labels_normalized.extend(labels.cpu().numpy().flatten())
avg_val_loss = np.mean(val_losses)
self.val_loss_history.append(avg_val_loss)
val_mse, val_mae, val_r2 = calculate_metrics(
val_true_labels_normalized,
val_predictions_normalized,
min_label, max_label
)
self.val_mse_history.append(val_mse)
self.val_mae_history.append(val_mae)
self.val_r2_history.append(val_r2)
print(
f"Epoch {epoch + 1}/{epochs}: Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}, Val MSE: {val_mse:.2f}, Val MAE: {val_mae:.2f}, Val R2: {val_r2:.2f}")
if lr_scheduler:
if isinstance(lr_scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau):
lr_scheduler.step(avg_val_loss)
else:
lr_scheduler.step()
if avg_val_loss < best_val_loss:
best_val_loss = avg_val_loss
epochs_no_improve = 0
best_epoch = epoch + 1
best_model_state = model.state_dict()
else:
epochs_no_improve += 1
if epochs_no_improve >= self.last_trained_params['early_stopping_patience']:
print(
f"早停触发!验证损失在 {self.last_trained_params['early_stopping_patience']} 个Epochs内没有改善。")
break
else:
print(
f"Epoch {epoch + 1}/{epochs}: Train Loss: {avg_train_loss:.4f} (无验证集评估或验证dataloader为空)")
if best_model_state:
model.load_state_dict(best_model_state)
print(f"已加载第 {best_epoch} 轮次的最佳模型。")
else:
print("没有找到更好的验证损失模型(无验证集或未改善)。使用最后一次训练的模型状态。")
# 绘制第一个图:损失
ax_loss.clear()
ax_loss.plot(self.train_loss_history, label="训练损失", color='blue')
if len(self.val_loss_history) > 0:
ax_loss.plot(self.val_loss_history, label="验证损失", color='orange')
ax_loss.legend()
ax_loss.set_xlabel("Epoch")
ax_loss.set_ylabel("损失")
if len(self.val_loss_history) > 0:
ax_loss.set_title(f"端到端深度学习模型训练完成 (基础CNN: {self.active_base_cnn_name})\n"
f"最佳验证损失: {best_val_loss:.4f} (Epoch {best_epoch})")
else:
ax_loss.set_title(f"端到端深度学习模型训练完成 (基础CNN: {self.active_base_cnn_name})\n"
f"最终训练损失: {self.train_loss_history[-1]:.4f} (无验证集)")
fig_loss.tight_layout()
# 绘制第二个图:MSE和MAE
ax_metrics.clear()
if len(self.val_mse_history) > 0:
ax_metrics.plot(self.val_mse_history, label="验证MSE", color='green', linestyle='--')
ax_metrics.plot(self.val_mae_history, label="验证MAE", color='red', linestyle=':')
ax_metrics.legend()
ax_metrics.set_xlabel("Epoch")
ax_metrics.set_ylabel("误差指标")
ax_metrics.set_title(f"验证MSE与MAE (端到端深度学习模型)")
fig_metrics.tight_layout()
self._save_model_artifacts("端到端深度学习")
return fig_loss, fig_metrics
elif self.current_model_type in ["随机森林", "支持向量回归", "梯度提升回归", "堆叠回归", "K近邻", "线性回归"]:
if self.sklearn_regressor is None or self.sklearn_feature_pipeline is None or self.feature_extractor is None:
ax_loss.text(0.5, 0.5, "Sklearn模型或其特征提取器未正确初始化。请重试。", horizontalalignment='center',
verticalalignment='center', transform=ax_loss.transAxes, fontsize=12, color='red')
ax_metrics.text(0.5, 0.5, "Sklearn模型未初始化。", horizontalalignment='center',
verticalalignment='center', transform=ax_metrics.transAxes, fontsize=12, color='red')
fig_loss.tight_layout()
fig_metrics.tight_layout()
return fig_loss, fig_metrics
print(f"正在提取所有图片的特征用于Sklearn模型训练 (基础CNN: {self.active_base_cnn_name})...")
print(f"PCA保留方差比例: {self.last_trained_params['pca_variance_ratio']}")
print(
f"原始分数范围: [{min_label:.2f}, {max_label:.2f}], 数据增强: {self.last_trained_params['enable_augmentation']}")
self.sklearn_regressor, self.sklearn_feature_pipeline = \
get_sklearn_model_pipeline(self.current_model_type, self.last_trained_params["pca_variance_ratio"])
X_train_features = []
y_train_labels_normalized = []
X_val_features = []
y_val_labels_normalized = []
self.feature_extractor.eval()
with torch.no_grad():
for images, labels in self.train_dataloader:
images = images.to(self.device)
features = self.feature_extractor(images).cpu().numpy()
X_train_features.extend(features)
y_train_labels_normalized.extend(labels.cpu().numpy())
if len(self.val_dataloader.dataset) > 0 and len(self.val_dataloader) > 0:
for images, labels in self.val_dataloader:
images = images.to(self.device)
features = self.feature_extractor(images).cpu().numpy()
X_val_features.extend(features)
y_val_labels_normalized.extend(labels.cpu().numpy())
X_train = np.array(X_train_features)
y_train_normalized = np.array(y_train_labels_normalized)
X_val = np.array(X_val_features)
y_val_normalized = np.array(y_val_labels_normalized)
if X_train.shape[0] == 0:
ax_loss.text(0.5, 0.5, "无训练特征数据,请检查图片加载。", horizontalalignment='center',
verticalalignment='center', transform=ax_loss.transAxes, fontsize=12, color='red')
ax_metrics.text(0.5, 0.5, "无训练特征数据。", horizontalalignment='center', verticalalignment='center',
transform=ax_metrics.transAxes, fontsize=12, color='red')
fig_loss.tight_layout()
fig_metrics.tight_layout()
return fig_loss, fig_metrics
print("正在对提取的特征进行预处理 (标准化, PCA)...")
X_train_processed = self.sklearn_feature_pipeline.fit_transform(X_train)
print(f"正在训练Sklearn {self.current_model_type} 模型...")
self.sklearn_regressor.fit(X_train_processed, y_train_normalized)
print(f"Sklearn {self.current_model_type} 模型训练完成。")
train_predictions_normalized = self.sklearn_regressor.predict(X_train_processed)
train_mse, train_mae, train_r2 = calculate_metrics(
y_train_normalized, train_predictions_normalized, min_label, max_label
)
# Sklearn模型没有Epochs概念,只显示最终状态
ax_loss.clear()
ax_metrics.clear()
if len(X_val) > 0:
X_val_processed = self.sklearn_feature_pipeline.transform(X_val)
val_predictions_normalized = self.sklearn_regressor.predict(X_val_processed)
val_mse, val_mae, val_r2 = calculate_metrics(
y_val_normalized, val_predictions_normalized, min_label, max_label
)
print(f"训练完成: Train MSE: {train_mse:.2f}, Train MAE: {train_mae:.2f}, Train R2: {train_r2:.2f}")
print(f" Val MSE: {val_mse:.2f}, Val MAE: {val_mae:.2f}, Val R2: {val_r2:.2f}")
# Sklearn模型的损失图可以显示一个文本摘要
ax_loss.text(0.5, 0.5, f"Sklearn {self.current_model_type} 训练完成\n"
f"训练集损失 (MSE/MAE): {train_mse:.2f}/{train_mae:.2f}\n" # 显示损失
f"验证集损失 (MSE/MAE): {val_mse:.2f}/{val_mae:.2f}\n" # 显示损失
f"训练集 R2: {train_r2:.2f}, 验证集 R2: {val_r2:.2f}", # 显示 R2
horizontalalignment='center', verticalalignment='center',
transform=ax_loss.transAxes, fontsize=10, color='green')
ax_loss.axis('off') # 隐藏坐标轴,因为没有连续曲线
# Sklearn的第二个图仍然是MSE/MAE
ax_metrics.text(0.5, 0.5, f"Sklearn {self.current_model_type} 训练完成\n"
f"训练集 MSE: {train_mse:.2f}\n"
f"验证集 MSE: {val_mse:.2f}\n"
f"训练集 MAE: {train_mae:.2f}\n"
f"验证集 MAE: {val_mae:.2f}",
horizontalalignment='center', verticalalignment='center',
transform=ax_metrics.transAxes, fontsize=10, color='green')
ax_metrics.axis('off')
else:
print(
f"训练完成: Train MSE: {train_mse:.2f}, Train MAE: {train_mae:.2f}, Train R2: {train_r2:.2f} (无验证集评估)")
ax_loss.text(0.5, 0.5, f"Sklearn {self.current_model_type} 训练完成\n"
f"训练集损失 (MSE/MAE): {train_mse:.2f}/{train_mae:.2f}\n"
f"训练集 R2: {train_r2:.2f} (无验证集)",
horizontalalignment='center', verticalalignment='center',
transform=ax_loss.transAxes, fontsize=10, color='green')
ax_loss.axis('off')
ax_metrics.text(0.5, 0.5, f"Sklearn {self.current_model_type} 训练完成\n"
f"训练集 MSE: {train_mse:.2f}\n"
f"训练集 MAE: {train_mae:.2f} (无验证集)",
horizontalalignment='center', verticalalignment='center',
transform=ax_metrics.transAxes, fontsize=10, color='green')
ax_metrics.axis('off')
ax_loss.set_title(f"Sklearn模型训练状态 ({self.current_model_type})") # 更具体标题
ax_metrics.set_title(f"Sklearn模型训练状态 ({self.current_model_type}) - 误差指标") # 更具体标题
fig_loss.tight_layout()
fig_metrics.tight_layout()
self._save_model_artifacts(self.current_model_type)
return fig_loss, fig_metrics
else:
ax_loss.text(0.5, 0.5, "未选择有效的模型类型进行训练。", horizontalalignment='center',
verticalalignment='center',
transform=ax_loss.transAxes, fontsize=12, color='red')
ax_metrics.text(0.5, 0.5, "未选择有效的模型类型。", horizontalalignment='center', verticalalignment='center',
transform=ax_metrics.transAxes, fontsize=12, color='red')
fig_loss.tight_layout()
fig_metrics.tight_layout()
return fig_loss, fig_metrics
# <-- 重新添加的 predict_score 方法!
def predict_score(self, image_path, model_type_str, base_cnn_name_for_predict):
if not self._load_model_artifacts(model_type_str, base_cnn_name_for_predict):
return "模型未训练或未加载!请先训练对应模型。"
current_image_size = get_image_size_by_model_name(base_cnn_name_for_predict)
# 预测时不进行数据增强
transform = get_transforms(train=False, image_size=current_image_size, enable_augmentation=False)
try:
image = Image.open(image_path).convert("RGB")
image_tensor = transform(image).unsqueeze(0).to(self.device)
except Exception as e:
return f"图片加载或预处理失败: {e}"
output_score_normalized = 0 # 0-1 范围的预测值
if model_type_str == "深度学习":
self.pytorch_regressor.eval()
self.feature_extractor.eval()
with torch.no_grad():
features = self.feature_extractor(image_tensor)
output_score_normalized = self.pytorch_regressor(features).item()
elif model_type_str == "端到端深度学习":
self.full_cnn_regressor.eval()
with torch.no_grad():
output_score_normalized = self.full_cnn_regressor(image_tensor).item()
else: # Sklearn模型
self.feature_extractor.eval()
with torch.no_grad():
features = self.feature_extractor(image_tensor).cpu().numpy()
processed_features = self.sklearn_feature_pipeline.transform(features)
output_score_normalized = self.sklearn_regressor.predict(processed_features)[0]
# 确保预测值在0-1范围内(如果模型没有Sigmoid,可能会超出)
output_score_normalized = max(0.0, min(1.0, output_score_normalized))
# 将归一化的预测值反归一化到原始分数范围
predicted_original_score = output_score_normalized * (
self.dataset_max_label - self.dataset_min_label) + self.dataset_min_label
# 最终钳制到0-100(因为原始分数可能不是0-100,但显示时通常希望在0-100)
predicted_original_score = max(0, min(100, predicted_original_score))
return f"预测分数: {predicted_original_score:.2f} (百分制)"
def _save_model_artifacts(self, model_type_str):
internal_name = self._get_internal_model_name(model_type_str)
meta_data_path = f"{MODEL_SAVE_BASE_PATH}_{internal_name}_meta.json"
if internal_name in ["pytorch_detached", "pytorch_full_cnn"]:
meta_data_path = f"{MODEL_SAVE_BASE_PATH}_{self.active_base_cnn_name}_{internal_name}_meta.json"
try:
with open(meta_data_path, 'w') as f:
json.dump(self.last_trained_params, f, indent=4)
print(f"模型元数据已保存到: {meta_data_path}")
except Exception as e:
print(f"保存模型元数据失败: {e}")
if internal_name == "pytorch_detached":
feat_extractor_path = f"{MODEL_SAVE_BASE_PATH}_{self.active_base_cnn_name}_features.pth"
regressor_path = f"{MODEL_SAVE_BASE_PATH}_{self.active_base_cnn_name}_pytorch_detached_regressor.pth"
torch.save(self.feature_extractor.state_dict(), feat_extractor_path)
torch.save(self.pytorch_regressor.state_dict(), regressor_path)
print(f"PyTorch模型组件 (分离模式) 已保存。")
elif internal_name == "pytorch_full_cnn":
full_cnn_path = f"{MODEL_SAVE_BASE_PATH}_{self.active_base_cnn_name}_full_cnn.pth"
torch.save(self.full_cnn_regressor.state_dict(), full_cnn_path)
print(f"端到端深度学习模型 已保存。")
else: # Sklearn 模型
regressor_path = f"{MODEL_SAVE_BASE_PATH}_{internal_name}_regressor.pkl"
pipeline_path = f"{MODEL_SAVE_BASE_PATH}_{internal_name}_feature_pipeline.pkl"
joblib.dump(self.sklearn_regressor, regressor_path)
joblib.dump(self.sklearn_feature_pipeline, pipeline_path)
print(f"Sklearn {model_type_str} 模型和特征管道已保存。")
# <-- 重新添加的 _load_model_artifacts 方法!
def _load_model_artifacts(self, model_type_str, base_cnn_name_to_load):
internal_name = self._get_internal_model_name(model_type_str)
loaded_params = None
# 尝试加载对应模型类型的元数据
meta_data_path = f"{MODEL_SAVE_BASE_PATH}_{internal_name}_meta.json"
if internal_name in ["pytorch_detached", "pytorch_full_cnn"]:
meta_data_path = f"{MODEL_SAVE_BASE_PATH}_{base_cnn_name_to_load}_{internal_name}_meta.json"
try:
with open(meta_data_path, 'r') as f:
loaded_params = json.load(f)
self.active_base_cnn_name = loaded_params.get("base_cnn_name", base_cnn_name_to_load)
# 加载 min_label 和 max_label
self.dataset_min_label = loaded_params.get("min_label", 0.0)
self.dataset_max_label = loaded_params.get("max_label", 100.0)
print(f"加载模型参数: {loaded_params}")
except FileNotFoundError:
print(f"警告: 模型元数据文件 {meta_data_path} 未找到。使用默认参数进行加载。")
loaded_params = {
"base_cnn_name": base_cnn_name_to_load,
"dropout_rate": DEFAULT_DROPOUT_RATE,
"weight_decay": DEFAULT_WEIGHT_DECAY,
"pca_variance_ratio": DEFAULT_PCA_VARIANCE_RATIO,
"min_label": 0.0, # 默认值
"max_label": 100.0 # 默认值
}
self.active_base_cnn_name = base_cnn_name_to_load
self.dataset_min_label = 0.0
self.dataset_max_label = 100.0
# 使用加载或默认的参数来实例化模型
if internal_name == "pytorch_detached":
try:
self.feature_extractor = FeatureExtractor(model_name=self.active_base_cnn_name).to(self.device)
feat_extractor_path = f"{MODEL_SAVE_BASE_PATH}_{self.active_base_cnn_name}_features.pth"
self.feature_extractor.load_state_dict(
torch.load(feat_extractor_path, map_location=self.device))
self.feature_extractor.eval()
feature_dim = self.feature_extractor.get_output_dim()
self.pytorch_regressor = PytorchRegressor(
in_features=feature_dim,
dropout_rate=loaded_params.get("dropout_rate", DEFAULT_DROPOUT_RATE)
).to(self.device)
regressor_path = f"{MODEL_SAVE_BASE_PATH}_{self.active_base_cnn_name}_pytorch_detached_regressor.pth"
self.pytorch_regressor.load_state_dict(
torch.load(regressor_path, map_location=self.device))
self.pytorch_regressor.eval()
print(f"PyTorch模型组件 (分离模式, 基础CNN: {self.active_base_cnn_name}) 已加载。")
return True
except FileNotFoundError as e:
print(f"PyTorch模型文件 (分离模式, 基础CNN: {self.active_base_cnn_name}) 未找到: {e}")
self.pytorch_regressor = None
self.feature_extractor = None
return False
elif internal_name == "pytorch_full_cnn":
try:
self.full_cnn_regressor = FullCNNRegressor(
model_name=self.active_base_cnn_name,
dropout_rate=loaded_params.get("dropout_rate", DEFAULT_DROPOUT_RATE)
).to(self.device)
full_cnn_path = f"{MODEL_SAVE_BASE_PATH}_{self.active_base_cnn_name}_full_cnn.pth"
self.full_cnn_regressor.load_state_dict(
torch.load(full_cnn_path, map_location=self.device))
self.full_cnn_regressor.eval()
print(f"端到端深度学习模型 ({self.active_base_cnn_name}) 已加载。")
return True
except FileNotFoundError as e:
print(f"端到端深度学习模型文件 ({self.active_base_cnn_name}) 未找到: {e}")
self.full_cnn_regressor = None
return False
else: # Sklearn 模型
try:
self.feature_extractor = FeatureExtractor(model_name=self.active_base_cnn_name).to(self.device)
self.feature_extractor.eval()
regressor_path = f"{MODEL_SAVE_BASE_PATH}_{internal_name}_regressor.pkl"
pipeline_path = f"{MODEL_SAVE_BASE_PATH}_{internal_name}_feature_pipeline.pkl"
# 重新实例化 Sklearn 模型和管道,以便使用加载的PCA参数
self.sklearn_regressor, self.sklearn_feature_pipeline = \
get_sklearn_model_pipeline(
model_type_str,
pca_variance_ratio=loaded_params.get("pca_variance_ratio", DEFAULT_PCA_VARIANCE_RATIO)
)
self.sklearn_regressor = joblib.load(regressor_path)
self.sklearn_feature_pipeline = joblib.load(pipeline_path)
print(f"Sklearn {model_type_str} 模型和特征管道 (基础CNN: {self.active_base_cnn_name}) 已加载。")
return True
except FileNotFoundError as e:
print(f"Sklearn模型文件 {regressor_path}{pipeline_path} 未找到: {e}")
self.sklearn_regressor = None
self.sklearn_feature_pipeline = None
self.feature_extractor = None
return False
return False