voxforge-world / engine /sim /debris_execution.py
peiti's picture
Upload folder using huggingface_hub
b154e4c verified
"""noctilith/sim/debris_execution.py — DebrisExecutionBridge."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
SCHEMA_VERSION = "noctilith.schema.v1"
MODULE_NAME = "noctilith.sim.debris_execution"
@dataclass
class DebrisExecutionConfig:
enabled: bool = True
request_limit_per_step: int = 16
min_strength: float = 0.05
max_strength: float = 10.0
velocity_gain: float = 1.2
mass_gain: float = 0.8
lifetime_base: float = 3.0
lifetime_gain: float = 2.0
consume_requests: bool = True
emit_events: bool = True
@dataclass
class DebrisExecutionTelemetry:
step_index: int = 0
requests_found: int = 0
spawned: int = 0
skipped_weak: int = 0
emitted_events: int = 0
class DebrisExecutionBridge:
def __init__(self, config: Optional[DebrisExecutionConfig] = None):
self.config = config or DebrisExecutionConfig()
def _make_payload(self, *, index: Tuple[int,int,int], strength: float,
step_index: int) -> Dict[str, Any]:
cfg = self.config
ix, iy, iz = index
vx = cfg.velocity_gain * (((ix % 3) - 1) * 0.25) * strength
vy = cfg.velocity_gain * (0.35 + ((iy % 5) * 0.03)) * strength
vz = cfg.velocity_gain * (((iz % 3) - 1) * 0.25) * strength
return {
"entity_type": "debris",
"origin_index": (int(ix), int(iy), int(iz)),
"position": (float(ix), float(iy), float(iz)),
"velocity": (float(vx), float(vy), float(vz)),
"strength": float(strength),
"mass": float(cfg.mass_gain * strength),
"lifetime": float(cfg.lifetime_base + cfg.lifetime_gain * strength),
"spawn_step_index": int(step_index),
}
def _consume(self, target_obj: Any) -> List[Dict[str, Any]]:
requests = getattr(target_obj, "_debris_requests", [])
if not requests:
return []
result = list(requests[:self.config.request_limit_per_step])
if self.config.consume_requests:
del requests[:len(result)]
return result
def _spawn(self, target_obj: Any, payload: Dict[str, Any]) -> bool:
fn = getattr(target_obj, "spawn_entity", None)
if callable(fn):
fn("debris", payload); return True
lst = getattr(target_obj, "_spawned_entities", None)
if lst is not None:
lst.append({"kind": "debris", "payload": payload}); return True
return False
def apply(self, *, target_obj: Any, world_obj: Any,
telemetry: Optional[Dict[str, Any]] = None,
event_bus: Optional[Any] = None) -> Dict[str, Any]:
cfg = self.config
out = DebrisExecutionTelemetry()
if not cfg.enabled:
return self._telemetry_dict(out)
telemetry = telemetry or {}
out.step_index = int(telemetry.get("step_index", 0))
requests = self._consume(world_obj if world_obj is not None else target_obj)
out.requests_found = len(requests)
for req in requests:
idx = req.get("index", (0, 0, 0))
strength = float(req.get("strength", 1.0))
if strength < cfg.min_strength:
out.skipped_weak += 1
continue
strength = min(strength, cfg.max_strength)
payload = self._make_payload(index=tuple(idx), strength=strength,
step_index=out.step_index)
if self._spawn(target_obj, payload):
out.spawned += 1
return self._telemetry_dict(out)
def _telemetry_dict(self, out: DebrisExecutionTelemetry) -> Dict[str, Any]:
return {
"schema_version": SCHEMA_VERSION,
"module_name": MODULE_NAME,
"step_index": int(out.step_index),
"requests_found": int(out.requests_found),
"spawned": int(out.spawned),
"skipped_weak": int(out.skipped_weak),
"emitted_events": int(out.emitted_events),
}