import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torch.utils.data import DataLoader, TensorDataset import numpy as np import matplotlib.pyplot as plt from tqdm import tqdm from sklearn.model_selection import train_test_split from sklearn.metrics import mean_squared_error, r2_score # 设置随机种子以确保结果可复现 torch.manual_seed(42) np.random.seed(42) # 模型定义 class HodgeDualLayer(nn.Module): """ 实现霍奇对偶操作的层,灵感来自霍奇理论中的对偶性 """ def __init__(self, in_features, out_features): super(HodgeDualLayer, self).__init__() self.forward_map = nn.Linear(in_features, out_features) self.dual_map = nn.Linear(out_features, in_features) def forward(self, x): # 前向映射 y = self.forward_map(x) # 对偶映射 (类似于霍奇对偶) dual_x = self.dual_map(y) return y, dual_x class MirrorSymmetryBlock(nn.Module): """ 镜像对称块:实现类似于镜像对称的结构 """ def __init__(self, dim): super(MirrorSymmetryBlock, self).__init__() self.dim = dim # 两个互为"镜像"的分支 self.branch_a = nn.Sequential( nn.Linear(dim, dim), # 修改:保持输出维度与输入相同,以便残差连接 nn.LayerNorm(dim), nn.GELU() ) self.branch_b = nn.Sequential( nn.Linear(dim, dim), # 修改:保持输出维度与输入相同,以便残差连接 nn.LayerNorm(dim), nn.GELU() ) # 对称性保持层 self.symmetry_preserving = nn.Parameter(torch.ones(1)) def forward(self, x): a = self.branch_a(x) b = self.branch_b(x) # 通过对称操作连接两个分支 mirror_term = self.symmetry_preserving * (a * b) return a + b + mirror_term class ComplexStructureModule(nn.Module): """ 模拟复几何结构的模块 """ def __init__(self, dim): super(ComplexStructureModule, self).__init__() self.real_transform = nn.Linear(dim, dim) self.imag_transform = nn.Linear(dim, dim) def forward(self, x): # 分离实部和虚部通道 mid_point = x.shape[1] // 2 real_part = x[:, :mid_point] imag_part = x[:, mid_point:] # 应用复几何变换 new_real = self.real_transform(real_part) - self.imag_transform(imag_part) new_imag = self.imag_transform(real_part) + self.real_transform(imag_part) # 合并实部和虚部 return torch.cat([new_real, new_imag], dim=1) class MirrorSymmetryHodgeNetwork(nn.Module): """ 基于镜像对称和霍奇理论概念的神经网络 """ def __init__(self, input_dim, hidden_dim, output_dim, num_blocks=3): super(MirrorSymmetryHodgeNetwork, self).__init__() # 输入嵌入层 self.embedding = nn.Linear(input_dim, hidden_dim*2) # 双倍维度用于复结构 # 霍奇对偶层 self.hodge_dual = HodgeDualLayer(hidden_dim*2, hidden_dim*2) # 镜像对称块 self.mirror_blocks = nn.ModuleList([ MirrorSymmetryBlock(hidden_dim*2) for _ in range(num_blocks) ]) # 复结构模块 self.complex_structure = ComplexStructureModule(hidden_dim) # 输出映射 self.output_map = nn.Linear(hidden_dim*2, output_dim) # 标度因子(代表霍奇结构中的度量) self.scale_factor = nn.Parameter(torch.ones(1)) def forward(self, x): # 初始嵌入 x = self.embedding(x) # 应用霍奇对偶 primary, dual = self.hodge_dual(x) # 残差连接 x = primary + self.scale_factor * dual # 应用镜像对称块 for block in self.mirror_blocks: x = x + block(x) # 残差连接 # 应用复结构变换 x = self.complex_structure(x) # 输出层 return self.output_map(x) # 基准模型 - 标准MLP class BaselineMLP(nn.Module): def __init__(self, input_dim, hidden_dim, output_dim, num_layers=3): super(BaselineMLP, self).__init__() layers = [nn.Linear(input_dim, hidden_dim), nn.ReLU()] for _ in range(num_layers - 1): layers.extend([nn.Linear(hidden_dim, hidden_dim), nn.ReLU()]) layers.append(nn.Linear(hidden_dim, output_dim)) self.network = nn.Sequential(*layers) def forward(self, x): return self.network(x) # 生成具有对称性的合成数据 def generate_symmetric_data(n_samples=1000, input_dim=10, noise_level=0.1): """ 生成具有对称性质的数据,适合测试镜像对称模型 """ # 随机生成输入特征 X = np.random.randn(n_samples, input_dim) # 创建符合对称性的目标变量 # 一半特征与另一半特征之间存在对称关系 mid = input_dim // 2 # 基础函数 y_base = np.sum(X[:, :mid]**2, axis=1) - np.sum(X[:, mid:]**2, axis=1) # 添加一些镜像对称项 mirror_terms = np.sum(X[:, :mid] * X[:, mid:], axis=1) # 添加复结构项 - 确保索引不会越界 if mid > 1: # 确保有足够的维度进行复结构计算 complex_terms = np.sum(X[:, :mid-1] * X[:, 1:mid] - X[:, mid:-1] * X[:, mid+1:], axis=1) else: complex_terms = np.zeros(n_samples) # 组合各项,创建最终目标 y = y_base + 0.5 * mirror_terms + 0.3 * complex_terms # 添加噪声 y += noise_level * np.random.randn(n_samples) # 转换为张量 X_tensor = torch.FloatTensor(X) y_tensor = torch.FloatTensor(y).reshape(-1, 1) return X_tensor, y_tensor # 训练函数 def train_model(model, train_loader, val_loader, epochs=100, lr=0.001, device='cpu'): """ 训练模型并返回训练历史 """ model.to(device) criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=lr) scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5) history = { 'train_loss': [], 'val_loss': [], } best_val_loss = float('inf') best_model_state = None for epoch in range(epochs): # 训练阶段 model.train() train_loss = 0.0 for X_batch, y_batch in train_loader: X_batch, y_batch = X_batch.to(device), y_batch.to(device) # 前向传播 y_pred = model(X_batch) loss = criterion(y_pred, y_batch) # 反向传播和优化 optimizer.zero_grad() loss.backward() optimizer.step() train_loss += loss.item() train_loss /= len(train_loader) history['train_loss'].append(train_loss) # 验证阶段 model.eval() val_loss = 0.0 with torch.no_grad(): for X_batch, y_batch in val_loader: X_batch, y_batch = X_batch.to(device), y_batch.to(device) y_pred = model(X_batch) loss = criterion(y_pred, y_batch) val_loss += loss.item() val_loss /= len(val_loader) history['val_loss'].append(val_loss) # 更新学习率 scheduler.step(val_loss) # 保存最佳模型 if val_loss < best_val_loss: best_val_loss = val_loss best_model_state = model.state_dict().copy() # 输出进度 if (epoch + 1) % 10 == 0: print(f'Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}') # 加载最佳模型权重 model.load_state_dict(best_model_state) return model, history # 评估函数 def evaluate_model(model, test_loader, device='cpu'): """ 评估模型性能 """ model.eval() criterion = nn.MSELoss() all_preds = [] all_targets = [] test_loss = 0.0 with torch.no_grad(): for X_batch, y_batch in test_loader: X_batch, y_batch = X_batch.to(device), y_batch.to(device) y_pred = model(X_batch) loss = criterion(y_pred, y_batch) test_loss += loss.item() all_preds.append(y_pred.cpu().numpy()) all_targets.append(y_batch.cpu().numpy()) test_loss /= len(test_loader) all_preds = np.vstack(all_preds) all_targets = np.vstack(all_targets) # 计算R2和RMSE r2 = r2_score(all_targets, all_preds) rmse = np.sqrt(mean_squared_error(all_targets, all_preds)) return { 'test_loss': test_loss, 'r2': r2, 'rmse': rmse, 'predictions': all_preds, 'targets': all_targets } # 绘制训练历史 def plot_training_history(history_mirror, history_baseline): """ 绘制训练和验证损失的对比图 """ plt.figure(figsize=(12, 5)) # 训练损失 plt.subplot(1, 2, 1) plt.plot(history_mirror['train_loss'], label='Mirror Symmetry Model') plt.plot(history_baseline['train_loss'], label='Baseline MLP') plt.title('Training Loss') plt.xlabel('Epochs') plt.ylabel('Loss') plt.legend() # 验证损失 plt.subplot(1, 2, 2) plt.plot(history_mirror['val_loss'], label='Mirror Symmetry Model') plt.plot(history_baseline['val_loss'], label='Baseline MLP') plt.title('Validation Loss') plt.xlabel('Epochs') plt.ylabel('Loss') plt.legend() plt.tight_layout() plt.show() # 绘制预测对比 def plot_predictions(mirror_results, baseline_results): """ 绘制预测值与真实值的对比图 """ plt.figure(figsize=(12, 5)) # 镜像对称模型的预测 plt.subplot(1, 2, 1) plt.scatter(mirror_results['targets'], mirror_results['predictions'], alpha=0.5) min_val = min(mirror_results['targets'].min(), mirror_results['predictions'].min()) max_val = max(mirror_results['targets'].max(), mirror_results['predictions'].max()) plt.plot([min_val, max_val], [min_val, max_val], 'r--') plt.title(f'Mirror Symmetry Model\nR² = {mirror_results["r2"]:.4f}, RMSE = {mirror_results["rmse"]:.4f}') plt.xlabel('True Values') plt.ylabel('Predicted Values') # 基准模型的预测 plt.subplot(1, 2, 2) plt.scatter(baseline_results['targets'], baseline_results['predictions'], alpha=0.5) min_val = min(baseline_results['targets'].min(), baseline_results['predictions'].min()) max_val = max(baseline_results['targets'].max(), baseline_results['predictions'].max()) plt.plot([min_val, max_val], [min_val, max_val], 'r--') plt.title(f'Baseline MLP\nR² = {baseline_results["r2"]:.4f}, RMSE = {baseline_results["rmse"]:.4f}') plt.xlabel('True Values') plt.ylabel('Predicted Values') plt.tight_layout() plt.show() # 主函数 def main(): # 设置设备 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print(f"Using device: {device}") # 超参数 input_dim = 10 hidden_dim = 64 output_dim = 1 batch_size = 32 epochs = 100 lr = 0.001 # 生成数据 X, y = generate_symmetric_data(n_samples=5000, input_dim=input_dim) # 划分数据集 X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42) X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42) # 创建数据加载器 train_dataset = TensorDataset(X_train, y_train) val_dataset = TensorDataset(X_val, y_val) test_dataset = TensorDataset(X_test, y_test) train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) val_loader = DataLoader(val_dataset, batch_size=batch_size) test_loader = DataLoader(test_dataset, batch_size=batch_size) # 实例化模型 mirror_model = MirrorSymmetryHodgeNetwork(input_dim, hidden_dim, output_dim) baseline_model = BaselineMLP(input_dim, hidden_dim, output_dim) # 训练镜像对称模型 print("Training Mirror Symmetry Hodge Network...") mirror_model, history_mirror = train_model( mirror_model, train_loader, val_loader, epochs=epochs, lr=lr, device=device ) # 训练基准模型 print("\nTraining Baseline MLP...") baseline_model, history_baseline = train_model( baseline_model, train_loader, val_loader, epochs=epochs, lr=lr, device=device ) # 评估两个模型 print("\nEvaluating models on test set...") mirror_results = evaluate_model(mirror_model, test_loader, device) baseline_results = evaluate_model(baseline_model, test_loader, device) # 输出结果 print("\nMirror Symmetry Hodge Network Results:") print(f"Test Loss: {mirror_results['test_loss']:.6f}") print(f"R2 Score: {mirror_results['r2']:.6f}") print(f"RMSE: {mirror_results['rmse']:.6f}") print("\nBaseline MLP Results:") print(f"Test Loss: {baseline_results['test_loss']:.6f}") print(f"R2 Score: {baseline_results['r2']:.6f}") print(f"RMSE: {baseline_results['rmse']:.6f}") # 绘制结果 plot_training_history(history_mirror, history_baseline) plot_predictions(mirror_results, baseline_results) # 保存最佳模型 torch.save(mirror_model.state_dict(), 'mirror_symmetry_hodge_model.pth') torch.save(baseline_model.state_dict(), 'baseline_mlp_model.pth') print("\nModels saved successfully!") if __name__ == "__main__": main()