import os import numpy as np import pandas as pd import matplotlib.pyplot as plt from scipy.signal import savgol_filter, find_peaks from scipy.ndimage import gaussian_filter1d from scipy.spatial.distance import pdist, squareform from sklearn.preprocessing import StandardScaler from pymatgen.core import Structure from pymatgen.analysis.diffraction.xrd import XRDCalculator import cv2 from skimage import filters, measure, morphology from scipy import ndimage import requests import re import tempfile import json from typing import Dict, List, Tuple, Optional # Configure matplotlib for headless operation plt.switch_backend('Agg') class UniversalFiberBundleAnalyzer: """Core analyzer for multi-modal materials data""" def __init__(self): self.results = {} def process_sample(self, files: Dict[str, str], sample_name: str = "sample") -> Dict: """ Process all available modalities for a sample Args: files: Dictionary with keys: 'xrd', 'vsm', 'uvvis', 'pl', 'tem' sample_name: Name for the sample Returns: Dictionary with analysis results """ results = {"sample_name": sample_name} # Process XRD if files.get('xrd'): try: xrd_data = self._load_spectral_data(files['xrd']) xrd_analyzer = XRDAnalyzer() xrd_invariants = xrd_analyzer.compute_local_invariants(xrd_data['x'], xrd_data['y']) xrd_features = xrd_analyzer.extract_global_features(xrd_data['x'], xrd_data['y'], xrd_invariants) results['xrd'] = { 'wavelength': xrd_data['x'], 'intensity': xrd_data['y'], 'invariants': xrd_invariants, 'features': xrd_features } except Exception as e: results['xrd_error'] = str(e) # Process VSM if files.get('vsm'): try: vsm_data = self._load_spectral_data(files['vsm']) vsm_analyzer = VSMAnalyzer() vsm_invariants = vsm_analyzer.compute_local_invariants(vsm_data['x'], vsm_data['y']) Hc, Mr = vsm_analyzer.detect_magnetic_params(vsm_data['x'], vsm_data['y']) results['vsm'] = { 'H': vsm_data['x'], 'M': vsm_data['y'], 'invariants': vsm_invariants, 'Hc': Hc, 'Mr': Mr } except Exception as e: results['vsm_error'] = str(e) # Process UV-Vis if files.get('uvvis'): try: uvvis_data = self._load_spectral_data(files['uvvis']) uvvis_analyzer = UVVisAnalyzer() uvvis_invariants = uvvis_analyzer.compute_local_invariants(uvvis_data['x'], uvvis_data['y']) bandgap = uvvis_analyzer.estimate_bandgap(uvvis_data['x'], uvvis_data['y']) results['uvvis'] = { 'wavelength': uvvis_data['x'], 'absorption': uvvis_data['y'], 'invariants': uvvis_invariants, 'bandgap_eV': bandgap } except Exception as e: results['uvvis_error'] = str(e) # Process PL if files.get('pl'): try: pl_data = self._load_spectral_data(files['pl']) pl_analyzer = PLAnalyzer() pl_invariants = pl_analyzer.compute_local_invariants(pl_data['x'], pl_data['y']) peaks = pl_analyzer.extract_pl_peaks(pl_data['x'], pl_data['y']) results['pl'] = { 'wavelength': pl_data['x'], 'intensity': pl_data['y'], 'invariants': pl_invariants, 'peaks': peaks } except Exception as e: results['pl_error'] = str(e) # Process TEM if files.get('tem'): try: tem_results = self._analyze_tem_image(files['tem']) results['tem'] = tem_results except Exception as e: results['tem_error'] = str(e) # Phase identification (requires XRD) if 'xrd' in results: try: phases = self._identify_phases(results['xrd']['wavelength'], results['xrd']['intensity']) results['phases'] = phases except Exception as e: results['phase_error'] = str(e) return results def _load_spectral_data(self, file_path: str) -> Dict[str, np.ndarray]: """Load spectral data from CSV""" df = pd.read_csv(file_path) cols = [c.lower() for c in df.columns] # Detect x column if 'wavelength' in cols: x_col = df.columns[cols.index('wavelength')] elif 'energy' in cols: x_col = df.columns[cols.index('energy')] elif '2theta' in cols: x_col = df.columns[cols.index('2theta')] elif 'h' in cols: x_col = df.columns[cols.index('h')] else: x_col = df.columns[0] # Detect y column if 'intensity' in cols: y_col = df.columns[cols.index('intensity')] elif 'm' in cols: y_col = df.columns[cols.index('m')] elif 'absorption' in cols: y_col = df.columns[cols.index('absorption')] else: y_col = df.columns[1] x = df[x_col].values.astype(float) y = df[y_col].values.astype(float) # Remove NaNs valid = np.isfinite(x) & np.isfinite(y) x, y = x[valid], y[valid] # Sort by x sort_idx = np.argsort(x) x, y = x[sort_idx], y[sort_idx] return {'x': x, 'y': y} def _analyze_tem_image(self, image_path: str) -> Dict: """Analyze TEM/SEM image for particle size""" img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE) if img is None: raise ValueError("Could not load TEM image") # Resize for consistent processing img = cv2.resize(img, (1024, 1024)) img = cv2.GaussianBlur(img, (5, 5), 0) # Threshold thresh = filters.threshold_otsu(img) binary = img < thresh # Clean up binary = morphology.remove_small_objects(binary, min_size=50) binary = morphology.binary_closing(binary, morphology.disk(2)) # Label particles labeled, num_features = ndimage.label(binary) props = measure.regionprops(labeled) if not props: return {"particle_count": 0} # Assume 1 pixel = 1 nm (user should calibrate) pixel_size_nm = 1.0 areas = [p.area for p in props] areas_nm2 = [a * pixel_size_nm**2 for a in areas] diameters_nm = [2 * np.sqrt(a / np.pi) for a in areas_nm2] return { 'particle_count': len(areas), 'mean_diameter_nm': float(np.mean(diameters_nm)), 'std_diameter_nm': float(np.std(diameters_nm)), 'min_diameter_nm': float(np.min(diameters_nm)), 'max_diameter_nm': float(np.max(diameters_nm)) } def _identify_phases(self, two_theta: np.ndarray, intensity: np.ndarray) -> List[Tuple[str, float]]: """Identify phases using COD database""" # Common material COD IDs candidate_cod_ids = { 'Fe3O4': '9008470', 'CoFe2O4': '9008464', 'γ-Fe2O3': '1011106', 'α-Fe2O3': '9007397', 'TiO2_anatase': '9007679', 'TiO2_rutile': '9007680' } calculator = XRDCalculator(wavelength=1.5406) matches = [] for phase_name, cod_id in candidate_cod_ids.items(): structure = self._download_cod_structure(cod_id) if structure is None: continue try: xrd_pattern = calculator.get_pattern(structure) sim_2theta = xrd_pattern.x sim_intensity = xrd_pattern.y # Interpolate to experimental grid sim_interp = np.interp(two_theta, sim_2theta, sim_intensity, left=0, right=0) sim_interp = sim_interp / (np.max(sim_interp) + 1e-8) exp_norm = intensity / (np.max(intensity) + 1e-8) # Compute correlation correlation = np.corrcoef(exp_norm, sim_interp)[0, 1] if not np.isnan(correlation): matches.append((phase_name, float(correlation))) except: continue # Sort by correlation matches.sort(key=lambda x: x[1], reverse=True) return matches[:3] def _download_cod_structure(self, cod_id: str) -> Optional[Structure]: """Download structure from Crystallography Open Database""" try: url = f"https://www.crystallography.net/cod/{cod_id}.cif" response = requests.get(url, timeout=10) if response.status_code == 200: with tempfile.NamedTemporaryFile(mode='w', suffix='.cif', delete=False) as f: f.write(response.text) temp_path = f.name structure = Structure.from_file(temp_path) os.unlink(temp_path) return structure except: return None def generate_report(self, results: Dict) -> str: """Generate scientific interpretation report""" report = [] report.append("=" * 60) report.append(f"🔬 MULTI-MODAL MATERIALS ANALYSIS REPORT") report.append(f"Sample: {results.get('sample_name', 'Unknown')}") report.append("=" * 60) # XRD analysis if 'xrd' in results: xrd = results['xrd'] report.append("\n📊 XRD ANALYSIS:") report.append(f" • Crystallite size: {xrd['features']['crystallite_size']:.2f} (rel. units)") report.append(f" • Microstrain: {xrd['features']['microstrain']:.3f}") report.append(f" • Amorphous ratio: {xrd['features']['amorphous_ratio']:.3f}") # Phase identification if 'phases' in results: report.append("\n🧪 PHASE IDENTIFICATION:") for i, (phase, corr) in enumerate(results['phases']): report.append(f" {i+1}. {phase} (correlation: {corr:.2f})") # VSM analysis if 'vsm' in results: vsm = results['vsm'] report.append("\n🧲 VSM ANALYSIS:") report.append(f" • Coercivity (Hc): {vsm['Hc']:.1f} Oe") report.append(f" • Remanence (Mr): {vsm['Mr']:.3f} (norm.)") # UV-Vis analysis if 'uvvis' in results: uvvis = results['uvvis'] report.append("\n🌈 UV-VIS ANALYSIS:") report.append(f" • Bandgap: {uvvis['bandgap_eV']:.2f} eV") # PL analysis if 'pl' in results: pl = results['pl'] report.append("\n💡 PHOTOLUMINESCENCE:") if pl['peaks']: peak = pl['peaks'][0] report.append(f" • Main peak: {peak['wavelength']:.1f} nm") report.append(f" • FWHM: {peak['fwhm']:.1f} nm") else: report.append(" • No significant peaks detected") # TEM analysis if 'tem' in results: tem = results['tem'] if tem['particle_count'] > 0: report.append("\n🔬 TEM ANALYSIS:") report.append(f" • Particle count: {tem['particle_count']}") report.append(f" • Mean diameter: {tem['mean_diameter_nm']:.1f} ± {tem['std_diameter_nm']:.1f} nm") # Cross-modal insights report.append("\n🧠 CROSS-MODAL INSIGHTS:") # Quantum confinement if 'tem' in results and 'uvvis' in results: tem = results['tem'] uvvis = results['uvvis'] if tem['particle_count'] > 0 and uvvis['bandgap_eV'] > 0: report.append(" • Quantum confinement analysis available") # Defect correlation if 'xrd' in results and 'pl' in results: xrd_disorder = results['xrd']['features']['avg_disorder'] if results['pl']['peaks']: pl_fwhm = results['pl']['peaks'][0]['fwhm'] report.append(" • XRD disorder and PL FWHM can be correlated for defect analysis") report.append("\n💡 RECOMMENDATIONS:") report.append("• Validate phase purity with Rietveld refinement") report.append("• Correlate particle size with magnetic/optical properties") report.append("• For thin films, consider substrate effects") report.append("\n" + "=" * 60) return "\n".join(report) def generate_plots(self, results: Dict, output_dir: str = ".") -> List[str]: """Generate publication-ready plots""" sample_name = results.get('sample_name', 'sample') plot_paths = [] # Create plots directory os.makedirs(output_dir, exist_ok=True) # XRD plot if 'xrd' in results: plt.figure(figsize=(8, 5)) plt.plot(results['xrd']['wavelength'], results['xrd']['intensity'], 'b-') plt.title(f"XRD Pattern - {sample_name}") plt.xlabel("2θ (degrees)") plt.ylabel("Intensity (a.u.)") xrd_path = os.path.join(output_dir, f"{sample_name}_xrd.png") plt.savefig(xrd_path, dpi=300, bbox_inches='tight') plt.close() plot_paths.append(xrd_path) # VSM plot if 'vsm' in results: plt.figure(figsize=(8, 5)) plt.plot(results['vsm']['H'], results['vsm']['M'], 'r-') plt.title(f"VSM Hysteresis Loop - {sample_name}") plt.xlabel("Magnetic Field H (Oe)") plt.ylabel("Magnetization M (norm.)") vsm_path = os.path.join(output_dir, f"{sample_name}_vsm.png") plt.savefig(vsm_path, dpi=300, bbox_inches='tight') plt.close() plot_paths.append(vsm_path) # UV-Vis plot if 'uvvis' in results: plt.figure(figsize=(8, 5)) plt.plot(results['uvvis']['wavelength'], results['uvvis']['absorption'], 'g-') plt.title(f"UV-Vis Absorption - {sample_name}") plt.xlabel("Wavelength (nm)") plt.ylabel("Absorption (a.u.)") uvvis_path = os.path.join(output_dir, f"{sample_name}_uvvis.png") plt.savefig(uvvis_path, dpi=300, bbox_inches='tight') plt.close() plot_paths.append(uvvis_path) # PL plot if 'pl' in results: plt.figure(figsize=(8, 5)) plt.plot(results['pl']['wavelength'], results['pl']['intensity'], 'm-') plt.title(f"Photoluminescence - {sample_name}") plt.xlabel("Wavelength (nm)") plt.ylabel("Intensity (a.u.)") pl_path = os.path.join(output_dir, f"{sample_name}_pl.png") plt.savefig(pl_path, dpi=300, bbox_inches='tight') plt.close() plot_paths.append(pl_path) # Correlation plot (if multiple modalities) if 'tem' in results and 'uvvis' in results: tem = results['tem'] uvvis = results['uvvis'] if tem['particle_count'] > 0 and uvvis['bandgap_eV'] > 0: plt.figure(figsize=(8, 5)) plt.scatter([tem['mean_diameter_nm']], [uvvis['bandgap_eV']], s=100) plt.title(f"Quantum Confinement - {sample_name}") plt.xlabel("Particle Size (nm)") plt.ylabel("Bandgap (eV)") corr_path = os.path.join(output_dir, f"{sample_name}_confinement.png") plt.savefig(corr_path, dpi=300, bbox_inches='tight') plt.close() plot_paths.append(corr_path) return plot_paths # Modal-specific analyzers class XRDAnalyzer: def compute_local_invariants(self, two_theta, intensity, window_size=10): intensity_smooth = savgol_filter(intensity, window_length=min(21, len(intensity)//2 * 2 + 1), polyorder=2) dI = np.gradient(intensity_smooth, two_theta) d2I = np.gradient(dI, two_theta) fiber = [] for i in range(len(two_theta)): start = max(0, i - window_size) end = min(len(two_theta), i + window_size + 1) local_I = intensity[start:end] local_var = np.var(local_I) local_skew = np.mean((local_I - np.mean(local_I))**3) / (np.std(local_I)**3 + 1e-8) fiber.append([ intensity[i], intensity_smooth[i], dI[i], d2I[i], local_var, local_skew ]) fiber = np.array(fiber) invariants = np.zeros((len(two_theta), 6)) for i in range(len(two_theta)): invariants[i] = [ abs(fiber[i, 3]), # sharpness fiber[i, 4], # disorder abs(fiber[i, 5]), # asymmetry 1.0 / (fiber[i, 4] + 1e-8), # stability abs(fiber[i, 2]), # gradient fiber[i, 1] / (np.max(fiber[:, 1]) + 1e-8) # norm intensity ] return invariants def extract_global_features(self, two_theta, intensity, local_invariants): peaks, _ = find_peaks(intensity, height=np.max(intensity)*0.1, distance=20) if len(peaks) == 0: return {'crystallite_size': 0, 'microstrain': 0, 'amorphous_ratio': 1.0, 'n_peaks': 0, 'avg_disorder': 0} fwhms = [] for p in peaks: half_max = intensity[p] / 2.0 left = p while left > 0 and intensity[left] > half_max: left -= 1 right = p while right < len(intensity) - 1 and intensity[right] > half_max: right += 1 fwhm = two_theta[right] - two_theta[left] fwhms.append(fwhm) avg_fwhm = np.mean(fwhms) theta_bragg = two_theta[peaks[0]] / 2.0 rel_size = 1.0 / (avg_fwhm * np.cos(np.radians(theta_bragg)) + 1e-8) smooth_bg = gaussian_filter1d(intensity, sigma=50) amorphous_ratio = np.mean(smooth_bg) / (np.mean(intensity) + 1e-8) microstrain = np.std(fwhms) / (avg_fwhm + 1e-8) avg_disorder = np.mean(local_invariants[:, 1]) return { 'crystallite_size': rel_size, 'microstrain': microstrain, 'amorphous_ratio': amorphous_ratio, 'n_peaks': len(peaks), 'avg_disorder': avg_disorder } class VSMAnalyzer: def compute_local_invariants(self, H, M, window_size=5): dM = np.gradient(M, H) d2M = np.gradient(dM, H) fiber = [] for i in range(len(H)): start = max(0, i - window_size) end = min(len(H), i + window_size + 1) local_M = M[start:end] fiber.append([ M[i], dM[i], d2M[i], np.std(local_M), np.mean((local_M - np.mean(local_M))**3) / (np.std(local_M)**3 + 1e-8) ]) fiber = np.array(fiber) invariants = np.zeros((len(H), 6)) for i in range(len(H)): # Symmetry breaking: |M(H) + M(-H)| H_val = H[i] M_val = M[i] idx_neg = np.argmin(np.abs(H + H_val)) sym_break = abs(M_val + M[idx_neg]) invariants[i] = [ abs(fiber[i, 2]), # curvature sym_break, # symmetry breaking abs(fiber[i, 2]), # sharpness fiber[i, 3], # noise abs(fiber[i, 1]), # gradient 1.0 / (fiber[i, 3] + 1e-8) # stability ] return invariants def detect_magnetic_params(self, H, M): asc_M = M[len(H)//2:] asc_H = H[len(H)//2:] zero_cross = np.where(np.diff(np.sign(asc_M)))[0] Hc = asc_H[zero_cross[0]] if len(zero_cross) > 0 else 0 Mr = M[np.argmin(np.abs(H))] return Hc, Mr class UVVisAnalyzer: def compute_local_invariants(self, wavelength, absorption, window_size=10): intensity_smooth = savgol_filter(absorption, window_length=min(21, len(absorption)//2 * 2 + 1), polyorder=2) dI = np.gradient(intensity_smooth, wavelength) d2I = np.gradient(dI, wavelength) fiber = [] for i in range(len(wavelength)): start = max(0, i - window_size) end = min(len(wavelength), i + window_size + 1) local_I = absorption[start:end] local_var = np.var(local_I) local_skew = np.mean((local_I - np.mean(local_I))**3) / (np.std(local_I)**3 + 1e-8) fiber.append([ absorption[i], intensity_smooth[i], dI[i], d2I[i], local_var, local_skew ]) fiber = np.array(fiber) invariants = np.zeros((len(wavelength), 6)) for i in range(len(wavelength)): invariants[i] = [ abs(fiber[i, 3]), # edge sharpness fiber[i, 4], # disorder abs(fiber[i, 5]), # asymmetry 1.0 / (fiber[i, 4] + 1e-8), # stability abs(fiber[i, 2]), # gradient fiber[i, 1] # norm intensity ] return invariants def estimate_bandgap(self, wavelength, absorption): """Estimate Tauc bandgap for direct semiconductors""" energy = 1240 / wavelength # eV (for nm) alpha_hv_sq = (absorption * energy) ** 2 # Find absorption edge edge_idx = np.argmax(absorption > 0.5 * np.max(absorption)) if edge_idx == 0: return 0 start = max(0, edge_idx - 20) end = min(len(energy), edge_idx + 20) if end - start < 5: return 0 # Linear fit in band edge region try: coeffs = np.polyfit(energy[start:end], alpha_hv_sq[start:end], 1) bandgap = -coeffs[1] / coeffs[0] if coeffs[0] != 0 else 0 return max(0, bandgap) except: return 0 class PLAnalyzer: def compute_local_invariants(self, wavelength, intensity, window_size=10): intensity_smooth = savgol_filter(intensity, window_length=min(21, len(intensity)//2 * 2 + 1), polyorder=2) dI = np.gradient(intensity_smooth, wavelength) d2I = np.gradient(dI, wavelength) fiber = [] for i in range(len(wavelength)): start = max(0, i - window_size) end = min(len(wavelength), i + window_size + 1) local_I = intensity[start:end] local_var = np.var(local_I) local_skew = np.mean((local_I - np.mean(local_I))**3) / (np.std(local_I)**3 + 1e-8) fiber.append([ intensity[i], intensity_smooth[i], dI[i], d2I[i], local_var, local_skew ]) fiber = np.array(fiber) invariants = np.zeros((len(wavelength), 6)) for i in range(len(wavelength)): invariants[i] = [ abs(fiber[i, 3]), # peak sharpness fiber[i, 4], # disorder abs(fiber[i, 5]), # asymmetry 1.0 / (fiber[i, 4] + 1e-8), # stability abs(fiber[i, 2]), # gradient fiber[i, 1] # norm intensity ] return invariants def extract_pl_peaks(self, wavelength, intensity): """Extract peak positions, FWHM, intensity""" peaks, props = find_peaks(intensity, height=np.max(intensity)*0.1, distance=20) peak_info = [] for peak in peaks: height = intensity[peak] half_max = height / 2.0 left = peak while left > 0 and intensity[left] > half_max: left -= 1 right = peak while right < len(intensity) - 1 and intensity[right] > half_max: right += 1 fwhm = wavelength[right] - wavelength[left] peak_info.append({ 'wavelength': float(wavelength[peak]), 'intensity': float(height), 'fwhm': float(fwhm) }) return peak_info