Spaces:
Sleeping
Sleeping
| """ | |
| Observation rendering for Pyre (single-agent). | |
| Converts raw server state into: | |
| - A first-person narrative string (the LLM's primary input) | |
| - Structured fields in PyreObservation | |
| Visibility rules: | |
| - Base radius: 5 (Manhattan distance) | |
| - Moderate smoke in agent's cell: radius 3 | |
| - Heavy smoke in agent's cell: radius 2 | |
| - Walls block flood-fill propagation | |
| Health status labels: | |
| 80–100 : Good | |
| 50–79 : Moderate | |
| 25–49 : Low | |
| 0–24 : Critical | |
| """ | |
| from typing import Any, Dict, List, Optional, Set, Tuple | |
| from .fire_sim import smoke_level_label, FIRE_BURNING, EXIT_BLOCKED_FIRE_THRESHOLD | |
| FLOOR = 0 | |
| WALL = 1 | |
| DOOR_OPEN = 2 | |
| DOOR_CLOSED = 3 | |
| EXIT = 4 | |
| OBSTACLE = 5 | |
| _CARDINAL = [(0, -1, "north"), (0, 1, "south"), (-1, 0, "west"), (1, 0, "east")] | |
| _DELTA_TO_DIR = {(0, -1): "north", (0, 1): "south", (-1, 0): "west", (1, 0): "east"} | |
| def _idx(x: int, y: int, w: int) -> int: | |
| return y * w + x | |
| def _in_bounds(x: int, y: int, w: int, h: int) -> bool: | |
| return 0 <= x < w and 0 <= y < h | |
| def _manhattan(x1: int, y1: int, x2: int, y2: int) -> int: | |
| return abs(x1 - x2) + abs(y1 - y2) | |
| def _health_label(health: float) -> str: | |
| if health >= 80: | |
| return "Good" | |
| if health >= 50: | |
| return "Moderate" | |
| if health >= 25: | |
| return "Low" | |
| return "Critical" | |
| # --------------------------------------------------------------------------- | |
| # Visibility computation | |
| # --------------------------------------------------------------------------- | |
| def compute_visible_cells( | |
| ax: int, ay: int, | |
| cell_grid: List[int], | |
| smoke_grid: List[float], | |
| w: int, h: int, | |
| ) -> Set[Tuple[int, int]]: | |
| """BFS flood-fill from agent; walls block propagation.""" | |
| agent_smoke = smoke_grid[_idx(ax, ay, w)] | |
| label = smoke_level_label(agent_smoke) | |
| if label == "heavy": | |
| radius = 2 | |
| elif label == "moderate": | |
| radius = 3 | |
| else: | |
| radius = 5 | |
| visible: Set[Tuple[int, int]] = {(ax, ay)} | |
| queue = [(ax, ay, 0)] | |
| while queue: | |
| x, y, dist = queue.pop(0) | |
| if dist >= radius: | |
| continue | |
| for dx, dy, _ in _CARDINAL: | |
| nx, ny = x + dx, y + dy | |
| if not _in_bounds(nx, ny, w, h): | |
| continue | |
| if (nx, ny) in visible: | |
| continue | |
| ct = cell_grid[_idx(nx, ny, w)] | |
| if ct == WALL: | |
| continue | |
| visible.add((nx, ny)) | |
| queue.append((nx, ny, dist + 1)) | |
| return visible | |
| # --------------------------------------------------------------------------- | |
| # Main builder | |
| # --------------------------------------------------------------------------- | |
| def build_narrative_observation( | |
| step_count: int, | |
| agent_x: int, | |
| agent_y: int, | |
| agent_alive: bool, | |
| agent_evacuated: bool, | |
| agent_health: float, | |
| cell_grid: List[int], | |
| fire_grid: List[float], | |
| smoke_grid: List[float], | |
| exit_positions: List[List[int]], | |
| door_registry: Dict[str, List[int]], | |
| zone_map: Dict[str, str], | |
| last_action_feedback: str, | |
| wind_dir: str, | |
| w: int, | |
| h: int, | |
| ) -> Dict[str, Any]: | |
| """Build the full observation dict (matches PyreObservation fields).""" | |
| if agent_evacuated: | |
| return _terminal_obs(step_count, last_action_feedback, | |
| narrative="You have safely evacuated the building.", | |
| agent_evacuated=True, agent_health=agent_health) | |
| if not agent_alive: | |
| return _terminal_obs(step_count, last_action_feedback, | |
| narrative="You have been overcome by fire and smoke.", | |
| agent_evacuated=False, agent_health=0.0) | |
| visible = compute_visible_cells(agent_x, agent_y, cell_grid, smoke_grid, w, h) | |
| # --- Agent cell conditions --- | |
| agent_smoke = smoke_grid[_idx(agent_x, agent_y, w)] | |
| smoke_label = smoke_level_label(agent_smoke) | |
| health_label = _health_label(agent_health) | |
| # --- Fire visibility --- | |
| fire_visible = False | |
| fire_dir: Optional[str] = None | |
| nearest_fire_dist = 999 | |
| for vx, vy in visible: | |
| if (vx, vy) == (agent_x, agent_y): | |
| continue | |
| if fire_grid[_idx(vx, vy, w)] >= FIRE_BURNING: | |
| fire_visible = True | |
| d = _manhattan(agent_x, agent_y, vx, vy) | |
| if d < nearest_fire_dist: | |
| nearest_fire_dist = d | |
| dx = vx - agent_x | |
| dy = vy - agent_y | |
| if abs(dx) >= abs(dy): | |
| fire_dir = "east" if dx > 0 else "west" | |
| else: | |
| fire_dir = "south" if dy > 0 else "north" | |
| # --- Visible objects (doors and exits) --- | |
| visible_objects: List[Dict[str, Any]] = [] | |
| door_pos_to_id = {(v[0], v[1]): k for k, v in door_registry.items()} | |
| blocked_exit_ids: List[str] = [] | |
| for vx, vy in visible: | |
| ct = cell_grid[_idx(vx, vy, w)] | |
| rel = _relative_pos_str(agent_x, agent_y, vx, vy) | |
| if ct in (DOOR_OPEN, DOOR_CLOSED): | |
| door_id = door_pos_to_id.get((vx, vy), f"door_{vx}_{vy}") | |
| door_state = "open" if ct == DOOR_OPEN else "closed" | |
| if fire_grid[_idx(vx, vy, w)] > 0.1: | |
| door_state += " (hot)" | |
| visible_objects.append({ | |
| "id": door_id, "type": "door", | |
| "relative_pos": rel, "state": door_state, | |
| }) | |
| elif ct == EXIT: | |
| fire_at_exit = fire_grid[_idx(vx, vy, w)] | |
| exit_id = f"exit_{vx}_{vy}" | |
| if fire_at_exit >= EXIT_BLOCKED_FIRE_THRESHOLD: | |
| exit_state = "BLOCKED by fire" | |
| blocked_exit_ids.append(exit_id) | |
| else: | |
| exit_state = "open" | |
| visible_objects.append({ | |
| "id": exit_id, "type": "exit", | |
| "relative_pos": rel, "state": exit_state, | |
| }) | |
| # Exits not visible — still flag blocked ones from known positions | |
| visible_coords = {(vx, vy) for vx, vy in visible} | |
| for ex in exit_positions: | |
| ex_id = f"exit_{ex[0]}_{ex[1]}" | |
| if (ex[0], ex[1]) not in visible_coords: | |
| if fire_grid[_idx(ex[0], ex[1], w)] >= EXIT_BLOCKED_FIRE_THRESHOLD: | |
| if ex_id not in blocked_exit_ids: | |
| blocked_exit_ids.append(ex_id) | |
| # --- Audible signals --- | |
| audible: List[str] = [] | |
| any_fire = any(fire_grid[i] >= FIRE_BURNING for i in range(w * h)) | |
| if any_fire: | |
| audible.append("Fire alarm sounding") | |
| if smoke_label in ("moderate", "heavy"): | |
| audible.append("Smoke detector beeping") | |
| if agent_health < 50: | |
| audible.append("Your own laboured breathing") | |
| # --- Zone label --- | |
| location_label = zone_map.get(f"{agent_x},{agent_y}", "unknown area") | |
| # --- Action hints --- | |
| action_hints = _build_action_hints( | |
| agent_x, agent_y, cell_grid, visible, | |
| visible_objects, door_registry, w, h | |
| ) | |
| # --- Narrative --- | |
| narrative = _compose_narrative( | |
| location_label=location_label, | |
| smoke_label=smoke_label, | |
| fire_visible=fire_visible, | |
| fire_dir=fire_dir, | |
| agent_health=agent_health, | |
| health_label=health_label, | |
| wind_dir=wind_dir, | |
| visible_objects=visible_objects, | |
| blocked_exit_ids=blocked_exit_ids, | |
| audible=audible, | |
| last_action_feedback=last_action_feedback, | |
| action_hints=action_hints, | |
| ) | |
| return { | |
| "narrative": narrative, | |
| "agent_evacuated": agent_evacuated, | |
| "location_label": location_label, | |
| "smoke_level": smoke_label, | |
| "fire_visible": fire_visible, | |
| "fire_direction": fire_dir, | |
| "agent_health": agent_health, | |
| "health_status": health_label, | |
| "wind_dir": wind_dir, | |
| "visible_objects": visible_objects, | |
| "blocked_exit_ids": blocked_exit_ids, | |
| "audible_signals": audible, | |
| "elapsed_steps": step_count, | |
| "last_action_feedback": last_action_feedback, | |
| "available_actions_hint": action_hints, | |
| "done": False, | |
| "reward": 0.0, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| def _relative_pos_str(ax: int, ay: int, tx: int, ty: int) -> str: | |
| dx, dy = tx - ax, ty - ay | |
| dist = abs(dx) + abs(dy) | |
| if dist == 0: | |
| return "here" | |
| if abs(dx) >= abs(dy): | |
| return f"{dist}m {'east' if dx > 0 else 'west'}" | |
| else: | |
| return f"{dist}m {'south' if dy > 0 else 'north'}" | |
| def _build_action_hints( | |
| ax: int, ay: int, | |
| cell_grid: List[int], | |
| visible: Set[Tuple[int, int]], | |
| visible_objects: List[Dict], | |
| door_registry: Dict[str, List[int]], | |
| w: int, h: int, | |
| ) -> List[str]: | |
| hints: List[str] = [] | |
| # Movement and look hints per direction | |
| for dx, dy, dirname in _CARDINAL: | |
| nx, ny = ax + dx, ay + dy | |
| if _in_bounds(nx, ny, w, h): | |
| ct = cell_grid[_idx(nx, ny, w)] | |
| if ct in (FLOOR, DOOR_OPEN, EXIT): | |
| hints.append(f"move(direction='{dirname}')") | |
| # Suggest look for any non-wall direction (including closed doors/obstacles) | |
| if ct != WALL: | |
| hints.append(f"look(direction='{dirname}')") | |
| # Door actions | |
| for obj in visible_objects: | |
| if obj["type"] == "door": | |
| did = obj["id"] | |
| if "closed" in obj["state"]: | |
| hints.append(f"door(target_id='{did}', door_state='open')") | |
| else: | |
| hints.append(f"door(target_id='{did}', door_state='close')") | |
| hints.append("wait()") | |
| return hints | |
| def _compose_narrative( | |
| location_label: str, | |
| smoke_label: str, | |
| fire_visible: bool, | |
| fire_dir: Optional[str], | |
| agent_health: float, | |
| health_label: str, | |
| wind_dir: str, | |
| visible_objects: List[Dict], | |
| blocked_exit_ids: List[str], | |
| audible: List[str], | |
| last_action_feedback: str, | |
| action_hints: List[str], | |
| ) -> str: | |
| lines = [] | |
| # Location + atmosphere | |
| lines.append(f"You are in the **{location_label}**. The air is **{smoke_label}**.") | |
| # Health + wind | |
| health_bar = _health_bar(agent_health) | |
| wind_str = f"Wind: **{wind_dir}**" if wind_dir != "CALM" else "Wind: calm" | |
| lines.append(f"Health: {health_bar} ({health_label}) | {wind_str}") | |
| # Fire | |
| if fire_visible and fire_dir: | |
| lines.append(f"Flames are visible to the **{fire_dir}**.") | |
| else: | |
| lines.append("No fire directly visible.") | |
| # Objects (exits and doors) | |
| exits_vis = [o for o in visible_objects if o["type"] == "exit"] | |
| doors_vis = [o for o in visible_objects if o["type"] == "door"] | |
| if exits_vis: | |
| exit_descs = [] | |
| for o in exits_vis: | |
| status = " **[BLOCKED]**" if o["state"].startswith("BLOCKED") else "" | |
| exit_descs.append(f"{o['id']}{status} at {o['relative_pos']}") | |
| lines.append(f"Exit{'s' if len(exits_vis) > 1 else ''} visible: {', '.join(exit_descs)}.") | |
| if doors_vis: | |
| door_descs = [f"{o['id']} ({o['state']}) at {o['relative_pos']}" for o in doors_vis] | |
| lines.append(f"Door{'s' if len(doors_vis) > 1 else ''}: {', '.join(door_descs)}.") | |
| if blocked_exit_ids: | |
| lines.append(f"WARNING: {len(blocked_exit_ids)} exit(s) blocked by fire — find an alternative route.") | |
| # Sound | |
| if audible: | |
| lines.append(f"You hear: {'; '.join(audible)}.") | |
| # Last action | |
| if last_action_feedback: | |
| lines.append(f"Last action: {last_action_feedback}") | |
| # Available actions | |
| if action_hints: | |
| hints_str = " ".join(action_hints[:8]) | |
| lines.append(f"Available actions: {hints_str}") | |
| return "\n".join(lines) | |
| def _health_bar(health: float) -> str: | |
| filled = int(health / 10) | |
| empty = 10 - filled | |
| return "█" * filled + "░" * empty + f" {int(health)}/100" | |
| # --------------------------------------------------------------------------- | |
| # Look action support | |
| # --------------------------------------------------------------------------- | |
| def build_look_result( | |
| direction: str, | |
| agent_x: int, | |
| agent_y: int, | |
| cell_grid: List[int], | |
| fire_grid: List[float], | |
| smoke_grid: List[float], | |
| zone_map: Dict[str, str], | |
| door_registry: Dict[str, List[int]], | |
| w: int, | |
| h: int, | |
| ) -> str: | |
| """Generate a detailed description of cells in one cardinal direction. | |
| Scans up to 5 cells from the agent's position in `direction`, stopping | |
| at the first wall or out-of-bounds cell. Returns a sentence describing | |
| each visible cell — smoke level, fire presence, door/exit status, zone. | |
| """ | |
| delta = { | |
| "north": (0, -1), "south": (0, 1), | |
| "west": (-1, 0), "east": (1, 0), | |
| }.get(direction) | |
| if delta is None: | |
| return f"Unknown direction '{direction}'." | |
| dx, dy = delta | |
| door_pos_to_id = {(v[0], v[1]): k for k, v in door_registry.items()} | |
| lines = [f"Looking **{direction}**:"] | |
| nothing_visible = True | |
| for dist in range(1, 6): | |
| nx, ny = agent_x + dx * dist, agent_y + dy * dist | |
| if not _in_bounds(nx, ny, w, h): | |
| lines.append(f" {dist}m — outer wall.") | |
| break | |
| ct = cell_grid[_idx(nx, ny, w)] | |
| if ct == WALL: | |
| lines.append(f" {dist}m — wall.") | |
| break | |
| nothing_visible = False | |
| parts: List[str] = [] | |
| # Cell type label | |
| if ct == EXIT: | |
| fire_at = fire_grid[_idx(nx, ny, w)] | |
| status = "BLOCKED by fire" if fire_at >= EXIT_BLOCKED_FIRE_THRESHOLD else "clear" | |
| parts.append(f"**EXIT** ({status})") | |
| elif ct == DOOR_OPEN: | |
| door_id = door_pos_to_id.get((nx, ny), "door") | |
| parts.append(f"open door [{door_id}]") | |
| elif ct == DOOR_CLOSED: | |
| door_id = door_pos_to_id.get((nx, ny), "door") | |
| parts.append(f"closed door [{door_id}]") | |
| elif ct == OBSTACLE: | |
| parts.append("burnt rubble (impassable)") | |
| else: | |
| zone = zone_map.get(f"{nx},{ny}", "") | |
| if zone: | |
| parts.append(zone.replace("_", " ")) | |
| else: | |
| parts.append("open floor") | |
| # Smoke | |
| smoke = smoke_grid[_idx(nx, ny, w)] | |
| s_label = smoke_level_label(smoke) | |
| if s_label != "none": | |
| parts.append(f"**{s_label} smoke**") | |
| # Fire | |
| fire = fire_grid[_idx(nx, ny, w)] | |
| if fire >= FIRE_BURNING: | |
| parts.append("**actively burning**") | |
| elif fire > 0.1: | |
| parts.append("smoldering heat") | |
| lines.append(f" {dist}m — {', '.join(parts)}.") | |
| if nothing_visible: | |
| lines.append(" Nothing visible in this direction.") | |
| return "\n".join(lines) | |
| def _terminal_obs( | |
| step_count: int, | |
| last_action_feedback: str, | |
| narrative: str, | |
| agent_evacuated: bool = False, | |
| agent_health: float = 0.0, | |
| ) -> Dict[str, Any]: | |
| return { | |
| "narrative": narrative, | |
| "agent_evacuated": agent_evacuated, | |
| "location_label": "", | |
| "smoke_level": "none", | |
| "fire_visible": False, | |
| "fire_direction": None, | |
| "agent_health": agent_health, | |
| "health_status": _health_label(agent_health), | |
| "wind_dir": "CALM", | |
| "visible_objects": [], | |
| "blocked_exit_ids": [], | |
| "audible_signals": [], | |
| "elapsed_steps": step_count, | |
| "last_action_feedback": last_action_feedback, | |
| "available_actions_hint": [], | |
| "done": True, | |
| "reward": 0.0, | |
| } | |