|
|
""" |
|
|
Pathfinding and Risk Assessment Module |
|
|
Uses A* algorithm with custom risk-based heuristics |
|
|
""" |
|
|
import heapq |
|
|
from typing import Dict, List, Tuple, Optional |
|
|
from .floor_plan import FloorPlan |
|
|
from .sensor_system import SensorSystem, SensorReading |
|
|
|
|
|
|
|
|
class PathNode: |
|
|
"""Node for pathfinding algorithm""" |
|
|
def __init__(self, room_id: str, g_cost: float, h_cost: float, parent=None): |
|
|
self.room_id = room_id |
|
|
self.g_cost = g_cost |
|
|
self.h_cost = h_cost |
|
|
self.f_cost = g_cost + h_cost |
|
|
self.parent = parent |
|
|
|
|
|
def __lt__(self, other): |
|
|
return self.f_cost < other.f_cost |
|
|
|
|
|
def __eq__(self, other): |
|
|
return self.room_id == other.room_id |
|
|
|
|
|
|
|
|
class RiskAssessment: |
|
|
"""Assesses risk for different evacuation routes""" |
|
|
|
|
|
@staticmethod |
|
|
def calculate_path_risk(path: List[str], sensor_system: SensorSystem, |
|
|
floor_plan: FloorPlan) -> Dict: |
|
|
""" |
|
|
Calculate comprehensive risk assessment for a path |
|
|
|
|
|
Returns dict with: |
|
|
- total_danger: Overall danger score |
|
|
- max_danger: Maximum danger point |
|
|
- avg_danger: Average danger |
|
|
- has_fire: Whether path goes through fire |
|
|
- has_oxygen_hazard: Whether path has oxygen cylinder |
|
|
- passable: Whether path is passable |
|
|
- risk_factors: List of specific risks |
|
|
""" |
|
|
if not path: |
|
|
return {"passable": False, "total_danger": 100.0} |
|
|
|
|
|
danger_scores = [] |
|
|
risk_factors = [] |
|
|
has_fire = False |
|
|
has_oxygen_hazard = False |
|
|
max_danger_location = None |
|
|
max_danger_score = 0.0 |
|
|
|
|
|
for room_id in path: |
|
|
sensor = sensor_system.get_sensor_reading(room_id) |
|
|
room = floor_plan.get_room(room_id) |
|
|
|
|
|
if sensor: |
|
|
danger = sensor.calculate_danger_score() |
|
|
danger_scores.append(danger) |
|
|
|
|
|
if danger > max_danger_score: |
|
|
max_danger_score = danger |
|
|
max_danger_location = room_id |
|
|
|
|
|
|
|
|
if sensor.fire_detected: |
|
|
has_fire = True |
|
|
risk_factors.append(f"Fire detected in {room_id}") |
|
|
|
|
|
if room and room.has_oxygen_cylinder: |
|
|
has_oxygen_hazard = True |
|
|
|
|
|
if sensor.temperature > 40 or sensor.fire_detected: |
|
|
danger_scores[-1] += 15 |
|
|
risk_factors.append(f"Oxygen cylinder explosion risk in {room_id}") |
|
|
else: |
|
|
risk_factors.append(f"Oxygen cylinder present in {room_id}") |
|
|
|
|
|
if sensor.smoke_level > 0.5: |
|
|
risk_factors.append(f"Heavy smoke in {room_id}") |
|
|
|
|
|
if sensor.temperature > 60: |
|
|
risk_factors.append(f"High temperature ({sensor.temperature:.1f}°C) in {room_id}") |
|
|
|
|
|
if sensor.oxygen_level < 19.5: |
|
|
risk_factors.append(f"Low oxygen ({sensor.oxygen_level:.1f}%) in {room_id}") |
|
|
|
|
|
|
|
|
if sensor.carbon_monoxide > 50: |
|
|
risk_factors.append(f"TOXIC CO ({sensor.carbon_monoxide:.0f} ppm) in {room_id}") |
|
|
if sensor.carbon_dioxide > 5000: |
|
|
risk_factors.append(f"High CO2 ({sensor.carbon_dioxide:.0f} ppm) in {room_id}") |
|
|
if sensor.hydrogen_cyanide > 20: |
|
|
risk_factors.append(f"TOXIC HCN ({sensor.hydrogen_cyanide:.1f} ppm) in {room_id}") |
|
|
|
|
|
|
|
|
if sensor.flashover_risk > 0.7: |
|
|
risk_factors.append(f"CRITICAL: Flashover risk ({sensor.flashover_risk*100:.0f}%) in {room_id}") |
|
|
if sensor.backdraft_risk > 0.6: |
|
|
risk_factors.append(f"CRITICAL: Backdraft risk ({sensor.backdraft_risk*100:.0f}%) in {room_id}") |
|
|
|
|
|
|
|
|
if sensor.occupancy_density > 0.8: |
|
|
risk_factors.append(f"High crowd density ({sensor.occupancy_density*100:.0f}%) in {room_id}") |
|
|
|
|
|
|
|
|
if not sensor.exit_accessible: |
|
|
risk_factors.append(f"Exit BLOCKED in {room_id}") |
|
|
if not sensor.stairwell_clear: |
|
|
risk_factors.append(f"Stairwell BLOCKED in {room_id}") |
|
|
if not sensor.emergency_lighting: |
|
|
risk_factors.append(f"No emergency lighting in {room_id}") |
|
|
|
|
|
|
|
|
if sensor.time_since_fire_start > 300: |
|
|
risk_factors.append(f"Time pressure: {sensor.time_since_fire_start//60} min elapsed") |
|
|
|
|
|
if not sensor.is_passable(): |
|
|
risk_factors.append(f"Path blocked at {room_id}") |
|
|
|
|
|
total_danger = sum(danger_scores) |
|
|
avg_danger = total_danger / len(danger_scores) if danger_scores else 0 |
|
|
|
|
|
|
|
|
passable = all(sensor_system.get_sensor_reading(rid).is_passable() |
|
|
for rid in path if sensor_system.get_sensor_reading(rid)) |
|
|
|
|
|
return { |
|
|
"total_danger": total_danger, |
|
|
"max_danger": max_danger_score, |
|
|
"max_danger_location": max_danger_location, |
|
|
"avg_danger": avg_danger, |
|
|
"path_length": len(path), |
|
|
"has_fire": has_fire, |
|
|
"has_oxygen_hazard": has_oxygen_hazard, |
|
|
"passable": passable, |
|
|
"risk_factors": risk_factors, |
|
|
"danger_scores": danger_scores |
|
|
} |
|
|
|
|
|
@staticmethod |
|
|
def get_risk_level(avg_danger: float) -> str: |
|
|
"""Get risk level description""" |
|
|
if avg_danger < 20: |
|
|
return "LOW" |
|
|
elif avg_danger < 40: |
|
|
return "MODERATE" |
|
|
elif avg_danger < 60: |
|
|
return "HIGH" |
|
|
else: |
|
|
return "CRITICAL" |
|
|
|
|
|
@staticmethod |
|
|
def recommend_path(paths: List[Tuple[List[str], Dict]]) -> Optional[Tuple[List[str], Dict]]: |
|
|
""" |
|
|
Recommend the best path based on risk assessment |
|
|
|
|
|
Args: |
|
|
paths: List of (path, risk_assessment) tuples |
|
|
|
|
|
Returns: |
|
|
Best (path, risk_assessment) tuple or None |
|
|
""" |
|
|
if not paths: |
|
|
return None |
|
|
|
|
|
|
|
|
passable_paths = [(p, r) for p, r in paths if r["passable"]] |
|
|
|
|
|
if not passable_paths: |
|
|
|
|
|
return min(paths, key=lambda x: x[1]["total_danger"]) |
|
|
|
|
|
|
|
|
def score_path(path_info): |
|
|
path, risk = path_info |
|
|
score = 0 |
|
|
|
|
|
|
|
|
if risk["has_fire"]: |
|
|
score += 100 |
|
|
|
|
|
|
|
|
if risk["has_oxygen_hazard"]: |
|
|
score += 30 |
|
|
|
|
|
|
|
|
score += risk["total_danger"] |
|
|
|
|
|
|
|
|
score += risk["path_length"] * 2 |
|
|
|
|
|
|
|
|
score += risk["max_danger"] * 0.5 |
|
|
|
|
|
return score |
|
|
|
|
|
|
|
|
return min(passable_paths, key=score_path) |
|
|
|
|
|
|
|
|
class PathFinder: |
|
|
"""Find optimal evacuation paths using A* algorithm with risk assessment""" |
|
|
|
|
|
def __init__(self, floor_plan: FloorPlan, sensor_system: SensorSystem): |
|
|
self.floor_plan = floor_plan |
|
|
self.sensor_system = sensor_system |
|
|
|
|
|
def find_all_evacuation_routes(self, start: str) -> List[Tuple[str, List[str], Dict]]: |
|
|
""" |
|
|
Find evacuation routes to all exits |
|
|
|
|
|
Returns: List of (exit_id, path, risk_assessment) tuples |
|
|
""" |
|
|
routes = [] |
|
|
exits = self.floor_plan.get_all_exits() |
|
|
|
|
|
for exit_id in exits: |
|
|
path = self.find_path(start, exit_id) |
|
|
if path: |
|
|
risk = RiskAssessment.calculate_path_risk( |
|
|
path, self.sensor_system, self.floor_plan |
|
|
) |
|
|
routes.append((exit_id, path, risk)) |
|
|
|
|
|
return routes |
|
|
|
|
|
def find_path(self, start: str, goal: str) -> Optional[List[str]]: |
|
|
""" |
|
|
Find path from start to goal using A* with risk-based costs |
|
|
|
|
|
Returns: List of room IDs representing the path, or None if no path exists |
|
|
""" |
|
|
if start not in self.floor_plan.rooms or goal not in self.floor_plan.rooms: |
|
|
return None |
|
|
|
|
|
|
|
|
open_set = [] |
|
|
closed_set = set() |
|
|
|
|
|
|
|
|
start_node = PathNode(start, 0, self._heuristic(start, goal)) |
|
|
heapq.heappush(open_set, start_node) |
|
|
|
|
|
|
|
|
g_costs = {start: 0} |
|
|
|
|
|
while open_set: |
|
|
current = heapq.heappop(open_set) |
|
|
|
|
|
|
|
|
if current.room_id == goal: |
|
|
return self._reconstruct_path(current) |
|
|
|
|
|
|
|
|
if current.room_id in closed_set: |
|
|
continue |
|
|
|
|
|
closed_set.add(current.room_id) |
|
|
|
|
|
|
|
|
neighbors = self.floor_plan.get_neighbors(current.room_id) |
|
|
for neighbor_id, base_distance in neighbors: |
|
|
if neighbor_id in closed_set: |
|
|
continue |
|
|
|
|
|
|
|
|
risk_cost = self._calculate_risk_cost(neighbor_id) |
|
|
tentative_g = current.g_cost + base_distance + risk_cost |
|
|
|
|
|
|
|
|
if neighbor_id not in g_costs or tentative_g < g_costs[neighbor_id]: |
|
|
g_costs[neighbor_id] = tentative_g |
|
|
h_cost = self._heuristic(neighbor_id, goal) |
|
|
neighbor_node = PathNode(neighbor_id, tentative_g, h_cost, current) |
|
|
heapq.heappush(open_set, neighbor_node) |
|
|
|
|
|
|
|
|
return None |
|
|
|
|
|
def _heuristic(self, room_id1: str, room_id2: str) -> float: |
|
|
""" |
|
|
Heuristic function for A* (Manhattan distance between room positions) |
|
|
""" |
|
|
room1 = self.floor_plan.get_room(room_id1) |
|
|
room2 = self.floor_plan.get_room(room_id2) |
|
|
|
|
|
if not room1 or not room2: |
|
|
return 0 |
|
|
|
|
|
x1, y1 = room1.position |
|
|
x2, y2 = room2.position |
|
|
|
|
|
return abs(x2 - x1) + abs(y2 - y1) |
|
|
|
|
|
def _calculate_risk_cost(self, room_id: str) -> float: |
|
|
""" |
|
|
Calculate risk-based cost for traversing a room with all real-world factors |
|
|
Higher danger = higher cost |
|
|
""" |
|
|
sensor = self.sensor_system.get_sensor_reading(room_id) |
|
|
if not sensor: |
|
|
return 0.0 |
|
|
|
|
|
danger_score = sensor.calculate_danger_score() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
risk_cost = danger_score / 10.0 |
|
|
|
|
|
|
|
|
if sensor.fire_detected: |
|
|
risk_cost += 20.0 |
|
|
|
|
|
|
|
|
room = self.floor_plan.get_room(room_id) |
|
|
if room and room.has_oxygen_cylinder: |
|
|
if sensor.temperature > 40 or sensor.fire_detected: |
|
|
risk_cost += 10.0 |
|
|
|
|
|
|
|
|
|
|
|
if sensor.carbon_monoxide > 50: |
|
|
risk_cost += 15.0 |
|
|
if sensor.carbon_dioxide > 5000: |
|
|
risk_cost += 10.0 |
|
|
|
|
|
|
|
|
if sensor.flashover_risk > 0.7: |
|
|
risk_cost += 25.0 |
|
|
elif sensor.flashover_risk > 0.5: |
|
|
risk_cost += 15.0 |
|
|
|
|
|
|
|
|
if not sensor.exit_accessible: |
|
|
risk_cost += 30.0 |
|
|
|
|
|
if not sensor.stairwell_clear: |
|
|
risk_cost += 20.0 |
|
|
|
|
|
|
|
|
if sensor.occupancy_density > 0.8: |
|
|
risk_cost += sensor.occupancy_density * 20.0 |
|
|
|
|
|
|
|
|
if not sensor.emergency_lighting: |
|
|
risk_cost += 5.0 |
|
|
|
|
|
|
|
|
if not sensor.is_passable(): |
|
|
risk_cost += 50.0 |
|
|
|
|
|
return risk_cost |
|
|
|
|
|
def _reconstruct_path(self, node: PathNode) -> List[str]: |
|
|
"""Reconstruct path from goal node back to start""" |
|
|
path = [] |
|
|
current = node |
|
|
while current: |
|
|
path.append(current.room_id) |
|
|
current = current.parent |
|
|
return list(reversed(path)) |
|
|
|
|
|
|