materialcharacterize / analyzer.py
qurashiubaid's picture
Upload 7 files
638afcf verified
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import savgol_filter, find_peaks
from scipy.ndimage import gaussian_filter1d
from scipy.spatial.distance import pdist, squareform
from sklearn.preprocessing import StandardScaler
from pymatgen.core import Structure
from pymatgen.analysis.diffraction.xrd import XRDCalculator
import cv2
from skimage import filters, measure, morphology
from scipy import ndimage
import requests
import re
import tempfile
import json
from typing import Dict, List, Tuple, Optional
# Configure matplotlib for headless operation
plt.switch_backend('Agg')
class UniversalFiberBundleAnalyzer:
"""Core analyzer for multi-modal materials data"""
def __init__(self):
self.results = {}
def process_sample(self, files: Dict[str, str], sample_name: str = "sample") -> Dict:
"""
Process all available modalities for a sample
Args:
files: Dictionary with keys: 'xrd', 'vsm', 'uvvis', 'pl', 'tem'
sample_name: Name for the sample
Returns:
Dictionary with analysis results
"""
results = {"sample_name": sample_name}
# Process XRD
if files.get('xrd'):
try:
xrd_data = self._load_spectral_data(files['xrd'])
xrd_analyzer = XRDAnalyzer()
xrd_invariants = xrd_analyzer.compute_local_invariants(xrd_data['x'], xrd_data['y'])
xrd_features = xrd_analyzer.extract_global_features(xrd_data['x'], xrd_data['y'], xrd_invariants)
results['xrd'] = {
'wavelength': xrd_data['x'],
'intensity': xrd_data['y'],
'invariants': xrd_invariants,
'features': xrd_features
}
except Exception as e:
results['xrd_error'] = str(e)
# Process VSM
if files.get('vsm'):
try:
vsm_data = self._load_spectral_data(files['vsm'])
vsm_analyzer = VSMAnalyzer()
vsm_invariants = vsm_analyzer.compute_local_invariants(vsm_data['x'], vsm_data['y'])
Hc, Mr = vsm_analyzer.detect_magnetic_params(vsm_data['x'], vsm_data['y'])
results['vsm'] = {
'H': vsm_data['x'],
'M': vsm_data['y'],
'invariants': vsm_invariants,
'Hc': Hc,
'Mr': Mr
}
except Exception as e:
results['vsm_error'] = str(e)
# Process UV-Vis
if files.get('uvvis'):
try:
uvvis_data = self._load_spectral_data(files['uvvis'])
uvvis_analyzer = UVVisAnalyzer()
uvvis_invariants = uvvis_analyzer.compute_local_invariants(uvvis_data['x'], uvvis_data['y'])
bandgap = uvvis_analyzer.estimate_bandgap(uvvis_data['x'], uvvis_data['y'])
results['uvvis'] = {
'wavelength': uvvis_data['x'],
'absorption': uvvis_data['y'],
'invariants': uvvis_invariants,
'bandgap_eV': bandgap
}
except Exception as e:
results['uvvis_error'] = str(e)
# Process PL
if files.get('pl'):
try:
pl_data = self._load_spectral_data(files['pl'])
pl_analyzer = PLAnalyzer()
pl_invariants = pl_analyzer.compute_local_invariants(pl_data['x'], pl_data['y'])
peaks = pl_analyzer.extract_pl_peaks(pl_data['x'], pl_data['y'])
results['pl'] = {
'wavelength': pl_data['x'],
'intensity': pl_data['y'],
'invariants': pl_invariants,
'peaks': peaks
}
except Exception as e:
results['pl_error'] = str(e)
# Process TEM
if files.get('tem'):
try:
tem_results = self._analyze_tem_image(files['tem'])
results['tem'] = tem_results
except Exception as e:
results['tem_error'] = str(e)
# Phase identification (requires XRD)
if 'xrd' in results:
try:
phases = self._identify_phases(results['xrd']['wavelength'], results['xrd']['intensity'])
results['phases'] = phases
except Exception as e:
results['phase_error'] = str(e)
return results
def _load_spectral_data(self, file_path: str) -> Dict[str, np.ndarray]:
"""Load spectral data from CSV"""
df = pd.read_csv(file_path)
cols = [c.lower() for c in df.columns]
# Detect x column
if 'wavelength' in cols:
x_col = df.columns[cols.index('wavelength')]
elif 'energy' in cols:
x_col = df.columns[cols.index('energy')]
elif '2theta' in cols:
x_col = df.columns[cols.index('2theta')]
elif 'h' in cols:
x_col = df.columns[cols.index('h')]
else:
x_col = df.columns[0]
# Detect y column
if 'intensity' in cols:
y_col = df.columns[cols.index('intensity')]
elif 'm' in cols:
y_col = df.columns[cols.index('m')]
elif 'absorption' in cols:
y_col = df.columns[cols.index('absorption')]
else:
y_col = df.columns[1]
x = df[x_col].values.astype(float)
y = df[y_col].values.astype(float)
# Remove NaNs
valid = np.isfinite(x) & np.isfinite(y)
x, y = x[valid], y[valid]
# Sort by x
sort_idx = np.argsort(x)
x, y = x[sort_idx], y[sort_idx]
return {'x': x, 'y': y}
def _analyze_tem_image(self, image_path: str) -> Dict:
"""Analyze TEM/SEM image for particle size"""
img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
if img is None:
raise ValueError("Could not load TEM image")
# Resize for consistent processing
img = cv2.resize(img, (1024, 1024))
img = cv2.GaussianBlur(img, (5, 5), 0)
# Threshold
thresh = filters.threshold_otsu(img)
binary = img < thresh
# Clean up
binary = morphology.remove_small_objects(binary, min_size=50)
binary = morphology.binary_closing(binary, morphology.disk(2))
# Label particles
labeled, num_features = ndimage.label(binary)
props = measure.regionprops(labeled)
if not props:
return {"particle_count": 0}
# Assume 1 pixel = 1 nm (user should calibrate)
pixel_size_nm = 1.0
areas = [p.area for p in props]
areas_nm2 = [a * pixel_size_nm**2 for a in areas]
diameters_nm = [2 * np.sqrt(a / np.pi) for a in areas_nm2]
return {
'particle_count': len(areas),
'mean_diameter_nm': float(np.mean(diameters_nm)),
'std_diameter_nm': float(np.std(diameters_nm)),
'min_diameter_nm': float(np.min(diameters_nm)),
'max_diameter_nm': float(np.max(diameters_nm))
}
def _identify_phases(self, two_theta: np.ndarray, intensity: np.ndarray) -> List[Tuple[str, float]]:
"""Identify phases using COD database"""
# Common material COD IDs
candidate_cod_ids = {
'Fe3O4': '9008470',
'CoFe2O4': '9008464',
'γ-Fe2O3': '1011106',
'α-Fe2O3': '9007397',
'TiO2_anatase': '9007679',
'TiO2_rutile': '9007680'
}
calculator = XRDCalculator(wavelength=1.5406)
matches = []
for phase_name, cod_id in candidate_cod_ids.items():
structure = self._download_cod_structure(cod_id)
if structure is None:
continue
try:
xrd_pattern = calculator.get_pattern(structure)
sim_2theta = xrd_pattern.x
sim_intensity = xrd_pattern.y
# Interpolate to experimental grid
sim_interp = np.interp(two_theta, sim_2theta, sim_intensity, left=0, right=0)
sim_interp = sim_interp / (np.max(sim_interp) + 1e-8)
exp_norm = intensity / (np.max(intensity) + 1e-8)
# Compute correlation
correlation = np.corrcoef(exp_norm, sim_interp)[0, 1]
if not np.isnan(correlation):
matches.append((phase_name, float(correlation)))
except:
continue
# Sort by correlation
matches.sort(key=lambda x: x[1], reverse=True)
return matches[:3]
def _download_cod_structure(self, cod_id: str) -> Optional[Structure]:
"""Download structure from Crystallography Open Database"""
try:
url = f"https://www.crystallography.net/cod/{cod_id}.cif"
response = requests.get(url, timeout=10)
if response.status_code == 200:
with tempfile.NamedTemporaryFile(mode='w', suffix='.cif', delete=False) as f:
f.write(response.text)
temp_path = f.name
structure = Structure.from_file(temp_path)
os.unlink(temp_path)
return structure
except:
return None
def generate_report(self, results: Dict) -> str:
"""Generate scientific interpretation report"""
report = []
report.append("=" * 60)
report.append(f"🔬 MULTI-MODAL MATERIALS ANALYSIS REPORT")
report.append(f"Sample: {results.get('sample_name', 'Unknown')}")
report.append("=" * 60)
# XRD analysis
if 'xrd' in results:
xrd = results['xrd']
report.append("\n📊 XRD ANALYSIS:")
report.append(f" • Crystallite size: {xrd['features']['crystallite_size']:.2f} (rel. units)")
report.append(f" • Microstrain: {xrd['features']['microstrain']:.3f}")
report.append(f" • Amorphous ratio: {xrd['features']['amorphous_ratio']:.3f}")
# Phase identification
if 'phases' in results:
report.append("\n🧪 PHASE IDENTIFICATION:")
for i, (phase, corr) in enumerate(results['phases']):
report.append(f" {i+1}. {phase} (correlation: {corr:.2f})")
# VSM analysis
if 'vsm' in results:
vsm = results['vsm']
report.append("\n🧲 VSM ANALYSIS:")
report.append(f" • Coercivity (Hc): {vsm['Hc']:.1f} Oe")
report.append(f" • Remanence (Mr): {vsm['Mr']:.3f} (norm.)")
# UV-Vis analysis
if 'uvvis' in results:
uvvis = results['uvvis']
report.append("\n🌈 UV-VIS ANALYSIS:")
report.append(f" • Bandgap: {uvvis['bandgap_eV']:.2f} eV")
# PL analysis
if 'pl' in results:
pl = results['pl']
report.append("\n💡 PHOTOLUMINESCENCE:")
if pl['peaks']:
peak = pl['peaks'][0]
report.append(f" • Main peak: {peak['wavelength']:.1f} nm")
report.append(f" • FWHM: {peak['fwhm']:.1f} nm")
else:
report.append(" • No significant peaks detected")
# TEM analysis
if 'tem' in results:
tem = results['tem']
if tem['particle_count'] > 0:
report.append("\n🔬 TEM ANALYSIS:")
report.append(f" • Particle count: {tem['particle_count']}")
report.append(f" • Mean diameter: {tem['mean_diameter_nm']:.1f} ± {tem['std_diameter_nm']:.1f} nm")
# Cross-modal insights
report.append("\n🧠 CROSS-MODAL INSIGHTS:")
# Quantum confinement
if 'tem' in results and 'uvvis' in results:
tem = results['tem']
uvvis = results['uvvis']
if tem['particle_count'] > 0 and uvvis['bandgap_eV'] > 0:
report.append(" • Quantum confinement analysis available")
# Defect correlation
if 'xrd' in results and 'pl' in results:
xrd_disorder = results['xrd']['features']['avg_disorder']
if results['pl']['peaks']:
pl_fwhm = results['pl']['peaks'][0]['fwhm']
report.append(" • XRD disorder and PL FWHM can be correlated for defect analysis")
report.append("\n💡 RECOMMENDATIONS:")
report.append("• Validate phase purity with Rietveld refinement")
report.append("• Correlate particle size with magnetic/optical properties")
report.append("• For thin films, consider substrate effects")
report.append("\n" + "=" * 60)
return "\n".join(report)
def generate_plots(self, results: Dict, output_dir: str = ".") -> List[str]:
"""Generate publication-ready plots"""
sample_name = results.get('sample_name', 'sample')
plot_paths = []
# Create plots directory
os.makedirs(output_dir, exist_ok=True)
# XRD plot
if 'xrd' in results:
plt.figure(figsize=(8, 5))
plt.plot(results['xrd']['wavelength'], results['xrd']['intensity'], 'b-')
plt.title(f"XRD Pattern - {sample_name}")
plt.xlabel("2θ (degrees)")
plt.ylabel("Intensity (a.u.)")
xrd_path = os.path.join(output_dir, f"{sample_name}_xrd.png")
plt.savefig(xrd_path, dpi=300, bbox_inches='tight')
plt.close()
plot_paths.append(xrd_path)
# VSM plot
if 'vsm' in results:
plt.figure(figsize=(8, 5))
plt.plot(results['vsm']['H'], results['vsm']['M'], 'r-')
plt.title(f"VSM Hysteresis Loop - {sample_name}")
plt.xlabel("Magnetic Field H (Oe)")
plt.ylabel("Magnetization M (norm.)")
vsm_path = os.path.join(output_dir, f"{sample_name}_vsm.png")
plt.savefig(vsm_path, dpi=300, bbox_inches='tight')
plt.close()
plot_paths.append(vsm_path)
# UV-Vis plot
if 'uvvis' in results:
plt.figure(figsize=(8, 5))
plt.plot(results['uvvis']['wavelength'], results['uvvis']['absorption'], 'g-')
plt.title(f"UV-Vis Absorption - {sample_name}")
plt.xlabel("Wavelength (nm)")
plt.ylabel("Absorption (a.u.)")
uvvis_path = os.path.join(output_dir, f"{sample_name}_uvvis.png")
plt.savefig(uvvis_path, dpi=300, bbox_inches='tight')
plt.close()
plot_paths.append(uvvis_path)
# PL plot
if 'pl' in results:
plt.figure(figsize=(8, 5))
plt.plot(results['pl']['wavelength'], results['pl']['intensity'], 'm-')
plt.title(f"Photoluminescence - {sample_name}")
plt.xlabel("Wavelength (nm)")
plt.ylabel("Intensity (a.u.)")
pl_path = os.path.join(output_dir, f"{sample_name}_pl.png")
plt.savefig(pl_path, dpi=300, bbox_inches='tight')
plt.close()
plot_paths.append(pl_path)
# Correlation plot (if multiple modalities)
if 'tem' in results and 'uvvis' in results:
tem = results['tem']
uvvis = results['uvvis']
if tem['particle_count'] > 0 and uvvis['bandgap_eV'] > 0:
plt.figure(figsize=(8, 5))
plt.scatter([tem['mean_diameter_nm']], [uvvis['bandgap_eV']], s=100)
plt.title(f"Quantum Confinement - {sample_name}")
plt.xlabel("Particle Size (nm)")
plt.ylabel("Bandgap (eV)")
corr_path = os.path.join(output_dir, f"{sample_name}_confinement.png")
plt.savefig(corr_path, dpi=300, bbox_inches='tight')
plt.close()
plot_paths.append(corr_path)
return plot_paths
# Modal-specific analyzers
class XRDAnalyzer:
def compute_local_invariants(self, two_theta, intensity, window_size=10):
intensity_smooth = savgol_filter(intensity, window_length=min(21, len(intensity)//2 * 2 + 1), polyorder=2)
dI = np.gradient(intensity_smooth, two_theta)
d2I = np.gradient(dI, two_theta)
fiber = []
for i in range(len(two_theta)):
start = max(0, i - window_size)
end = min(len(two_theta), i + window_size + 1)
local_I = intensity[start:end]
local_var = np.var(local_I)
local_skew = np.mean((local_I - np.mean(local_I))**3) / (np.std(local_I)**3 + 1e-8)
fiber.append([
intensity[i], intensity_smooth[i], dI[i], d2I[i],
local_var, local_skew
])
fiber = np.array(fiber)
invariants = np.zeros((len(two_theta), 6))
for i in range(len(two_theta)):
invariants[i] = [
abs(fiber[i, 3]), # sharpness
fiber[i, 4], # disorder
abs(fiber[i, 5]), # asymmetry
1.0 / (fiber[i, 4] + 1e-8), # stability
abs(fiber[i, 2]), # gradient
fiber[i, 1] / (np.max(fiber[:, 1]) + 1e-8) # norm intensity
]
return invariants
def extract_global_features(self, two_theta, intensity, local_invariants):
peaks, _ = find_peaks(intensity, height=np.max(intensity)*0.1, distance=20)
if len(peaks) == 0:
return {'crystallite_size': 0, 'microstrain': 0, 'amorphous_ratio': 1.0, 'n_peaks': 0, 'avg_disorder': 0}
fwhms = []
for p in peaks:
half_max = intensity[p] / 2.0
left = p
while left > 0 and intensity[left] > half_max:
left -= 1
right = p
while right < len(intensity) - 1 and intensity[right] > half_max:
right += 1
fwhm = two_theta[right] - two_theta[left]
fwhms.append(fwhm)
avg_fwhm = np.mean(fwhms)
theta_bragg = two_theta[peaks[0]] / 2.0
rel_size = 1.0 / (avg_fwhm * np.cos(np.radians(theta_bragg)) + 1e-8)
smooth_bg = gaussian_filter1d(intensity, sigma=50)
amorphous_ratio = np.mean(smooth_bg) / (np.mean(intensity) + 1e-8)
microstrain = np.std(fwhms) / (avg_fwhm + 1e-8)
avg_disorder = np.mean(local_invariants[:, 1])
return {
'crystallite_size': rel_size,
'microstrain': microstrain,
'amorphous_ratio': amorphous_ratio,
'n_peaks': len(peaks),
'avg_disorder': avg_disorder
}
class VSMAnalyzer:
def compute_local_invariants(self, H, M, window_size=5):
dM = np.gradient(M, H)
d2M = np.gradient(dM, H)
fiber = []
for i in range(len(H)):
start = max(0, i - window_size)
end = min(len(H), i + window_size + 1)
local_M = M[start:end]
fiber.append([
M[i], dM[i], d2M[i],
np.std(local_M),
np.mean((local_M - np.mean(local_M))**3) / (np.std(local_M)**3 + 1e-8)
])
fiber = np.array(fiber)
invariants = np.zeros((len(H), 6))
for i in range(len(H)):
# Symmetry breaking: |M(H) + M(-H)|
H_val = H[i]
M_val = M[i]
idx_neg = np.argmin(np.abs(H + H_val))
sym_break = abs(M_val + M[idx_neg])
invariants[i] = [
abs(fiber[i, 2]), # curvature
sym_break, # symmetry breaking
abs(fiber[i, 2]), # sharpness
fiber[i, 3], # noise
abs(fiber[i, 1]), # gradient
1.0 / (fiber[i, 3] + 1e-8) # stability
]
return invariants
def detect_magnetic_params(self, H, M):
asc_M = M[len(H)//2:]
asc_H = H[len(H)//2:]
zero_cross = np.where(np.diff(np.sign(asc_M)))[0]
Hc = asc_H[zero_cross[0]] if len(zero_cross) > 0 else 0
Mr = M[np.argmin(np.abs(H))]
return Hc, Mr
class UVVisAnalyzer:
def compute_local_invariants(self, wavelength, absorption, window_size=10):
intensity_smooth = savgol_filter(absorption, window_length=min(21, len(absorption)//2 * 2 + 1), polyorder=2)
dI = np.gradient(intensity_smooth, wavelength)
d2I = np.gradient(dI, wavelength)
fiber = []
for i in range(len(wavelength)):
start = max(0, i - window_size)
end = min(len(wavelength), i + window_size + 1)
local_I = absorption[start:end]
local_var = np.var(local_I)
local_skew = np.mean((local_I - np.mean(local_I))**3) / (np.std(local_I)**3 + 1e-8)
fiber.append([
absorption[i], intensity_smooth[i], dI[i], d2I[i],
local_var, local_skew
])
fiber = np.array(fiber)
invariants = np.zeros((len(wavelength), 6))
for i in range(len(wavelength)):
invariants[i] = [
abs(fiber[i, 3]), # edge sharpness
fiber[i, 4], # disorder
abs(fiber[i, 5]), # asymmetry
1.0 / (fiber[i, 4] + 1e-8), # stability
abs(fiber[i, 2]), # gradient
fiber[i, 1] # norm intensity
]
return invariants
def estimate_bandgap(self, wavelength, absorption):
"""Estimate Tauc bandgap for direct semiconductors"""
energy = 1240 / wavelength # eV (for nm)
alpha_hv_sq = (absorption * energy) ** 2
# Find absorption edge
edge_idx = np.argmax(absorption > 0.5 * np.max(absorption))
if edge_idx == 0:
return 0
start = max(0, edge_idx - 20)
end = min(len(energy), edge_idx + 20)
if end - start < 5:
return 0
# Linear fit in band edge region
try:
coeffs = np.polyfit(energy[start:end], alpha_hv_sq[start:end], 1)
bandgap = -coeffs[1] / coeffs[0] if coeffs[0] != 0 else 0
return max(0, bandgap)
except:
return 0
class PLAnalyzer:
def compute_local_invariants(self, wavelength, intensity, window_size=10):
intensity_smooth = savgol_filter(intensity, window_length=min(21, len(intensity)//2 * 2 + 1), polyorder=2)
dI = np.gradient(intensity_smooth, wavelength)
d2I = np.gradient(dI, wavelength)
fiber = []
for i in range(len(wavelength)):
start = max(0, i - window_size)
end = min(len(wavelength), i + window_size + 1)
local_I = intensity[start:end]
local_var = np.var(local_I)
local_skew = np.mean((local_I - np.mean(local_I))**3) / (np.std(local_I)**3 + 1e-8)
fiber.append([
intensity[i], intensity_smooth[i], dI[i], d2I[i],
local_var, local_skew
])
fiber = np.array(fiber)
invariants = np.zeros((len(wavelength), 6))
for i in range(len(wavelength)):
invariants[i] = [
abs(fiber[i, 3]), # peak sharpness
fiber[i, 4], # disorder
abs(fiber[i, 5]), # asymmetry
1.0 / (fiber[i, 4] + 1e-8), # stability
abs(fiber[i, 2]), # gradient
fiber[i, 1] # norm intensity
]
return invariants
def extract_pl_peaks(self, wavelength, intensity):
"""Extract peak positions, FWHM, intensity"""
peaks, props = find_peaks(intensity, height=np.max(intensity)*0.1, distance=20)
peak_info = []
for peak in peaks:
height = intensity[peak]
half_max = height / 2.0
left = peak
while left > 0 and intensity[left] > half_max:
left -= 1
right = peak
while right < len(intensity) - 1 and intensity[right] > half_max:
right += 1
fwhm = wavelength[right] - wavelength[left]
peak_info.append({
'wavelength': float(wavelength[peak]),
'intensity': float(height),
'fwhm': float(fwhm)
})
return peak_info