pyre_env / server /narrative.py
Akshaykumarbm's picture
Upload folder using huggingface_hub
443c22e verified
"""
Observation rendering for Pyre (single-agent).
Converts raw server state into:
- A first-person narrative string (the LLM's primary input)
- Structured fields in PyreObservation
Visibility rules:
- Base radius: 5 (Manhattan distance)
- Moderate smoke in agent's cell: radius 3
- Heavy smoke in agent's cell: radius 2
- Walls block flood-fill propagation
Health status labels:
80–100 : Good
50–79 : Moderate
25–49 : Low
0–24 : Critical
"""
from typing import Any, Dict, List, Optional, Set, Tuple
from .fire_sim import smoke_level_label, FIRE_BURNING, EXIT_BLOCKED_FIRE_THRESHOLD
FLOOR = 0
WALL = 1
DOOR_OPEN = 2
DOOR_CLOSED = 3
EXIT = 4
OBSTACLE = 5
_CARDINAL = [(0, -1, "north"), (0, 1, "south"), (-1, 0, "west"), (1, 0, "east")]
_DELTA_TO_DIR = {(0, -1): "north", (0, 1): "south", (-1, 0): "west", (1, 0): "east"}
def _idx(x: int, y: int, w: int) -> int:
return y * w + x
def _in_bounds(x: int, y: int, w: int, h: int) -> bool:
return 0 <= x < w and 0 <= y < h
def _manhattan(x1: int, y1: int, x2: int, y2: int) -> int:
return abs(x1 - x2) + abs(y1 - y2)
def _health_label(health: float) -> str:
if health >= 80:
return "Good"
if health >= 50:
return "Moderate"
if health >= 25:
return "Low"
return "Critical"
# ---------------------------------------------------------------------------
# Visibility computation
# ---------------------------------------------------------------------------
def compute_visible_cells(
ax: int, ay: int,
cell_grid: List[int],
smoke_grid: List[float],
w: int, h: int,
) -> Set[Tuple[int, int]]:
"""BFS flood-fill from agent; walls block propagation."""
agent_smoke = smoke_grid[_idx(ax, ay, w)]
label = smoke_level_label(agent_smoke)
if label == "heavy":
radius = 2
elif label == "moderate":
radius = 3
else:
radius = 5
visible: Set[Tuple[int, int]] = {(ax, ay)}
queue = [(ax, ay, 0)]
while queue:
x, y, dist = queue.pop(0)
if dist >= radius:
continue
for dx, dy, _ in _CARDINAL:
nx, ny = x + dx, y + dy
if not _in_bounds(nx, ny, w, h):
continue
if (nx, ny) in visible:
continue
ct = cell_grid[_idx(nx, ny, w)]
if ct == WALL:
continue
visible.add((nx, ny))
queue.append((nx, ny, dist + 1))
return visible
# ---------------------------------------------------------------------------
# Main builder
# ---------------------------------------------------------------------------
def build_narrative_observation(
step_count: int,
agent_x: int,
agent_y: int,
agent_alive: bool,
agent_evacuated: bool,
agent_health: float,
cell_grid: List[int],
fire_grid: List[float],
smoke_grid: List[float],
exit_positions: List[List[int]],
door_registry: Dict[str, List[int]],
zone_map: Dict[str, str],
last_action_feedback: str,
wind_dir: str,
w: int,
h: int,
) -> Dict[str, Any]:
"""Build the full observation dict (matches PyreObservation fields)."""
if agent_evacuated:
return _terminal_obs(step_count, last_action_feedback,
narrative="You have safely evacuated the building.",
agent_evacuated=True, agent_health=agent_health)
if not agent_alive:
return _terminal_obs(step_count, last_action_feedback,
narrative="You have been overcome by fire and smoke.",
agent_evacuated=False, agent_health=0.0)
visible = compute_visible_cells(agent_x, agent_y, cell_grid, smoke_grid, w, h)
# --- Agent cell conditions ---
agent_smoke = smoke_grid[_idx(agent_x, agent_y, w)]
smoke_label = smoke_level_label(agent_smoke)
health_label = _health_label(agent_health)
# --- Fire visibility ---
fire_visible = False
fire_dir: Optional[str] = None
nearest_fire_dist = 999
for vx, vy in visible:
if (vx, vy) == (agent_x, agent_y):
continue
if fire_grid[_idx(vx, vy, w)] >= FIRE_BURNING:
fire_visible = True
d = _manhattan(agent_x, agent_y, vx, vy)
if d < nearest_fire_dist:
nearest_fire_dist = d
dx = vx - agent_x
dy = vy - agent_y
if abs(dx) >= abs(dy):
fire_dir = "east" if dx > 0 else "west"
else:
fire_dir = "south" if dy > 0 else "north"
# --- Visible objects (doors and exits) ---
visible_objects: List[Dict[str, Any]] = []
door_pos_to_id = {(v[0], v[1]): k for k, v in door_registry.items()}
blocked_exit_ids: List[str] = []
for vx, vy in visible:
ct = cell_grid[_idx(vx, vy, w)]
rel = _relative_pos_str(agent_x, agent_y, vx, vy)
if ct in (DOOR_OPEN, DOOR_CLOSED):
door_id = door_pos_to_id.get((vx, vy), f"door_{vx}_{vy}")
door_state = "open" if ct == DOOR_OPEN else "closed"
if fire_grid[_idx(vx, vy, w)] > 0.1:
door_state += " (hot)"
visible_objects.append({
"id": door_id, "type": "door",
"relative_pos": rel, "state": door_state,
})
elif ct == EXIT:
fire_at_exit = fire_grid[_idx(vx, vy, w)]
exit_id = f"exit_{vx}_{vy}"
if fire_at_exit >= EXIT_BLOCKED_FIRE_THRESHOLD:
exit_state = "BLOCKED by fire"
blocked_exit_ids.append(exit_id)
else:
exit_state = "open"
visible_objects.append({
"id": exit_id, "type": "exit",
"relative_pos": rel, "state": exit_state,
})
# Exits not visible — still flag blocked ones from known positions
visible_coords = {(vx, vy) for vx, vy in visible}
for ex in exit_positions:
ex_id = f"exit_{ex[0]}_{ex[1]}"
if (ex[0], ex[1]) not in visible_coords:
if fire_grid[_idx(ex[0], ex[1], w)] >= EXIT_BLOCKED_FIRE_THRESHOLD:
if ex_id not in blocked_exit_ids:
blocked_exit_ids.append(ex_id)
# --- Audible signals ---
audible: List[str] = []
any_fire = any(fire_grid[i] >= FIRE_BURNING for i in range(w * h))
if any_fire:
audible.append("Fire alarm sounding")
if smoke_label in ("moderate", "heavy"):
audible.append("Smoke detector beeping")
if agent_health < 50:
audible.append("Your own laboured breathing")
# --- Zone label ---
location_label = zone_map.get(f"{agent_x},{agent_y}", "unknown area")
# --- Action hints ---
action_hints = _build_action_hints(
agent_x, agent_y, cell_grid, visible,
visible_objects, door_registry, w, h
)
# --- Narrative ---
narrative = _compose_narrative(
location_label=location_label,
smoke_label=smoke_label,
fire_visible=fire_visible,
fire_dir=fire_dir,
agent_health=agent_health,
health_label=health_label,
wind_dir=wind_dir,
visible_objects=visible_objects,
blocked_exit_ids=blocked_exit_ids,
audible=audible,
last_action_feedback=last_action_feedback,
action_hints=action_hints,
)
return {
"narrative": narrative,
"agent_evacuated": agent_evacuated,
"location_label": location_label,
"smoke_level": smoke_label,
"fire_visible": fire_visible,
"fire_direction": fire_dir,
"agent_health": agent_health,
"health_status": health_label,
"wind_dir": wind_dir,
"visible_objects": visible_objects,
"blocked_exit_ids": blocked_exit_ids,
"audible_signals": audible,
"elapsed_steps": step_count,
"last_action_feedback": last_action_feedback,
"available_actions_hint": action_hints,
"done": False,
"reward": 0.0,
}
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _relative_pos_str(ax: int, ay: int, tx: int, ty: int) -> str:
dx, dy = tx - ax, ty - ay
dist = abs(dx) + abs(dy)
if dist == 0:
return "here"
if abs(dx) >= abs(dy):
return f"{dist}m {'east' if dx > 0 else 'west'}"
else:
return f"{dist}m {'south' if dy > 0 else 'north'}"
def _build_action_hints(
ax: int, ay: int,
cell_grid: List[int],
visible: Set[Tuple[int, int]],
visible_objects: List[Dict],
door_registry: Dict[str, List[int]],
w: int, h: int,
) -> List[str]:
hints: List[str] = []
# Movement and look hints per direction
for dx, dy, dirname in _CARDINAL:
nx, ny = ax + dx, ay + dy
if _in_bounds(nx, ny, w, h):
ct = cell_grid[_idx(nx, ny, w)]
if ct in (FLOOR, DOOR_OPEN, EXIT):
hints.append(f"move(direction='{dirname}')")
# Suggest look for any non-wall direction (including closed doors/obstacles)
if ct != WALL:
hints.append(f"look(direction='{dirname}')")
# Door actions
for obj in visible_objects:
if obj["type"] == "door":
did = obj["id"]
if "closed" in obj["state"]:
hints.append(f"door(target_id='{did}', door_state='open')")
else:
hints.append(f"door(target_id='{did}', door_state='close')")
hints.append("wait()")
return hints
def _compose_narrative(
location_label: str,
smoke_label: str,
fire_visible: bool,
fire_dir: Optional[str],
agent_health: float,
health_label: str,
wind_dir: str,
visible_objects: List[Dict],
blocked_exit_ids: List[str],
audible: List[str],
last_action_feedback: str,
action_hints: List[str],
) -> str:
lines = []
# Location + atmosphere
lines.append(f"You are in the **{location_label}**. The air is **{smoke_label}**.")
# Health + wind
health_bar = _health_bar(agent_health)
wind_str = f"Wind: **{wind_dir}**" if wind_dir != "CALM" else "Wind: calm"
lines.append(f"Health: {health_bar} ({health_label}) | {wind_str}")
# Fire
if fire_visible and fire_dir:
lines.append(f"Flames are visible to the **{fire_dir}**.")
else:
lines.append("No fire directly visible.")
# Objects (exits and doors)
exits_vis = [o for o in visible_objects if o["type"] == "exit"]
doors_vis = [o for o in visible_objects if o["type"] == "door"]
if exits_vis:
exit_descs = []
for o in exits_vis:
status = " **[BLOCKED]**" if o["state"].startswith("BLOCKED") else ""
exit_descs.append(f"{o['id']}{status} at {o['relative_pos']}")
lines.append(f"Exit{'s' if len(exits_vis) > 1 else ''} visible: {', '.join(exit_descs)}.")
if doors_vis:
door_descs = [f"{o['id']} ({o['state']}) at {o['relative_pos']}" for o in doors_vis]
lines.append(f"Door{'s' if len(doors_vis) > 1 else ''}: {', '.join(door_descs)}.")
if blocked_exit_ids:
lines.append(f"WARNING: {len(blocked_exit_ids)} exit(s) blocked by fire — find an alternative route.")
# Sound
if audible:
lines.append(f"You hear: {'; '.join(audible)}.")
# Last action
if last_action_feedback:
lines.append(f"Last action: {last_action_feedback}")
# Available actions
if action_hints:
hints_str = " ".join(action_hints[:8])
lines.append(f"Available actions: {hints_str}")
return "\n".join(lines)
def _health_bar(health: float) -> str:
filled = int(health / 10)
empty = 10 - filled
return "█" * filled + "░" * empty + f" {int(health)}/100"
# ---------------------------------------------------------------------------
# Look action support
# ---------------------------------------------------------------------------
def build_look_result(
direction: str,
agent_x: int,
agent_y: int,
cell_grid: List[int],
fire_grid: List[float],
smoke_grid: List[float],
zone_map: Dict[str, str],
door_registry: Dict[str, List[int]],
w: int,
h: int,
) -> str:
"""Generate a detailed description of cells in one cardinal direction.
Scans up to 5 cells from the agent's position in `direction`, stopping
at the first wall or out-of-bounds cell. Returns a sentence describing
each visible cell — smoke level, fire presence, door/exit status, zone.
"""
delta = {
"north": (0, -1), "south": (0, 1),
"west": (-1, 0), "east": (1, 0),
}.get(direction)
if delta is None:
return f"Unknown direction '{direction}'."
dx, dy = delta
door_pos_to_id = {(v[0], v[1]): k for k, v in door_registry.items()}
lines = [f"Looking **{direction}**:"]
nothing_visible = True
for dist in range(1, 6):
nx, ny = agent_x + dx * dist, agent_y + dy * dist
if not _in_bounds(nx, ny, w, h):
lines.append(f" {dist}m — outer wall.")
break
ct = cell_grid[_idx(nx, ny, w)]
if ct == WALL:
lines.append(f" {dist}m — wall.")
break
nothing_visible = False
parts: List[str] = []
# Cell type label
if ct == EXIT:
fire_at = fire_grid[_idx(nx, ny, w)]
status = "BLOCKED by fire" if fire_at >= EXIT_BLOCKED_FIRE_THRESHOLD else "clear"
parts.append(f"**EXIT** ({status})")
elif ct == DOOR_OPEN:
door_id = door_pos_to_id.get((nx, ny), "door")
parts.append(f"open door [{door_id}]")
elif ct == DOOR_CLOSED:
door_id = door_pos_to_id.get((nx, ny), "door")
parts.append(f"closed door [{door_id}]")
elif ct == OBSTACLE:
parts.append("burnt rubble (impassable)")
else:
zone = zone_map.get(f"{nx},{ny}", "")
if zone:
parts.append(zone.replace("_", " "))
else:
parts.append("open floor")
# Smoke
smoke = smoke_grid[_idx(nx, ny, w)]
s_label = smoke_level_label(smoke)
if s_label != "none":
parts.append(f"**{s_label} smoke**")
# Fire
fire = fire_grid[_idx(nx, ny, w)]
if fire >= FIRE_BURNING:
parts.append("**actively burning**")
elif fire > 0.1:
parts.append("smoldering heat")
lines.append(f" {dist}m — {', '.join(parts)}.")
if nothing_visible:
lines.append(" Nothing visible in this direction.")
return "\n".join(lines)
def _terminal_obs(
step_count: int,
last_action_feedback: str,
narrative: str,
agent_evacuated: bool = False,
agent_health: float = 0.0,
) -> Dict[str, Any]:
return {
"narrative": narrative,
"agent_evacuated": agent_evacuated,
"location_label": "",
"smoke_level": "none",
"fire_visible": False,
"fire_direction": None,
"agent_health": agent_health,
"health_status": _health_label(agent_health),
"wind_dir": "CALM",
"visible_objects": [],
"blocked_exit_ids": [],
"audible_signals": [],
"elapsed_steps": step_count,
"last_action_feedback": last_action_feedback,
"available_actions_hint": [],
"done": True,
"reward": 0.0,
}