Spaces:
Runtime error
Runtime error
| """ | |
| Pathfinding and Risk Assessment Module | |
| Uses A* algorithm with custom risk-based heuristics | |
| """ | |
| import heapq | |
| from typing import Dict, List, Tuple, Optional | |
| from .floor_plan import FloorPlan | |
| from .sensor_system import SensorSystem, SensorReading | |
| class PathNode: | |
| """Node for pathfinding algorithm""" | |
| def __init__(self, room_id: str, g_cost: float, h_cost: float, parent=None): | |
| self.room_id = room_id | |
| self.g_cost = g_cost # Cost from start | |
| self.h_cost = h_cost # Heuristic cost to goal | |
| self.f_cost = g_cost + h_cost # Total cost | |
| self.parent = parent | |
| def __lt__(self, other): | |
| return self.f_cost < other.f_cost | |
| def __eq__(self, other): | |
| return self.room_id == other.room_id | |
| class RiskAssessment: | |
| """Assesses risk for different evacuation routes""" | |
| def calculate_path_risk(path: List[str], sensor_system: SensorSystem, | |
| floor_plan: FloorPlan) -> Dict: | |
| """ | |
| Calculate comprehensive risk assessment for a path | |
| Returns dict with: | |
| - total_danger: Overall danger score | |
| - max_danger: Maximum danger point | |
| - avg_danger: Average danger | |
| - has_fire: Whether path goes through fire | |
| - has_oxygen_hazard: Whether path has oxygen cylinder | |
| - passable: Whether path is passable | |
| - risk_factors: List of specific risks | |
| """ | |
| if not path: | |
| return {"passable": False, "total_danger": 100.0} | |
| danger_scores = [] | |
| risk_factors = [] | |
| has_fire = False | |
| has_oxygen_hazard = False | |
| max_danger_location = None | |
| max_danger_score = 0.0 | |
| for room_id in path: | |
| sensor = sensor_system.get_sensor_reading(room_id) | |
| room = floor_plan.get_room(room_id) | |
| if sensor: | |
| danger = sensor.calculate_danger_score() | |
| danger_scores.append(danger) | |
| if danger > max_danger_score: | |
| max_danger_score = danger | |
| max_danger_location = room_id | |
| # Check specific hazards | |
| if sensor.fire_detected: | |
| has_fire = True | |
| risk_factors.append(f"Fire detected in {room_id}") | |
| if room and room.has_oxygen_cylinder: | |
| has_oxygen_hazard = True | |
| # Increase danger if there's heat near oxygen | |
| if sensor.temperature > 40 or sensor.fire_detected: | |
| danger_scores[-1] += 15 # Add explosion risk | |
| risk_factors.append(f"Oxygen cylinder explosion risk in {room_id}") | |
| else: | |
| risk_factors.append(f"Oxygen cylinder present in {room_id}") | |
| if sensor.smoke_level > 0.5: | |
| risk_factors.append(f"Heavy smoke in {room_id}") | |
| if sensor.temperature > 60: | |
| risk_factors.append(f"High temperature ({sensor.temperature:.1f}°C) in {room_id}") | |
| if sensor.oxygen_level < 19.5: | |
| risk_factors.append(f"Low oxygen ({sensor.oxygen_level:.1f}%) in {room_id}") | |
| # NEW: Toxic gas warnings | |
| if sensor.carbon_monoxide > 50: | |
| risk_factors.append(f"TOXIC CO ({sensor.carbon_monoxide:.0f} ppm) in {room_id}") | |
| if sensor.carbon_dioxide > 5000: | |
| risk_factors.append(f"High CO2 ({sensor.carbon_dioxide:.0f} ppm) in {room_id}") | |
| if sensor.hydrogen_cyanide > 20: | |
| risk_factors.append(f"TOXIC HCN ({sensor.hydrogen_cyanide:.1f} ppm) in {room_id}") | |
| # NEW: Flashover/backdraft warnings | |
| if sensor.flashover_risk > 0.7: | |
| risk_factors.append(f"CRITICAL: Flashover risk ({sensor.flashover_risk*100:.0f}%) in {room_id}") | |
| if sensor.backdraft_risk > 0.6: | |
| risk_factors.append(f"CRITICAL: Backdraft risk ({sensor.backdraft_risk*100:.0f}%) in {room_id}") | |
| # NEW: Crowd density warnings | |
| if sensor.occupancy_density > 0.8: | |
| risk_factors.append(f"High crowd density ({sensor.occupancy_density*100:.0f}%) in {room_id}") | |
| # NEW: Infrastructure failures | |
| if not sensor.exit_accessible: | |
| risk_factors.append(f"Exit BLOCKED in {room_id}") | |
| if not sensor.stairwell_clear: | |
| risk_factors.append(f"Stairwell BLOCKED in {room_id}") | |
| if not sensor.emergency_lighting: | |
| risk_factors.append(f"No emergency lighting in {room_id}") | |
| # NEW: Time pressure | |
| if sensor.time_since_fire_start > 300: | |
| risk_factors.append(f"Time pressure: {sensor.time_since_fire_start//60} min elapsed") | |
| if not sensor.is_passable(): | |
| risk_factors.append(f"Path blocked at {room_id}") | |
| total_danger = sum(danger_scores) | |
| avg_danger = total_danger / len(danger_scores) if danger_scores else 0 | |
| # Check if all segments are passable | |
| passable = all(sensor_system.get_sensor_reading(rid).is_passable() | |
| for rid in path if sensor_system.get_sensor_reading(rid)) | |
| return { | |
| "total_danger": total_danger, | |
| "max_danger": max_danger_score, | |
| "max_danger_location": max_danger_location, | |
| "avg_danger": avg_danger, | |
| "path_length": len(path), | |
| "has_fire": has_fire, | |
| "has_oxygen_hazard": has_oxygen_hazard, | |
| "passable": passable, | |
| "risk_factors": risk_factors, | |
| "danger_scores": danger_scores | |
| } | |
| def get_risk_level(avg_danger: float) -> str: | |
| """Get risk level description""" | |
| if avg_danger < 20: | |
| return "LOW" | |
| elif avg_danger < 40: | |
| return "MODERATE" | |
| elif avg_danger < 60: | |
| return "HIGH" | |
| else: | |
| return "CRITICAL" | |
| def recommend_path(paths: List[Tuple[List[str], Dict]]) -> Optional[Tuple[List[str], Dict]]: | |
| """ | |
| Recommend the best path based on risk assessment | |
| Args: | |
| paths: List of (path, risk_assessment) tuples | |
| Returns: | |
| Best (path, risk_assessment) tuple or None | |
| """ | |
| if not paths: | |
| return None | |
| # Filter to only passable paths | |
| passable_paths = [(p, r) for p, r in paths if r["passable"]] | |
| if not passable_paths: | |
| # No fully passable paths, return least dangerous | |
| return min(paths, key=lambda x: x[1]["total_danger"]) | |
| # Score each path (lower is better) | |
| def score_path(path_info): | |
| path, risk = path_info | |
| score = 0 | |
| # Heavily penalize fire | |
| if risk["has_fire"]: | |
| score += 100 | |
| # Add oxygen hazard risk (but less than fire) | |
| if risk["has_oxygen_hazard"]: | |
| score += 30 | |
| # Add danger scores | |
| score += risk["total_danger"] | |
| # Prefer shorter paths (slight preference) | |
| score += risk["path_length"] * 2 | |
| # Penalize high max danger points | |
| score += risk["max_danger"] * 0.5 | |
| return score | |
| # Return path with lowest score | |
| return min(passable_paths, key=score_path) | |
| class PathFinder: | |
| """Find optimal evacuation paths using A* algorithm with risk assessment""" | |
| def __init__(self, floor_plan: FloorPlan, sensor_system: SensorSystem): | |
| self.floor_plan = floor_plan | |
| self.sensor_system = sensor_system | |
| def find_all_evacuation_routes(self, start: str) -> List[Tuple[str, List[str], Dict]]: | |
| """ | |
| Find evacuation routes to all exits | |
| Returns: List of (exit_id, path, risk_assessment) tuples | |
| """ | |
| routes = [] | |
| exits = self.floor_plan.get_all_exits() | |
| for exit_id in exits: | |
| path = self.find_path(start, exit_id) | |
| if path: | |
| risk = RiskAssessment.calculate_path_risk( | |
| path, self.sensor_system, self.floor_plan | |
| ) | |
| routes.append((exit_id, path, risk)) | |
| return routes | |
| def find_path(self, start: str, goal: str) -> Optional[List[str]]: | |
| """ | |
| Find path from start to goal using A* with risk-based costs | |
| Returns: List of room IDs representing the path, or None if no path exists | |
| """ | |
| if start not in self.floor_plan.rooms or goal not in self.floor_plan.rooms: | |
| return None | |
| # Priority queue: (f_cost, node) | |
| open_set = [] | |
| closed_set = set() | |
| # Initialize start node | |
| start_node = PathNode(start, 0, self._heuristic(start, goal)) | |
| heapq.heappush(open_set, start_node) | |
| # Track best g_cost for each room | |
| g_costs = {start: 0} | |
| while open_set: | |
| current = heapq.heappop(open_set) | |
| # Goal reached | |
| if current.room_id == goal: | |
| return self._reconstruct_path(current) | |
| # Skip if already processed | |
| if current.room_id in closed_set: | |
| continue | |
| closed_set.add(current.room_id) | |
| # Explore neighbors | |
| neighbors = self.floor_plan.get_neighbors(current.room_id) | |
| for neighbor_id, base_distance in neighbors: | |
| if neighbor_id in closed_set: | |
| continue | |
| # Calculate risk-adjusted cost | |
| risk_cost = self._calculate_risk_cost(neighbor_id) | |
| tentative_g = current.g_cost + base_distance + risk_cost | |
| # If this path is better, add to open set | |
| if neighbor_id not in g_costs or tentative_g < g_costs[neighbor_id]: | |
| g_costs[neighbor_id] = tentative_g | |
| h_cost = self._heuristic(neighbor_id, goal) | |
| neighbor_node = PathNode(neighbor_id, tentative_g, h_cost, current) | |
| heapq.heappush(open_set, neighbor_node) | |
| # No path found | |
| return None | |
| def _heuristic(self, room_id1: str, room_id2: str) -> float: | |
| """ | |
| Heuristic function for A* (Manhattan distance between room positions) | |
| """ | |
| room1 = self.floor_plan.get_room(room_id1) | |
| room2 = self.floor_plan.get_room(room_id2) | |
| if not room1 or not room2: | |
| return 0 | |
| x1, y1 = room1.position | |
| x2, y2 = room2.position | |
| return abs(x2 - x1) + abs(y2 - y1) | |
| def _calculate_risk_cost(self, room_id: str) -> float: | |
| """ | |
| Calculate risk-based cost for traversing a room with all real-world factors | |
| Higher danger = higher cost | |
| """ | |
| sensor = self.sensor_system.get_sensor_reading(room_id) | |
| if not sensor: | |
| return 0.0 | |
| danger_score = sensor.calculate_danger_score() | |
| # Convert danger score to cost multiplier | |
| # Danger 0-20: minimal cost | |
| # Danger 20-50: moderate cost | |
| # Danger 50+: high cost | |
| risk_cost = danger_score / 10.0 | |
| # Extra penalty for fire | |
| if sensor.fire_detected: | |
| risk_cost += 20.0 | |
| # Extra penalty for oxygen cylinder in dangerous conditions | |
| room = self.floor_plan.get_room(room_id) | |
| if room and room.has_oxygen_cylinder: | |
| if sensor.temperature > 40 or sensor.fire_detected: | |
| risk_cost += 10.0 | |
| # NEW: Extra penalties for critical factors | |
| # Toxic gas penalty | |
| if sensor.carbon_monoxide > 50: | |
| risk_cost += 15.0 | |
| if sensor.carbon_dioxide > 5000: | |
| risk_cost += 10.0 | |
| # Flashover risk penalty | |
| if sensor.flashover_risk > 0.7: | |
| risk_cost += 25.0 | |
| elif sensor.flashover_risk > 0.5: | |
| risk_cost += 15.0 | |
| # Exit blockage penalty | |
| if not sensor.exit_accessible: | |
| risk_cost += 30.0 | |
| if not sensor.stairwell_clear: | |
| risk_cost += 20.0 | |
| # Crowd density penalty (slows movement) | |
| if sensor.occupancy_density > 0.8: | |
| risk_cost += sensor.occupancy_density * 20.0 | |
| # Infrastructure failure penalty | |
| if not sensor.emergency_lighting: | |
| risk_cost += 5.0 | |
| # Make impassable areas very expensive (but not infinite) | |
| if not sensor.is_passable(): | |
| risk_cost += 50.0 | |
| return risk_cost | |
| def _reconstruct_path(self, node: PathNode) -> List[str]: | |
| """Reconstruct path from goal node back to start""" | |
| path = [] | |
| current = node | |
| while current: | |
| path.append(current.room_id) | |
| current = current.parent | |
| return list(reversed(path)) | |