"""noctilith/sim/local_material_map.py — M13.1: per-cell material parameter arrays.""" from __future__ import annotations import numpy as np from dataclasses import dataclass, field from typing import Any, Dict, Optional, Tuple from engine.sim.material_profiles import MaterialPipelineProfile, PROFILE_CACHE, clamp SCHEMA_VERSION = "noctilith.schema.v1" MODULE_NAME = "noctilith.sim.local_material_map" _DEFAULT_PARAMS = { "temp_weight": 0.45, "entropy_weight": 0.40, "fatigue_exponent": 1.35, "risk_gain": 0.65, "fracture_threshold": 0.80, "propagation_gain": 0.35, "damage_gain": 0.65, "debris_threshold": 0.88, "entropy_gain": 0.40, "thermal_gain": 0.15, "fragmentation_threshold": 0.50, "fragmentation_gain": 0.40, } @dataclass class LocalMaterialMap: shape: Tuple[int, ...] temp_weight: np.ndarray = field(repr=False) entropy_weight: np.ndarray = field(repr=False) fatigue_exponent: np.ndarray = field(repr=False) risk_gain: np.ndarray = field(repr=False) fracture_threshold: np.ndarray = field(repr=False) propagation_gain: np.ndarray = field(repr=False) damage_gain: np.ndarray = field(repr=False) debris_threshold: np.ndarray = field(repr=False) entropy_gain: np.ndarray = field(repr=False) thermal_gain: np.ndarray = field(repr=False) fragmentation_threshold: np.ndarray = field(repr=False) fragmentation_gain: np.ndarray = field(repr=False) unique_material_ids: Tuple[int, ...] = field(default_factory=tuple) material_diversity: float = 0.0 @classmethod def from_voxel_ids(cls, voxel_ids: np.ndarray, *, registry=None, cache=None) -> "LocalMaterialMap": if cache is None: cache = PROFILE_CACHE shape = voxel_ids.shape unique_ids = np.unique(voxel_ids).tolist() param_names = list(_DEFAULT_PARAMS.keys()) max_id = max(unique_ids) if unique_ids else 0 lut = np.zeros((max_id + 1, len(param_names)), dtype=np.float32) for vid in unique_ids: profile = cache.get(int(vid), registry) if profile is not None: lut[vid] = _extract_params(profile) else: lut[vid] = [float(_DEFAULT_PARAMS[k]) for k in param_names] ids_flat = np.clip(voxel_ids.ravel().astype(np.int32), 0, max_id) params_flat = lut[ids_flat] def _arr(i: int) -> np.ndarray: return params_flat[:, i].reshape(shape).astype(np.float32) idx = {name: i for i, name in enumerate(param_names)} counts = np.bincount(ids_flat, minlength=max_id+1).astype(np.float64) total = counts.sum() + 1e-12 probs = counts[counts > 0] / total h = -np.sum(probs * np.log2(probs + 1e-12)) max_h = np.log2(len(unique_ids) + 1e-12) diversity = float(clamp(h / (max_h + 1e-12), 0.0, 1.0)) return cls( shape=shape, temp_weight =_arr(idx["temp_weight"]), entropy_weight =_arr(idx["entropy_weight"]), fatigue_exponent =_arr(idx["fatigue_exponent"]), risk_gain =_arr(idx["risk_gain"]), fracture_threshold=_arr(idx["fracture_threshold"]), propagation_gain =_arr(idx["propagation_gain"]), damage_gain =_arr(idx["damage_gain"]), debris_threshold =_arr(idx["debris_threshold"]), entropy_gain =_arr(idx["entropy_gain"]), thermal_gain =_arr(idx["thermal_gain"]), fragmentation_threshold=_arr(idx["fragmentation_threshold"]), fragmentation_gain =_arr(idx["fragmentation_gain"]), unique_material_ids=tuple(int(v) for v in unique_ids), material_diversity =diversity, ) @classmethod def uniform(cls, material_id: int, shape: Tuple[int,...], *, registry=None, cache=None) -> "LocalMaterialMap": ids = np.full(shape, material_id, dtype=np.int16) return cls.from_voxel_ids(ids, registry=registry, cache=cache) def compute_fatigue_vectorized(self, T: np.ndarray, S: np.ndarray, phi_rms_total: float = 0.0, *, ambient_temperature: float = 293.15, temperature_scale_K: float = 500.0, entropy_scale: float = 1.0, phi_rms_scale: float = 1.0, max_fatigue_multiplier: float = 100.0) -> np.ndarray: dT = np.maximum(0.0, T - ambient_temperature) / max(temperature_scale_K, 1e-8) s_term = np.maximum(0.0, S) / max(entropy_scale, 1e-8) rms = max(0.0, phi_rms_total) / max(phi_rms_scale, 1e-8) rms_w = np.clip(1.0 - self.temp_weight - self.entropy_weight, 0.0, 1.0) raw = self.temp_weight * dT + self.entropy_weight * s_term + rms_w * rms fatigue= 1.0 + np.power(np.maximum(0.0, raw), self.fatigue_exponent.astype(np.float64)) return np.clip(fatigue, 1.0, max_fatigue_multiplier) def get_fracture_threshold_field(self) -> np.ndarray: return self.fracture_threshold.astype(np.float64) def summary(self) -> Dict[str, Any]: return { "schema_version": SCHEMA_VERSION, "module_name": MODULE_NAME, "shape": self.shape, "unique_materials": list(self.unique_material_ids), "n_materials": len(self.unique_material_ids), "material_diversity": float(self.material_diversity), "mean_fracture_threshold": float(self.fracture_threshold.mean()), "mean_entropy_gain": float(self.entropy_gain.mean()), "mean_fatigue_exponent": float(self.fatigue_exponent.mean()), } def _extract_params(profile: MaterialPipelineProfile) -> list: d = profile.degradation; f = profile.fracture w = profile.world_reaction; c = profile.coupling return [ float(d.temp_weight), float(d.entropy_weight), float(d.fatigue_exponent), float(d.risk_gain), float(f.fracture_risk_threshold), float(f.propagation_gain), float(w.damage_gain), float(w.debris_threshold), float(c.entropy_gain), float(c.thermal_gain), float(c.fragmentation_strength_threshold), float(c.fragmentation_strength_gain), ] def evaluate_degradation_local(thermal_grid: Any, entropy_field: Any, material_map: LocalMaterialMap, *, phi_rms_total: float = 0.0, ambient_temperature: float = 293.15, temperature_scale_K: float = 500.0, entropy_scale: float = 1.0, risk_bias: float = 0.0) -> Dict[str, Any]: T = _resolve_arr(thermal_grid) S = _resolve_arr(entropy_field) fatigue = material_map.compute_fatigue_vectorized( T, S, phi_rms_total, ambient_temperature=ambient_temperature, temperature_scale_K=temperature_scale_K, entropy_scale=entropy_scale) x = np.maximum(0.0, fatigue - 1.0) risk = 1.0 - np.exp(-(material_map.risk_gain * x + risk_bias)) risk = np.clip(risk, 0.0, 1.0) return { "schema_version": SCHEMA_VERSION, "module_name": MODULE_NAME, "risk_field": risk.astype(np.float32, copy=True), "risk_max": float(np.max(risk)), "risk_mean": float(np.mean(risk)), "fatigue_max": float(np.max(fatigue)), "fatigue_mean": float(np.mean(fatigue)), "material_diversity": float(material_map.material_diversity), } def _resolve_arr(obj: Any) -> np.ndarray: if isinstance(obj, np.ndarray): return obj.astype(np.float64, copy=False) for name in ("T", "S", "values", "temperatures", "entropy", "E"): arr = getattr(obj, name, None) if arr is not None and isinstance(arr, np.ndarray): return arr.astype(np.float64, copy=False) raise AttributeError(f"Cannot resolve array from {type(obj)}")