fire_evac / pathfinding.py
ratyim's picture
Upload 6 files
faa740f verified
"""
Pathfinding and Risk Assessment Module
Uses A* algorithm with custom risk-based heuristics
"""
import heapq
from typing import Dict, List, Tuple, Optional
from .floor_plan import FloorPlan
from .sensor_system import SensorSystem, SensorReading
class PathNode:
"""Node for pathfinding algorithm"""
def __init__(self, room_id: str, g_cost: float, h_cost: float, parent=None):
self.room_id = room_id
self.g_cost = g_cost # Cost from start
self.h_cost = h_cost # Heuristic cost to goal
self.f_cost = g_cost + h_cost # Total cost
self.parent = parent
def __lt__(self, other):
return self.f_cost < other.f_cost
def __eq__(self, other):
return self.room_id == other.room_id
class RiskAssessment:
"""Assesses risk for different evacuation routes"""
@staticmethod
def calculate_path_risk(path: List[str], sensor_system: SensorSystem,
floor_plan: FloorPlan) -> Dict:
"""
Calculate comprehensive risk assessment for a path
Returns dict with:
- total_danger: Overall danger score
- max_danger: Maximum danger point
- avg_danger: Average danger
- has_fire: Whether path goes through fire
- has_oxygen_hazard: Whether path has oxygen cylinder
- passable: Whether path is passable
- risk_factors: List of specific risks
"""
if not path:
return {"passable": False, "total_danger": 100.0}
danger_scores = []
risk_factors = []
has_fire = False
has_oxygen_hazard = False
max_danger_location = None
max_danger_score = 0.0
for room_id in path:
sensor = sensor_system.get_sensor_reading(room_id)
room = floor_plan.get_room(room_id)
if sensor:
danger = sensor.calculate_danger_score()
danger_scores.append(danger)
if danger > max_danger_score:
max_danger_score = danger
max_danger_location = room_id
# Check specific hazards
if sensor.fire_detected:
has_fire = True
risk_factors.append(f"Fire detected in {room_id}")
if room and room.has_oxygen_cylinder:
has_oxygen_hazard = True
# Increase danger if there's heat near oxygen
if sensor.temperature > 40 or sensor.fire_detected:
danger_scores[-1] += 15 # Add explosion risk
risk_factors.append(f"Oxygen cylinder explosion risk in {room_id}")
else:
risk_factors.append(f"Oxygen cylinder present in {room_id}")
if sensor.smoke_level > 0.5:
risk_factors.append(f"Heavy smoke in {room_id}")
if sensor.temperature > 60:
risk_factors.append(f"High temperature ({sensor.temperature:.1f}°C) in {room_id}")
if sensor.oxygen_level < 19.5:
risk_factors.append(f"Low oxygen ({sensor.oxygen_level:.1f}%) in {room_id}")
# NEW: Toxic gas warnings
if sensor.carbon_monoxide > 50:
risk_factors.append(f"TOXIC CO ({sensor.carbon_monoxide:.0f} ppm) in {room_id}")
if sensor.carbon_dioxide > 5000:
risk_factors.append(f"High CO2 ({sensor.carbon_dioxide:.0f} ppm) in {room_id}")
if sensor.hydrogen_cyanide > 20:
risk_factors.append(f"TOXIC HCN ({sensor.hydrogen_cyanide:.1f} ppm) in {room_id}")
# NEW: Flashover/backdraft warnings
if sensor.flashover_risk > 0.7:
risk_factors.append(f"CRITICAL: Flashover risk ({sensor.flashover_risk*100:.0f}%) in {room_id}")
if sensor.backdraft_risk > 0.6:
risk_factors.append(f"CRITICAL: Backdraft risk ({sensor.backdraft_risk*100:.0f}%) in {room_id}")
# NEW: Crowd density warnings
if sensor.occupancy_density > 0.8:
risk_factors.append(f"High crowd density ({sensor.occupancy_density*100:.0f}%) in {room_id}")
# NEW: Infrastructure failures
if not sensor.exit_accessible:
risk_factors.append(f"Exit BLOCKED in {room_id}")
if not sensor.stairwell_clear:
risk_factors.append(f"Stairwell BLOCKED in {room_id}")
if not sensor.emergency_lighting:
risk_factors.append(f"No emergency lighting in {room_id}")
# NEW: Time pressure
if sensor.time_since_fire_start > 300:
risk_factors.append(f"Time pressure: {sensor.time_since_fire_start//60} min elapsed")
if not sensor.is_passable():
risk_factors.append(f"Path blocked at {room_id}")
total_danger = sum(danger_scores)
avg_danger = total_danger / len(danger_scores) if danger_scores else 0
# Check if all segments are passable
passable = all(sensor_system.get_sensor_reading(rid).is_passable()
for rid in path if sensor_system.get_sensor_reading(rid))
return {
"total_danger": total_danger,
"max_danger": max_danger_score,
"max_danger_location": max_danger_location,
"avg_danger": avg_danger,
"path_length": len(path),
"has_fire": has_fire,
"has_oxygen_hazard": has_oxygen_hazard,
"passable": passable,
"risk_factors": risk_factors,
"danger_scores": danger_scores
}
@staticmethod
def get_risk_level(avg_danger: float) -> str:
"""Get risk level description"""
if avg_danger < 20:
return "LOW"
elif avg_danger < 40:
return "MODERATE"
elif avg_danger < 60:
return "HIGH"
else:
return "CRITICAL"
@staticmethod
def recommend_path(paths: List[Tuple[List[str], Dict]]) -> Optional[Tuple[List[str], Dict]]:
"""
Recommend the best path based on risk assessment
Args:
paths: List of (path, risk_assessment) tuples
Returns:
Best (path, risk_assessment) tuple or None
"""
if not paths:
return None
# Filter to only passable paths
passable_paths = [(p, r) for p, r in paths if r["passable"]]
if not passable_paths:
# No fully passable paths, return least dangerous
return min(paths, key=lambda x: x[1]["total_danger"])
# Score each path (lower is better)
def score_path(path_info):
path, risk = path_info
score = 0
# Heavily penalize fire
if risk["has_fire"]:
score += 100
# Add oxygen hazard risk (but less than fire)
if risk["has_oxygen_hazard"]:
score += 30
# Add danger scores
score += risk["total_danger"]
# Prefer shorter paths (slight preference)
score += risk["path_length"] * 2
# Penalize high max danger points
score += risk["max_danger"] * 0.5
return score
# Return path with lowest score
return min(passable_paths, key=score_path)
class PathFinder:
"""Find optimal evacuation paths using A* algorithm with risk assessment"""
def __init__(self, floor_plan: FloorPlan, sensor_system: SensorSystem):
self.floor_plan = floor_plan
self.sensor_system = sensor_system
def find_all_evacuation_routes(self, start: str) -> List[Tuple[str, List[str], Dict]]:
"""
Find evacuation routes to all exits
Returns: List of (exit_id, path, risk_assessment) tuples
"""
routes = []
exits = self.floor_plan.get_all_exits()
for exit_id in exits:
path = self.find_path(start, exit_id)
if path:
risk = RiskAssessment.calculate_path_risk(
path, self.sensor_system, self.floor_plan
)
routes.append((exit_id, path, risk))
return routes
def find_path(self, start: str, goal: str) -> Optional[List[str]]:
"""
Find path from start to goal using A* with risk-based costs
Returns: List of room IDs representing the path, or None if no path exists
"""
if start not in self.floor_plan.rooms or goal not in self.floor_plan.rooms:
return None
# Priority queue: (f_cost, node)
open_set = []
closed_set = set()
# Initialize start node
start_node = PathNode(start, 0, self._heuristic(start, goal))
heapq.heappush(open_set, start_node)
# Track best g_cost for each room
g_costs = {start: 0}
while open_set:
current = heapq.heappop(open_set)
# Goal reached
if current.room_id == goal:
return self._reconstruct_path(current)
# Skip if already processed
if current.room_id in closed_set:
continue
closed_set.add(current.room_id)
# Explore neighbors
neighbors = self.floor_plan.get_neighbors(current.room_id)
for neighbor_id, base_distance in neighbors:
if neighbor_id in closed_set:
continue
# Calculate risk-adjusted cost
risk_cost = self._calculate_risk_cost(neighbor_id)
tentative_g = current.g_cost + base_distance + risk_cost
# If this path is better, add to open set
if neighbor_id not in g_costs or tentative_g < g_costs[neighbor_id]:
g_costs[neighbor_id] = tentative_g
h_cost = self._heuristic(neighbor_id, goal)
neighbor_node = PathNode(neighbor_id, tentative_g, h_cost, current)
heapq.heappush(open_set, neighbor_node)
# No path found
return None
def _heuristic(self, room_id1: str, room_id2: str) -> float:
"""
Heuristic function for A* (Manhattan distance between room positions)
"""
room1 = self.floor_plan.get_room(room_id1)
room2 = self.floor_plan.get_room(room_id2)
if not room1 or not room2:
return 0
x1, y1 = room1.position
x2, y2 = room2.position
return abs(x2 - x1) + abs(y2 - y1)
def _calculate_risk_cost(self, room_id: str) -> float:
"""
Calculate risk-based cost for traversing a room with all real-world factors
Higher danger = higher cost
"""
sensor = self.sensor_system.get_sensor_reading(room_id)
if not sensor:
return 0.0
danger_score = sensor.calculate_danger_score()
# Convert danger score to cost multiplier
# Danger 0-20: minimal cost
# Danger 20-50: moderate cost
# Danger 50+: high cost
risk_cost = danger_score / 10.0
# Extra penalty for fire
if sensor.fire_detected:
risk_cost += 20.0
# Extra penalty for oxygen cylinder in dangerous conditions
room = self.floor_plan.get_room(room_id)
if room and room.has_oxygen_cylinder:
if sensor.temperature > 40 or sensor.fire_detected:
risk_cost += 10.0
# NEW: Extra penalties for critical factors
# Toxic gas penalty
if sensor.carbon_monoxide > 50:
risk_cost += 15.0
if sensor.carbon_dioxide > 5000:
risk_cost += 10.0
# Flashover risk penalty
if sensor.flashover_risk > 0.7:
risk_cost += 25.0
elif sensor.flashover_risk > 0.5:
risk_cost += 15.0
# Exit blockage penalty
if not sensor.exit_accessible:
risk_cost += 30.0
if not sensor.stairwell_clear:
risk_cost += 20.0
# Crowd density penalty (slows movement)
if sensor.occupancy_density > 0.8:
risk_cost += sensor.occupancy_density * 20.0
# Infrastructure failure penalty
if not sensor.emergency_lighting:
risk_cost += 5.0
# Make impassable areas very expensive (but not infinite)
if not sensor.is_passable():
risk_cost += 50.0
return risk_cost
def _reconstruct_path(self, node: PathNode) -> List[str]:
"""Reconstruct path from goal node back to start"""
path = []
current = node
while current:
path.append(current.room_id)
current = current.parent
return list(reversed(path))