""" environment.py — Garbage Collecting Robot Core RL Environment. Fixes applied: • Battery no longer drains during autonomous CHARGE / UNLOAD_HERE steps. • Recharge guard now checks `not self.done` instead of `self.garbage_positions` so it also fires correctly at episode boundaries. """ from typing import Any, Dict, Optional, List, Tuple from collections import deque from models import Observation, State from scenarios import SCENARIOS # ───────────────────────────────────────────────────────────── # BFS PATHFINDING HELPER # ───────────────────────────────────────────────────────────── def _bfs( start, goal, obstacles, grid_w: int, grid_h: int, ) -> Tuple[Optional[str], float]: """ Breadth-First Search from *start* to *goal* on a rectangular grid. Avoids all cells listed in *obstacles*. Returns: (first_direction, path_length) — the single step that begins the shortest path, and how many steps the full path takes. (None, 0) — start == goal (already there). (None, inf) — goal is unreachable. Directions: "UP" (+y), "DOWN" (−y), "LEFT" (−x), "RIGHT" (+x). """ start = (int(start[0]), int(start[1])) goal = (int(goal[0]), int(goal[1])) if start == goal: return (None, 0) obstacle_set = frozenset((int(o[0]), int(o[1])) for o in obstacles) dirs = [("RIGHT", (1, 0)), ("LEFT", (-1, 0)), ("UP", (0, 1)), ("DOWN", (0, -1))] queue: deque = deque([(start, None, 0)]) # (pos, first_move, depth) visited = {start} while queue: pos, first, depth = queue.popleft() for name, (dx, dy) in dirs: npos = (pos[0] + dx, pos[1] + dy) if not (0 <= npos[0] < grid_w and 0 <= npos[1] < grid_h): continue if npos in obstacle_set or npos in visited: continue move = first if first else name if npos == goal: return (move, depth + 1) visited.add(npos) queue.append((npos, move, depth + 1)) return (None, float("inf")) # ───────────────────────────────────────────────────────────── # ENVIRONMENT # ───────────────────────────────────────────────────────────── class GarbageRobotEnv: """ Core RL Environment for the Garbage Collecting Robot. Robot modes ----------- MODE_NORMAL — agent controls the robot normally. MODE_RECHARGE — battery critically low; robot auto-navigates home, recharges, then switches back to NORMAL. MODE_UNLOAD — storage full; robot auto-navigates to unload_station, empties its bin, then switches back to NORMAL. Autonomous overrides happen *inside* step(): the command the caller sends is silently replaced when the robot is in a non-normal mode. This keeps the external API unchanged while giving the robot self-managing capabilities. FIX: Battery is only decremented for real movement/collection commands, NOT for internal CHARGE or UNLOAD_HERE commands. """ MODE_NORMAL = "normal" MODE_RECHARGE = "recharging" MODE_UNLOAD = "unloading" # Safety margin added on top of BFS distance when deciding to recharge. RECHARGE_BUFFER = 4 def __init__(self): self.current_task_id = None self.grid_size = (0, 0) self.robot_position = [0, 0] self.garbage_positions = [] self.obstacle_positions = [] self.battery_level = 0 self.max_battery = 0 self.inventory_count = 0 # Resource management state self.home_position = [0, 0] self.unload_station = [0, 0] self.storage_capacity = 6 self.current_storage_load = 0 # Episode accounting self.total_reward = 0.0 self.steps_taken = 0 self.done = False # Autonomous navigation mode self._mode = self.MODE_NORMAL # ── Reset ───────────────────────────────────────────────── def reset(self, task_id: str) -> State: if task_id not in SCENARIOS: raise ValueError(f"Task ID '{task_id}' not found in scenarios.") s = SCENARIOS[task_id] self.current_task_id = task_id self.grid_size = tuple(s["grid_size"]) self.robot_position = list(s["robot_start"]) self.garbage_positions = [list(g) for g in s["garbage_starts"]] self.obstacle_positions = [list(o) for o in s["obstacle_starts"]] self.battery_level = s["max_battery"] self.max_battery = s["max_battery"] self.home_position = list(s.get("home_position", s["robot_start"])) self.unload_station = list(s.get("unload_station", [0, self.grid_size[1] - 1])) self.storage_capacity = s.get("storage_capacity", 6) self.current_storage_load = 0 self.inventory_count = 0 self.total_reward = 0.0 self.steps_taken = 0 self.done = False self._mode = self.MODE_NORMAL return self.state() def reset_custom( self, task_id: str = "task_easy", grid_size=None, robot_start=None, garbage_positions=None, obstacle_positions=None, max_battery=None, storage_capacity=None, home_position=None, unload_station=None, ) -> State: """ Dynamic reset: start from a scenario baseline and override any fields. Pass task_id='custom' with all fields supplied to skip scenario lookup. """ if task_id in SCENARIOS: s = SCENARIOS[task_id] base_grid = s["grid_size"] base_robot = s["robot_start"] base_garbage = s["garbage_starts"] base_obstacles = s["obstacle_starts"] base_battery = s["max_battery"] base_home = s.get("home_position", s["robot_start"]) base_unload = s.get("unload_station", [0, s["grid_size"][1] - 1]) base_capacity = s.get("storage_capacity", 5) else: base_grid = (10, 10) base_robot = (0, 0) base_garbage = [] base_obstacles = [] base_battery = 60 base_home = (0, 0) base_unload = (9, 0) base_capacity = 6 self.current_task_id = task_id self.grid_size = tuple(grid_size) if grid_size is not None else tuple(base_grid) self.robot_position = list(robot_start) if robot_start is not None else list(base_robot) self.garbage_positions = [list(g) for g in garbage_positions] if garbage_positions is not None else [list(g) for g in base_garbage] self.obstacle_positions = [list(o) for o in obstacle_positions] if obstacle_positions is not None else [list(o) for o in base_obstacles] self.battery_level = max_battery if max_battery is not None else base_battery self.max_battery = self.battery_level self.home_position = list(home_position) if home_position is not None else list(base_home) self.unload_station = list(unload_station) if unload_station is not None else list(base_unload) self.storage_capacity = storage_capacity if storage_capacity is not None else base_capacity self.current_storage_load = 0 self.inventory_count = 0 self.total_reward = 0.0 self.steps_taken = 0 self.done = False self._mode = self.MODE_NORMAL # Remove any garbage placed on top of an obstacle self.garbage_positions = [ g for g in self.garbage_positions if g not in self.obstacle_positions ] return self.state() # ── Observation & State helpers ─────────────────────────── def _bfs_distance(self, target) -> int: """Return BFS step-count from current robot position to *target*.""" _, dist = _bfs( self.robot_position, target, self.obstacle_positions, self.grid_size[0], self.grid_size[1], ) return int(dist) if dist != float("inf") else -1 def _should_recharge(self) -> bool: """ Return True when the robot must leave immediately to reach home before battery runs out. Threshold = BFS distance to home + RECHARGE_BUFFER. A buffer of 4 gives comfortable headroom for obstacle detours. """ if self.battery_level <= 1: return True dist = self._bfs_distance(self.home_position) if dist < 0: # Home unreachable via BFS — fall back to Manhattan distance dist = (abs(self.robot_position[0] - self.home_position[0]) + abs(self.robot_position[1] - self.home_position[1])) return self.battery_level <= (dist + self.RECHARGE_BUFFER) def _should_unload(self) -> bool: """Return True when the storage bin is at capacity.""" return self.current_storage_load >= self.storage_capacity def get_observation(self, message: str = "") -> Observation: dist_home = self._bfs_distance(self.home_position) if not message: message = ( f"You are at {tuple(self.robot_position)}. " f"Garbage remaining: {len(self.garbage_positions)}. " f"Battery: {self.battery_level}/{self.max_battery}. " f"Storage: {self.current_storage_load}/{self.storage_capacity}. " f"Home (charging): {tuple(self.home_position)} " f"[{dist_home if dist_home >= 0 else 'unreachable'} steps]. " f"Unload station: {tuple(self.unload_station)}. " f"Mode: {self._mode}." ) return Observation( grid_size = self.grid_size, robot_position = tuple(self.robot_position), garbage_positions = [tuple(g) for g in self.garbage_positions], obstacle_positions = [tuple(o) for o in self.obstacle_positions], battery_level = self.battery_level, inventory_count = self.inventory_count, message = message, home_position = tuple(self.home_position), unload_station = tuple(self.unload_station), storage_capacity = self.storage_capacity, current_storage_load = self.current_storage_load, distance_from_home = dist_home, robot_mode = self._mode, ) def state(self) -> State: return State( task_id = self.current_task_id, total_reward = self.total_reward, steps_taken = self.steps_taken, done = self.done, robot_mode = self._mode, current_storage_load = self.current_storage_load, battery_level = self.battery_level, distance_from_home = self._bfs_distance(self.home_position), ) # ── Autonomous command resolver ──────────────────────────── def _resolve_command(self, requested: str) -> Tuple[str, str]: """ Determine the *effective* command for this step. When the robot is in MODE_RECHARGE or MODE_UNLOAD the caller's command is replaced by an autonomously-computed one. Returns ------- (effective_command, mode_message) """ # ── Trigger check (only when in normal mode) ─────────── # FIX: use `not self.done` guard instead of `self.garbage_positions` # so recharge still fires even if all garbage is collected this step. if self._mode == self.MODE_NORMAL: if self._should_recharge() and not self.done: self._mode = self.MODE_RECHARGE elif self._should_unload(): self._mode = self.MODE_UNLOAD # ── Recharging mode ──────────────────────────────────── if self._mode == self.MODE_RECHARGE: if tuple(self.robot_position) == tuple(self.home_position): # Arrived — charge and return to normal self._mode = self.MODE_NORMAL return ( "CHARGE", (f"Reached charging station {tuple(self.home_position)}. " f"Battery fully restored to {self.max_battery}. " f"Resuming garbage collection."), ) else: move, dist = _bfs( self.robot_position, self.home_position, self.obstacle_positions, self.grid_size[0], self.grid_size[1], ) dist_str = f"{int(dist)} steps" if dist != float("inf") else "route blocked" return ( move or "UP", (f"⚡ Battery critical ({self.battery_level}/{self.max_battery}). " f"Auto-navigating to charging station {tuple(self.home_position)} " f"[{dist_str}]."), ) # ── Unloading mode ───────────────────────────────────── if self._mode == self.MODE_UNLOAD: if tuple(self.robot_position) == tuple(self.unload_station): # Arrived — empty the bin and return to normal freed = self.current_storage_load self._mode = self.MODE_NORMAL return ( "UNLOAD_HERE", (f"Reached unload station {tuple(self.unload_station)}. " f"Emptied {freed} item(s) from storage. " f"Resuming garbage collection."), ) else: move, dist = _bfs( self.robot_position, self.unload_station, self.obstacle_positions, self.grid_size[0], self.grid_size[1], ) dist_str = f"{int(dist)} steps" if dist != float("inf") else "route blocked" return ( move or "UP", (f"📦 Storage full ({self.current_storage_load}/{self.storage_capacity}). " f"Auto-navigating to unload station {tuple(self.unload_station)} " f"[{dist_str}]."), ) # ── Normal mode — use caller's command ───────────────── return (requested, "") # ── Step ────────────────────────────────────────────────── def step(self, command: str) -> Dict[str, Any]: if self.done: obs = self.get_observation("Episode already finished.") return {"observation": obs.dict(), "reward": 0.0, "done": True, "info": {}} self.steps_taken += 1 # Resolve autonomous overrides BEFORE battery decrement so that # CHARGE / UNLOAD_HERE commands do NOT consume battery. effective_cmd, mode_message = self._resolve_command(command) # FIX: only drain battery for real movement / collection actions. # Autonomous internal commands (CHARGE, UNLOAD_HERE) are free. if effective_cmd in ("CHARGE", "UNLOAD_HERE"): reward = 0.0 else: self.battery_level -= 1 reward = -0.1 message = mode_message # may be overwritten below # ── CHARGE (internal — issued autonomously at home) ──── if effective_cmd == "CHARGE": self.battery_level = self.max_battery reward += 5.0 # message already set from resolver # ── UNLOAD_HERE (internal — issued autonomously at station) ── elif effective_cmd == "UNLOAD_HERE": freed = self.current_storage_load self.current_storage_load = 0 reward += 2.0 # message already set from resolver # ── COLLECT ─────────────────────────────────────────── elif effective_cmd == "COLLECT": if self.robot_position in self.garbage_positions: self.garbage_positions.remove(self.robot_position) self.inventory_count += 1 self.current_storage_load += 1 reward += 10.0 message = ( f"Collected garbage! " f"Storage: {self.current_storage_load}/{self.storage_capacity}." ) if self._should_unload() and self.garbage_positions: self._mode = self.MODE_UNLOAD message += ( f" Storage full — auto-routing to " f"unload station {tuple(self.unload_station)}." ) else: reward -= 1.0 message = "No garbage to collect here." # ── Movement commands ────────────────────────────────── elif effective_cmd in ("UP", "DOWN", "LEFT", "RIGHT"): new_pos = list(self.robot_position) if effective_cmd == "UP": new_pos[1] += 1 elif effective_cmd == "DOWN": new_pos[1] -= 1 elif effective_cmd == "LEFT": new_pos[0] -= 1 elif effective_cmd == "RIGHT": new_pos[0] += 1 gw, gh = self.grid_size if 0 <= new_pos[0] < gw and 0 <= new_pos[1] < gh: if new_pos in self.obstacle_positions: reward -= 5.0 blocked = [] direction_map = { "UP": [0, 1], "DOWN": [0, -1], "LEFT": [-1, 0], "RIGHT": [1, 0], } for d, delta in direction_map.items(): nb = [self.robot_position[0] + delta[0], self.robot_position[1] + delta[1]] if nb in self.obstacle_positions: blocked.append(d) blocked_str = ", ".join(blocked) if blocked else "none" message = ( f"BLOCKED! {effective_cmd} leads to an obstacle. " f"Blocked directions from here: {blocked_str}. " f"Choose a different direction." ) else: self.robot_position = new_pos if not message: message = f"Moved {effective_cmd}." else: reward -= 1.0 if not message: message = ( f"Hit a wall trying to move {effective_cmd}. " f"Do NOT try {effective_cmd} again from this position." ) # ── Unknown command ──────────────────────────────────── else: reward -= 1.0 message = f"Invalid command: '{effective_cmd}'." # ── Termination checks ───────────────────────────────── if len(self.garbage_positions) == 0: self.done = True reward += 50.0 message += " All garbage collected! Task complete." elif self.battery_level <= 0: self.done = True message += " Battery depleted! Game over." self.total_reward += reward return { "observation": self.get_observation(message).dict(), "reward": reward, "done": self.done, "info": { "inventory_count": self.inventory_count, "steps": self.steps_taken, "current_storage_load": self.current_storage_load, "robot_mode": self._mode, "autonomous_override": effective_cmd != command, "original_command": command, "effective_command": effective_cmd, }, } # ── Grading ─────────────────────────────────────────────── def grade(self, task_id: str) -> float: """Normalised [0.0, 1.0] completion score for the leaderboard.""" if task_id not in SCENARIOS: return 0.0 total = len(SCENARIOS[task_id]["garbage_starts"]) return min(max(self.inventory_count / total, 0.0), 1.0)