Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Preprocessing script for experimental images to extract displacement fields | |
| for elastic parameter identification using PINN. | |
| This script performs Digital Image Correlation (DIC) on experimental images | |
| to extract u_x, u_y displacement fields, then computes stress fields. | |
| Usage: | |
| python preprocess_user_data.py --input /path/to/images/ --output /path/to/output/ | |
| --calibration 0.1 --geometry rectangular | |
| """ | |
| import os | |
| import argparse | |
| import json | |
| import zipfile | |
| import tempfile | |
| from pathlib import Path | |
| import numpy as np | |
| import cv2 | |
| from scipy import ndimage | |
| from scipy.interpolate import griddata | |
| import warnings | |
| try: | |
| import tifffile | |
| HAS_TIFFILE = True | |
| except ImportError: | |
| HAS_TIFFILE = False | |
| import numpy as np | |
| class DICProcessor: | |
| """ | |
| Digital Image Correlation processor for extracting displacement fields | |
| from speckle pattern images. | |
| """ | |
| def __init__(self, subset_size=64, step=8, corr_method=cv2.TM_CCOEFF_NORMED): | |
| """ | |
| Initialize DIC processor. | |
| Args: | |
| subset_size: Size of the subset window for correlation (pixels) | |
| step: Step size for grid points (pixels) | |
| corr_method: OpenCV template matching method | |
| """ | |
| self.subset_size = subset_size | |
| self.step = step | |
| self.corr_method = corr_method | |
| def extract_displacement_field(self, ref_image, deformed_image, calibration=1.0): | |
| """ | |
| Extract displacement field between reference and deformed images. | |
| Args: | |
| ref_image: Reference (undeformed) image | |
| deformed_image: Deformed image | |
| calibration: Pixel to physical unit conversion (mm/pixel) | |
| Returns: | |
| dict: Dictionary containing x, y coordinates and u_x, u_y displacements | |
| """ | |
| if len(ref_image.shape) > 2: | |
| ref_image = cv2.cvtColor(ref_image, cv2.COLOR_BGR2GRAY) | |
| if len(deformed_image.shape) > 2: | |
| deformed_image = cv2.cvtColor(deformed_image, cv2.COLOR_BGR2GRAY) | |
| ref_image = np.float64(ref_image) | |
| deformed_image = np.float64(deformed_image) | |
| ref_image = (ref_image - ref_image.mean()) / ref_image.std() | |
| deformed_image = (deformed_image - deformed_image.mean()) / deformed_image.std() | |
| h, w = ref_image.shape | |
| half_subset = self.subset_size // 2 | |
| y_coords = range(half_subset, h - half_subset, self.step) | |
| x_coords = range(half_subset, w - half_subset, self.step) | |
| u_x = np.zeros((len(y_coords), len(x_coords))) | |
| u_y = np.zeros((len(y_coords), len(x_coords))) | |
| valid_mask = np.zeros((len(y_coords), len(x_coords)), dtype=bool) | |
| for i, y in enumerate(y_coords): | |
| for j, x in enumerate(x_coords): | |
| subset = ref_image[ | |
| y - half_subset : y + half_subset, x - half_subset : x + half_subset | |
| ] | |
| search_region = deformed_image[ | |
| max(0, y - half_subset - 50) : min(h, y + half_subset + 50), | |
| max(0, x - half_subset - 50) : min(w, x + half_subset + 50), | |
| ] | |
| if ( | |
| search_region.shape[0] < self.subset_size | |
| or search_region.shape[1] < self.subset_size | |
| ): | |
| continue | |
| try: | |
| result = cv2.matchTemplate(search_region, subset, self.corr_method) | |
| min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result) | |
| if self.corr_method in [cv2.TM_SQDIFF, cv2.TM_SQDIFF_NORMED]: | |
| match_loc = min_loc | |
| else: | |
| match_loc = max_loc | |
| offset_y = match_loc[1] - 50 | |
| offset_x = match_loc[0] - 50 | |
| u_y[i, j] = offset_y | |
| u_x[i, j] = offset_x | |
| valid_mask[i, j] = True | |
| except Exception: | |
| continue | |
| x_grid = np.array([x * calibration for x in x_coords]) | |
| y_grid = np.array([y * calibration for y in y_coords]) | |
| u_x = u_x * calibration | |
| u_y = u_y * calibration | |
| return { | |
| "x": x_grid, | |
| "y": y_grid, | |
| "u_x": u_x, | |
| "u_y": u_y, | |
| "valid_mask": valid_mask, | |
| "calibration": calibration, | |
| } | |
| def compute_strains(self, disp_data, lambda_val=1.0, mu_val=0.5): | |
| """ | |
| Compute strain and stress fields from displacement data. | |
| Args: | |
| disp_data: Dictionary with x, y, u_x, u_y | |
| lambda_val: First Lamé parameter (normalized) | |
| mu_val: Second Lamé parameter (normalized) | |
| Returns: | |
| dict: Strain and stress fields | |
| """ | |
| x = disp_data["x"] | |
| y = disp_data["y"] | |
| u_x = disp_data["u_x"] | |
| u_y = disp_data["u_y"] | |
| dx = x[1] - x[0] if len(x) > 1 else 1.0 | |
| dy = y[1] - y[0] if len(y) > 1 else 1.0 | |
| epsilon_xx = np.gradient(u_x, dx, axis=1) | |
| epsilon_yy = np.gradient(u_y, dy, axis=0) | |
| epsilon_xy = 0.5 * (np.gradient(u_x, dy, axis=0) + np.gradient(u_y, dx, axis=1)) | |
| sigma_xx = (lambda_val + 2 * mu_val) * epsilon_xx + lambda_val * epsilon_yy | |
| sigma_yy = (lambda_val + 2 * mu_val) * epsilon_yy + lambda_val * epsilon_xx | |
| sigma_xy = 2 * mu_val * epsilon_xy | |
| return { | |
| "epsilon_xx": epsilon_xx, | |
| "epsilon_yy": epsilon_yy, | |
| "epsilon_xy": epsilon_xy, | |
| "sigma_xx": sigma_xx, | |
| "sigma_yy": sigma_yy, | |
| "sigma_xy": sigma_xy, | |
| } | |
| def normalize_to_pinn_format(self, disp_data, stress_data, domain_bounds=None): | |
| """ | |
| Normalize data to PINN training format (domain [0,1] x [0,1]). | |
| Args: | |
| disp_data: Displacement data dictionary | |
| stress_data: Stress data dictionary | |
| domain_bounds: Optional (x_min, x_max, y_min, y_max) for normalization | |
| Returns: | |
| dict: Normalized data ready for PINN | |
| """ | |
| x = disp_data["x"] | |
| y = disp_data["y"] | |
| if domain_bounds is None: | |
| x_min, x_max = x.min(), x.max() | |
| y_min, y_max = y.min(), y.max() | |
| else: | |
| x_min, x_max, y_min, y_max = domain_bounds | |
| x_norm = (x - x_min) / (x_max - x_min) | |
| y_norm = (y - y_min) / (y_max - y_min) | |
| u_x_norm = disp_data["u_x"] | |
| u_y_norm = disp_data["u_y"] | |
| u_x_norm = (u_x_norm - u_x_norm.mean()) / u_x_norm.std() | |
| u_y_norm = (u_y_norm - u_y_norm.mean()) / u_y_norm.std() | |
| return { | |
| "x_norm": x_norm, | |
| "y_norm": y_norm, | |
| "u_x": u_x_norm, | |
| "u_y": u_y_norm, | |
| "sigma_xx": stress_data["sigma_xx"], | |
| "sigma_yy": stress_data["sigma_yy"], | |
| "sigma_xy": stress_data["sigma_xy"], | |
| "original_bounds": (x_min, x_max, y_min, y_max), | |
| "calibration": disp_data["calibration"], | |
| } | |
| class ImageLoader: | |
| """ | |
| Handles loading images from various sources (folder, zip, etc.) | |
| """ | |
| SUPPORTED_FORMATS = {".tif", ".tiff", ".png", ".jpg", ".jpeg", ".bmp"} | |
| def load_images_from_folder(folder_path, sort_by_name=True): | |
| """ | |
| Load all images from a folder. | |
| Args: | |
| folder_path: Path to folder containing images | |
| sort_by_name: Whether to sort images by filename | |
| Returns: | |
| list: List of image arrays | |
| """ | |
| folder = Path(folder_path) | |
| image_files = [] | |
| for ext in ImageLoader.SUPPORTED_FORMATS: | |
| image_files.extend(list(folder.glob(f"*{ext}"))) | |
| image_files.extend(list(folder.glob(f"*{ext.upper()}"))) | |
| if sort_by_name: | |
| image_files = sorted(image_files) | |
| images = [] | |
| for img_path in image_files: | |
| img = ImageLoader.load_image(img_path) | |
| if img is not None: | |
| images.append( | |
| {"path": str(img_path), "name": img_path.name, "data": img} | |
| ) | |
| return images | |
| def load_images_from_zip(zip_path, extract_to=None): | |
| """ | |
| Load images from a ZIP file, preserving order in filename. | |
| Args: | |
| zip_path: Path to ZIP file | |
| extract_to: Optional folder to extract images | |
| Returns: | |
| list: List of image dictionaries | |
| """ | |
| zip_path = Path(zip_path) | |
| if extract_to is None: | |
| extract_to = tempfile.mkdtemp() | |
| with zipfile.ZipFile(zip_path, "r") as zf: | |
| image_files = [ | |
| f | |
| for f in zf.namelist() | |
| if Path(f).suffix.lower() in ImageLoader.SUPPORTED_FORMATS | |
| ] | |
| image_files = sorted(image_files) | |
| zf.extractall(extract_to) | |
| return ImageLoader.load_images_from_folder(extract_to, sort_by_name=True) | |
| def load_image(path): | |
| """ | |
| Load a single image from various formats. | |
| Args: | |
| path: Path to image file | |
| Returns: | |
| numpy array or None | |
| """ | |
| path = Path(path) | |
| suffix = path.suffix.lower() | |
| try: | |
| if suffix in [".tif", ".tiff"]: | |
| if HAS_TIFFILE: | |
| return tifffile.imread(str(path)) | |
| else: | |
| return cv2.imread(str(path), cv2.IMREAD_UNCHANGED) | |
| else: | |
| return cv2.imread(str(path), cv2.IMREAD_GRAYSCALE) | |
| except Exception as e: | |
| print(f"Error loading {path}: {e}") | |
| return None | |
| class ExperimentalDataProcessor: | |
| """ | |
| Main class for processing experimental images and preparing data for PINN. | |
| """ | |
| def __init__( | |
| self, | |
| calibration=1.0, | |
| geometry="rectangular", | |
| domain_bounds=None, | |
| subset_size=64, | |
| step=8, | |
| ): | |
| """ | |
| Initialize processor. | |
| Args: | |
| calibration: Pixel to mm conversion | |
| geometry: 'rectangular' or other | |
| domain_bounds: (x_min, x_max, y_min, y_max) in mm | |
| subset_size: DIC subset size | |
| step: DIC step size | |
| """ | |
| self.calibration = calibration | |
| self.geometry = geometry | |
| self.domain_bounds = domain_bounds | |
| self.dic = DICProcessor(subset_size=subset_size, step=step) | |
| def process_image_sequence( | |
| self, images, reference_index=0, lambda_init=1.0, mu_init=0.5 | |
| ): | |
| """ | |
| Process a sequence of images to extract displacement fields. | |
| Args: | |
| images: List of image dictionaries | |
| reference_index: Index of reference (undeformed) image | |
| lambda_init: Initial lambda for stress calculation | |
| mu_init: Initial mu for stress calculation | |
| Returns: | |
| list: List of processed data dictionaries | |
| """ | |
| if len(images) < 2: | |
| raise ValueError("At least 2 images required (reference + deformed)") | |
| ref_img = images[reference_index]["data"] | |
| results = [] | |
| for i, img_dict in enumerate(images): | |
| if i == reference_index: | |
| continue | |
| def_img = img_dict["data"] | |
| disp_data = self.dic.extract_displacement_field( | |
| ref_img, def_img, self.calibration | |
| ) | |
| stress_data = self.dic.compute_strains(disp_data, lambda_init, mu_init) | |
| normalized = self.dic.normalize_to_pinn_format( | |
| disp_data, stress_data, self.domain_bounds | |
| ) | |
| results.append( | |
| { | |
| "image_name": img_dict["name"], | |
| "step": i, | |
| "displacement": disp_data, | |
| "stress": stress_data, | |
| "normalized": normalized, | |
| } | |
| ) | |
| print(f"Processed: {img_dict['name']} (step {i})") | |
| return results | |
| def export_to_csv(self, processed_data, output_path): | |
| """ | |
| Export processed data to CSV format for PINN training. | |
| Args: | |
| processed_data: List of processed data dictionaries | |
| output_path: Path to output CSV file | |
| """ | |
| import pandas as pd | |
| all_points = [] | |
| for data in processed_data: | |
| x = data["normalized"]["x_norm"].flatten() | |
| y = data["normalized"]["y_norm"].flatten() | |
| ux = data["normalized"]["u_x"].flatten() | |
| uy = data["normalized"]["u_y"].flatten() | |
| sxx = data["normalized"]["sigma_xx"].flatten() | |
| syy = data["normalized"]["sigma_yy"].flatten() | |
| sxy = data["normalized"]["sigma_xy"].flatten() | |
| for i in range(len(x)): | |
| all_points.append( | |
| { | |
| "x": x[i], | |
| "y": y[i], | |
| "u_x": ux[i], | |
| "u_y": uy[i], | |
| "sigma_xx": sxx[i], | |
| "sigma_yy": syy[i], | |
| "sigma_xy": sxy[i], | |
| "step": data["step"], | |
| } | |
| ) | |
| df = pd.DataFrame(all_points) | |
| df.to_csv(output_path, index=False) | |
| print(f"Exported to: {output_path}") | |
| return df | |
| def export_to_numpy(self, processed_data, output_path): | |
| """ | |
| Export processed data to numpy format. | |
| Args: | |
| processed_data: List of processed data dictionaries | |
| output_path: Path to output .npz file | |
| """ | |
| x_data = [] | |
| y_data = [] | |
| ux_data = [] | |
| uy_data = [] | |
| sxx_data = [] | |
| syy_data = [] | |
| sxy_data = [] | |
| for data in processed_data: | |
| x_data.append(data["normalized"]["x_norm"]) | |
| y_data.append(data["normalized"]["y_norm"]) | |
| ux_data.append(data["normalized"]["u_x"]) | |
| uy_data.append(data["normalized"]["u_y"]) | |
| sxx_data.append(data["normalized"]["sigma_xx"]) | |
| syy_data.append(data["normalized"]["sigma_yy"]) | |
| sxy_data.append(data["normalized"]["sigma_xy"]) | |
| np.savez( | |
| output_path, | |
| x=np.array(x_data), | |
| y=np.array(y_data), | |
| u_x=np.array(ux_data), | |
| u_y=np.array(uy_data), | |
| sigma_xx=np.array(sxx_data), | |
| sigma_yy=np.array(syy_data), | |
| sigma_xy=np.array(sxy_data), | |
| domain_bounds=self.domain_bounds, | |
| calibration=self.calibration, | |
| ) | |
| print(f"Exported to: {output_path}") | |
| def save_metadata(self, processed_data, output_path, metadata=None): | |
| """ | |
| Save processing metadata to JSON. | |
| Args: | |
| processed_data: List of processed data | |
| output_path: Path to output JSON | |
| metadata: Additional metadata dictionary | |
| """ | |
| meta = { | |
| "num_images": len(processed_data), | |
| "calibration_mm_per_pixel": self.calibration, | |
| "geometry": self.geometry, | |
| "domain_bounds": self.domain_bounds, | |
| "dic_parameters": { | |
| "subset_size": self.dic.subset_size, | |
| "step": self.dic.step, | |
| }, | |
| "images": [ | |
| {"name": d["image_name"], "step": d["step"]} for d in processed_data | |
| ], | |
| } | |
| if metadata: | |
| meta.update(metadata) | |
| with open(output_path, "w") as f: | |
| json.dump(meta, f, indent=2) | |
| print(f"Metadata saved to: {output_path}") | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Process experimental images for PINN-based elastic parameter identification" | |
| ) | |
| parser.add_argument( | |
| "--input", | |
| "-i", | |
| required=True, | |
| help="Input folder or ZIP file containing images", | |
| ) | |
| parser.add_argument( | |
| "--output", "-o", required=True, help="Output folder for processed data" | |
| ) | |
| parser.add_argument( | |
| "--calibration", | |
| "-c", | |
| type=float, | |
| default=1.0, | |
| help="Pixel to mm conversion (default: 1.0)", | |
| ) | |
| parser.add_argument( | |
| "--geometry", | |
| "-g", | |
| default="rectangular", | |
| choices=["rectangular", "circular", "custom"], | |
| help="Sample geometry (default: rectangular)", | |
| ) | |
| parser.add_argument( | |
| "--bounds", | |
| nargs=4, | |
| type=float, | |
| metavar=("XMIN", "XMAX", "YMIN", "YMAX"), | |
| help="Domain bounds in mm", | |
| ) | |
| parser.add_argument( | |
| "--reference", | |
| "-r", | |
| type=int, | |
| default=0, | |
| help="Reference image index (default: 0)", | |
| ) | |
| parser.add_argument( | |
| "--subset-size", | |
| type=int, | |
| default=64, | |
| help="DIC subset size in pixels (default: 64)", | |
| ) | |
| parser.add_argument( | |
| "--step", type=int, default=8, help="DIC step size in pixels (default: 8)" | |
| ) | |
| parser.add_argument("--zip", action="store_true", help="Input is a ZIP file") | |
| parser.add_argument( | |
| "--export-format", | |
| choices=["csv", "numpy", "both"], | |
| default="both", | |
| help="Export format", | |
| ) | |
| args = parser.parse_args() | |
| os.makedirs(args.output, exist_ok=True) | |
| print(f"Loading images from: {args.input}") | |
| if args.zip or str(args.input).endswith(".zip"): | |
| images = ImageLoader.load_images_from_zip(args.input) | |
| else: | |
| images = ImageLoader.load_images_from_folder(args.input) | |
| print(f"Loaded {len(images)} images") | |
| if len(images) < 2: | |
| print("Error: Need at least 2 images") | |
| return | |
| domain_bounds = tuple(args.bounds) if args.bounds else None | |
| processor = ExperimentalDataProcessor( | |
| calibration=args.calibration, | |
| geometry=args.geometry, | |
| domain_bounds=domain_bounds, | |
| subset_size=args.subset_size, | |
| step=args.step, | |
| ) | |
| print("Processing image sequence...") | |
| processed_data = processor.process_image_sequence( | |
| images, reference_index=args.reference | |
| ) | |
| print("Exporting data...") | |
| if args.export_format in ["csv", "both"]: | |
| csv_path = os.path.join(args.output, "training_data.csv") | |
| processor.export_to_csv(processed_data, csv_path) | |
| if args.export_format in ["numpy", "both"]: | |
| npz_path = os.path.join(args.output, "training_data.npz") | |
| processor.export_to_numpy(processed_data, npz_path) | |
| meta_path = os.path.join(args.output, "processing_metadata.json") | |
| processor.save_metadata(processed_data, meta_path) | |
| print(f"\nProcessing complete! Output in: {args.output}") | |
| if __name__ == "__main__": | |
| main() | |