Spaces:
Sleeping
Sleeping
| """MotiveGridGame — the shared per-turn game loop for the motive_grid family. | |
| ``MotiveGridGame`` subclasses ``proteus.game.engine.ARCBaseGame``. It owns the | |
| turn loop and delegates all scenario-specific behaviour (level construction, | |
| threat motion, elimination, answer keys) to an injected :class:`Scenario`. | |
| One ``perform_action`` call drives exactly one turn (the engine calls | |
| ``step()`` repeatedly until ``complete_action()``; this game completes the | |
| action in a single ``step``). The public surface is | |
| :meth:`apply_motive_action`, which accepts a plain action string so callers | |
| (the future ``MotiveGridModule``) never touch the ``GameAction`` enums. | |
| See ``docs/superpowers/specs/2026-06-01-motive-grid-design.md`` §3. | |
| """ | |
| from __future__ import annotations | |
| import random | |
| from typing import TYPE_CHECKING | |
| import numpy as np | |
| from proteus.game.engine import ( | |
| ARCBaseGame, | |
| ActionInput, | |
| Camera, | |
| GameAction, | |
| GameState, | |
| Sprite, | |
| ) | |
| if TYPE_CHECKING: | |
| from ..scenarios.base import Scenario | |
| # Action string -> (dx, dy) grid delta. y grows downward (arc_grid convention). | |
| # "interact" is a no-move action whose effect is delegated to scenario.on_interact. | |
| _DIRECTION_DELTAS: dict[str, tuple[int, int]] = { | |
| "up": (0, -1), | |
| "down": (0, 1), | |
| "left": (-1, 0), | |
| "right": (1, 0), | |
| "stay": (0, 0), | |
| "interact": (0, 0), | |
| } | |
| # Action string -> engine GameAction (per spec §3: up/down/left/right = | |
| # ACTION1..4, stay = ACTION5). | |
| _ACTION_TO_GAMEACTION: dict[str, GameAction] = { | |
| "up": GameAction.ACTION1, | |
| "down": GameAction.ACTION2, | |
| "left": GameAction.ACTION3, | |
| "right": GameAction.ACTION4, | |
| "stay": GameAction.ACTION5, | |
| } | |
| # Reverse map: engine GameAction -> action string, for step() dispatch. | |
| _GAMEACTION_TO_ACTION: dict[GameAction, str] = { | |
| v: k for k, v in _ACTION_TO_GAMEACTION.items() | |
| } | |
| # Sprite names the scenario contract relies on. | |
| FOCAL_NAME = "focal" | |
| PREDATOR_NAME = "predator" | |
| class MotiveGridGame(ARCBaseGame): | |
| """Shared turn loop for motive_grid scenarios. | |
| The game is constructed with a concrete :class:`Scenario`, builds its world | |
| via ``scenario.build_level``, and runs each turn through :meth:`step`: | |
| 1. Move the focal sprite (internal-wall collision or an off-grid target | |
| keeps it in place). | |
| 2. ``scenario.advance_threat`` moves the predator/NPCs. | |
| 3. If ``scenario.check_elimination`` -> :meth:`lose`. | |
| 4. Else if ``scenario.check_success`` -> :meth:`win` (early win). | |
| 5. Else if the step budget is exhausted -> :meth:`win`. | |
| 6. ``complete_action`` ends the turn. | |
| Boundary handling: arc_grid enforces no world border, so the framework | |
| guards the grid edge for the focal agent itself. Every scenario uses | |
| ``camera == grid_size`` with no panning, so a focal move whose target cell | |
| falls outside ``[0, width) x [0, height)`` is rejected (see | |
| :meth:`within_bounds`). Internal walls (dead-ends, obstacles) are still | |
| handled by ``try_move_sprite``. Scenarios therefore need not ring the grid | |
| with perimeter walls, but DO own their own threat sprites' bounds. | |
| """ | |
| def __init__( | |
| self, | |
| scenario: Scenario, | |
| rng: random.Random, | |
| difficulty, | |
| max_steps: int, | |
| ) -> None: | |
| """Initialize the game from a scenario. | |
| Args: | |
| scenario: The injected scenario driving world/threat/answer-key | |
| behaviour. | |
| rng: Seeded RNG, threaded into level construction and threat | |
| tie-breaks for determinism. | |
| difficulty: Session difficulty (passed through to the scenario). | |
| max_steps: Survival budget; surviving this many played turns wins. | |
| """ | |
| level = scenario.build_level(rng, difficulty) | |
| width, height = scenario.grid_size | |
| camera = Camera(width=width, height=height) | |
| # Direction actions (1..5) only; placeable/click action 6 is unused. | |
| super().__init__( | |
| game_id=scenario.task_name, | |
| levels=[level], | |
| camera=camera, | |
| available_actions=[1, 2, 3, 4, 5], | |
| ) | |
| self.scenario = scenario | |
| self.rng = rng | |
| self.difficulty = difficulty | |
| self.max_steps = max_steps | |
| self.step_count = 0 | |
| # Health is opt-in: scenarios that don't use it return 0 (inert). | |
| self.health = scenario.health_start(difficulty) | |
| # The plain action string committed this turn (read by step()); set by | |
| # apply_motive_action. "interact" cannot round-trip through GameAction | |
| # (ACTION6 is a ComplexAction), so we track the string directly. | |
| self._last_motive_action = "stay" | |
| # ------------------------------------------------------------------ # | |
| # Public turn surface | |
| # ------------------------------------------------------------------ # | |
| def apply_motive_action(self, action: str): | |
| """Play one turn from a plain action string. | |
| Wraps the action string in an ``ActionInput`` and drives one turn via | |
| the engine's ``perform_action`` (which calls :meth:`step`). This is the | |
| public surface so callers never build ``GameAction`` enums themselves. | |
| Args: | |
| action: One of ``"up"``, ``"down"``, ``"left"``, ``"right"``, | |
| ``"stay"``. | |
| Returns: | |
| The ``FrameData`` returned by ``perform_action``. | |
| Raises: | |
| ValueError: If *action* is not a recognised direction string. | |
| """ | |
| if action == "interact": | |
| # Engine no-move (ACTION5); the effect is applied in step() via | |
| # scenario.on_interact. We bypass _ACTION_TO_GAMEACTION so the | |
| # GameAction reverse-map stays unambiguous. | |
| self._last_motive_action = "interact" | |
| return self.perform_action(ActionInput(id=_ACTION_TO_GAMEACTION["stay"])) | |
| if action not in _ACTION_TO_GAMEACTION: | |
| valid = ", ".join([*_ACTION_TO_GAMEACTION, "interact"]) | |
| raise ValueError(f"Unknown action '{action}'. Valid: {valid}") | |
| self._last_motive_action = action | |
| action_input = ActionInput(id=_ACTION_TO_GAMEACTION[action]) | |
| return self.perform_action(action_input) | |
| def damage(self, amount: int) -> None: | |
| """Reduce ``health`` by *amount*, clamped at 0 (never negative). | |
| Inert for scenarios that start at 0 health and never check it; HP | |
| scenarios call this from ``scenario.on_step_effects`` / ``on_interact``. | |
| """ | |
| self.health = max(0, self.health - amount) | |
| def step(self) -> None: | |
| """Resolve a single turn, honouring ``scenario.turn_order``. | |
| focal_first (default): move focal, then advance threat (chasing the new | |
| focal cell). predator_first: advance threat first (chasing the current | |
| focal cell), then move focal. Capture/survival are checked once, after | |
| both have moved. Always completes the action so the engine loop ends. | |
| """ | |
| action = self._last_motive_action | |
| dx, dy = _DIRECTION_DELTAS.get(action, (0, 0)) | |
| focal = self.focal_sprite | |
| predator_first = ( | |
| getattr(self.scenario, "turn_order", "focal_first") == "predator_first" | |
| ) | |
| if predator_first: | |
| self.scenario.advance_threat(self) | |
| if ( | |
| focal is not None | |
| and (dx != 0 or dy != 0) | |
| and self._footprint_in_bounds(focal, dx, dy) | |
| ): | |
| self.try_move_sprite(focal, dx, dy) | |
| if not predator_first: | |
| self.scenario.advance_threat(self) | |
| self.step_count += 1 | |
| # New per-turn hooks (default no-op): environmental effects, then the | |
| # focal's interact effect, then terminal checks (elimination > success | |
| # > curfew). Existing scenarios override none of these. | |
| self.scenario.on_step_effects(self) | |
| if action == "interact": | |
| self.scenario.on_interact(self) | |
| if self.scenario.check_elimination(self): | |
| self.lose() | |
| elif self.scenario.check_success(self): | |
| self.win() | |
| elif self.scenario.turn_limited and self.step_count >= self.max_steps: | |
| self.win() | |
| self.complete_action() | |
| # ------------------------------------------------------------------ # | |
| # Sprite / state accessors for scenarios | |
| # ------------------------------------------------------------------ # | |
| def within_bounds(self, x: int, y: int) -> bool: | |
| """Return True if cell ``(x, y)`` lies inside the scenario grid. | |
| Every motive_grid scenario uses ``camera == grid_size`` with no | |
| panning, so the grid is the world. A move whose target cell falls | |
| outside ``[0, width) x [0, height)`` is rejected, keeping sprites | |
| on-screen without each scenario needing a perimeter wall. Public so a | |
| scenario's ``advance_threat`` hook can reuse it to keep its own threat | |
| sprites on-grid (the framework only guards the focal agent). | |
| Args: | |
| x: Grid column (cell x-coordinate). | |
| y: Grid row (cell y-coordinate). | |
| """ | |
| width, height = self.scenario.grid_size | |
| return 0 <= x < width and 0 <= y < height | |
| def _footprint_in_bounds(self, sprite, dx: int, dy: int) -> bool: | |
| """Return True if *sprite*'s full footprint stays on-grid after (dx, dy). | |
| For a 1x1 sprite this is exactly ``within_bounds(x+dx, y+dy)`` (so the | |
| single-cell scenarios are unaffected); for a multi-cell sprite it checks | |
| every footprint cell. | |
| """ | |
| nx, ny = sprite.x + dx, sprite.y + dy | |
| for j in range(sprite.height): | |
| for i in range(sprite.width): | |
| if not self.within_bounds(nx + i, ny + j): | |
| return False | |
| return True | |
| def get_sprite(self, name: str) -> Sprite | None: | |
| """Return the first sprite with *name*, or ``None`` if absent. | |
| Args: | |
| name: The sprite name to look up on the current level. | |
| Returns: | |
| The matching :class:`~proteus.game.engine.Sprite`, or ``None``. | |
| """ | |
| sprites = self.current_level.get_sprites_by_name(name) | |
| return sprites[0] if sprites else None | |
| def focal_sprite(self) -> Sprite | None: | |
| """The model-controlled focal sprite (``name="focal"``).""" | |
| return self.get_sprite(FOCAL_NAME) | |
| def predator_sprite(self) -> Sprite | None: | |
| """The threat/predator sprite (``name="predator"``).""" | |
| return self.get_sprite(PREDATOR_NAME) | |
| def eliminated(self) -> bool: | |
| """Whether the focal agent has been eliminated (game over).""" | |
| return self._state == GameState.GAME_OVER | |
| def survived(self) -> bool: | |
| """Whether the focal agent won (survived the step budget or achieved an early goal).""" | |
| return self._state == GameState.WIN | |
| def current_grid(self) -> np.ndarray: | |
| """Render the current world at native resolution (for ASCII). | |
| Returns the native ``width x height`` palette array (e.g. ``(8, 8)`` for | |
| an 8x8 grid), one palette index per cell, which ``ascii_view`` converts | |
| to text. This is distinct from :meth:`current_frame`, which returns the | |
| 64x64 upscaled render used for PNG/debug output. | |
| We deliberately call the arc_grid-internal ``camera._raw_render`` here: | |
| it is the correct native-resolution accessor (``render`` always upscales | |
| to 64x64). No public native-resolution accessor exists. | |
| Returns: | |
| A ``(height, width)`` numpy array of palette indices at native grid | |
| resolution. | |
| """ | |
| return self.camera._raw_render(self.current_level.get_sprites()) | |
| def current_frame(self) -> np.ndarray: | |
| """Render the current world as a 64x64 numpy palette frame. | |
| The ASCII conversion (palette index -> symbol via ``scenario.legend``) | |
| lives in the later ``ascii_view`` module; this exposes the raw frame | |
| for it to consume. | |
| Returns: | |
| A ``(64, 64)`` numpy array of palette indices. | |
| """ | |
| return self.camera.render(self.current_level.get_sprites()) | |