Transformer-PINN / data_loader.py
guanwencan's picture
Upload 5 files
5e4dee3 verified
import os
import re
import numpy as np
import pandas as pd
from glob import glob
from sklearn.preprocessing import StandardScaler
class DamageCalculator:
@staticmethod
def compute_freeze_thaw_damage(FN, FT, a1=0.002, b1=1.0, c1=0.02):
return a1 * (FN ** b1) * np.exp(c1 * FT)
@staticmethod
def compute_chemical_damage(pH, a2=0.01, b2=1.5):
return a2 * np.abs(pH - 7.0) ** b2
@staticmethod
def compute_thermal_damage(T, T0=100.0, a3=0.0003, b3=1.2):
if T < T0:
return 0.0
return a3 * ((T - T0) ** b3)
@staticmethod
def compute_total_damage(pH, FN, FT, T):
D_ft = DamageCalculator.compute_freeze_thaw_damage(FN, FT)
D_ch = DamageCalculator.compute_chemical_damage(pH)
D_th = DamageCalculator.compute_thermal_damage(T)
D_total = 1.0 - (1.0 - D_ft) * (1.0 - D_ch) * (1.0 - D_th)
return np.clip(D_total, 0.0, 0.99)
@staticmethod
def compute_lambda(D0):
return 1.0 - D0
class CrackDataLoader:
def __init__(self, base_path, stress_type="major"):
self.base_path = base_path
self.stress_type = stress_type
if stress_type == "major":
self.data_dir = os.path.join(base_path, "major_principal_stress")
else:
self.data_dir = os.path.join(base_path, "minor_principal_stress")
self.scaler_X = StandardScaler()
self.scaler_y = StandardScaler()
self.damage_calculator = DamageCalculator()
def parse_filename(self, filename):
pattern = r'(\d+)-(\d+)-(\d+)-(\d+)'
match = re.search(pattern, filename)
if match:
pH = int(match.group(1))
FN = int(match.group(2))
FT = int(match.group(3))
T = int(match.group(4))
return {
'pH': pH,
'FN': FN,
'FT': FT,
'T': T
}
else:
raise ValueError(f"Cannot parse filename: {filename}")
def load_single_csv(self, csv_path):
data = pd.read_csv(csv_path, header=None, names=['angle', 'count'])
angles = data['angle'].values
counts = data['count'].values
return angles, counts
def load_all_data(self, phase="both"):
X_list = []
y_list = []
damage_list = []
if phase == "both":
subdirs = ["unstable_development", "peak_stress"]
elif phase == "early":
subdirs = ["unstable_development"]
elif phase == "peak":
subdirs = ["peak_stress"]
else:
raise ValueError(f"Unknown phase: {phase}")
for subdir in subdirs:
subdir_path = os.path.join(self.data_dir, subdir)
if not os.path.exists(subdir_path):
print(f"Warning: Directory does not exist {subdir_path}")
continue
phase_code = 0 if "unstable" in subdir else 1
csv_files = glob(os.path.join(subdir_path, "*.csv"))
print(f"Loading {len(csv_files)} files from {subdir}...")
for csv_file in csv_files:
try:
params = self.parse_filename(os.path.basename(csv_file))
angles, counts = self.load_single_csv(csv_file)
D0 = DamageCalculator.compute_total_damage(
params['pH'], params['FN'], params['FT'], params['T']
)
lambda_coef = DamageCalculator.compute_lambda(D0)
features = np.array([
params['pH'],
params['FN'],
params['FT'],
params['T'],
phase_code
], dtype=np.float32)
X_list.append(features)
y_list.append(counts)
damage_list.append({'D0': D0, 'lambda': lambda_coef})
except Exception as e:
print(f"Skipping file {csv_file}: {e}")
continue
if len(X_list) == 0:
raise ValueError("No data loaded successfully!")
X = np.array(X_list)
y_length = len(y_list[0])
y_padded = []
for y_sample in y_list:
if len(y_sample) < y_length:
y_sample = np.pad(y_sample, (0, y_length - len(y_sample)), 'constant')
elif len(y_sample) > y_length:
y_sample = y_sample[:y_length]
y_padded.append(y_sample)
y = np.array(y_padded)
angles, _ = self.load_single_csv(csv_files[0])
angle_bins = angles[:y_length]
print(f"\nData loading complete:")
print(f" Samples: {X.shape[0]}")
print(f" Input features: {X.shape[1]} (pH, FN, FT, T, phase)")
print(f" Output dimension: {y.shape[1]} (angle bins)")
print(f" Angle range: {angle_bins[0]:.1f} - {angle_bins[-1]:.1f}")
print(f" Total cracks range: {y.sum(axis=1).min():.0f} - {y.sum(axis=1).max():.0f}")
return X, y, angle_bins, damage_list
def create_synthetic_data(self, n_samples=100, output_dim=72):
pH_values = [1, 3, 5, 7]
FN_values = [5, 10, 20, 40]
FT_values = [10, 20, 30, 40]
T_values = [25, 300, 600, 900]
phase_values = [0, 1]
X_list = []
y_list = []
for _ in range(n_samples):
pH = np.random.choice(pH_values)
FN = np.random.choice(FN_values)
FT = np.random.choice(FT_values)
T = np.random.choice(T_values)
phase = np.random.choice(phase_values)
D0 = DamageCalculator.compute_total_damage(pH, FN, FT, T)
if self.stress_type == "major":
peak_angle = 90.0 + np.random.normal(0, 10)
spread = 15.0 + D0 * 20.0
else:
peak_angle = 45.0 + np.random.normal(0, 15)
spread = 20.0 + D0 * 25.0
angles = np.linspace(0, 175, output_dim)
distribution = np.exp(-0.5 * ((angles - peak_angle) / spread) ** 2)
distribution = distribution * (100 + D0 * 200) * (1 + 0.5 * phase)
distribution = distribution + np.random.normal(0, 5, output_dim)
distribution = np.maximum(distribution, 0)
X_list.append([pH, FN, FT, T, phase])
y_list.append(distribution)
X = np.array(X_list, dtype=np.float32)
y = np.array(y_list, dtype=np.float32)
angle_bins = np.linspace(0, 175, output_dim)
return X, y, angle_bins
def normalize_data(self, X_train, y_train, X_test=None, y_test=None):
X_train_norm = self.scaler_X.fit_transform(X_train)
y_train_norm = self.scaler_y.fit_transform(y_train)
if X_test is not None and y_test is not None:
X_test_norm = self.scaler_X.transform(X_test)
y_test_norm = self.scaler_y.transform(y_test)
return X_train_norm, y_train_norm, X_test_norm, y_test_norm
else:
return X_train_norm, y_train_norm
def denormalize_output(self, y_norm):
return self.scaler_y.inverse_transform(y_norm)
def get_statistics(self, X, y):
stats = {
'n_samples': X.shape[0],
'input_dim': X.shape[1],
'output_dim': y.shape[1],
'pH_range': (X[:, 0].min(), X[:, 0].max()),
'FN_range': (X[:, 1].min(), X[:, 1].max()),
'FT_range': (X[:, 2].min(), X[:, 2].max()),
'T_range': (X[:, 3].min(), X[:, 3].max()),
'total_cracks_range': (y.sum(axis=1).min(), y.sum(axis=1).max()),
'total_cracks_mean': y.sum(axis=1).mean(),
'total_cracks_std': y.sum(axis=1).std(),
}
D0_values = []
for i in range(X.shape[0]):
D0 = DamageCalculator.compute_total_damage(X[i, 0], X[i, 1], X[i, 2], X[i, 3])
D0_values.append(D0)
stats['D0_range'] = (min(D0_values), max(D0_values))
stats['D0_mean'] = np.mean(D0_values)
return stats