FutureMa commited on
Commit
20ffd89
·
verified ·
1 Parent(s): 9fd51b7

Create mshnn_model.py

Browse files
Files changed (1) hide show
  1. mshnn_model.py +423 -0
mshnn_model.py ADDED
@@ -0,0 +1,423 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import torch
2
+ import torch.nn as nn
3
+ import torch.nn.functional as F
4
+ import torch.optim as optim
5
+ from torch.utils.data import DataLoader, TensorDataset
6
+ import numpy as np
7
+ import matplotlib.pyplot as plt
8
+ from tqdm import tqdm
9
+ from sklearn.model_selection import train_test_split
10
+ from sklearn.metrics import mean_squared_error, r2_score
11
+
12
+ # 设置随机种子以确保结果可复现
13
+ torch.manual_seed(42)
14
+ np.random.seed(42)
15
+
16
+ # 模型定义
17
+ class HodgeDualLayer(nn.Module):
18
+ """
19
+ 实现霍奇对偶操作的层,灵感来自霍奇理论中的对偶性
20
+ """
21
+ def __init__(self, in_features, out_features):
22
+ super(HodgeDualLayer, self).__init__()
23
+ self.forward_map = nn.Linear(in_features, out_features)
24
+ self.dual_map = nn.Linear(out_features, in_features)
25
+
26
+ def forward(self, x):
27
+ # 前向映射
28
+ y = self.forward_map(x)
29
+ # 对偶映射 (类似于霍奇对偶)
30
+ dual_x = self.dual_map(y)
31
+ return y, dual_x
32
+
33
+ class MirrorSymmetryBlock(nn.Module):
34
+ """
35
+ 镜像对称块:实现类似于镜像对称的结构
36
+ """
37
+ def __init__(self, dim):
38
+ super(MirrorSymmetryBlock, self).__init__()
39
+ self.dim = dim
40
+ # 两个互为"镜像"的分支
41
+ self.branch_a = nn.Sequential(
42
+ nn.Linear(dim, dim), # 修改:保持输出维度与输入相同,以便残差连接
43
+ nn.LayerNorm(dim),
44
+ nn.GELU()
45
+ )
46
+ self.branch_b = nn.Sequential(
47
+ nn.Linear(dim, dim), # 修改:保持输出维度与输入相同,以便残差连接
48
+ nn.LayerNorm(dim),
49
+ nn.GELU()
50
+ )
51
+ # 对称性保持层
52
+ self.symmetry_preserving = nn.Parameter(torch.ones(1))
53
+
54
+ def forward(self, x):
55
+ a = self.branch_a(x)
56
+ b = self.branch_b(x)
57
+ # 通过对称操作连接两个分支
58
+ mirror_term = self.symmetry_preserving * (a * b)
59
+ return a + b + mirror_term
60
+
61
+ class ComplexStructureModule(nn.Module):
62
+ """
63
+ 模拟复几何结构的模块
64
+ """
65
+ def __init__(self, dim):
66
+ super(ComplexStructureModule, self).__init__()
67
+ self.real_transform = nn.Linear(dim, dim)
68
+ self.imag_transform = nn.Linear(dim, dim)
69
+
70
+ def forward(self, x):
71
+ # 分离实部和虚部通道
72
+ mid_point = x.shape[1] // 2
73
+ real_part = x[:, :mid_point]
74
+ imag_part = x[:, mid_point:]
75
+
76
+ # 应用复几何变换
77
+ new_real = self.real_transform(real_part) - self.imag_transform(imag_part)
78
+ new_imag = self.imag_transform(real_part) + self.real_transform(imag_part)
79
+
80
+ # 合并实部和虚部
81
+ return torch.cat([new_real, new_imag], dim=1)
82
+
83
+ class MirrorSymmetryHodgeNetwork(nn.Module):
84
+ """
85
+ 基于镜像对称和霍奇理论概念的神经网络
86
+ """
87
+ def __init__(self, input_dim, hidden_dim, output_dim, num_blocks=3):
88
+ super(MirrorSymmetryHodgeNetwork, self).__init__()
89
+
90
+ # 输入嵌入层
91
+ self.embedding = nn.Linear(input_dim, hidden_dim*2) # 双倍维度用于复结构
92
+
93
+ # 霍奇对偶层
94
+ self.hodge_dual = HodgeDualLayer(hidden_dim*2, hidden_dim*2)
95
+
96
+ # 镜像对称块
97
+ self.mirror_blocks = nn.ModuleList([
98
+ MirrorSymmetryBlock(hidden_dim*2) for _ in range(num_blocks)
99
+ ])
100
+
101
+ # 复结构模块
102
+ self.complex_structure = ComplexStructureModule(hidden_dim)
103
+
104
+ # 输出映射
105
+ self.output_map = nn.Linear(hidden_dim*2, output_dim)
106
+
107
+ # 标度因子(代表霍奇结构中的度量)
108
+ self.scale_factor = nn.Parameter(torch.ones(1))
109
+
110
+ def forward(self, x):
111
+ # 初始嵌入
112
+ x = self.embedding(x)
113
+
114
+ # 应用霍奇对偶
115
+ primary, dual = self.hodge_dual(x)
116
+
117
+ # 残差连接
118
+ x = primary + self.scale_factor * dual
119
+
120
+ # 应用镜像对称块
121
+ for block in self.mirror_blocks:
122
+ x = x + block(x) # 残差连接
123
+
124
+ # 应用复结构变换
125
+ x = self.complex_structure(x)
126
+
127
+ # 输出层
128
+ return self.output_map(x)
129
+
130
+ # 基准模型 - 标准MLP
131
+ class BaselineMLP(nn.Module):
132
+ def __init__(self, input_dim, hidden_dim, output_dim, num_layers=3):
133
+ super(BaselineMLP, self).__init__()
134
+
135
+ layers = [nn.Linear(input_dim, hidden_dim), nn.ReLU()]
136
+ for _ in range(num_layers - 1):
137
+ layers.extend([nn.Linear(hidden_dim, hidden_dim), nn.ReLU()])
138
+ layers.append(nn.Linear(hidden_dim, output_dim))
139
+
140
+ self.network = nn.Sequential(*layers)
141
+
142
+ def forward(self, x):
143
+ return self.network(x)
144
+
145
+ # 生成具有对称性的合成数据
146
+ def generate_symmetric_data(n_samples=1000, input_dim=10, noise_level=0.1):
147
+ """
148
+ 生成具有对称性质的数据,适合测试镜像对称模型
149
+ """
150
+ # 随机生成输入特征
151
+ X = np.random.randn(n_samples, input_dim)
152
+
153
+ # 创建符合对称性的目标变量
154
+ # 一半特征与另一半特征之间存在对称关系
155
+ mid = input_dim // 2
156
+
157
+ # 基础函数
158
+ y_base = np.sum(X[:, :mid]**2, axis=1) - np.sum(X[:, mid:]**2, axis=1)
159
+
160
+ # 添加一些镜像对称项
161
+ mirror_terms = np.sum(X[:, :mid] * X[:, mid:], axis=1)
162
+
163
+ # 添加复结构项 - 确保索引不会越界
164
+ if mid > 1: # 确保有足够的维度进行复结构计算
165
+ complex_terms = np.sum(X[:, :mid-1] * X[:, 1:mid] - X[:, mid:-1] * X[:, mid+1:], axis=1)
166
+ else:
167
+ complex_terms = np.zeros(n_samples)
168
+
169
+ # 组合各项,创建最终目标
170
+ y = y_base + 0.5 * mirror_terms + 0.3 * complex_terms
171
+
172
+ # 添加噪声
173
+ y += noise_level * np.random.randn(n_samples)
174
+
175
+ # 转换为张量
176
+ X_tensor = torch.FloatTensor(X)
177
+ y_tensor = torch.FloatTensor(y).reshape(-1, 1)
178
+
179
+ return X_tensor, y_tensor
180
+
181
+ # 训练函数
182
+ def train_model(model, train_loader, val_loader, epochs=100, lr=0.001, device='cpu'):
183
+ """
184
+ 训练模型并返回训练历史
185
+ """
186
+ model.to(device)
187
+ criterion = nn.MSELoss()
188
+ optimizer = optim.Adam(model.parameters(), lr=lr)
189
+ scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5)
190
+
191
+ history = {
192
+ 'train_loss': [],
193
+ 'val_loss': [],
194
+ }
195
+
196
+ best_val_loss = float('inf')
197
+ best_model_state = None
198
+
199
+ for epoch in range(epochs):
200
+ # 训练阶段
201
+ model.train()
202
+ train_loss = 0.0
203
+
204
+ for X_batch, y_batch in train_loader:
205
+ X_batch, y_batch = X_batch.to(device), y_batch.to(device)
206
+
207
+ # 前向传播
208
+ y_pred = model(X_batch)
209
+ loss = criterion(y_pred, y_batch)
210
+
211
+ # 反向传播和优化
212
+ optimizer.zero_grad()
213
+ loss.backward()
214
+ optimizer.step()
215
+
216
+ train_loss += loss.item()
217
+
218
+ train_loss /= len(train_loader)
219
+ history['train_loss'].append(train_loss)
220
+
221
+ # 验证阶段
222
+ model.eval()
223
+ val_loss = 0.0
224
+
225
+ with torch.no_grad():
226
+ for X_batch, y_batch in val_loader:
227
+ X_batch, y_batch = X_batch.to(device), y_batch.to(device)
228
+ y_pred = model(X_batch)
229
+ loss = criterion(y_pred, y_batch)
230
+ val_loss += loss.item()
231
+
232
+ val_loss /= len(val_loader)
233
+ history['val_loss'].append(val_loss)
234
+
235
+ # 更新学习率
236
+ scheduler.step(val_loss)
237
+
238
+ # 保存最佳模型
239
+ if val_loss < best_val_loss:
240
+ best_val_loss = val_loss
241
+ best_model_state = model.state_dict().copy()
242
+
243
+ # 输出进度
244
+ if (epoch + 1) % 10 == 0:
245
+ print(f'Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}')
246
+
247
+ # 加载最佳模型权重
248
+ model.load_state_dict(best_model_state)
249
+
250
+ return model, history
251
+
252
+ # 评估函数
253
+ def evaluate_model(model, test_loader, device='cpu'):
254
+ """
255
+ 评估模型性能
256
+ """
257
+ model.eval()
258
+ criterion = nn.MSELoss()
259
+
260
+ all_preds = []
261
+ all_targets = []
262
+ test_loss = 0.0
263
+
264
+ with torch.no_grad():
265
+ for X_batch, y_batch in test_loader:
266
+ X_batch, y_batch = X_batch.to(device), y_batch.to(device)
267
+ y_pred = model(X_batch)
268
+ loss = criterion(y_pred, y_batch)
269
+ test_loss += loss.item()
270
+
271
+ all_preds.append(y_pred.cpu().numpy())
272
+ all_targets.append(y_batch.cpu().numpy())
273
+
274
+ test_loss /= len(test_loader)
275
+ all_preds = np.vstack(all_preds)
276
+ all_targets = np.vstack(all_targets)
277
+
278
+ # 计算R2和RMSE
279
+ r2 = r2_score(all_targets, all_preds)
280
+ rmse = np.sqrt(mean_squared_error(all_targets, all_preds))
281
+
282
+ return {
283
+ 'test_loss': test_loss,
284
+ 'r2': r2,
285
+ 'rmse': rmse,
286
+ 'predictions': all_preds,
287
+ 'targets': all_targets
288
+ }
289
+
290
+ # 绘制训练历史
291
+ def plot_training_history(history_mirror, history_baseline):
292
+ """
293
+ 绘制训练和验证损失的对比图
294
+ """
295
+ plt.figure(figsize=(12, 5))
296
+
297
+ # 训练损失
298
+ plt.subplot(1, 2, 1)
299
+ plt.plot(history_mirror['train_loss'], label='Mirror Symmetry Model')
300
+ plt.plot(history_baseline['train_loss'], label='Baseline MLP')
301
+ plt.title('Training Loss')
302
+ plt.xlabel('Epochs')
303
+ plt.ylabel('Loss')
304
+ plt.legend()
305
+
306
+ # 验证损失
307
+ plt.subplot(1, 2, 2)
308
+ plt.plot(history_mirror['val_loss'], label='Mirror Symmetry Model')
309
+ plt.plot(history_baseline['val_loss'], label='Baseline MLP')
310
+ plt.title('Validation Loss')
311
+ plt.xlabel('Epochs')
312
+ plt.ylabel('Loss')
313
+ plt.legend()
314
+
315
+ plt.tight_layout()
316
+ plt.show()
317
+
318
+ # 绘制预测对比
319
+ def plot_predictions(mirror_results, baseline_results):
320
+ """
321
+ 绘制预测值与真实值的对比图
322
+ """
323
+ plt.figure(figsize=(12, 5))
324
+
325
+ # 镜像对称模型的预测
326
+ plt.subplot(1, 2, 1)
327
+ plt.scatter(mirror_results['targets'], mirror_results['predictions'], alpha=0.5)
328
+ min_val = min(mirror_results['targets'].min(), mirror_results['predictions'].min())
329
+ max_val = max(mirror_results['targets'].max(), mirror_results['predictions'].max())
330
+ plt.plot([min_val, max_val], [min_val, max_val], 'r--')
331
+ plt.title(f'Mirror Symmetry Model\nR² = {mirror_results["r2"]:.4f}, RMSE = {mirror_results["rmse"]:.4f}')
332
+ plt.xlabel('True Values')
333
+ plt.ylabel('Predicted Values')
334
+
335
+ # 基准模型的预测
336
+ plt.subplot(1, 2, 2)
337
+ plt.scatter(baseline_results['targets'], baseline_results['predictions'], alpha=0.5)
338
+ min_val = min(baseline_results['targets'].min(), baseline_results['predictions'].min())
339
+ max_val = max(baseline_results['targets'].max(), baseline_results['predictions'].max())
340
+ plt.plot([min_val, max_val], [min_val, max_val], 'r--')
341
+ plt.title(f'Baseline MLP\nR² = {baseline_results["r2"]:.4f}, RMSE = {baseline_results["rmse"]:.4f}')
342
+ plt.xlabel('True Values')
343
+ plt.ylabel('Predicted Values')
344
+
345
+ plt.tight_layout()
346
+ plt.show()
347
+
348
+ # 主函数
349
+ def main():
350
+ # 设置设备
351
+ device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
352
+ print(f"Using device: {device}")
353
+
354
+ # 超参数
355
+ input_dim = 10
356
+ hidden_dim = 64
357
+ output_dim = 1
358
+ batch_size = 32
359
+ epochs = 100
360
+ lr = 0.001
361
+
362
+ # 生成数据
363
+ X, y = generate_symmetric_data(n_samples=5000, input_dim=input_dim)
364
+
365
+ # 划分数据集
366
+ X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)
367
+ X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)
368
+
369
+ # 创建数据加载器
370
+ train_dataset = TensorDataset(X_train, y_train)
371
+ val_dataset = TensorDataset(X_val, y_val)
372
+ test_dataset = TensorDataset(X_test, y_test)
373
+
374
+ train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
375
+ val_loader = DataLoader(val_dataset, batch_size=batch_size)
376
+ test_loader = DataLoader(test_dataset, batch_size=batch_size)
377
+
378
+ # 实例化模型
379
+ mirror_model = MirrorSymmetryHodgeNetwork(input_dim, hidden_dim, output_dim)
380
+ baseline_model = BaselineMLP(input_dim, hidden_dim, output_dim)
381
+
382
+ # 训练镜像对称模型
383
+ print("Training Mirror Symmetry Hodge Network...")
384
+ mirror_model, history_mirror = train_model(
385
+ mirror_model, train_loader, val_loader,
386
+ epochs=epochs, lr=lr, device=device
387
+ )
388
+
389
+ # 训练基准模型
390
+ print("\nTraining Baseline MLP...")
391
+ baseline_model, history_baseline = train_model(
392
+ baseline_model, train_loader, val_loader,
393
+ epochs=epochs, lr=lr, device=device
394
+ )
395
+
396
+ # 评估两个模型
397
+ print("\nEvaluating models on test set...")
398
+ mirror_results = evaluate_model(mirror_model, test_loader, device)
399
+ baseline_results = evaluate_model(baseline_model, test_loader, device)
400
+
401
+ # 输出结果
402
+ print("\nMirror Symmetry Hodge Network Results:")
403
+ print(f"Test Loss: {mirror_results['test_loss']:.6f}")
404
+ print(f"R2 Score: {mirror_results['r2']:.6f}")
405
+ print(f"RMSE: {mirror_results['rmse']:.6f}")
406
+
407
+ print("\nBaseline MLP Results:")
408
+ print(f"Test Loss: {baseline_results['test_loss']:.6f}")
409
+ print(f"R2 Score: {baseline_results['r2']:.6f}")
410
+ print(f"RMSE: {baseline_results['rmse']:.6f}")
411
+
412
+ # 绘制结果
413
+ plot_training_history(history_mirror, history_baseline)
414
+ plot_predictions(mirror_results, baseline_results)
415
+
416
+ # 保存最佳模型
417
+ torch.save(mirror_model.state_dict(), 'mirror_symmetry_hodge_model.pth')
418
+ torch.save(baseline_model.state_dict(), 'baseline_mlp_model.pth')
419
+
420
+ print("\nModels saved successfully!")
421
+
422
+ if __name__ == "__main__":
423
+ main()