Spaces:
Running
Running
| """noctilith/sim/local_material_map.py — M13.1: per-cell material parameter arrays.""" | |
| from __future__ import annotations | |
| import numpy as np | |
| from dataclasses import dataclass, field | |
| from typing import Any, Dict, Optional, Tuple | |
| from engine.sim.material_profiles import MaterialPipelineProfile, PROFILE_CACHE, clamp | |
| SCHEMA_VERSION = "noctilith.schema.v1" | |
| MODULE_NAME = "noctilith.sim.local_material_map" | |
| _DEFAULT_PARAMS = { | |
| "temp_weight": 0.45, "entropy_weight": 0.40, "fatigue_exponent": 1.35, | |
| "risk_gain": 0.65, "fracture_threshold": 0.80, "propagation_gain": 0.35, | |
| "damage_gain": 0.65, "debris_threshold": 0.88, "entropy_gain": 0.40, | |
| "thermal_gain": 0.15, "fragmentation_threshold": 0.50, "fragmentation_gain": 0.40, | |
| } | |
| class LocalMaterialMap: | |
| shape: Tuple[int, ...] | |
| temp_weight: np.ndarray = field(repr=False) | |
| entropy_weight: np.ndarray = field(repr=False) | |
| fatigue_exponent: np.ndarray = field(repr=False) | |
| risk_gain: np.ndarray = field(repr=False) | |
| fracture_threshold: np.ndarray = field(repr=False) | |
| propagation_gain: np.ndarray = field(repr=False) | |
| damage_gain: np.ndarray = field(repr=False) | |
| debris_threshold: np.ndarray = field(repr=False) | |
| entropy_gain: np.ndarray = field(repr=False) | |
| thermal_gain: np.ndarray = field(repr=False) | |
| fragmentation_threshold: np.ndarray = field(repr=False) | |
| fragmentation_gain: np.ndarray = field(repr=False) | |
| unique_material_ids: Tuple[int, ...] = field(default_factory=tuple) | |
| material_diversity: float = 0.0 | |
| def from_voxel_ids(cls, voxel_ids: np.ndarray, *, registry=None, | |
| cache=None) -> "LocalMaterialMap": | |
| if cache is None: cache = PROFILE_CACHE | |
| shape = voxel_ids.shape | |
| unique_ids = np.unique(voxel_ids).tolist() | |
| param_names = list(_DEFAULT_PARAMS.keys()) | |
| max_id = max(unique_ids) if unique_ids else 0 | |
| lut = np.zeros((max_id + 1, len(param_names)), dtype=np.float32) | |
| for vid in unique_ids: | |
| profile = cache.get(int(vid), registry) | |
| if profile is not None: | |
| lut[vid] = _extract_params(profile) | |
| else: | |
| lut[vid] = [float(_DEFAULT_PARAMS[k]) for k in param_names] | |
| ids_flat = np.clip(voxel_ids.ravel().astype(np.int32), 0, max_id) | |
| params_flat = lut[ids_flat] | |
| def _arr(i: int) -> np.ndarray: | |
| return params_flat[:, i].reshape(shape).astype(np.float32) | |
| idx = {name: i for i, name in enumerate(param_names)} | |
| counts = np.bincount(ids_flat, minlength=max_id+1).astype(np.float64) | |
| total = counts.sum() + 1e-12 | |
| probs = counts[counts > 0] / total | |
| h = -np.sum(probs * np.log2(probs + 1e-12)) | |
| max_h = np.log2(len(unique_ids) + 1e-12) | |
| diversity = float(clamp(h / (max_h + 1e-12), 0.0, 1.0)) | |
| return cls( | |
| shape=shape, | |
| temp_weight =_arr(idx["temp_weight"]), | |
| entropy_weight =_arr(idx["entropy_weight"]), | |
| fatigue_exponent =_arr(idx["fatigue_exponent"]), | |
| risk_gain =_arr(idx["risk_gain"]), | |
| fracture_threshold=_arr(idx["fracture_threshold"]), | |
| propagation_gain =_arr(idx["propagation_gain"]), | |
| damage_gain =_arr(idx["damage_gain"]), | |
| debris_threshold =_arr(idx["debris_threshold"]), | |
| entropy_gain =_arr(idx["entropy_gain"]), | |
| thermal_gain =_arr(idx["thermal_gain"]), | |
| fragmentation_threshold=_arr(idx["fragmentation_threshold"]), | |
| fragmentation_gain =_arr(idx["fragmentation_gain"]), | |
| unique_material_ids=tuple(int(v) for v in unique_ids), | |
| material_diversity =diversity, | |
| ) | |
| def uniform(cls, material_id: int, shape: Tuple[int,...], | |
| *, registry=None, cache=None) -> "LocalMaterialMap": | |
| ids = np.full(shape, material_id, dtype=np.int16) | |
| return cls.from_voxel_ids(ids, registry=registry, cache=cache) | |
| def compute_fatigue_vectorized(self, T: np.ndarray, S: np.ndarray, | |
| phi_rms_total: float = 0.0, *, | |
| ambient_temperature: float = 293.15, | |
| temperature_scale_K: float = 500.0, | |
| entropy_scale: float = 1.0, | |
| phi_rms_scale: float = 1.0, | |
| max_fatigue_multiplier: float = 100.0) -> np.ndarray: | |
| dT = np.maximum(0.0, T - ambient_temperature) / max(temperature_scale_K, 1e-8) | |
| s_term = np.maximum(0.0, S) / max(entropy_scale, 1e-8) | |
| rms = max(0.0, phi_rms_total) / max(phi_rms_scale, 1e-8) | |
| rms_w = np.clip(1.0 - self.temp_weight - self.entropy_weight, 0.0, 1.0) | |
| raw = self.temp_weight * dT + self.entropy_weight * s_term + rms_w * rms | |
| fatigue= 1.0 + np.power(np.maximum(0.0, raw), self.fatigue_exponent.astype(np.float64)) | |
| return np.clip(fatigue, 1.0, max_fatigue_multiplier) | |
| def get_fracture_threshold_field(self) -> np.ndarray: | |
| return self.fracture_threshold.astype(np.float64) | |
| def summary(self) -> Dict[str, Any]: | |
| return { | |
| "schema_version": SCHEMA_VERSION, | |
| "module_name": MODULE_NAME, | |
| "shape": self.shape, | |
| "unique_materials": list(self.unique_material_ids), | |
| "n_materials": len(self.unique_material_ids), | |
| "material_diversity": float(self.material_diversity), | |
| "mean_fracture_threshold": float(self.fracture_threshold.mean()), | |
| "mean_entropy_gain": float(self.entropy_gain.mean()), | |
| "mean_fatigue_exponent": float(self.fatigue_exponent.mean()), | |
| } | |
| def _extract_params(profile: MaterialPipelineProfile) -> list: | |
| d = profile.degradation; f = profile.fracture | |
| w = profile.world_reaction; c = profile.coupling | |
| return [ | |
| float(d.temp_weight), float(d.entropy_weight), float(d.fatigue_exponent), | |
| float(d.risk_gain), float(f.fracture_risk_threshold), float(f.propagation_gain), | |
| float(w.damage_gain), float(w.debris_threshold), float(c.entropy_gain), | |
| float(c.thermal_gain), float(c.fragmentation_strength_threshold), | |
| float(c.fragmentation_strength_gain), | |
| ] | |
| def evaluate_degradation_local(thermal_grid: Any, entropy_field: Any, | |
| material_map: LocalMaterialMap, *, | |
| phi_rms_total: float = 0.0, | |
| ambient_temperature: float = 293.15, | |
| temperature_scale_K: float = 500.0, | |
| entropy_scale: float = 1.0, | |
| risk_bias: float = 0.0) -> Dict[str, Any]: | |
| T = _resolve_arr(thermal_grid) | |
| S = _resolve_arr(entropy_field) | |
| fatigue = material_map.compute_fatigue_vectorized( | |
| T, S, phi_rms_total, ambient_temperature=ambient_temperature, | |
| temperature_scale_K=temperature_scale_K, entropy_scale=entropy_scale) | |
| x = np.maximum(0.0, fatigue - 1.0) | |
| risk = 1.0 - np.exp(-(material_map.risk_gain * x + risk_bias)) | |
| risk = np.clip(risk, 0.0, 1.0) | |
| return { | |
| "schema_version": SCHEMA_VERSION, "module_name": MODULE_NAME, | |
| "risk_field": risk.astype(np.float32, copy=True), | |
| "risk_max": float(np.max(risk)), "risk_mean": float(np.mean(risk)), | |
| "fatigue_max": float(np.max(fatigue)), "fatigue_mean": float(np.mean(fatigue)), | |
| "material_diversity": float(material_map.material_diversity), | |
| } | |
| def _resolve_arr(obj: Any) -> np.ndarray: | |
| if isinstance(obj, np.ndarray): return obj.astype(np.float64, copy=False) | |
| for name in ("T", "S", "values", "temperatures", "entropy", "E"): | |
| arr = getattr(obj, name, None) | |
| if arr is not None and isinstance(arr, np.ndarray): | |
| return arr.astype(np.float64, copy=False) | |
| raise AttributeError(f"Cannot resolve array from {type(obj)}") | |