Spaces:
Runtime error
Runtime error
| """ | |
| Dynamic Simulation Module - Real-time fire spread and sensor updates | |
| Simulates how fire conditions change over time and updates evacuation recommendations | |
| """ | |
| import random | |
| import time | |
| from typing import Dict, List, Tuple | |
| from .floor_plan import FloorPlan | |
| from .sensor_system import SensorSystem, SensorReading | |
| from .pathfinding import PathFinder, RiskAssessment | |
| class FireSpreadSimulator: | |
| """Simulates dynamic fire spread and environmental changes""" | |
| def __init__(self, floor_plan: FloorPlan, sensor_system: SensorSystem): | |
| self.floor_plan = floor_plan | |
| self.sensor_system = sensor_system | |
| self.time_step = 0 | |
| self.fire_sources = [] # List of rooms where fire started | |
| def initialize_fire(self, fire_locations: List[str]): | |
| """Initialize fire at specific locations with all real-world factors""" | |
| self.fire_sources = fire_locations | |
| for location in fire_locations: | |
| if location in self.sensor_system.sensors: | |
| # Generate comprehensive mock data for fire location | |
| self.sensor_system.update_sensor( | |
| location, | |
| # Basic factors | |
| fire_detected=True, | |
| smoke_level=random.uniform(0.7, 0.9), | |
| temperature=random.uniform(150, 250), | |
| oxygen_level=random.uniform(14, 16), | |
| visibility=random.uniform(5, 15), | |
| structural_integrity=random.uniform(60, 80), | |
| # Fire-specific factors | |
| fire_growth_rate=random.uniform(5, 15), # m²/min | |
| flashover_risk=random.uniform(0.3, 0.6), | |
| backdraft_risk=random.uniform(0.2, 0.4), | |
| heat_radiation=random.uniform(3, 8), # kW/m² | |
| fire_type=random.choice(["wood", "electrical", "chemical"]), | |
| # Toxic gases (high in fire areas) | |
| carbon_monoxide=random.uniform(50, 200), # ppm | |
| carbon_dioxide=random.uniform(5000, 15000), # ppm | |
| hydrogen_cyanide=random.uniform(10, 50), # ppm | |
| hydrogen_chloride=random.uniform(5, 20), # ppm | |
| # Environmental (affected by fire) | |
| wind_direction=random.uniform(0, 360), | |
| wind_speed=random.uniform(2, 8), # m/s | |
| air_pressure=random.uniform(1000, 1020), | |
| humidity=random.uniform(30, 60), | |
| # Human factors | |
| occupancy_density=random.uniform(0.3, 0.7), | |
| mobility_limitations=random.randint(0, 3), | |
| panic_level=random.uniform(0.6, 0.9), | |
| evacuation_progress=0.0, | |
| # Infrastructure (may fail near fire) | |
| sprinkler_active=random.choice([True, False]), | |
| emergency_lighting=random.choice([True, False]), | |
| elevator_available=False, # Elevators disabled in fire | |
| stairwell_clear=random.choice([True, False]), | |
| exit_accessible=random.choice([True, False]), | |
| exit_capacity=random.randint(50, 150), | |
| ventilation_active=random.choice([True, False]), | |
| # Time-based | |
| time_since_fire_start=0, | |
| estimated_time_to_exit=random.randint(60, 300), | |
| # Communication | |
| emergency_comm_working=random.choice([True, False]), | |
| wifi_signal_strength=random.uniform(40, 80), | |
| # External | |
| weather_temperature=random.uniform(15, 25), | |
| weather_rain=random.choice([True, False]), | |
| time_of_day=random.randint(8, 18), | |
| day_of_week=random.randint(0, 6) | |
| ) | |
| def update_simulation(self, intensity: float = 1.0): | |
| """ | |
| Update fire conditions for one time step | |
| Args: | |
| intensity: Fire spread intensity (0.5 = slow, 1.0 = normal, 2.0 = fast) | |
| """ | |
| self.time_step += 1 | |
| # Update time for all sensors | |
| for sensor in self.sensor_system.sensors.values(): | |
| sensor.time_since_fire_start = self.time_step * 30 # 30 seconds per step | |
| # 1. Intensify existing fires | |
| self._intensify_fires(intensity) | |
| # 2. Spread fire to adjacent rooms | |
| self._spread_fire(intensity) | |
| # 3. Spread smoke further | |
| self._spread_smoke(intensity) | |
| # 4. Update structural integrity | |
| self._update_structures(intensity) | |
| # 5. Update evacuation progress | |
| self._update_evacuation_progress(intensity) | |
| def _intensify_fires(self, intensity: float): | |
| """Make existing fires worse over time with all factors""" | |
| for location_id, sensor in self.sensor_system.sensors.items(): | |
| if sensor.fire_detected: | |
| # Fire gets hotter | |
| temp_increase = random.uniform(5, 15) * intensity | |
| new_temp = min(sensor.temperature + temp_increase, 300) | |
| # More smoke | |
| smoke_increase = random.uniform(0.02, 0.05) * intensity | |
| new_smoke = min(sensor.smoke_level + smoke_increase, 1.0) | |
| # Less oxygen | |
| oxygen_decrease = random.uniform(0.2, 0.5) * intensity | |
| new_oxygen = max(sensor.oxygen_level - oxygen_decrease, 10.0) | |
| # Worse visibility | |
| visibility_decrease = random.uniform(1, 3) * intensity | |
| new_visibility = max(sensor.visibility - visibility_decrease, 0.0) | |
| # Structural damage | |
| integrity_decrease = random.uniform(1, 3) * intensity | |
| new_integrity = max(sensor.structural_integrity - integrity_decrease, 30.0) | |
| # Update all fire-related factors | |
| self.sensor_system.update_sensor( | |
| location_id, | |
| temperature=new_temp, | |
| smoke_level=new_smoke, | |
| oxygen_level=new_oxygen, | |
| visibility=new_visibility, | |
| structural_integrity=new_integrity, | |
| # Fire growth increases | |
| fire_growth_rate=min(sensor.fire_growth_rate + random.uniform(0.5, 2) * intensity, 20), | |
| flashover_risk=min(sensor.flashover_risk + random.uniform(0.02, 0.05) * intensity, 1.0), | |
| backdraft_risk=min(sensor.backdraft_risk + random.uniform(0.01, 0.03) * intensity, 1.0), | |
| heat_radiation=min(sensor.heat_radiation + random.uniform(0.2, 0.5) * intensity, 15), | |
| # Toxic gases increase | |
| carbon_monoxide=min(sensor.carbon_monoxide + random.uniform(5, 15) * intensity, 500), | |
| carbon_dioxide=min(sensor.carbon_dioxide + random.uniform(200, 500) * intensity, 20000), | |
| hydrogen_cyanide=min(sensor.hydrogen_cyanide + random.uniform(1, 3) * intensity, 100), | |
| hydrogen_chloride=min(sensor.hydrogen_chloride + random.uniform(0.5, 2) * intensity, 50), | |
| # Time increases | |
| time_since_fire_start=sensor.time_since_fire_start + 30, # 30 seconds per step | |
| # Panic increases | |
| panic_level=min(sensor.panic_level + random.uniform(0.01, 0.03) * intensity, 1.0), | |
| # Infrastructure may fail | |
| emergency_lighting=random.random() > 0.1, # 10% chance of failure | |
| exit_accessible=random.random() > 0.15, # 15% chance of blockage | |
| stairwell_clear=random.random() > 0.2 # 20% chance of blockage | |
| ) | |
| def _spread_fire(self, intensity: float): | |
| """Fire spreads to adjacent rooms based on conditions""" | |
| new_fires = [] | |
| for location_id, sensor in self.sensor_system.sensors.items(): | |
| if sensor.fire_detected: | |
| # Get adjacent rooms | |
| neighbors = self.floor_plan.get_neighbors(location_id) | |
| for neighbor_id, _ in neighbors: | |
| neighbor_sensor = self.sensor_system.get_sensor_reading(neighbor_id) | |
| if neighbor_sensor and not neighbor_sensor.fire_detected: | |
| # Chance of fire spreading based on conditions | |
| spread_chance = 0.15 * intensity # Base 15% chance per time step | |
| # Higher chance if already hot or smoky | |
| if neighbor_sensor.temperature > 80: | |
| spread_chance += 0.2 | |
| if neighbor_sensor.smoke_level > 0.5: | |
| spread_chance += 0.15 | |
| # Check if oxygen cylinder present (explosion!) | |
| room = self.floor_plan.get_room(neighbor_id) | |
| if room and room.has_oxygen_cylinder and neighbor_sensor.temperature > 60: | |
| spread_chance += 0.4 # Much higher chance! | |
| if random.random() < spread_chance: | |
| new_fires.append(neighbor_id) | |
| # Apply new fires | |
| for location_id in new_fires: | |
| self.sensor_system.update_sensor( | |
| location_id, | |
| fire_detected=True, | |
| smoke_level=random.uniform(0.6, 0.8), | |
| temperature=random.uniform(120, 180), | |
| oxygen_level=random.uniform(15, 17), | |
| visibility=random.uniform(15, 30) | |
| ) | |
| self.fire_sources.append(location_id) | |
| def _spread_smoke(self, intensity: float): | |
| """Smoke and toxic gases spread to all connected areas""" | |
| for location_id, sensor in self.sensor_system.sensors.items(): | |
| # Rooms with fire or high smoke affect neighbors | |
| if sensor.fire_detected or sensor.smoke_level > 0.3: | |
| neighbors = self.floor_plan.get_neighbors(location_id) | |
| for neighbor_id, _ in neighbors: | |
| neighbor_sensor = self.sensor_system.get_sensor_reading(neighbor_id) | |
| if neighbor_sensor and not neighbor_sensor.fire_detected: | |
| # Smoke drifts to adjacent areas | |
| smoke_increase = random.uniform(0.03, 0.08) * intensity | |
| new_smoke = min(neighbor_sensor.smoke_level + smoke_increase, 1.0) | |
| # Temperature rises slightly | |
| temp_increase = random.uniform(2, 8) * intensity | |
| new_temp = min(neighbor_sensor.temperature + temp_increase, 100) | |
| # Oxygen decreases | |
| oxygen_decrease = random.uniform(0.1, 0.3) * intensity | |
| new_oxygen = max(neighbor_sensor.oxygen_level - oxygen_decrease, 16.0) | |
| # Visibility decreases | |
| visibility_decrease = random.uniform(2, 5) * intensity | |
| new_visibility = max(neighbor_sensor.visibility - visibility_decrease, 10.0) | |
| # Toxic gases spread (reduced concentration) | |
| co_spread = sensor.carbon_monoxide * 0.1 * intensity | |
| co2_spread = sensor.carbon_dioxide * 0.05 * intensity | |
| hcn_spread = sensor.hydrogen_cyanide * 0.15 * intensity | |
| hcl_spread = sensor.hydrogen_chloride * 0.2 * intensity | |
| # Heat radiation spreads | |
| heat_spread = sensor.heat_radiation * 0.3 * intensity | |
| self.sensor_system.update_sensor( | |
| neighbor_id, | |
| smoke_level=new_smoke, | |
| temperature=new_temp, | |
| oxygen_level=new_oxygen, | |
| visibility=new_visibility, | |
| # Toxic gases (accumulate) | |
| carbon_monoxide=min(neighbor_sensor.carbon_monoxide + co_spread, 200), | |
| carbon_dioxide=min(neighbor_sensor.carbon_dioxide + co2_spread, 10000), | |
| hydrogen_cyanide=min(neighbor_sensor.hydrogen_cyanide + hcn_spread, 50), | |
| hydrogen_chloride=min(neighbor_sensor.hydrogen_chloride + hcl_spread, 30), | |
| # Heat radiation | |
| heat_radiation=min(neighbor_sensor.heat_radiation + heat_spread, 5), | |
| # Flashover risk increases slightly | |
| flashover_risk=min(neighbor_sensor.flashover_risk + random.uniform(0.01, 0.02) * intensity, 0.5), | |
| # Occupancy may increase (people moving away) | |
| occupancy_density=max(neighbor_sensor.occupancy_density - random.uniform(0.05, 0.15), 0.0), | |
| panic_level=min(neighbor_sensor.panic_level + random.uniform(0.02, 0.05) * intensity, 1.0) | |
| ) | |
| def _update_structures(self, intensity: float): | |
| """Update structural integrity based on fire exposure""" | |
| for location_id, sensor in self.sensor_system.sensors.items(): | |
| if sensor.fire_detected or sensor.temperature > 100: | |
| # Structural damage from heat | |
| damage = random.uniform(0.5, 2.0) * intensity | |
| new_integrity = max(sensor.structural_integrity - damage, 20.0) | |
| self.sensor_system.update_sensor( | |
| location_id, | |
| structural_integrity=new_integrity | |
| ) | |
| def _update_evacuation_progress(self, intensity: float): | |
| """Update evacuation progress and occupancy""" | |
| for location_id, sensor in self.sensor_system.sensors.items(): | |
| # People evacuate from rooms (occupancy decreases) | |
| if sensor.occupancy_density > 0: | |
| evacuation_rate = random.uniform(0.05, 0.15) * intensity | |
| new_occupancy = max(sensor.occupancy_density - evacuation_rate, 0.0) | |
| new_progress = min(sensor.evacuation_progress + evacuation_rate * 100, 100.0) | |
| self.sensor_system.update_sensor( | |
| location_id, | |
| occupancy_density=new_occupancy, | |
| evacuation_progress=new_progress | |
| ) | |
| # Exits may get more crowded as people arrive | |
| room = self.floor_plan.get_room(location_id) | |
| if room and room.room_type == "exit": | |
| # Exits get more crowded as people evacuate | |
| if sensor.occupancy_density < 0.8: | |
| arrival_rate = random.uniform(0.02, 0.08) * intensity | |
| new_occupancy = min(sensor.occupancy_density + arrival_rate, 0.9) | |
| self.sensor_system.update_sensor( | |
| location_id, | |
| occupancy_density=new_occupancy | |
| ) | |
| class DynamicEvacuationSystem: | |
| """Manages dynamic evacuation with changing conditions""" | |
| def __init__(self, floor_plan: FloorPlan): | |
| self.floor_plan = floor_plan | |
| self.sensor_system = SensorSystem(floor_plan) | |
| self.simulator = FireSpreadSimulator(floor_plan, self.sensor_system) | |
| self.pathfinder = PathFinder(floor_plan, self.sensor_system) | |
| self.current_recommendation = None | |
| self.recommendation_history = [] | |
| def initialize_scenario(self, fire_locations: List[str], | |
| affected_areas: Dict[str, Dict] = None): | |
| """Initialize the fire scenario""" | |
| # Set up initial conditions | |
| if affected_areas: | |
| for location, values in affected_areas.items(): | |
| if location in self.sensor_system.sensors: | |
| self.sensor_system.update_sensor(location, **values) | |
| # Initialize fires | |
| self.simulator.initialize_fire(fire_locations) | |
| # Get initial recommendation | |
| self._update_recommendation() | |
| def step(self, intensity: float = 1.0, start_location: str = "R1"): | |
| """ | |
| Advance simulation by one time step | |
| Args: | |
| intensity: Fire spread intensity | |
| start_location: Where person is evacuating from | |
| Returns: | |
| (time_step, routes, recommended_route, route_changed) | |
| """ | |
| # Update fire conditions | |
| self.simulator.update_simulation(intensity) | |
| # Recalculate best route | |
| previous_rec = self.current_recommendation | |
| self._update_recommendation(start_location) | |
| # Check if recommendation changed | |
| route_changed = False | |
| if previous_rec and self.current_recommendation: | |
| prev_path = previous_rec[0] if previous_rec else None | |
| curr_path = self.current_recommendation[0] if self.current_recommendation else None | |
| route_changed = (prev_path != curr_path) | |
| # Get all routes | |
| routes = self.pathfinder.find_all_evacuation_routes(start_location) | |
| return self.simulator.time_step, routes, self.current_recommendation, route_changed | |
| def _update_recommendation(self, start_location: str = "R1"): | |
| """Update the evacuation recommendation""" | |
| routes = self.pathfinder.find_all_evacuation_routes(start_location) | |
| if routes: | |
| recommended = RiskAssessment.recommend_path( | |
| [(path, risk) for _, path, risk in routes] | |
| ) | |
| self.current_recommendation = recommended | |
| # Track history | |
| if recommended: | |
| rec_path, rec_risk = recommended | |
| rec_exit = [e for e, p, _ in routes if p == rec_path][0] | |
| self.recommendation_history.append({ | |
| 'time_step': self.simulator.time_step, | |
| 'exit': rec_exit, | |
| 'path': rec_path, | |
| 'danger': rec_risk['avg_danger'] | |
| }) | |
| else: | |
| self.current_recommendation = None | |
| def get_status_summary(self, start_location: str = "R1"): | |
| """Get current status summary""" | |
| routes = self.pathfinder.find_all_evacuation_routes(start_location) | |
| summary = { | |
| 'time_step': self.simulator.time_step, | |
| 'total_fires': sum(1 for s in self.sensor_system.sensors.values() | |
| if s.fire_detected), | |
| 'passable_routes': sum(1 for _, _, r in routes if r['passable']), | |
| 'total_routes': len(routes), | |
| 'avg_danger': sum(r['avg_danger'] for _, _, r in routes) / len(routes) if routes else 100, | |
| } | |
| if self.current_recommendation: | |
| rec_path, rec_risk = self.current_recommendation | |
| rec_exit = [e for e, p, _ in routes if p == rec_path][0] | |
| summary['recommended_exit'] = rec_exit | |
| summary['recommended_danger'] = rec_risk['avg_danger'] | |
| else: | |
| summary['recommended_exit'] = None | |
| summary['recommended_danger'] = 100.0 | |
| return summary | |