|
|
import os
|
|
|
import re
|
|
|
import numpy as np
|
|
|
import pandas as pd
|
|
|
from glob import glob
|
|
|
from sklearn.preprocessing import StandardScaler
|
|
|
|
|
|
|
|
|
class DamageCalculator:
|
|
|
|
|
|
@staticmethod
|
|
|
def compute_freeze_thaw_damage(FN, FT, a1=0.002, b1=1.0, c1=0.02):
|
|
|
return a1 * (FN ** b1) * np.exp(c1 * FT)
|
|
|
|
|
|
@staticmethod
|
|
|
def compute_chemical_damage(pH, a2=0.01, b2=1.5):
|
|
|
return a2 * np.abs(pH - 7.0) ** b2
|
|
|
|
|
|
@staticmethod
|
|
|
def compute_thermal_damage(T, T0=100.0, a3=0.0003, b3=1.2):
|
|
|
if T < T0:
|
|
|
return 0.0
|
|
|
return a3 * ((T - T0) ** b3)
|
|
|
|
|
|
@staticmethod
|
|
|
def compute_total_damage(pH, FN, FT, T):
|
|
|
D_ft = DamageCalculator.compute_freeze_thaw_damage(FN, FT)
|
|
|
D_ch = DamageCalculator.compute_chemical_damage(pH)
|
|
|
D_th = DamageCalculator.compute_thermal_damage(T)
|
|
|
|
|
|
D_total = 1.0 - (1.0 - D_ft) * (1.0 - D_ch) * (1.0 - D_th)
|
|
|
return np.clip(D_total, 0.0, 0.99)
|
|
|
|
|
|
@staticmethod
|
|
|
def compute_lambda(D0):
|
|
|
return 1.0 - D0
|
|
|
|
|
|
|
|
|
class CrackDataLoader:
|
|
|
|
|
|
def __init__(self, base_path, stress_type="major"):
|
|
|
self.base_path = base_path
|
|
|
self.stress_type = stress_type
|
|
|
|
|
|
if stress_type == "major":
|
|
|
self.data_dir = os.path.join(base_path, "major_principal_stress")
|
|
|
else:
|
|
|
self.data_dir = os.path.join(base_path, "minor_principal_stress")
|
|
|
|
|
|
self.scaler_X = StandardScaler()
|
|
|
self.scaler_y = StandardScaler()
|
|
|
self.damage_calculator = DamageCalculator()
|
|
|
|
|
|
def parse_filename(self, filename):
|
|
|
pattern = r'(\d+)-(\d+)-(\d+)-(\d+)'
|
|
|
match = re.search(pattern, filename)
|
|
|
|
|
|
if match:
|
|
|
pH = int(match.group(1))
|
|
|
FN = int(match.group(2))
|
|
|
FT = int(match.group(3))
|
|
|
T = int(match.group(4))
|
|
|
|
|
|
return {
|
|
|
'pH': pH,
|
|
|
'FN': FN,
|
|
|
'FT': FT,
|
|
|
'T': T
|
|
|
}
|
|
|
else:
|
|
|
raise ValueError(f"Cannot parse filename: {filename}")
|
|
|
|
|
|
def load_single_csv(self, csv_path):
|
|
|
data = pd.read_csv(csv_path, header=None, names=['angle', 'count'])
|
|
|
angles = data['angle'].values
|
|
|
counts = data['count'].values
|
|
|
return angles, counts
|
|
|
|
|
|
def load_all_data(self, phase="both"):
|
|
|
X_list = []
|
|
|
y_list = []
|
|
|
damage_list = []
|
|
|
|
|
|
if phase == "both":
|
|
|
subdirs = ["unstable_development", "peak_stress"]
|
|
|
elif phase == "early":
|
|
|
subdirs = ["unstable_development"]
|
|
|
elif phase == "peak":
|
|
|
subdirs = ["peak_stress"]
|
|
|
else:
|
|
|
raise ValueError(f"Unknown phase: {phase}")
|
|
|
|
|
|
for subdir in subdirs:
|
|
|
subdir_path = os.path.join(self.data_dir, subdir)
|
|
|
|
|
|
if not os.path.exists(subdir_path):
|
|
|
print(f"Warning: Directory does not exist {subdir_path}")
|
|
|
continue
|
|
|
|
|
|
phase_code = 0 if "unstable" in subdir else 1
|
|
|
|
|
|
csv_files = glob(os.path.join(subdir_path, "*.csv"))
|
|
|
|
|
|
print(f"Loading {len(csv_files)} files from {subdir}...")
|
|
|
|
|
|
for csv_file in csv_files:
|
|
|
try:
|
|
|
params = self.parse_filename(os.path.basename(csv_file))
|
|
|
|
|
|
angles, counts = self.load_single_csv(csv_file)
|
|
|
|
|
|
D0 = DamageCalculator.compute_total_damage(
|
|
|
params['pH'], params['FN'], params['FT'], params['T']
|
|
|
)
|
|
|
lambda_coef = DamageCalculator.compute_lambda(D0)
|
|
|
|
|
|
features = np.array([
|
|
|
params['pH'],
|
|
|
params['FN'],
|
|
|
params['FT'],
|
|
|
params['T'],
|
|
|
phase_code
|
|
|
], dtype=np.float32)
|
|
|
|
|
|
X_list.append(features)
|
|
|
y_list.append(counts)
|
|
|
damage_list.append({'D0': D0, 'lambda': lambda_coef})
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Skipping file {csv_file}: {e}")
|
|
|
continue
|
|
|
|
|
|
if len(X_list) == 0:
|
|
|
raise ValueError("No data loaded successfully!")
|
|
|
|
|
|
X = np.array(X_list)
|
|
|
|
|
|
y_length = len(y_list[0])
|
|
|
y_padded = []
|
|
|
|
|
|
for y_sample in y_list:
|
|
|
if len(y_sample) < y_length:
|
|
|
y_sample = np.pad(y_sample, (0, y_length - len(y_sample)), 'constant')
|
|
|
elif len(y_sample) > y_length:
|
|
|
y_sample = y_sample[:y_length]
|
|
|
y_padded.append(y_sample)
|
|
|
|
|
|
y = np.array(y_padded)
|
|
|
|
|
|
angles, _ = self.load_single_csv(csv_files[0])
|
|
|
angle_bins = angles[:y_length]
|
|
|
|
|
|
print(f"\nData loading complete:")
|
|
|
print(f" Samples: {X.shape[0]}")
|
|
|
print(f" Input features: {X.shape[1]} (pH, FN, FT, T, phase)")
|
|
|
print(f" Output dimension: {y.shape[1]} (angle bins)")
|
|
|
print(f" Angle range: {angle_bins[0]:.1f} - {angle_bins[-1]:.1f}")
|
|
|
print(f" Total cracks range: {y.sum(axis=1).min():.0f} - {y.sum(axis=1).max():.0f}")
|
|
|
|
|
|
return X, y, angle_bins, damage_list
|
|
|
|
|
|
def create_synthetic_data(self, n_samples=100, output_dim=72):
|
|
|
pH_values = [1, 3, 5, 7]
|
|
|
FN_values = [5, 10, 20, 40]
|
|
|
FT_values = [10, 20, 30, 40]
|
|
|
T_values = [25, 300, 600, 900]
|
|
|
phase_values = [0, 1]
|
|
|
|
|
|
X_list = []
|
|
|
y_list = []
|
|
|
|
|
|
for _ in range(n_samples):
|
|
|
pH = np.random.choice(pH_values)
|
|
|
FN = np.random.choice(FN_values)
|
|
|
FT = np.random.choice(FT_values)
|
|
|
T = np.random.choice(T_values)
|
|
|
phase = np.random.choice(phase_values)
|
|
|
|
|
|
D0 = DamageCalculator.compute_total_damage(pH, FN, FT, T)
|
|
|
|
|
|
if self.stress_type == "major":
|
|
|
peak_angle = 90.0 + np.random.normal(0, 10)
|
|
|
spread = 15.0 + D0 * 20.0
|
|
|
else:
|
|
|
peak_angle = 45.0 + np.random.normal(0, 15)
|
|
|
spread = 20.0 + D0 * 25.0
|
|
|
|
|
|
angles = np.linspace(0, 175, output_dim)
|
|
|
distribution = np.exp(-0.5 * ((angles - peak_angle) / spread) ** 2)
|
|
|
distribution = distribution * (100 + D0 * 200) * (1 + 0.5 * phase)
|
|
|
distribution = distribution + np.random.normal(0, 5, output_dim)
|
|
|
distribution = np.maximum(distribution, 0)
|
|
|
|
|
|
X_list.append([pH, FN, FT, T, phase])
|
|
|
y_list.append(distribution)
|
|
|
|
|
|
X = np.array(X_list, dtype=np.float32)
|
|
|
y = np.array(y_list, dtype=np.float32)
|
|
|
angle_bins = np.linspace(0, 175, output_dim)
|
|
|
|
|
|
return X, y, angle_bins
|
|
|
|
|
|
def normalize_data(self, X_train, y_train, X_test=None, y_test=None):
|
|
|
X_train_norm = self.scaler_X.fit_transform(X_train)
|
|
|
y_train_norm = self.scaler_y.fit_transform(y_train)
|
|
|
|
|
|
if X_test is not None and y_test is not None:
|
|
|
X_test_norm = self.scaler_X.transform(X_test)
|
|
|
y_test_norm = self.scaler_y.transform(y_test)
|
|
|
return X_train_norm, y_train_norm, X_test_norm, y_test_norm
|
|
|
else:
|
|
|
return X_train_norm, y_train_norm
|
|
|
|
|
|
def denormalize_output(self, y_norm):
|
|
|
return self.scaler_y.inverse_transform(y_norm)
|
|
|
|
|
|
def get_statistics(self, X, y):
|
|
|
stats = {
|
|
|
'n_samples': X.shape[0],
|
|
|
'input_dim': X.shape[1],
|
|
|
'output_dim': y.shape[1],
|
|
|
'pH_range': (X[:, 0].min(), X[:, 0].max()),
|
|
|
'FN_range': (X[:, 1].min(), X[:, 1].max()),
|
|
|
'FT_range': (X[:, 2].min(), X[:, 2].max()),
|
|
|
'T_range': (X[:, 3].min(), X[:, 3].max()),
|
|
|
'total_cracks_range': (y.sum(axis=1).min(), y.sum(axis=1).max()),
|
|
|
'total_cracks_mean': y.sum(axis=1).mean(),
|
|
|
'total_cracks_std': y.sum(axis=1).std(),
|
|
|
}
|
|
|
|
|
|
D0_values = []
|
|
|
for i in range(X.shape[0]):
|
|
|
D0 = DamageCalculator.compute_total_damage(X[i, 0], X[i, 1], X[i, 2], X[i, 3])
|
|
|
D0_values.append(D0)
|
|
|
|
|
|
stats['D0_range'] = (min(D0_values), max(D0_values))
|
|
|
stats['D0_mean'] = np.mean(D0_values)
|
|
|
|
|
|
return stats
|
|
|
|