voxforge-world / engine /sim /local_material_map.py
peiti's picture
Upload folder using huggingface_hub
b154e4c verified
"""noctilith/sim/local_material_map.py — M13.1: per-cell material parameter arrays."""
from __future__ import annotations
import numpy as np
from dataclasses import dataclass, field
from typing import Any, Dict, Optional, Tuple
from engine.sim.material_profiles import MaterialPipelineProfile, PROFILE_CACHE, clamp
SCHEMA_VERSION = "noctilith.schema.v1"
MODULE_NAME = "noctilith.sim.local_material_map"
_DEFAULT_PARAMS = {
"temp_weight": 0.45, "entropy_weight": 0.40, "fatigue_exponent": 1.35,
"risk_gain": 0.65, "fracture_threshold": 0.80, "propagation_gain": 0.35,
"damage_gain": 0.65, "debris_threshold": 0.88, "entropy_gain": 0.40,
"thermal_gain": 0.15, "fragmentation_threshold": 0.50, "fragmentation_gain": 0.40,
}
@dataclass
class LocalMaterialMap:
shape: Tuple[int, ...]
temp_weight: np.ndarray = field(repr=False)
entropy_weight: np.ndarray = field(repr=False)
fatigue_exponent: np.ndarray = field(repr=False)
risk_gain: np.ndarray = field(repr=False)
fracture_threshold: np.ndarray = field(repr=False)
propagation_gain: np.ndarray = field(repr=False)
damage_gain: np.ndarray = field(repr=False)
debris_threshold: np.ndarray = field(repr=False)
entropy_gain: np.ndarray = field(repr=False)
thermal_gain: np.ndarray = field(repr=False)
fragmentation_threshold: np.ndarray = field(repr=False)
fragmentation_gain: np.ndarray = field(repr=False)
unique_material_ids: Tuple[int, ...] = field(default_factory=tuple)
material_diversity: float = 0.0
@classmethod
def from_voxel_ids(cls, voxel_ids: np.ndarray, *, registry=None,
cache=None) -> "LocalMaterialMap":
if cache is None: cache = PROFILE_CACHE
shape = voxel_ids.shape
unique_ids = np.unique(voxel_ids).tolist()
param_names = list(_DEFAULT_PARAMS.keys())
max_id = max(unique_ids) if unique_ids else 0
lut = np.zeros((max_id + 1, len(param_names)), dtype=np.float32)
for vid in unique_ids:
profile = cache.get(int(vid), registry)
if profile is not None:
lut[vid] = _extract_params(profile)
else:
lut[vid] = [float(_DEFAULT_PARAMS[k]) for k in param_names]
ids_flat = np.clip(voxel_ids.ravel().astype(np.int32), 0, max_id)
params_flat = lut[ids_flat]
def _arr(i: int) -> np.ndarray:
return params_flat[:, i].reshape(shape).astype(np.float32)
idx = {name: i for i, name in enumerate(param_names)}
counts = np.bincount(ids_flat, minlength=max_id+1).astype(np.float64)
total = counts.sum() + 1e-12
probs = counts[counts > 0] / total
h = -np.sum(probs * np.log2(probs + 1e-12))
max_h = np.log2(len(unique_ids) + 1e-12)
diversity = float(clamp(h / (max_h + 1e-12), 0.0, 1.0))
return cls(
shape=shape,
temp_weight =_arr(idx["temp_weight"]),
entropy_weight =_arr(idx["entropy_weight"]),
fatigue_exponent =_arr(idx["fatigue_exponent"]),
risk_gain =_arr(idx["risk_gain"]),
fracture_threshold=_arr(idx["fracture_threshold"]),
propagation_gain =_arr(idx["propagation_gain"]),
damage_gain =_arr(idx["damage_gain"]),
debris_threshold =_arr(idx["debris_threshold"]),
entropy_gain =_arr(idx["entropy_gain"]),
thermal_gain =_arr(idx["thermal_gain"]),
fragmentation_threshold=_arr(idx["fragmentation_threshold"]),
fragmentation_gain =_arr(idx["fragmentation_gain"]),
unique_material_ids=tuple(int(v) for v in unique_ids),
material_diversity =diversity,
)
@classmethod
def uniform(cls, material_id: int, shape: Tuple[int,...],
*, registry=None, cache=None) -> "LocalMaterialMap":
ids = np.full(shape, material_id, dtype=np.int16)
return cls.from_voxel_ids(ids, registry=registry, cache=cache)
def compute_fatigue_vectorized(self, T: np.ndarray, S: np.ndarray,
phi_rms_total: float = 0.0, *,
ambient_temperature: float = 293.15,
temperature_scale_K: float = 500.0,
entropy_scale: float = 1.0,
phi_rms_scale: float = 1.0,
max_fatigue_multiplier: float = 100.0) -> np.ndarray:
dT = np.maximum(0.0, T - ambient_temperature) / max(temperature_scale_K, 1e-8)
s_term = np.maximum(0.0, S) / max(entropy_scale, 1e-8)
rms = max(0.0, phi_rms_total) / max(phi_rms_scale, 1e-8)
rms_w = np.clip(1.0 - self.temp_weight - self.entropy_weight, 0.0, 1.0)
raw = self.temp_weight * dT + self.entropy_weight * s_term + rms_w * rms
fatigue= 1.0 + np.power(np.maximum(0.0, raw), self.fatigue_exponent.astype(np.float64))
return np.clip(fatigue, 1.0, max_fatigue_multiplier)
def get_fracture_threshold_field(self) -> np.ndarray:
return self.fracture_threshold.astype(np.float64)
def summary(self) -> Dict[str, Any]:
return {
"schema_version": SCHEMA_VERSION,
"module_name": MODULE_NAME,
"shape": self.shape,
"unique_materials": list(self.unique_material_ids),
"n_materials": len(self.unique_material_ids),
"material_diversity": float(self.material_diversity),
"mean_fracture_threshold": float(self.fracture_threshold.mean()),
"mean_entropy_gain": float(self.entropy_gain.mean()),
"mean_fatigue_exponent": float(self.fatigue_exponent.mean()),
}
def _extract_params(profile: MaterialPipelineProfile) -> list:
d = profile.degradation; f = profile.fracture
w = profile.world_reaction; c = profile.coupling
return [
float(d.temp_weight), float(d.entropy_weight), float(d.fatigue_exponent),
float(d.risk_gain), float(f.fracture_risk_threshold), float(f.propagation_gain),
float(w.damage_gain), float(w.debris_threshold), float(c.entropy_gain),
float(c.thermal_gain), float(c.fragmentation_strength_threshold),
float(c.fragmentation_strength_gain),
]
def evaluate_degradation_local(thermal_grid: Any, entropy_field: Any,
material_map: LocalMaterialMap, *,
phi_rms_total: float = 0.0,
ambient_temperature: float = 293.15,
temperature_scale_K: float = 500.0,
entropy_scale: float = 1.0,
risk_bias: float = 0.0) -> Dict[str, Any]:
T = _resolve_arr(thermal_grid)
S = _resolve_arr(entropy_field)
fatigue = material_map.compute_fatigue_vectorized(
T, S, phi_rms_total, ambient_temperature=ambient_temperature,
temperature_scale_K=temperature_scale_K, entropy_scale=entropy_scale)
x = np.maximum(0.0, fatigue - 1.0)
risk = 1.0 - np.exp(-(material_map.risk_gain * x + risk_bias))
risk = np.clip(risk, 0.0, 1.0)
return {
"schema_version": SCHEMA_VERSION, "module_name": MODULE_NAME,
"risk_field": risk.astype(np.float32, copy=True),
"risk_max": float(np.max(risk)), "risk_mean": float(np.mean(risk)),
"fatigue_max": float(np.max(fatigue)), "fatigue_mean": float(np.mean(fatigue)),
"material_diversity": float(material_map.material_diversity),
}
def _resolve_arr(obj: Any) -> np.ndarray:
if isinstance(obj, np.ndarray): return obj.astype(np.float64, copy=False)
for name in ("T", "S", "values", "temperatures", "entropy", "E"):
arr = getattr(obj, name, None)
if arr is not None and isinstance(arr, np.ndarray):
return arr.astype(np.float64, copy=False)
raise AttributeError(f"Cannot resolve array from {type(obj)}")