Spaces:
Running
Running
| """noctilith/sim/quantized_field_ops.py — vectorized field operations (M12.2+).""" | |
| from __future__ import annotations | |
| import numpy as np | |
| from typing import List, Tuple | |
| SCHEMA_VERSION = "noctilith.schema.v1" | |
| MODULE_NAME = "noctilith.sim.quantized_field_ops" | |
| def compute_overlap_eta(positions: np.ndarray, epsilon: float = 1e-12) -> float: | |
| """η(t) = 1 − |uniq β(𝒫)| / (N+ε) — spatial overlap metric.""" | |
| if len(positions) == 0: | |
| return 0.0 | |
| beta = np.floor(positions).astype(int) | |
| unique = len({tuple(r) for r in beta}) | |
| N = len(positions) | |
| return float(1.0 - unique / (N + epsilon)) | |
| def build_radius_kernel(radius: int, smooth: bool = True) -> np.ndarray: | |
| """Build ℱ! normalized Manhattan distance kernel of given radius.""" | |
| r = max(0, int(radius)) | |
| size = 2 * r + 1 | |
| kernel = np.zeros((size, size, size), dtype=np.float64) | |
| for ix in range(size): | |
| for iy in range(size): | |
| for iz in range(size): | |
| dist = abs(ix-r) + abs(iy-r) + abs(iz-r) | |
| if dist <= r: | |
| kernel[ix, iy, iz] = 1.0 / (dist + 1.0) if smooth else 1.0 | |
| total = kernel.sum() | |
| if total > 0: | |
| kernel /= total | |
| else: | |
| kernel[r, r, r] = 1.0 | |
| return kernel | |
| def scatter_deposit(arr: np.ndarray, center: Tuple[int,int,int], delta: float, | |
| kernel: np.ndarray, *, clamp: float = 1e9) -> None: | |
| """Vectorized kernel deposition at center voxel.""" | |
| r = kernel.shape[0] // 2 | |
| nx, ny, nz = arr.shape | |
| ix, iy, iz = int(round(center[0])), int(round(center[1])), int(round(center[2])) | |
| x0, x1 = max(0, ix-r), min(nx, ix+r+1) | |
| y0, y1 = max(0, iy-r), min(ny, iy+r+1) | |
| z0, z1 = max(0, iz-r), min(nz, iz+r+1) | |
| kx0 = x0 - (ix-r); kx1 = kx0 + (x1-x0) | |
| ky0 = y0 - (iy-r); ky1 = ky0 + (y1-y0) | |
| kz0 = z0 - (iz-r); kz1 = kz0 + (z1-z0) | |
| if x1 > x0 and y1 > y0 and z1 > z0: | |
| deposit = delta * kernel[kx0:kx1, ky0:ky1, kz0:kz1] | |
| arr[x0:x1, y0:y1, z0:z1] = np.clip( | |
| arr[x0:x1, y0:y1, z0:z1] + deposit, 0.0, clamp) | |
| def batch_scatter_deposit(arr: np.ndarray, hits: list, kernel: np.ndarray, | |
| *, clamp: float = 1e9) -> int: | |
| """β-quantize + uniq 𝒫 batch deposition. Returns unique voxel count.""" | |
| if not hits: | |
| return 0 | |
| acc: dict = {} | |
| for center, delta in hits: | |
| key = (int(round(center[0])), int(round(center[1])), int(round(center[2]))) | |
| acc[key] = acc.get(key, 0.0) + float(delta) | |
| for key, delta in acc.items(): | |
| scatter_deposit(arr, key, delta, kernel=kernel, clamp=clamp) | |
| return len(acc) | |
| def vectorized_propagate(fracture_risk: np.ndarray, seeds: list, kernel: np.ndarray, | |
| *, gain: float = 0.35, clamp: float = 1.0) -> np.ndarray: | |
| """Propagate fracture risk from seed candidates using kernel.""" | |
| result = fracture_risk.astype(np.float64, copy=True) | |
| if not seeds: | |
| return result | |
| # β-quantize + uniq 𝒫 dedup on seed positions | |
| seen = set() | |
| unique_seeds = [] | |
| for s in seeds: | |
| key = (int(s[0]), int(s[1]), int(s[2])) | |
| if key not in seen: | |
| seen.add(key) | |
| unique_seeds.append((key, float(s[3]))) | |
| for (ix, iy, iz), risk_val in unique_seeds: | |
| scatter_deposit(result, (ix, iy, iz), gain * risk_val, kernel=kernel, clamp=clamp) | |
| return result | |
| def vectorized_extract_candidates(fracture_risk: np.ndarray, threshold: float, | |
| max_candidates: int) -> List[Tuple[int,int,int,float]]: | |
| """O(n) β-quantized candidate extraction — no Python dedup loop needed.""" | |
| flat = fracture_risk.ravel() | |
| flat_idx = np.flatnonzero(flat >= threshold) | |
| if flat_idx.size == 0: | |
| return [] | |
| values = flat[flat_idx] | |
| n = len(values) | |
| if n <= max_candidates: | |
| top_flat = flat_idx[np.argsort(values)[::-1]] | |
| else: | |
| part = np.argpartition(values, n - max_candidates)[-max_candidates:] | |
| top_flat = flat_idx[part[np.argsort(values[part])[::-1]]] | |
| top_flat = top_flat[:max_candidates] | |
| top_vals = flat[top_flat] | |
| ix, iy, iz = np.unravel_index(top_flat, fracture_risk.shape) | |
| return list(zip(ix.tolist(), iy.tolist(), iz.tolist(), top_vals.tolist())) | |
| def vectorized_extract_candidates_local(fracture_risk: np.ndarray, | |
| threshold_field: np.ndarray, | |
| max_candidates: int) -> List[Tuple[int,int,int,float]]: | |
| """Per-cell threshold variant for LocalMaterialMap integration.""" | |
| mask = fracture_risk >= threshold_field | |
| flat = fracture_risk.ravel() | |
| flat_idx = np.flatnonzero(mask.ravel()) | |
| if flat_idx.size == 0: | |
| return [] | |
| values = flat[flat_idx] | |
| n = len(values) | |
| if n <= max_candidates: | |
| top_flat = flat_idx[np.argsort(values)[::-1]] | |
| else: | |
| part = np.argpartition(values, n - max_candidates)[-max_candidates:] | |
| top_flat = flat_idx[part[np.argsort(values[part])[::-1]]] | |
| top_flat = top_flat[:max_candidates] | |
| top_vals = flat[top_flat] | |
| ix, iy, iz = np.unravel_index(top_flat, fracture_risk.shape) | |
| return list(zip(ix.tolist(), iy.tolist(), iz.tolist(), top_vals.tolist())) | |