""" Dynamic Simulation Module - Real-time fire spread and sensor updates Simulates how fire conditions change over time and updates evacuation recommendations """ import random import time from typing import Dict, List, Tuple from .floor_plan import FloorPlan from .sensor_system import SensorSystem, SensorReading from .pathfinding import PathFinder, RiskAssessment class FireSpreadSimulator: """Simulates dynamic fire spread and environmental changes""" def __init__(self, floor_plan: FloorPlan, sensor_system: SensorSystem): self.floor_plan = floor_plan self.sensor_system = sensor_system self.time_step = 0 self.fire_sources = [] # List of rooms where fire started def initialize_fire(self, fire_locations: List[str]): """Initialize fire at specific locations with all real-world factors""" self.fire_sources = fire_locations for location in fire_locations: if location in self.sensor_system.sensors: # Generate comprehensive mock data for fire location self.sensor_system.update_sensor( location, # Basic factors fire_detected=True, smoke_level=random.uniform(0.7, 0.9), temperature=random.uniform(150, 250), oxygen_level=random.uniform(14, 16), visibility=random.uniform(5, 15), structural_integrity=random.uniform(60, 80), # Fire-specific factors fire_growth_rate=random.uniform(5, 15), # m²/min flashover_risk=random.uniform(0.3, 0.6), backdraft_risk=random.uniform(0.2, 0.4), heat_radiation=random.uniform(3, 8), # kW/m² fire_type=random.choice(["wood", "electrical", "chemical"]), # Toxic gases (high in fire areas) carbon_monoxide=random.uniform(50, 200), # ppm carbon_dioxide=random.uniform(5000, 15000), # ppm hydrogen_cyanide=random.uniform(10, 50), # ppm hydrogen_chloride=random.uniform(5, 20), # ppm # Environmental (affected by fire) wind_direction=random.uniform(0, 360), wind_speed=random.uniform(2, 8), # m/s air_pressure=random.uniform(1000, 1020), humidity=random.uniform(30, 60), # Human factors occupancy_density=random.uniform(0.3, 0.7), mobility_limitations=random.randint(0, 3), panic_level=random.uniform(0.6, 0.9), evacuation_progress=0.0, # Infrastructure (may fail near fire) sprinkler_active=random.choice([True, False]), emergency_lighting=random.choice([True, False]), elevator_available=False, # Elevators disabled in fire stairwell_clear=random.choice([True, False]), exit_accessible=random.choice([True, False]), exit_capacity=random.randint(50, 150), ventilation_active=random.choice([True, False]), # Time-based time_since_fire_start=0, estimated_time_to_exit=random.randint(60, 300), # Communication emergency_comm_working=random.choice([True, False]), wifi_signal_strength=random.uniform(40, 80), # External weather_temperature=random.uniform(15, 25), weather_rain=random.choice([True, False]), time_of_day=random.randint(8, 18), day_of_week=random.randint(0, 6) ) def update_simulation(self, intensity: float = 1.0): """ Update fire conditions for one time step Args: intensity: Fire spread intensity (0.5 = slow, 1.0 = normal, 2.0 = fast) """ self.time_step += 1 # Update time for all sensors for sensor in self.sensor_system.sensors.values(): sensor.time_since_fire_start = self.time_step * 30 # 30 seconds per step # 1. Intensify existing fires self._intensify_fires(intensity) # 2. Spread fire to adjacent rooms self._spread_fire(intensity) # 3. Spread smoke further self._spread_smoke(intensity) # 4. Update structural integrity self._update_structures(intensity) # 5. Update evacuation progress self._update_evacuation_progress(intensity) def _intensify_fires(self, intensity: float): """Make existing fires worse over time with all factors""" for location_id, sensor in self.sensor_system.sensors.items(): if sensor.fire_detected: # Fire gets hotter temp_increase = random.uniform(5, 15) * intensity new_temp = min(sensor.temperature + temp_increase, 300) # More smoke smoke_increase = random.uniform(0.02, 0.05) * intensity new_smoke = min(sensor.smoke_level + smoke_increase, 1.0) # Less oxygen oxygen_decrease = random.uniform(0.2, 0.5) * intensity new_oxygen = max(sensor.oxygen_level - oxygen_decrease, 10.0) # Worse visibility visibility_decrease = random.uniform(1, 3) * intensity new_visibility = max(sensor.visibility - visibility_decrease, 0.0) # Structural damage integrity_decrease = random.uniform(1, 3) * intensity new_integrity = max(sensor.structural_integrity - integrity_decrease, 30.0) # Update all fire-related factors self.sensor_system.update_sensor( location_id, temperature=new_temp, smoke_level=new_smoke, oxygen_level=new_oxygen, visibility=new_visibility, structural_integrity=new_integrity, # Fire growth increases fire_growth_rate=min(sensor.fire_growth_rate + random.uniform(0.5, 2) * intensity, 20), flashover_risk=min(sensor.flashover_risk + random.uniform(0.02, 0.05) * intensity, 1.0), backdraft_risk=min(sensor.backdraft_risk + random.uniform(0.01, 0.03) * intensity, 1.0), heat_radiation=min(sensor.heat_radiation + random.uniform(0.2, 0.5) * intensity, 15), # Toxic gases increase carbon_monoxide=min(sensor.carbon_monoxide + random.uniform(5, 15) * intensity, 500), carbon_dioxide=min(sensor.carbon_dioxide + random.uniform(200, 500) * intensity, 20000), hydrogen_cyanide=min(sensor.hydrogen_cyanide + random.uniform(1, 3) * intensity, 100), hydrogen_chloride=min(sensor.hydrogen_chloride + random.uniform(0.5, 2) * intensity, 50), # Time increases time_since_fire_start=sensor.time_since_fire_start + 30, # 30 seconds per step # Panic increases panic_level=min(sensor.panic_level + random.uniform(0.01, 0.03) * intensity, 1.0), # Infrastructure may fail emergency_lighting=random.random() > 0.1, # 10% chance of failure exit_accessible=random.random() > 0.15, # 15% chance of blockage stairwell_clear=random.random() > 0.2 # 20% chance of blockage ) def _spread_fire(self, intensity: float): """Fire spreads to adjacent rooms based on conditions""" new_fires = [] for location_id, sensor in self.sensor_system.sensors.items(): if sensor.fire_detected: # Get adjacent rooms neighbors = self.floor_plan.get_neighbors(location_id) for neighbor_id, _ in neighbors: neighbor_sensor = self.sensor_system.get_sensor_reading(neighbor_id) if neighbor_sensor and not neighbor_sensor.fire_detected: # Chance of fire spreading based on conditions spread_chance = 0.15 * intensity # Base 15% chance per time step # Higher chance if already hot or smoky if neighbor_sensor.temperature > 80: spread_chance += 0.2 if neighbor_sensor.smoke_level > 0.5: spread_chance += 0.15 # Check if oxygen cylinder present (explosion!) room = self.floor_plan.get_room(neighbor_id) if room and room.has_oxygen_cylinder and neighbor_sensor.temperature > 60: spread_chance += 0.4 # Much higher chance! if random.random() < spread_chance: new_fires.append(neighbor_id) # Apply new fires for location_id in new_fires: self.sensor_system.update_sensor( location_id, fire_detected=True, smoke_level=random.uniform(0.6, 0.8), temperature=random.uniform(120, 180), oxygen_level=random.uniform(15, 17), visibility=random.uniform(15, 30) ) self.fire_sources.append(location_id) def _spread_smoke(self, intensity: float): """Smoke and toxic gases spread to all connected areas""" for location_id, sensor in self.sensor_system.sensors.items(): # Rooms with fire or high smoke affect neighbors if sensor.fire_detected or sensor.smoke_level > 0.3: neighbors = self.floor_plan.get_neighbors(location_id) for neighbor_id, _ in neighbors: neighbor_sensor = self.sensor_system.get_sensor_reading(neighbor_id) if neighbor_sensor and not neighbor_sensor.fire_detected: # Smoke drifts to adjacent areas smoke_increase = random.uniform(0.03, 0.08) * intensity new_smoke = min(neighbor_sensor.smoke_level + smoke_increase, 1.0) # Temperature rises slightly temp_increase = random.uniform(2, 8) * intensity new_temp = min(neighbor_sensor.temperature + temp_increase, 100) # Oxygen decreases oxygen_decrease = random.uniform(0.1, 0.3) * intensity new_oxygen = max(neighbor_sensor.oxygen_level - oxygen_decrease, 16.0) # Visibility decreases visibility_decrease = random.uniform(2, 5) * intensity new_visibility = max(neighbor_sensor.visibility - visibility_decrease, 10.0) # Toxic gases spread (reduced concentration) co_spread = sensor.carbon_monoxide * 0.1 * intensity co2_spread = sensor.carbon_dioxide * 0.05 * intensity hcn_spread = sensor.hydrogen_cyanide * 0.15 * intensity hcl_spread = sensor.hydrogen_chloride * 0.2 * intensity # Heat radiation spreads heat_spread = sensor.heat_radiation * 0.3 * intensity self.sensor_system.update_sensor( neighbor_id, smoke_level=new_smoke, temperature=new_temp, oxygen_level=new_oxygen, visibility=new_visibility, # Toxic gases (accumulate) carbon_monoxide=min(neighbor_sensor.carbon_monoxide + co_spread, 200), carbon_dioxide=min(neighbor_sensor.carbon_dioxide + co2_spread, 10000), hydrogen_cyanide=min(neighbor_sensor.hydrogen_cyanide + hcn_spread, 50), hydrogen_chloride=min(neighbor_sensor.hydrogen_chloride + hcl_spread, 30), # Heat radiation heat_radiation=min(neighbor_sensor.heat_radiation + heat_spread, 5), # Flashover risk increases slightly flashover_risk=min(neighbor_sensor.flashover_risk + random.uniform(0.01, 0.02) * intensity, 0.5), # Occupancy may increase (people moving away) occupancy_density=max(neighbor_sensor.occupancy_density - random.uniform(0.05, 0.15), 0.0), panic_level=min(neighbor_sensor.panic_level + random.uniform(0.02, 0.05) * intensity, 1.0) ) def _update_structures(self, intensity: float): """Update structural integrity based on fire exposure""" for location_id, sensor in self.sensor_system.sensors.items(): if sensor.fire_detected or sensor.temperature > 100: # Structural damage from heat damage = random.uniform(0.5, 2.0) * intensity new_integrity = max(sensor.structural_integrity - damage, 20.0) self.sensor_system.update_sensor( location_id, structural_integrity=new_integrity ) def _update_evacuation_progress(self, intensity: float): """Update evacuation progress and occupancy""" for location_id, sensor in self.sensor_system.sensors.items(): # People evacuate from rooms (occupancy decreases) if sensor.occupancy_density > 0: evacuation_rate = random.uniform(0.05, 0.15) * intensity new_occupancy = max(sensor.occupancy_density - evacuation_rate, 0.0) new_progress = min(sensor.evacuation_progress + evacuation_rate * 100, 100.0) self.sensor_system.update_sensor( location_id, occupancy_density=new_occupancy, evacuation_progress=new_progress ) # Exits may get more crowded as people arrive room = self.floor_plan.get_room(location_id) if room and room.room_type == "exit": # Exits get more crowded as people evacuate if sensor.occupancy_density < 0.8: arrival_rate = random.uniform(0.02, 0.08) * intensity new_occupancy = min(sensor.occupancy_density + arrival_rate, 0.9) self.sensor_system.update_sensor( location_id, occupancy_density=new_occupancy ) class DynamicEvacuationSystem: """Manages dynamic evacuation with changing conditions""" def __init__(self, floor_plan: FloorPlan): self.floor_plan = floor_plan self.sensor_system = SensorSystem(floor_plan) self.simulator = FireSpreadSimulator(floor_plan, self.sensor_system) self.pathfinder = PathFinder(floor_plan, self.sensor_system) self.current_recommendation = None self.recommendation_history = [] def initialize_scenario(self, fire_locations: List[str], affected_areas: Dict[str, Dict] = None): """Initialize the fire scenario""" # Set up initial conditions if affected_areas: for location, values in affected_areas.items(): if location in self.sensor_system.sensors: self.sensor_system.update_sensor(location, **values) # Initialize fires self.simulator.initialize_fire(fire_locations) # Get initial recommendation self._update_recommendation() def step(self, intensity: float = 1.0, start_location: str = "R1"): """ Advance simulation by one time step Args: intensity: Fire spread intensity start_location: Where person is evacuating from Returns: (time_step, routes, recommended_route, route_changed) """ # Update fire conditions self.simulator.update_simulation(intensity) # Recalculate best route previous_rec = self.current_recommendation self._update_recommendation(start_location) # Check if recommendation changed route_changed = False if previous_rec and self.current_recommendation: prev_path = previous_rec[0] if previous_rec else None curr_path = self.current_recommendation[0] if self.current_recommendation else None route_changed = (prev_path != curr_path) # Get all routes routes = self.pathfinder.find_all_evacuation_routes(start_location) return self.simulator.time_step, routes, self.current_recommendation, route_changed def _update_recommendation(self, start_location: str = "R1"): """Update the evacuation recommendation""" routes = self.pathfinder.find_all_evacuation_routes(start_location) if routes: recommended = RiskAssessment.recommend_path( [(path, risk) for _, path, risk in routes] ) self.current_recommendation = recommended # Track history if recommended: rec_path, rec_risk = recommended rec_exit = [e for e, p, _ in routes if p == rec_path][0] self.recommendation_history.append({ 'time_step': self.simulator.time_step, 'exit': rec_exit, 'path': rec_path, 'danger': rec_risk['avg_danger'] }) else: self.current_recommendation = None def get_status_summary(self, start_location: str = "R1"): """Get current status summary""" routes = self.pathfinder.find_all_evacuation_routes(start_location) summary = { 'time_step': self.simulator.time_step, 'total_fires': sum(1 for s in self.sensor_system.sensors.values() if s.fire_detected), 'passable_routes': sum(1 for _, _, r in routes if r['passable']), 'total_routes': len(routes), 'avg_danger': sum(r['avg_danger'] for _, _, r in routes) / len(routes) if routes else 100, } if self.current_recommendation: rec_path, rec_risk = self.current_recommendation rec_exit = [e for e, p, _ in routes if p == rec_path][0] summary['recommended_exit'] = rec_exit summary['recommended_danger'] = rec_risk['avg_danger'] else: summary['recommended_exit'] = None summary['recommended_danger'] = 100.0 return summary