import os import re import numpy as np import pandas as pd from glob import glob from sklearn.preprocessing import StandardScaler class DamageCalculator: @staticmethod def compute_freeze_thaw_damage(FN, FT, a1=0.002, b1=1.0, c1=0.02): return a1 * (FN ** b1) * np.exp(c1 * FT) @staticmethod def compute_chemical_damage(pH, a2=0.01, b2=1.5): return a2 * np.abs(pH - 7.0) ** b2 @staticmethod def compute_thermal_damage(T, T0=100.0, a3=0.0003, b3=1.2): if T < T0: return 0.0 return a3 * ((T - T0) ** b3) @staticmethod def compute_total_damage(pH, FN, FT, T): D_ft = DamageCalculator.compute_freeze_thaw_damage(FN, FT) D_ch = DamageCalculator.compute_chemical_damage(pH) D_th = DamageCalculator.compute_thermal_damage(T) D_total = 1.0 - (1.0 - D_ft) * (1.0 - D_ch) * (1.0 - D_th) return np.clip(D_total, 0.0, 0.99) @staticmethod def compute_lambda(D0): return 1.0 - D0 class CrackDataLoader: def __init__(self, base_path, stress_type="major"): self.base_path = base_path self.stress_type = stress_type if stress_type == "major": self.data_dir = os.path.join(base_path, "major_principal_stress") else: self.data_dir = os.path.join(base_path, "minor_principal_stress") self.scaler_X = StandardScaler() self.scaler_y = StandardScaler() self.damage_calculator = DamageCalculator() def parse_filename(self, filename): pattern = r'(\d+)-(\d+)-(\d+)-(\d+)' match = re.search(pattern, filename) if match: pH = int(match.group(1)) FN = int(match.group(2)) FT = int(match.group(3)) T = int(match.group(4)) return { 'pH': pH, 'FN': FN, 'FT': FT, 'T': T } else: raise ValueError(f"Cannot parse filename: {filename}") def load_single_csv(self, csv_path): data = pd.read_csv(csv_path, header=None, names=['angle', 'count']) angles = data['angle'].values counts = data['count'].values return angles, counts def load_all_data(self, phase="both"): X_list = [] y_list = [] damage_list = [] if phase == "both": subdirs = ["unstable_development", "peak_stress"] elif phase == "early": subdirs = ["unstable_development"] elif phase == "peak": subdirs = ["peak_stress"] else: raise ValueError(f"Unknown phase: {phase}") for subdir in subdirs: subdir_path = os.path.join(self.data_dir, subdir) if not os.path.exists(subdir_path): print(f"Warning: Directory does not exist {subdir_path}") continue phase_code = 0 if "unstable" in subdir else 1 csv_files = glob(os.path.join(subdir_path, "*.csv")) print(f"Loading {len(csv_files)} files from {subdir}...") for csv_file in csv_files: try: params = self.parse_filename(os.path.basename(csv_file)) angles, counts = self.load_single_csv(csv_file) D0 = DamageCalculator.compute_total_damage( params['pH'], params['FN'], params['FT'], params['T'] ) lambda_coef = DamageCalculator.compute_lambda(D0) features = np.array([ params['pH'], params['FN'], params['FT'], params['T'], phase_code ], dtype=np.float32) X_list.append(features) y_list.append(counts) damage_list.append({'D0': D0, 'lambda': lambda_coef}) except Exception as e: print(f"Skipping file {csv_file}: {e}") continue if len(X_list) == 0: raise ValueError("No data loaded successfully!") X = np.array(X_list) y_length = len(y_list[0]) y_padded = [] for y_sample in y_list: if len(y_sample) < y_length: y_sample = np.pad(y_sample, (0, y_length - len(y_sample)), 'constant') elif len(y_sample) > y_length: y_sample = y_sample[:y_length] y_padded.append(y_sample) y = np.array(y_padded) angles, _ = self.load_single_csv(csv_files[0]) angle_bins = angles[:y_length] print(f"\nData loading complete:") print(f" Samples: {X.shape[0]}") print(f" Input features: {X.shape[1]} (pH, FN, FT, T, phase)") print(f" Output dimension: {y.shape[1]} (angle bins)") print(f" Angle range: {angle_bins[0]:.1f} - {angle_bins[-1]:.1f}") print(f" Total cracks range: {y.sum(axis=1).min():.0f} - {y.sum(axis=1).max():.0f}") return X, y, angle_bins, damage_list def create_synthetic_data(self, n_samples=100, output_dim=72): pH_values = [1, 3, 5, 7] FN_values = [5, 10, 20, 40] FT_values = [10, 20, 30, 40] T_values = [25, 300, 600, 900] phase_values = [0, 1] X_list = [] y_list = [] for _ in range(n_samples): pH = np.random.choice(pH_values) FN = np.random.choice(FN_values) FT = np.random.choice(FT_values) T = np.random.choice(T_values) phase = np.random.choice(phase_values) D0 = DamageCalculator.compute_total_damage(pH, FN, FT, T) if self.stress_type == "major": peak_angle = 90.0 + np.random.normal(0, 10) spread = 15.0 + D0 * 20.0 else: peak_angle = 45.0 + np.random.normal(0, 15) spread = 20.0 + D0 * 25.0 angles = np.linspace(0, 175, output_dim) distribution = np.exp(-0.5 * ((angles - peak_angle) / spread) ** 2) distribution = distribution * (100 + D0 * 200) * (1 + 0.5 * phase) distribution = distribution + np.random.normal(0, 5, output_dim) distribution = np.maximum(distribution, 0) X_list.append([pH, FN, FT, T, phase]) y_list.append(distribution) X = np.array(X_list, dtype=np.float32) y = np.array(y_list, dtype=np.float32) angle_bins = np.linspace(0, 175, output_dim) return X, y, angle_bins def normalize_data(self, X_train, y_train, X_test=None, y_test=None): X_train_norm = self.scaler_X.fit_transform(X_train) y_train_norm = self.scaler_y.fit_transform(y_train) if X_test is not None and y_test is not None: X_test_norm = self.scaler_X.transform(X_test) y_test_norm = self.scaler_y.transform(y_test) return X_train_norm, y_train_norm, X_test_norm, y_test_norm else: return X_train_norm, y_train_norm def denormalize_output(self, y_norm): return self.scaler_y.inverse_transform(y_norm) def get_statistics(self, X, y): stats = { 'n_samples': X.shape[0], 'input_dim': X.shape[1], 'output_dim': y.shape[1], 'pH_range': (X[:, 0].min(), X[:, 0].max()), 'FN_range': (X[:, 1].min(), X[:, 1].max()), 'FT_range': (X[:, 2].min(), X[:, 2].max()), 'T_range': (X[:, 3].min(), X[:, 3].max()), 'total_cracks_range': (y.sum(axis=1).min(), y.sum(axis=1).max()), 'total_cracks_mean': y.sum(axis=1).mean(), 'total_cracks_std': y.sum(axis=1).std(), } D0_values = [] for i in range(X.shape[0]): D0 = DamageCalculator.compute_total_damage(X[i, 0], X[i, 1], X[i, 2], X[i, 3]) D0_values.append(D0) stats['D0_range'] = (min(D0_values), max(D0_values)) stats['D0_mean'] = np.mean(D0_values) return stats