Spaces:
Configuration error
Configuration error
| import os | |
| import numpy as np | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| from scipy.signal import savgol_filter, find_peaks | |
| from scipy.ndimage import gaussian_filter1d | |
| from scipy.spatial.distance import pdist, squareform | |
| from sklearn.preprocessing import StandardScaler | |
| from pymatgen.core import Structure | |
| from pymatgen.analysis.diffraction.xrd import XRDCalculator | |
| import cv2 | |
| from skimage import filters, measure, morphology | |
| from scipy import ndimage | |
| import requests | |
| import re | |
| import tempfile | |
| import json | |
| from typing import Dict, List, Tuple, Optional | |
| # Configure matplotlib for headless operation | |
| plt.switch_backend('Agg') | |
| class UniversalFiberBundleAnalyzer: | |
| """Core analyzer for multi-modal materials data""" | |
| def __init__(self): | |
| self.results = {} | |
| def process_sample(self, files: Dict[str, str], sample_name: str = "sample") -> Dict: | |
| """ | |
| Process all available modalities for a sample | |
| Args: | |
| files: Dictionary with keys: 'xrd', 'vsm', 'uvvis', 'pl', 'tem' | |
| sample_name: Name for the sample | |
| Returns: | |
| Dictionary with analysis results | |
| """ | |
| results = {"sample_name": sample_name} | |
| # Process XRD | |
| if files.get('xrd'): | |
| try: | |
| xrd_data = self._load_spectral_data(files['xrd']) | |
| xrd_analyzer = XRDAnalyzer() | |
| xrd_invariants = xrd_analyzer.compute_local_invariants(xrd_data['x'], xrd_data['y']) | |
| xrd_features = xrd_analyzer.extract_global_features(xrd_data['x'], xrd_data['y'], xrd_invariants) | |
| results['xrd'] = { | |
| 'wavelength': xrd_data['x'], | |
| 'intensity': xrd_data['y'], | |
| 'invariants': xrd_invariants, | |
| 'features': xrd_features | |
| } | |
| except Exception as e: | |
| results['xrd_error'] = str(e) | |
| # Process VSM | |
| if files.get('vsm'): | |
| try: | |
| vsm_data = self._load_spectral_data(files['vsm']) | |
| vsm_analyzer = VSMAnalyzer() | |
| vsm_invariants = vsm_analyzer.compute_local_invariants(vsm_data['x'], vsm_data['y']) | |
| Hc, Mr = vsm_analyzer.detect_magnetic_params(vsm_data['x'], vsm_data['y']) | |
| results['vsm'] = { | |
| 'H': vsm_data['x'], | |
| 'M': vsm_data['y'], | |
| 'invariants': vsm_invariants, | |
| 'Hc': Hc, | |
| 'Mr': Mr | |
| } | |
| except Exception as e: | |
| results['vsm_error'] = str(e) | |
| # Process UV-Vis | |
| if files.get('uvvis'): | |
| try: | |
| uvvis_data = self._load_spectral_data(files['uvvis']) | |
| uvvis_analyzer = UVVisAnalyzer() | |
| uvvis_invariants = uvvis_analyzer.compute_local_invariants(uvvis_data['x'], uvvis_data['y']) | |
| bandgap = uvvis_analyzer.estimate_bandgap(uvvis_data['x'], uvvis_data['y']) | |
| results['uvvis'] = { | |
| 'wavelength': uvvis_data['x'], | |
| 'absorption': uvvis_data['y'], | |
| 'invariants': uvvis_invariants, | |
| 'bandgap_eV': bandgap | |
| } | |
| except Exception as e: | |
| results['uvvis_error'] = str(e) | |
| # Process PL | |
| if files.get('pl'): | |
| try: | |
| pl_data = self._load_spectral_data(files['pl']) | |
| pl_analyzer = PLAnalyzer() | |
| pl_invariants = pl_analyzer.compute_local_invariants(pl_data['x'], pl_data['y']) | |
| peaks = pl_analyzer.extract_pl_peaks(pl_data['x'], pl_data['y']) | |
| results['pl'] = { | |
| 'wavelength': pl_data['x'], | |
| 'intensity': pl_data['y'], | |
| 'invariants': pl_invariants, | |
| 'peaks': peaks | |
| } | |
| except Exception as e: | |
| results['pl_error'] = str(e) | |
| # Process TEM | |
| if files.get('tem'): | |
| try: | |
| tem_results = self._analyze_tem_image(files['tem']) | |
| results['tem'] = tem_results | |
| except Exception as e: | |
| results['tem_error'] = str(e) | |
| # Phase identification (requires XRD) | |
| if 'xrd' in results: | |
| try: | |
| phases = self._identify_phases(results['xrd']['wavelength'], results['xrd']['intensity']) | |
| results['phases'] = phases | |
| except Exception as e: | |
| results['phase_error'] = str(e) | |
| return results | |
| def _load_spectral_data(self, file_path: str) -> Dict[str, np.ndarray]: | |
| """Load spectral data from CSV""" | |
| df = pd.read_csv(file_path) | |
| cols = [c.lower() for c in df.columns] | |
| # Detect x column | |
| if 'wavelength' in cols: | |
| x_col = df.columns[cols.index('wavelength')] | |
| elif 'energy' in cols: | |
| x_col = df.columns[cols.index('energy')] | |
| elif '2theta' in cols: | |
| x_col = df.columns[cols.index('2theta')] | |
| elif 'h' in cols: | |
| x_col = df.columns[cols.index('h')] | |
| else: | |
| x_col = df.columns[0] | |
| # Detect y column | |
| if 'intensity' in cols: | |
| y_col = df.columns[cols.index('intensity')] | |
| elif 'm' in cols: | |
| y_col = df.columns[cols.index('m')] | |
| elif 'absorption' in cols: | |
| y_col = df.columns[cols.index('absorption')] | |
| else: | |
| y_col = df.columns[1] | |
| x = df[x_col].values.astype(float) | |
| y = df[y_col].values.astype(float) | |
| # Remove NaNs | |
| valid = np.isfinite(x) & np.isfinite(y) | |
| x, y = x[valid], y[valid] | |
| # Sort by x | |
| sort_idx = np.argsort(x) | |
| x, y = x[sort_idx], y[sort_idx] | |
| return {'x': x, 'y': y} | |
| def _analyze_tem_image(self, image_path: str) -> Dict: | |
| """Analyze TEM/SEM image for particle size""" | |
| img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE) | |
| if img is None: | |
| raise ValueError("Could not load TEM image") | |
| # Resize for consistent processing | |
| img = cv2.resize(img, (1024, 1024)) | |
| img = cv2.GaussianBlur(img, (5, 5), 0) | |
| # Threshold | |
| thresh = filters.threshold_otsu(img) | |
| binary = img < thresh | |
| # Clean up | |
| binary = morphology.remove_small_objects(binary, min_size=50) | |
| binary = morphology.binary_closing(binary, morphology.disk(2)) | |
| # Label particles | |
| labeled, num_features = ndimage.label(binary) | |
| props = measure.regionprops(labeled) | |
| if not props: | |
| return {"particle_count": 0} | |
| # Assume 1 pixel = 1 nm (user should calibrate) | |
| pixel_size_nm = 1.0 | |
| areas = [p.area for p in props] | |
| areas_nm2 = [a * pixel_size_nm**2 for a in areas] | |
| diameters_nm = [2 * np.sqrt(a / np.pi) for a in areas_nm2] | |
| return { | |
| 'particle_count': len(areas), | |
| 'mean_diameter_nm': float(np.mean(diameters_nm)), | |
| 'std_diameter_nm': float(np.std(diameters_nm)), | |
| 'min_diameter_nm': float(np.min(diameters_nm)), | |
| 'max_diameter_nm': float(np.max(diameters_nm)) | |
| } | |
| def _identify_phases(self, two_theta: np.ndarray, intensity: np.ndarray) -> List[Tuple[str, float]]: | |
| """Identify phases using COD database""" | |
| # Common material COD IDs | |
| candidate_cod_ids = { | |
| 'Fe3O4': '9008470', | |
| 'CoFe2O4': '9008464', | |
| 'γ-Fe2O3': '1011106', | |
| 'α-Fe2O3': '9007397', | |
| 'TiO2_anatase': '9007679', | |
| 'TiO2_rutile': '9007680' | |
| } | |
| calculator = XRDCalculator(wavelength=1.5406) | |
| matches = [] | |
| for phase_name, cod_id in candidate_cod_ids.items(): | |
| structure = self._download_cod_structure(cod_id) | |
| if structure is None: | |
| continue | |
| try: | |
| xrd_pattern = calculator.get_pattern(structure) | |
| sim_2theta = xrd_pattern.x | |
| sim_intensity = xrd_pattern.y | |
| # Interpolate to experimental grid | |
| sim_interp = np.interp(two_theta, sim_2theta, sim_intensity, left=0, right=0) | |
| sim_interp = sim_interp / (np.max(sim_interp) + 1e-8) | |
| exp_norm = intensity / (np.max(intensity) + 1e-8) | |
| # Compute correlation | |
| correlation = np.corrcoef(exp_norm, sim_interp)[0, 1] | |
| if not np.isnan(correlation): | |
| matches.append((phase_name, float(correlation))) | |
| except: | |
| continue | |
| # Sort by correlation | |
| matches.sort(key=lambda x: x[1], reverse=True) | |
| return matches[:3] | |
| def _download_cod_structure(self, cod_id: str) -> Optional[Structure]: | |
| """Download structure from Crystallography Open Database""" | |
| try: | |
| url = f"https://www.crystallography.net/cod/{cod_id}.cif" | |
| response = requests.get(url, timeout=10) | |
| if response.status_code == 200: | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.cif', delete=False) as f: | |
| f.write(response.text) | |
| temp_path = f.name | |
| structure = Structure.from_file(temp_path) | |
| os.unlink(temp_path) | |
| return structure | |
| except: | |
| return None | |
| def generate_report(self, results: Dict) -> str: | |
| """Generate scientific interpretation report""" | |
| report = [] | |
| report.append("=" * 60) | |
| report.append(f"🔬 MULTI-MODAL MATERIALS ANALYSIS REPORT") | |
| report.append(f"Sample: {results.get('sample_name', 'Unknown')}") | |
| report.append("=" * 60) | |
| # XRD analysis | |
| if 'xrd' in results: | |
| xrd = results['xrd'] | |
| report.append("\n📊 XRD ANALYSIS:") | |
| report.append(f" • Crystallite size: {xrd['features']['crystallite_size']:.2f} (rel. units)") | |
| report.append(f" • Microstrain: {xrd['features']['microstrain']:.3f}") | |
| report.append(f" • Amorphous ratio: {xrd['features']['amorphous_ratio']:.3f}") | |
| # Phase identification | |
| if 'phases' in results: | |
| report.append("\n🧪 PHASE IDENTIFICATION:") | |
| for i, (phase, corr) in enumerate(results['phases']): | |
| report.append(f" {i+1}. {phase} (correlation: {corr:.2f})") | |
| # VSM analysis | |
| if 'vsm' in results: | |
| vsm = results['vsm'] | |
| report.append("\n🧲 VSM ANALYSIS:") | |
| report.append(f" • Coercivity (Hc): {vsm['Hc']:.1f} Oe") | |
| report.append(f" • Remanence (Mr): {vsm['Mr']:.3f} (norm.)") | |
| # UV-Vis analysis | |
| if 'uvvis' in results: | |
| uvvis = results['uvvis'] | |
| report.append("\n🌈 UV-VIS ANALYSIS:") | |
| report.append(f" • Bandgap: {uvvis['bandgap_eV']:.2f} eV") | |
| # PL analysis | |
| if 'pl' in results: | |
| pl = results['pl'] | |
| report.append("\n💡 PHOTOLUMINESCENCE:") | |
| if pl['peaks']: | |
| peak = pl['peaks'][0] | |
| report.append(f" • Main peak: {peak['wavelength']:.1f} nm") | |
| report.append(f" • FWHM: {peak['fwhm']:.1f} nm") | |
| else: | |
| report.append(" • No significant peaks detected") | |
| # TEM analysis | |
| if 'tem' in results: | |
| tem = results['tem'] | |
| if tem['particle_count'] > 0: | |
| report.append("\n🔬 TEM ANALYSIS:") | |
| report.append(f" • Particle count: {tem['particle_count']}") | |
| report.append(f" • Mean diameter: {tem['mean_diameter_nm']:.1f} ± {tem['std_diameter_nm']:.1f} nm") | |
| # Cross-modal insights | |
| report.append("\n🧠 CROSS-MODAL INSIGHTS:") | |
| # Quantum confinement | |
| if 'tem' in results and 'uvvis' in results: | |
| tem = results['tem'] | |
| uvvis = results['uvvis'] | |
| if tem['particle_count'] > 0 and uvvis['bandgap_eV'] > 0: | |
| report.append(" • Quantum confinement analysis available") | |
| # Defect correlation | |
| if 'xrd' in results and 'pl' in results: | |
| xrd_disorder = results['xrd']['features']['avg_disorder'] | |
| if results['pl']['peaks']: | |
| pl_fwhm = results['pl']['peaks'][0]['fwhm'] | |
| report.append(" • XRD disorder and PL FWHM can be correlated for defect analysis") | |
| report.append("\n💡 RECOMMENDATIONS:") | |
| report.append("• Validate phase purity with Rietveld refinement") | |
| report.append("• Correlate particle size with magnetic/optical properties") | |
| report.append("• For thin films, consider substrate effects") | |
| report.append("\n" + "=" * 60) | |
| return "\n".join(report) | |
| def generate_plots(self, results: Dict, output_dir: str = ".") -> List[str]: | |
| """Generate publication-ready plots""" | |
| sample_name = results.get('sample_name', 'sample') | |
| plot_paths = [] | |
| # Create plots directory | |
| os.makedirs(output_dir, exist_ok=True) | |
| # XRD plot | |
| if 'xrd' in results: | |
| plt.figure(figsize=(8, 5)) | |
| plt.plot(results['xrd']['wavelength'], results['xrd']['intensity'], 'b-') | |
| plt.title(f"XRD Pattern - {sample_name}") | |
| plt.xlabel("2θ (degrees)") | |
| plt.ylabel("Intensity (a.u.)") | |
| xrd_path = os.path.join(output_dir, f"{sample_name}_xrd.png") | |
| plt.savefig(xrd_path, dpi=300, bbox_inches='tight') | |
| plt.close() | |
| plot_paths.append(xrd_path) | |
| # VSM plot | |
| if 'vsm' in results: | |
| plt.figure(figsize=(8, 5)) | |
| plt.plot(results['vsm']['H'], results['vsm']['M'], 'r-') | |
| plt.title(f"VSM Hysteresis Loop - {sample_name}") | |
| plt.xlabel("Magnetic Field H (Oe)") | |
| plt.ylabel("Magnetization M (norm.)") | |
| vsm_path = os.path.join(output_dir, f"{sample_name}_vsm.png") | |
| plt.savefig(vsm_path, dpi=300, bbox_inches='tight') | |
| plt.close() | |
| plot_paths.append(vsm_path) | |
| # UV-Vis plot | |
| if 'uvvis' in results: | |
| plt.figure(figsize=(8, 5)) | |
| plt.plot(results['uvvis']['wavelength'], results['uvvis']['absorption'], 'g-') | |
| plt.title(f"UV-Vis Absorption - {sample_name}") | |
| plt.xlabel("Wavelength (nm)") | |
| plt.ylabel("Absorption (a.u.)") | |
| uvvis_path = os.path.join(output_dir, f"{sample_name}_uvvis.png") | |
| plt.savefig(uvvis_path, dpi=300, bbox_inches='tight') | |
| plt.close() | |
| plot_paths.append(uvvis_path) | |
| # PL plot | |
| if 'pl' in results: | |
| plt.figure(figsize=(8, 5)) | |
| plt.plot(results['pl']['wavelength'], results['pl']['intensity'], 'm-') | |
| plt.title(f"Photoluminescence - {sample_name}") | |
| plt.xlabel("Wavelength (nm)") | |
| plt.ylabel("Intensity (a.u.)") | |
| pl_path = os.path.join(output_dir, f"{sample_name}_pl.png") | |
| plt.savefig(pl_path, dpi=300, bbox_inches='tight') | |
| plt.close() | |
| plot_paths.append(pl_path) | |
| # Correlation plot (if multiple modalities) | |
| if 'tem' in results and 'uvvis' in results: | |
| tem = results['tem'] | |
| uvvis = results['uvvis'] | |
| if tem['particle_count'] > 0 and uvvis['bandgap_eV'] > 0: | |
| plt.figure(figsize=(8, 5)) | |
| plt.scatter([tem['mean_diameter_nm']], [uvvis['bandgap_eV']], s=100) | |
| plt.title(f"Quantum Confinement - {sample_name}") | |
| plt.xlabel("Particle Size (nm)") | |
| plt.ylabel("Bandgap (eV)") | |
| corr_path = os.path.join(output_dir, f"{sample_name}_confinement.png") | |
| plt.savefig(corr_path, dpi=300, bbox_inches='tight') | |
| plt.close() | |
| plot_paths.append(corr_path) | |
| return plot_paths | |
| # Modal-specific analyzers | |
| class XRDAnalyzer: | |
| def compute_local_invariants(self, two_theta, intensity, window_size=10): | |
| intensity_smooth = savgol_filter(intensity, window_length=min(21, len(intensity)//2 * 2 + 1), polyorder=2) | |
| dI = np.gradient(intensity_smooth, two_theta) | |
| d2I = np.gradient(dI, two_theta) | |
| fiber = [] | |
| for i in range(len(two_theta)): | |
| start = max(0, i - window_size) | |
| end = min(len(two_theta), i + window_size + 1) | |
| local_I = intensity[start:end] | |
| local_var = np.var(local_I) | |
| local_skew = np.mean((local_I - np.mean(local_I))**3) / (np.std(local_I)**3 + 1e-8) | |
| fiber.append([ | |
| intensity[i], intensity_smooth[i], dI[i], d2I[i], | |
| local_var, local_skew | |
| ]) | |
| fiber = np.array(fiber) | |
| invariants = np.zeros((len(two_theta), 6)) | |
| for i in range(len(two_theta)): | |
| invariants[i] = [ | |
| abs(fiber[i, 3]), # sharpness | |
| fiber[i, 4], # disorder | |
| abs(fiber[i, 5]), # asymmetry | |
| 1.0 / (fiber[i, 4] + 1e-8), # stability | |
| abs(fiber[i, 2]), # gradient | |
| fiber[i, 1] / (np.max(fiber[:, 1]) + 1e-8) # norm intensity | |
| ] | |
| return invariants | |
| def extract_global_features(self, two_theta, intensity, local_invariants): | |
| peaks, _ = find_peaks(intensity, height=np.max(intensity)*0.1, distance=20) | |
| if len(peaks) == 0: | |
| return {'crystallite_size': 0, 'microstrain': 0, 'amorphous_ratio': 1.0, 'n_peaks': 0, 'avg_disorder': 0} | |
| fwhms = [] | |
| for p in peaks: | |
| half_max = intensity[p] / 2.0 | |
| left = p | |
| while left > 0 and intensity[left] > half_max: | |
| left -= 1 | |
| right = p | |
| while right < len(intensity) - 1 and intensity[right] > half_max: | |
| right += 1 | |
| fwhm = two_theta[right] - two_theta[left] | |
| fwhms.append(fwhm) | |
| avg_fwhm = np.mean(fwhms) | |
| theta_bragg = two_theta[peaks[0]] / 2.0 | |
| rel_size = 1.0 / (avg_fwhm * np.cos(np.radians(theta_bragg)) + 1e-8) | |
| smooth_bg = gaussian_filter1d(intensity, sigma=50) | |
| amorphous_ratio = np.mean(smooth_bg) / (np.mean(intensity) + 1e-8) | |
| microstrain = np.std(fwhms) / (avg_fwhm + 1e-8) | |
| avg_disorder = np.mean(local_invariants[:, 1]) | |
| return { | |
| 'crystallite_size': rel_size, | |
| 'microstrain': microstrain, | |
| 'amorphous_ratio': amorphous_ratio, | |
| 'n_peaks': len(peaks), | |
| 'avg_disorder': avg_disorder | |
| } | |
| class VSMAnalyzer: | |
| def compute_local_invariants(self, H, M, window_size=5): | |
| dM = np.gradient(M, H) | |
| d2M = np.gradient(dM, H) | |
| fiber = [] | |
| for i in range(len(H)): | |
| start = max(0, i - window_size) | |
| end = min(len(H), i + window_size + 1) | |
| local_M = M[start:end] | |
| fiber.append([ | |
| M[i], dM[i], d2M[i], | |
| np.std(local_M), | |
| np.mean((local_M - np.mean(local_M))**3) / (np.std(local_M)**3 + 1e-8) | |
| ]) | |
| fiber = np.array(fiber) | |
| invariants = np.zeros((len(H), 6)) | |
| for i in range(len(H)): | |
| # Symmetry breaking: |M(H) + M(-H)| | |
| H_val = H[i] | |
| M_val = M[i] | |
| idx_neg = np.argmin(np.abs(H + H_val)) | |
| sym_break = abs(M_val + M[idx_neg]) | |
| invariants[i] = [ | |
| abs(fiber[i, 2]), # curvature | |
| sym_break, # symmetry breaking | |
| abs(fiber[i, 2]), # sharpness | |
| fiber[i, 3], # noise | |
| abs(fiber[i, 1]), # gradient | |
| 1.0 / (fiber[i, 3] + 1e-8) # stability | |
| ] | |
| return invariants | |
| def detect_magnetic_params(self, H, M): | |
| asc_M = M[len(H)//2:] | |
| asc_H = H[len(H)//2:] | |
| zero_cross = np.where(np.diff(np.sign(asc_M)))[0] | |
| Hc = asc_H[zero_cross[0]] if len(zero_cross) > 0 else 0 | |
| Mr = M[np.argmin(np.abs(H))] | |
| return Hc, Mr | |
| class UVVisAnalyzer: | |
| def compute_local_invariants(self, wavelength, absorption, window_size=10): | |
| intensity_smooth = savgol_filter(absorption, window_length=min(21, len(absorption)//2 * 2 + 1), polyorder=2) | |
| dI = np.gradient(intensity_smooth, wavelength) | |
| d2I = np.gradient(dI, wavelength) | |
| fiber = [] | |
| for i in range(len(wavelength)): | |
| start = max(0, i - window_size) | |
| end = min(len(wavelength), i + window_size + 1) | |
| local_I = absorption[start:end] | |
| local_var = np.var(local_I) | |
| local_skew = np.mean((local_I - np.mean(local_I))**3) / (np.std(local_I)**3 + 1e-8) | |
| fiber.append([ | |
| absorption[i], intensity_smooth[i], dI[i], d2I[i], | |
| local_var, local_skew | |
| ]) | |
| fiber = np.array(fiber) | |
| invariants = np.zeros((len(wavelength), 6)) | |
| for i in range(len(wavelength)): | |
| invariants[i] = [ | |
| abs(fiber[i, 3]), # edge sharpness | |
| fiber[i, 4], # disorder | |
| abs(fiber[i, 5]), # asymmetry | |
| 1.0 / (fiber[i, 4] + 1e-8), # stability | |
| abs(fiber[i, 2]), # gradient | |
| fiber[i, 1] # norm intensity | |
| ] | |
| return invariants | |
| def estimate_bandgap(self, wavelength, absorption): | |
| """Estimate Tauc bandgap for direct semiconductors""" | |
| energy = 1240 / wavelength # eV (for nm) | |
| alpha_hv_sq = (absorption * energy) ** 2 | |
| # Find absorption edge | |
| edge_idx = np.argmax(absorption > 0.5 * np.max(absorption)) | |
| if edge_idx == 0: | |
| return 0 | |
| start = max(0, edge_idx - 20) | |
| end = min(len(energy), edge_idx + 20) | |
| if end - start < 5: | |
| return 0 | |
| # Linear fit in band edge region | |
| try: | |
| coeffs = np.polyfit(energy[start:end], alpha_hv_sq[start:end], 1) | |
| bandgap = -coeffs[1] / coeffs[0] if coeffs[0] != 0 else 0 | |
| return max(0, bandgap) | |
| except: | |
| return 0 | |
| class PLAnalyzer: | |
| def compute_local_invariants(self, wavelength, intensity, window_size=10): | |
| intensity_smooth = savgol_filter(intensity, window_length=min(21, len(intensity)//2 * 2 + 1), polyorder=2) | |
| dI = np.gradient(intensity_smooth, wavelength) | |
| d2I = np.gradient(dI, wavelength) | |
| fiber = [] | |
| for i in range(len(wavelength)): | |
| start = max(0, i - window_size) | |
| end = min(len(wavelength), i + window_size + 1) | |
| local_I = intensity[start:end] | |
| local_var = np.var(local_I) | |
| local_skew = np.mean((local_I - np.mean(local_I))**3) / (np.std(local_I)**3 + 1e-8) | |
| fiber.append([ | |
| intensity[i], intensity_smooth[i], dI[i], d2I[i], | |
| local_var, local_skew | |
| ]) | |
| fiber = np.array(fiber) | |
| invariants = np.zeros((len(wavelength), 6)) | |
| for i in range(len(wavelength)): | |
| invariants[i] = [ | |
| abs(fiber[i, 3]), # peak sharpness | |
| fiber[i, 4], # disorder | |
| abs(fiber[i, 5]), # asymmetry | |
| 1.0 / (fiber[i, 4] + 1e-8), # stability | |
| abs(fiber[i, 2]), # gradient | |
| fiber[i, 1] # norm intensity | |
| ] | |
| return invariants | |
| def extract_pl_peaks(self, wavelength, intensity): | |
| """Extract peak positions, FWHM, intensity""" | |
| peaks, props = find_peaks(intensity, height=np.max(intensity)*0.1, distance=20) | |
| peak_info = [] | |
| for peak in peaks: | |
| height = intensity[peak] | |
| half_max = height / 2.0 | |
| left = peak | |
| while left > 0 and intensity[left] > half_max: | |
| left -= 1 | |
| right = peak | |
| while right < len(intensity) - 1 and intensity[right] > half_max: | |
| right += 1 | |
| fwhm = wavelength[right] - wavelength[left] | |
| peak_info.append({ | |
| 'wavelength': float(wavelength[peak]), | |
| 'intensity': float(height), | |
| 'fwhm': float(fwhm) | |
| }) | |
| return peak_info |