"""noctilith/sim/quantized_field_ops.py — vectorized field operations (M12.2+).""" from __future__ import annotations import numpy as np from typing import List, Tuple SCHEMA_VERSION = "noctilith.schema.v1" MODULE_NAME = "noctilith.sim.quantized_field_ops" def compute_overlap_eta(positions: np.ndarray, epsilon: float = 1e-12) -> float: """η(t) = 1 − |uniq β(𝒫)| / (N+ε) — spatial overlap metric.""" if len(positions) == 0: return 0.0 beta = np.floor(positions).astype(int) unique = len({tuple(r) for r in beta}) N = len(positions) return float(1.0 - unique / (N + epsilon)) def build_radius_kernel(radius: int, smooth: bool = True) -> np.ndarray: """Build ℱ! normalized Manhattan distance kernel of given radius.""" r = max(0, int(radius)) size = 2 * r + 1 kernel = np.zeros((size, size, size), dtype=np.float64) for ix in range(size): for iy in range(size): for iz in range(size): dist = abs(ix-r) + abs(iy-r) + abs(iz-r) if dist <= r: kernel[ix, iy, iz] = 1.0 / (dist + 1.0) if smooth else 1.0 total = kernel.sum() if total > 0: kernel /= total else: kernel[r, r, r] = 1.0 return kernel def scatter_deposit(arr: np.ndarray, center: Tuple[int,int,int], delta: float, kernel: np.ndarray, *, clamp: float = 1e9) -> None: """Vectorized kernel deposition at center voxel.""" r = kernel.shape[0] // 2 nx, ny, nz = arr.shape ix, iy, iz = int(round(center[0])), int(round(center[1])), int(round(center[2])) x0, x1 = max(0, ix-r), min(nx, ix+r+1) y0, y1 = max(0, iy-r), min(ny, iy+r+1) z0, z1 = max(0, iz-r), min(nz, iz+r+1) kx0 = x0 - (ix-r); kx1 = kx0 + (x1-x0) ky0 = y0 - (iy-r); ky1 = ky0 + (y1-y0) kz0 = z0 - (iz-r); kz1 = kz0 + (z1-z0) if x1 > x0 and y1 > y0 and z1 > z0: deposit = delta * kernel[kx0:kx1, ky0:ky1, kz0:kz1] arr[x0:x1, y0:y1, z0:z1] = np.clip( arr[x0:x1, y0:y1, z0:z1] + deposit, 0.0, clamp) def batch_scatter_deposit(arr: np.ndarray, hits: list, kernel: np.ndarray, *, clamp: float = 1e9) -> int: """β-quantize + uniq 𝒫 batch deposition. Returns unique voxel count.""" if not hits: return 0 acc: dict = {} for center, delta in hits: key = (int(round(center[0])), int(round(center[1])), int(round(center[2]))) acc[key] = acc.get(key, 0.0) + float(delta) for key, delta in acc.items(): scatter_deposit(arr, key, delta, kernel=kernel, clamp=clamp) return len(acc) def vectorized_propagate(fracture_risk: np.ndarray, seeds: list, kernel: np.ndarray, *, gain: float = 0.35, clamp: float = 1.0) -> np.ndarray: """Propagate fracture risk from seed candidates using kernel.""" result = fracture_risk.astype(np.float64, copy=True) if not seeds: return result # β-quantize + uniq 𝒫 dedup on seed positions seen = set() unique_seeds = [] for s in seeds: key = (int(s[0]), int(s[1]), int(s[2])) if key not in seen: seen.add(key) unique_seeds.append((key, float(s[3]))) for (ix, iy, iz), risk_val in unique_seeds: scatter_deposit(result, (ix, iy, iz), gain * risk_val, kernel=kernel, clamp=clamp) return result def vectorized_extract_candidates(fracture_risk: np.ndarray, threshold: float, max_candidates: int) -> List[Tuple[int,int,int,float]]: """O(n) β-quantized candidate extraction — no Python dedup loop needed.""" flat = fracture_risk.ravel() flat_idx = np.flatnonzero(flat >= threshold) if flat_idx.size == 0: return [] values = flat[flat_idx] n = len(values) if n <= max_candidates: top_flat = flat_idx[np.argsort(values)[::-1]] else: part = np.argpartition(values, n - max_candidates)[-max_candidates:] top_flat = flat_idx[part[np.argsort(values[part])[::-1]]] top_flat = top_flat[:max_candidates] top_vals = flat[top_flat] ix, iy, iz = np.unravel_index(top_flat, fracture_risk.shape) return list(zip(ix.tolist(), iy.tolist(), iz.tolist(), top_vals.tolist())) def vectorized_extract_candidates_local(fracture_risk: np.ndarray, threshold_field: np.ndarray, max_candidates: int) -> List[Tuple[int,int,int,float]]: """Per-cell threshold variant for LocalMaterialMap integration.""" mask = fracture_risk >= threshold_field flat = fracture_risk.ravel() flat_idx = np.flatnonzero(mask.ravel()) if flat_idx.size == 0: return [] values = flat[flat_idx] n = len(values) if n <= max_candidates: top_flat = flat_idx[np.argsort(values)[::-1]] else: part = np.argpartition(values, n - max_candidates)[-max_candidates:] top_flat = flat_idx[part[np.argsort(values[part])[::-1]]] top_flat = top_flat[:max_candidates] top_vals = flat[top_flat] ix, iy, iz = np.unravel_index(top_flat, fracture_risk.shape) return list(zip(ix.tolist(), iy.tolist(), iz.tolist(), top_vals.tolist()))