Spaces:
Running
Running
| """noctilith/sim/debris_execution.py — DebrisExecutionBridge.""" | |
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| from typing import Any, Dict, List, Optional, Tuple | |
| SCHEMA_VERSION = "noctilith.schema.v1" | |
| MODULE_NAME = "noctilith.sim.debris_execution" | |
| class DebrisExecutionConfig: | |
| enabled: bool = True | |
| request_limit_per_step: int = 16 | |
| min_strength: float = 0.05 | |
| max_strength: float = 10.0 | |
| velocity_gain: float = 1.2 | |
| mass_gain: float = 0.8 | |
| lifetime_base: float = 3.0 | |
| lifetime_gain: float = 2.0 | |
| consume_requests: bool = True | |
| emit_events: bool = True | |
| class DebrisExecutionTelemetry: | |
| step_index: int = 0 | |
| requests_found: int = 0 | |
| spawned: int = 0 | |
| skipped_weak: int = 0 | |
| emitted_events: int = 0 | |
| class DebrisExecutionBridge: | |
| def __init__(self, config: Optional[DebrisExecutionConfig] = None): | |
| self.config = config or DebrisExecutionConfig() | |
| def _make_payload(self, *, index: Tuple[int,int,int], strength: float, | |
| step_index: int) -> Dict[str, Any]: | |
| cfg = self.config | |
| ix, iy, iz = index | |
| vx = cfg.velocity_gain * (((ix % 3) - 1) * 0.25) * strength | |
| vy = cfg.velocity_gain * (0.35 + ((iy % 5) * 0.03)) * strength | |
| vz = cfg.velocity_gain * (((iz % 3) - 1) * 0.25) * strength | |
| return { | |
| "entity_type": "debris", | |
| "origin_index": (int(ix), int(iy), int(iz)), | |
| "position": (float(ix), float(iy), float(iz)), | |
| "velocity": (float(vx), float(vy), float(vz)), | |
| "strength": float(strength), | |
| "mass": float(cfg.mass_gain * strength), | |
| "lifetime": float(cfg.lifetime_base + cfg.lifetime_gain * strength), | |
| "spawn_step_index": int(step_index), | |
| } | |
| def _consume(self, target_obj: Any) -> List[Dict[str, Any]]: | |
| requests = getattr(target_obj, "_debris_requests", []) | |
| if not requests: | |
| return [] | |
| result = list(requests[:self.config.request_limit_per_step]) | |
| if self.config.consume_requests: | |
| del requests[:len(result)] | |
| return result | |
| def _spawn(self, target_obj: Any, payload: Dict[str, Any]) -> bool: | |
| fn = getattr(target_obj, "spawn_entity", None) | |
| if callable(fn): | |
| fn("debris", payload); return True | |
| lst = getattr(target_obj, "_spawned_entities", None) | |
| if lst is not None: | |
| lst.append({"kind": "debris", "payload": payload}); return True | |
| return False | |
| def apply(self, *, target_obj: Any, world_obj: Any, | |
| telemetry: Optional[Dict[str, Any]] = None, | |
| event_bus: Optional[Any] = None) -> Dict[str, Any]: | |
| cfg = self.config | |
| out = DebrisExecutionTelemetry() | |
| if not cfg.enabled: | |
| return self._telemetry_dict(out) | |
| telemetry = telemetry or {} | |
| out.step_index = int(telemetry.get("step_index", 0)) | |
| requests = self._consume(world_obj if world_obj is not None else target_obj) | |
| out.requests_found = len(requests) | |
| for req in requests: | |
| idx = req.get("index", (0, 0, 0)) | |
| strength = float(req.get("strength", 1.0)) | |
| if strength < cfg.min_strength: | |
| out.skipped_weak += 1 | |
| continue | |
| strength = min(strength, cfg.max_strength) | |
| payload = self._make_payload(index=tuple(idx), strength=strength, | |
| step_index=out.step_index) | |
| if self._spawn(target_obj, payload): | |
| out.spawned += 1 | |
| return self._telemetry_dict(out) | |
| def _telemetry_dict(self, out: DebrisExecutionTelemetry) -> Dict[str, Any]: | |
| return { | |
| "schema_version": SCHEMA_VERSION, | |
| "module_name": MODULE_NAME, | |
| "step_index": int(out.step_index), | |
| "requests_found": int(out.requests_found), | |
| "spawned": int(out.spawned), | |
| "skipped_weak": int(out.skipped_weak), | |
| "emitted_events": int(out.emitted_events), | |
| } | |