irregular6612's picture
feat(errand): no move limit — ends only on reaching the house (analysis) or zero health
bb1f1e7
Raw
History Blame Contribute Delete
12.3 kB
"""MotiveGridGame — the shared per-turn game loop for the motive_grid family.
``MotiveGridGame`` subclasses ``proteus.game.engine.ARCBaseGame``. It owns the
turn loop and delegates all scenario-specific behaviour (level construction,
threat motion, elimination, answer keys) to an injected :class:`Scenario`.
One ``perform_action`` call drives exactly one turn (the engine calls
``step()`` repeatedly until ``complete_action()``; this game completes the
action in a single ``step``). The public surface is
:meth:`apply_motive_action`, which accepts a plain action string so callers
(the future ``MotiveGridModule``) never touch the ``GameAction`` enums.
See ``docs/superpowers/specs/2026-06-01-motive-grid-design.md`` §3.
"""
from __future__ import annotations
import random
from typing import TYPE_CHECKING
import numpy as np
from proteus.game.engine import (
ARCBaseGame,
ActionInput,
Camera,
GameAction,
GameState,
Sprite,
)
if TYPE_CHECKING:
from ..scenarios.base import Scenario
# Action string -> (dx, dy) grid delta. y grows downward (arc_grid convention).
# "interact" is a no-move action whose effect is delegated to scenario.on_interact.
_DIRECTION_DELTAS: dict[str, tuple[int, int]] = {
"up": (0, -1),
"down": (0, 1),
"left": (-1, 0),
"right": (1, 0),
"stay": (0, 0),
"interact": (0, 0),
}
# Action string -> engine GameAction (per spec §3: up/down/left/right =
# ACTION1..4, stay = ACTION5).
_ACTION_TO_GAMEACTION: dict[str, GameAction] = {
"up": GameAction.ACTION1,
"down": GameAction.ACTION2,
"left": GameAction.ACTION3,
"right": GameAction.ACTION4,
"stay": GameAction.ACTION5,
}
# Reverse map: engine GameAction -> action string, for step() dispatch.
_GAMEACTION_TO_ACTION: dict[GameAction, str] = {
v: k for k, v in _ACTION_TO_GAMEACTION.items()
}
# Sprite names the scenario contract relies on.
FOCAL_NAME = "focal"
PREDATOR_NAME = "predator"
class MotiveGridGame(ARCBaseGame):
"""Shared turn loop for motive_grid scenarios.
The game is constructed with a concrete :class:`Scenario`, builds its world
via ``scenario.build_level``, and runs each turn through :meth:`step`:
1. Move the focal sprite (internal-wall collision or an off-grid target
keeps it in place).
2. ``scenario.advance_threat`` moves the predator/NPCs.
3. If ``scenario.check_elimination`` -> :meth:`lose`.
4. Else if ``scenario.check_success`` -> :meth:`win` (early win).
5. Else if the step budget is exhausted -> :meth:`win`.
6. ``complete_action`` ends the turn.
Boundary handling: arc_grid enforces no world border, so the framework
guards the grid edge for the focal agent itself. Every scenario uses
``camera == grid_size`` with no panning, so a focal move whose target cell
falls outside ``[0, width) x [0, height)`` is rejected (see
:meth:`within_bounds`). Internal walls (dead-ends, obstacles) are still
handled by ``try_move_sprite``. Scenarios therefore need not ring the grid
with perimeter walls, but DO own their own threat sprites' bounds.
"""
def __init__(
self,
scenario: Scenario,
rng: random.Random,
difficulty,
max_steps: int,
) -> None:
"""Initialize the game from a scenario.
Args:
scenario: The injected scenario driving world/threat/answer-key
behaviour.
rng: Seeded RNG, threaded into level construction and threat
tie-breaks for determinism.
difficulty: Session difficulty (passed through to the scenario).
max_steps: Survival budget; surviving this many played turns wins.
"""
level = scenario.build_level(rng, difficulty)
width, height = scenario.grid_size
camera = Camera(width=width, height=height)
# Direction actions (1..5) only; placeable/click action 6 is unused.
super().__init__(
game_id=scenario.task_name,
levels=[level],
camera=camera,
available_actions=[1, 2, 3, 4, 5],
)
self.scenario = scenario
self.rng = rng
self.difficulty = difficulty
self.max_steps = max_steps
self.step_count = 0
# Health is opt-in: scenarios that don't use it return 0 (inert).
self.health = scenario.health_start(difficulty)
# The plain action string committed this turn (read by step()); set by
# apply_motive_action. "interact" cannot round-trip through GameAction
# (ACTION6 is a ComplexAction), so we track the string directly.
self._last_motive_action = "stay"
# ------------------------------------------------------------------ #
# Public turn surface
# ------------------------------------------------------------------ #
def apply_motive_action(self, action: str):
"""Play one turn from a plain action string.
Wraps the action string in an ``ActionInput`` and drives one turn via
the engine's ``perform_action`` (which calls :meth:`step`). This is the
public surface so callers never build ``GameAction`` enums themselves.
Args:
action: One of ``"up"``, ``"down"``, ``"left"``, ``"right"``,
``"stay"``.
Returns:
The ``FrameData`` returned by ``perform_action``.
Raises:
ValueError: If *action* is not a recognised direction string.
"""
if action == "interact":
# Engine no-move (ACTION5); the effect is applied in step() via
# scenario.on_interact. We bypass _ACTION_TO_GAMEACTION so the
# GameAction reverse-map stays unambiguous.
self._last_motive_action = "interact"
return self.perform_action(ActionInput(id=_ACTION_TO_GAMEACTION["stay"]))
if action not in _ACTION_TO_GAMEACTION:
valid = ", ".join([*_ACTION_TO_GAMEACTION, "interact"])
raise ValueError(f"Unknown action '{action}'. Valid: {valid}")
self._last_motive_action = action
action_input = ActionInput(id=_ACTION_TO_GAMEACTION[action])
return self.perform_action(action_input)
def damage(self, amount: int) -> None:
"""Reduce ``health`` by *amount*, clamped at 0 (never negative).
Inert for scenarios that start at 0 health and never check it; HP
scenarios call this from ``scenario.on_step_effects`` / ``on_interact``.
"""
self.health = max(0, self.health - amount)
def step(self) -> None:
"""Resolve a single turn, honouring ``scenario.turn_order``.
focal_first (default): move focal, then advance threat (chasing the new
focal cell). predator_first: advance threat first (chasing the current
focal cell), then move focal. Capture/survival are checked once, after
both have moved. Always completes the action so the engine loop ends.
"""
action = self._last_motive_action
dx, dy = _DIRECTION_DELTAS.get(action, (0, 0))
focal = self.focal_sprite
predator_first = (
getattr(self.scenario, "turn_order", "focal_first") == "predator_first"
)
if predator_first:
self.scenario.advance_threat(self)
if (
focal is not None
and (dx != 0 or dy != 0)
and self._footprint_in_bounds(focal, dx, dy)
):
self.try_move_sprite(focal, dx, dy)
if not predator_first:
self.scenario.advance_threat(self)
self.step_count += 1
# New per-turn hooks (default no-op): environmental effects, then the
# focal's interact effect, then terminal checks (elimination > success
# > curfew). Existing scenarios override none of these.
self.scenario.on_step_effects(self)
if action == "interact":
self.scenario.on_interact(self)
if self.scenario.check_elimination(self):
self.lose()
elif self.scenario.check_success(self):
self.win()
elif self.scenario.turn_limited and self.step_count >= self.max_steps:
self.win()
self.complete_action()
# ------------------------------------------------------------------ #
# Sprite / state accessors for scenarios
# ------------------------------------------------------------------ #
def within_bounds(self, x: int, y: int) -> bool:
"""Return True if cell ``(x, y)`` lies inside the scenario grid.
Every motive_grid scenario uses ``camera == grid_size`` with no
panning, so the grid is the world. A move whose target cell falls
outside ``[0, width) x [0, height)`` is rejected, keeping sprites
on-screen without each scenario needing a perimeter wall. Public so a
scenario's ``advance_threat`` hook can reuse it to keep its own threat
sprites on-grid (the framework only guards the focal agent).
Args:
x: Grid column (cell x-coordinate).
y: Grid row (cell y-coordinate).
"""
width, height = self.scenario.grid_size
return 0 <= x < width and 0 <= y < height
def _footprint_in_bounds(self, sprite, dx: int, dy: int) -> bool:
"""Return True if *sprite*'s full footprint stays on-grid after (dx, dy).
For a 1x1 sprite this is exactly ``within_bounds(x+dx, y+dy)`` (so the
single-cell scenarios are unaffected); for a multi-cell sprite it checks
every footprint cell.
"""
nx, ny = sprite.x + dx, sprite.y + dy
for j in range(sprite.height):
for i in range(sprite.width):
if not self.within_bounds(nx + i, ny + j):
return False
return True
def get_sprite(self, name: str) -> Sprite | None:
"""Return the first sprite with *name*, or ``None`` if absent.
Args:
name: The sprite name to look up on the current level.
Returns:
The matching :class:`~proteus.game.engine.Sprite`, or ``None``.
"""
sprites = self.current_level.get_sprites_by_name(name)
return sprites[0] if sprites else None
@property
def focal_sprite(self) -> Sprite | None:
"""The model-controlled focal sprite (``name="focal"``)."""
return self.get_sprite(FOCAL_NAME)
@property
def predator_sprite(self) -> Sprite | None:
"""The threat/predator sprite (``name="predator"``)."""
return self.get_sprite(PREDATOR_NAME)
@property
def eliminated(self) -> bool:
"""Whether the focal agent has been eliminated (game over)."""
return self._state == GameState.GAME_OVER
@property
def survived(self) -> bool:
"""Whether the focal agent won (survived the step budget or achieved an early goal)."""
return self._state == GameState.WIN
def current_grid(self) -> np.ndarray:
"""Render the current world at native resolution (for ASCII).
Returns the native ``width x height`` palette array (e.g. ``(8, 8)`` for
an 8x8 grid), one palette index per cell, which ``ascii_view`` converts
to text. This is distinct from :meth:`current_frame`, which returns the
64x64 upscaled render used for PNG/debug output.
We deliberately call the arc_grid-internal ``camera._raw_render`` here:
it is the correct native-resolution accessor (``render`` always upscales
to 64x64). No public native-resolution accessor exists.
Returns:
A ``(height, width)`` numpy array of palette indices at native grid
resolution.
"""
return self.camera._raw_render(self.current_level.get_sprites())
def current_frame(self) -> np.ndarray:
"""Render the current world as a 64x64 numpy palette frame.
The ASCII conversion (palette index -> symbol via ``scenario.legend``)
lives in the later ``ascii_view`` module; this exposes the raw frame
for it to consume.
Returns:
A ``(64, 64)`` numpy array of palette indices.
"""
return self.camera.render(self.current_level.get_sprites())