| """ |
| Observation rendering for Pyre (single-agent). |
| |
| Converts raw server state into: |
| - A first-person narrative string (the LLM's primary input) |
| - Structured fields in PyreObservation |
| |
| Visibility rules: |
| - Base radius: 5 (Manhattan distance) |
| - Moderate smoke in agent's cell: radius 3 |
| - Heavy smoke in agent's cell: radius 2 |
| - Walls block flood-fill propagation |
| |
| Health status labels: |
| 80–100 : Good |
| 50–79 : Moderate |
| 25–49 : Low |
| 0–24 : Critical |
| """ |
|
|
| from typing import Any, Dict, List, Optional, Set, Tuple |
|
|
| from .fire_sim import smoke_level_label, FIRE_BURNING, EXIT_BLOCKED_FIRE_THRESHOLD |
|
|
| FLOOR = 0 |
| WALL = 1 |
| DOOR_OPEN = 2 |
| DOOR_CLOSED = 3 |
| EXIT = 4 |
| OBSTACLE = 5 |
|
|
| _CARDINAL = [(0, -1, "north"), (0, 1, "south"), (-1, 0, "west"), (1, 0, "east")] |
| _DELTA_TO_DIR = {(0, -1): "north", (0, 1): "south", (-1, 0): "west", (1, 0): "east"} |
|
|
|
|
| def _idx(x: int, y: int, w: int) -> int: |
| return y * w + x |
|
|
|
|
| def _in_bounds(x: int, y: int, w: int, h: int) -> bool: |
| return 0 <= x < w and 0 <= y < h |
|
|
|
|
| def _manhattan(x1: int, y1: int, x2: int, y2: int) -> int: |
| return abs(x1 - x2) + abs(y1 - y2) |
|
|
|
|
| def _health_label(health: float) -> str: |
| if health >= 80: |
| return "Good" |
| if health >= 50: |
| return "Moderate" |
| if health >= 25: |
| return "Low" |
| return "Critical" |
|
|
|
|
| |
| |
| |
|
|
| def compute_visible_cells( |
| ax: int, ay: int, |
| cell_grid: List[int], |
| smoke_grid: List[float], |
| w: int, h: int, |
| ) -> Set[Tuple[int, int]]: |
| """BFS flood-fill from agent; walls block propagation.""" |
| agent_smoke = smoke_grid[_idx(ax, ay, w)] |
| label = smoke_level_label(agent_smoke) |
|
|
| if label == "heavy": |
| radius = 2 |
| elif label == "moderate": |
| radius = 3 |
| else: |
| radius = 5 |
|
|
| visible: Set[Tuple[int, int]] = {(ax, ay)} |
| queue = [(ax, ay, 0)] |
|
|
| while queue: |
| x, y, dist = queue.pop(0) |
| if dist >= radius: |
| continue |
| for dx, dy, _ in _CARDINAL: |
| nx, ny = x + dx, y + dy |
| if not _in_bounds(nx, ny, w, h): |
| continue |
| if (nx, ny) in visible: |
| continue |
| ct = cell_grid[_idx(nx, ny, w)] |
| if ct == WALL: |
| continue |
| visible.add((nx, ny)) |
| queue.append((nx, ny, dist + 1)) |
|
|
| return visible |
|
|
|
|
| |
| |
| |
|
|
| def build_narrative_observation( |
| step_count: int, |
| agent_x: int, |
| agent_y: int, |
| agent_alive: bool, |
| agent_evacuated: bool, |
| agent_health: float, |
| cell_grid: List[int], |
| fire_grid: List[float], |
| smoke_grid: List[float], |
| exit_positions: List[List[int]], |
| door_registry: Dict[str, List[int]], |
| zone_map: Dict[str, str], |
| last_action_feedback: str, |
| wind_dir: str, |
| w: int, |
| h: int, |
| visible_override: Optional[Set[Tuple[int, int]]] = None, |
| ) -> Dict[str, Any]: |
| """Build the full observation dict (matches PyreObservation fields).""" |
| if agent_evacuated: |
| return _terminal_obs(step_count, last_action_feedback, |
| narrative="You have safely evacuated the building.", |
| agent_evacuated=True, agent_health=agent_health) |
|
|
| if not agent_alive: |
| return _terminal_obs(step_count, last_action_feedback, |
| narrative="You have been overcome by fire and smoke.", |
| agent_evacuated=False, agent_health=0.0) |
|
|
| visible = visible_override if visible_override is not None else compute_visible_cells( |
| agent_x, agent_y, cell_grid, smoke_grid, w, h |
| ) |
|
|
| |
| agent_smoke = smoke_grid[_idx(agent_x, agent_y, w)] |
| smoke_label = smoke_level_label(agent_smoke) |
| health_label = _health_label(agent_health) |
|
|
| |
| fire_visible = False |
| fire_dir: Optional[str] = None |
| nearest_fire_dist = 999 |
| for vx, vy in visible: |
| if (vx, vy) == (agent_x, agent_y): |
| continue |
| if fire_grid[_idx(vx, vy, w)] >= FIRE_BURNING: |
| fire_visible = True |
| d = _manhattan(agent_x, agent_y, vx, vy) |
| if d < nearest_fire_dist: |
| nearest_fire_dist = d |
| dx = vx - agent_x |
| dy = vy - agent_y |
| if abs(dx) >= abs(dy): |
| fire_dir = "east" if dx > 0 else "west" |
| else: |
| fire_dir = "south" if dy > 0 else "north" |
|
|
| |
| visible_objects: List[Dict[str, Any]] = [] |
| door_pos_to_id = {(v[0], v[1]): k for k, v in door_registry.items()} |
| blocked_exit_ids: List[str] = [] |
|
|
| for vx, vy in visible: |
| ct = cell_grid[_idx(vx, vy, w)] |
| rel = _relative_pos_str(agent_x, agent_y, vx, vy) |
|
|
| if ct in (DOOR_OPEN, DOOR_CLOSED): |
| door_id = door_pos_to_id.get((vx, vy), f"door_{vx}_{vy}") |
| door_state = "open" if ct == DOOR_OPEN else "closed" |
| if fire_grid[_idx(vx, vy, w)] > 0.1: |
| door_state += " (hot)" |
| visible_objects.append({ |
| "id": door_id, "type": "door", |
| "relative_pos": rel, "state": door_state, |
| }) |
|
|
| elif ct == EXIT: |
| fire_at_exit = fire_grid[_idx(vx, vy, w)] |
| exit_id = f"exit_{vx}_{vy}" |
| if fire_at_exit >= EXIT_BLOCKED_FIRE_THRESHOLD: |
| exit_state = "BLOCKED by fire" |
| blocked_exit_ids.append(exit_id) |
| else: |
| exit_state = "open" |
| visible_objects.append({ |
| "id": exit_id, "type": "exit", |
| "relative_pos": rel, "state": exit_state, |
| }) |
|
|
| |
| visible_coords = {(vx, vy) for vx, vy in visible} |
| for ex in exit_positions: |
| ex_id = f"exit_{ex[0]}_{ex[1]}" |
| if (ex[0], ex[1]) not in visible_coords: |
| if fire_grid[_idx(ex[0], ex[1], w)] >= EXIT_BLOCKED_FIRE_THRESHOLD: |
| if ex_id not in blocked_exit_ids: |
| blocked_exit_ids.append(ex_id) |
|
|
| |
| audible: List[str] = [] |
| any_fire = any(fire_grid[i] >= FIRE_BURNING for i in range(w * h)) |
| if any_fire: |
| audible.append("Fire alarm sounding") |
| if smoke_label in ("moderate", "heavy"): |
| audible.append("Smoke detector beeping") |
| if agent_health < 50: |
| audible.append("Your own laboured breathing") |
|
|
| |
| location_label = zone_map.get(f"{agent_x},{agent_y}", "unknown area") |
|
|
| |
| action_hints = _build_action_hints( |
| agent_x, agent_y, cell_grid, visible, |
| visible_objects, door_registry, w, h |
| ) |
|
|
| |
| narrative = _compose_narrative( |
| location_label=location_label, |
| smoke_label=smoke_label, |
| fire_visible=fire_visible, |
| fire_dir=fire_dir, |
| agent_health=agent_health, |
| health_label=health_label, |
| wind_dir=wind_dir, |
| visible_objects=visible_objects, |
| blocked_exit_ids=blocked_exit_ids, |
| audible=audible, |
| last_action_feedback=last_action_feedback, |
| action_hints=action_hints, |
| ) |
|
|
| return { |
| "narrative": narrative, |
| "agent_evacuated": agent_evacuated, |
| "location_label": location_label, |
| "smoke_level": smoke_label, |
| "fire_visible": fire_visible, |
| "fire_direction": fire_dir, |
| "agent_health": agent_health, |
| "health_status": health_label, |
| "wind_dir": wind_dir, |
| "visible_objects": visible_objects, |
| "blocked_exit_ids": blocked_exit_ids, |
| "audible_signals": audible, |
| "elapsed_steps": step_count, |
| "last_action_feedback": last_action_feedback, |
| "available_actions_hint": action_hints, |
| "done": False, |
| "reward": 0.0, |
| } |
|
|
|
|
| |
| |
| |
|
|
| def _relative_pos_str(ax: int, ay: int, tx: int, ty: int) -> str: |
| dx, dy = tx - ax, ty - ay |
| dist = abs(dx) + abs(dy) |
| if dist == 0: |
| return "here" |
| if abs(dx) >= abs(dy): |
| return f"{dist}m {'east' if dx > 0 else 'west'}" |
| else: |
| return f"{dist}m {'south' if dy > 0 else 'north'}" |
|
|
|
|
| def _build_action_hints( |
| ax: int, ay: int, |
| cell_grid: List[int], |
| visible: Set[Tuple[int, int]], |
| visible_objects: List[Dict], |
| door_registry: Dict[str, List[int]], |
| w: int, h: int, |
| ) -> List[str]: |
| hints: List[str] = [] |
|
|
| |
| for dx, dy, dirname in _CARDINAL: |
| nx, ny = ax + dx, ay + dy |
| if _in_bounds(nx, ny, w, h): |
| ct = cell_grid[_idx(nx, ny, w)] |
| if ct in (FLOOR, DOOR_OPEN, EXIT): |
| hints.append(f"move(direction='{dirname}')") |
|
|
| |
| for obj in visible_objects: |
| if obj["type"] == "door": |
| did = obj["id"] |
| if "closed" in obj["state"]: |
| hints.append(f"door(target_id='{did}', door_state='open')") |
| else: |
| hints.append(f"door(target_id='{did}', door_state='close')") |
|
|
| hints.append("wait()") |
| return hints |
|
|
|
|
| def _compose_narrative( |
| location_label: str, |
| smoke_label: str, |
| fire_visible: bool, |
| fire_dir: Optional[str], |
| agent_health: float, |
| health_label: str, |
| wind_dir: str, |
| visible_objects: List[Dict], |
| blocked_exit_ids: List[str], |
| audible: List[str], |
| last_action_feedback: str, |
| action_hints: List[str], |
| ) -> str: |
| lines = [] |
|
|
| |
| lines.append(f"You are in the **{location_label}**. The air is **{smoke_label}**.") |
|
|
| |
| health_bar = _health_bar(agent_health) |
| wind_str = f"Wind: **{wind_dir}**" if wind_dir != "CALM" else "Wind: calm" |
| lines.append(f"Health: {health_bar} ({health_label}) | {wind_str}") |
|
|
| |
| if fire_visible and fire_dir: |
| lines.append(f"Flames are visible to the **{fire_dir}**.") |
| else: |
| lines.append("No fire directly visible.") |
|
|
| |
| exits_vis = [o for o in visible_objects if o["type"] == "exit"] |
| doors_vis = [o for o in visible_objects if o["type"] == "door"] |
|
|
| if exits_vis: |
| exit_descs = [] |
| for o in exits_vis: |
| status = " **[BLOCKED]**" if o["state"].startswith("BLOCKED") else "" |
| exit_descs.append(f"{o['id']}{status} at {o['relative_pos']}") |
| lines.append(f"Exit{'s' if len(exits_vis) > 1 else ''} visible: {', '.join(exit_descs)}.") |
|
|
| if doors_vis: |
| door_descs = [f"{o['id']} ({o['state']}) at {o['relative_pos']}" for o in doors_vis] |
| lines.append(f"Door{'s' if len(doors_vis) > 1 else ''}: {', '.join(door_descs)}.") |
|
|
| if blocked_exit_ids: |
| lines.append(f"WARNING: {len(blocked_exit_ids)} exit(s) blocked by fire — find an alternative route.") |
|
|
| |
| if audible: |
| lines.append(f"You hear: {'; '.join(audible)}.") |
|
|
| |
| if last_action_feedback: |
| lines.append(f"Last action: {last_action_feedback}") |
|
|
| |
| if action_hints: |
| hints_str = " ".join(action_hints[:8]) |
| lines.append(f"Available actions: {hints_str}") |
|
|
| return "\n".join(lines) |
|
|
|
|
| def _health_bar(health: float) -> str: |
| filled = int(health / 10) |
| empty = 10 - filled |
| return "█" * filled + "░" * empty + f" {int(health)}/100" |
|
|
|
|
| |
| |
| |
|
|
| def build_look_result( |
| direction: str, |
| agent_x: int, |
| agent_y: int, |
| cell_grid: List[int], |
| fire_grid: List[float], |
| smoke_grid: List[float], |
| zone_map: Dict[str, str], |
| door_registry: Dict[str, List[int]], |
| w: int, |
| h: int, |
| ) -> str: |
| """Generate a detailed description of cells in one cardinal direction. |
| |
| Scans up to 5 cells from the agent's position in `direction`, stopping |
| at the first wall or out-of-bounds cell. Returns a sentence describing |
| each visible cell — smoke level, fire presence, door/exit status, zone. |
| """ |
| delta = { |
| "north": (0, -1), "south": (0, 1), |
| "west": (-1, 0), "east": (1, 0), |
| }.get(direction) |
| if delta is None: |
| return f"Unknown direction '{direction}'." |
|
|
| dx, dy = delta |
| door_pos_to_id = {(v[0], v[1]): k for k, v in door_registry.items()} |
| lines = [f"Looking **{direction}**:"] |
| nothing_visible = True |
|
|
| for dist in range(1, 6): |
| nx, ny = agent_x + dx * dist, agent_y + dy * dist |
| if not _in_bounds(nx, ny, w, h): |
| lines.append(f" {dist}m — outer wall.") |
| break |
|
|
| ct = cell_grid[_idx(nx, ny, w)] |
| if ct == WALL: |
| lines.append(f" {dist}m — wall.") |
| break |
|
|
| nothing_visible = False |
| parts: List[str] = [] |
|
|
| |
| if ct == EXIT: |
| fire_at = fire_grid[_idx(nx, ny, w)] |
| status = "BLOCKED by fire" if fire_at >= EXIT_BLOCKED_FIRE_THRESHOLD else "clear" |
| parts.append(f"**EXIT** ({status})") |
| elif ct == DOOR_OPEN: |
| door_id = door_pos_to_id.get((nx, ny), "door") |
| parts.append(f"open door [{door_id}]") |
| elif ct == DOOR_CLOSED: |
| door_id = door_pos_to_id.get((nx, ny), "door") |
| parts.append(f"closed door [{door_id}]") |
| elif ct == OBSTACLE: |
| parts.append("burnt rubble (impassable)") |
| else: |
| zone = zone_map.get(f"{nx},{ny}", "") |
| if zone: |
| parts.append(zone.replace("_", " ")) |
| else: |
| parts.append("open floor") |
|
|
| |
| smoke = smoke_grid[_idx(nx, ny, w)] |
| s_label = smoke_level_label(smoke) |
| if s_label != "none": |
| parts.append(f"**{s_label} smoke**") |
|
|
| |
| fire = fire_grid[_idx(nx, ny, w)] |
| if fire >= FIRE_BURNING: |
| parts.append("**actively burning**") |
| elif fire > 0.1: |
| parts.append("smoldering heat") |
|
|
| lines.append(f" {dist}m — {', '.join(parts)}.") |
|
|
| if nothing_visible: |
| lines.append(" Nothing visible in this direction.") |
|
|
| return "\n".join(lines) |
|
|
|
|
| def _terminal_obs( |
| step_count: int, |
| last_action_feedback: str, |
| narrative: str, |
| agent_evacuated: bool = False, |
| agent_health: float = 0.0, |
| ) -> Dict[str, Any]: |
| return { |
| "narrative": narrative, |
| "agent_evacuated": agent_evacuated, |
| "location_label": "", |
| "smoke_level": "none", |
| "fire_visible": False, |
| "fire_direction": None, |
| "agent_health": agent_health, |
| "health_status": _health_label(agent_health), |
| "wind_dir": "CALM", |
| "visible_objects": [], |
| "blocked_exit_ids": [], |
| "audible_signals": [], |
| "elapsed_steps": step_count, |
| "last_action_feedback": last_action_feedback, |
| "available_actions_hint": [], |
| "done": True, |
| "reward": 0.0, |
| } |
|
|