Spaces:
Runtime error
Runtime error
| """ | |
| Sensor System Module - Simulates and manages sensor data for fire, smoke, temperature, oxygen | |
| """ | |
| import random | |
| import numpy as np | |
| from typing import Dict, List | |
| from .floor_plan import FloorPlan | |
| class SensorReading: | |
| """Represents sensor readings for a specific location""" | |
| def __init__(self, location_id: str): | |
| self.location_id = location_id | |
| # Basic factors (existing) | |
| self.fire_detected = False | |
| self.smoke_level = 0.0 # 0.0 to 1.0 (0 = no smoke, 1 = heavy smoke) | |
| self.temperature = 20.0 # Celsius | |
| self.oxygen_level = 21.0 # Percentage (normal is ~21%) | |
| self.visibility = 100.0 # Percentage (100 = clear, 0 = no visibility) | |
| self.structural_integrity = 100.0 # Percentage | |
| # NEW: Fire-specific factors | |
| self.fire_growth_rate = 0.0 # m²/min | |
| self.flashover_risk = 0.0 # 0.0 to 1.0 | |
| self.backdraft_risk = 0.0 # 0.0 to 1.0 | |
| self.heat_radiation = 0.0 # kW/m² | |
| self.fire_type = "none" # electrical, chemical, wood, etc. | |
| # NEW: Toxic gas detection | |
| self.carbon_monoxide = 0.0 # ppm (normal < 9) | |
| self.carbon_dioxide = 400.0 # ppm (normal ~400) | |
| self.hydrogen_cyanide = 0.0 # ppm | |
| self.hydrogen_chloride = 0.0 # ppm | |
| # NEW: Environmental factors | |
| self.wind_direction = 0.0 # degrees | |
| self.wind_speed = 0.0 # m/s | |
| self.air_pressure = 1013.25 # hPa (normal) | |
| self.humidity = 50.0 # Percentage | |
| # NEW: Human factors | |
| self.occupancy_density = 0.0 # 0.0 to 1.0 (0 = empty, 1 = full) | |
| self.mobility_limitations = 0 # Count of people with mobility issues | |
| self.panic_level = 0.0 # 0.0 to 1.0 | |
| self.evacuation_progress = 0.0 # Percentage evacuated | |
| # NEW: Building infrastructure | |
| self.sprinkler_active = False | |
| self.emergency_lighting = True | |
| self.elevator_available = True | |
| self.stairwell_clear = True | |
| self.exit_accessible = True | |
| self.exit_capacity = 100 # people per minute | |
| self.ventilation_active = True | |
| # NEW: Time-based factors | |
| self.time_since_fire_start = 0 # seconds | |
| self.estimated_time_to_exit = 0 # seconds | |
| # NEW: Communication | |
| self.emergency_comm_working = True | |
| self.wifi_signal_strength = 100.0 # Percentage | |
| # NEW: External factors | |
| self.weather_temperature = 20.0 # Celsius | |
| self.weather_rain = False | |
| self.time_of_day = 12 # 0-23 hours | |
| self.day_of_week = 1 # 0=Monday, 6=Sunday | |
| def calculate_danger_score(self) -> float: | |
| """ | |
| Calculate enhanced danger score with all real-world factors | |
| Returns: 0.0 (safe) to 100.0 (extremely dangerous) | |
| """ | |
| score = 0.0 | |
| # === BASIC FACTORS (Existing) === | |
| # Fire presence - highest priority | |
| if self.fire_detected: | |
| score += 40.0 | |
| # Smoke level | |
| score += self.smoke_level * 20.0 | |
| # Temperature (dangerous above 60°C) | |
| if self.temperature > 60: | |
| score += min((self.temperature - 60) / 2, 20.0) | |
| # Low oxygen | |
| if self.oxygen_level < 19.5: | |
| score += (19.5 - self.oxygen_level) * 5.0 | |
| # Visibility | |
| score += (100 - self.visibility) * 0.1 | |
| # Structural integrity | |
| if self.structural_integrity < 80: | |
| score += (100 - self.structural_integrity) * 0.2 | |
| # === NEW: TOXIC GAS RISK === | |
| # Carbon Monoxide (deadly above 50 ppm) | |
| if self.carbon_monoxide > 50: | |
| score += min(self.carbon_monoxide / 5, 30.0) | |
| elif self.carbon_monoxide > 9: | |
| score += (self.carbon_monoxide - 9) * 0.5 | |
| # Carbon Dioxide (dangerous above 5000 ppm) | |
| if self.carbon_dioxide > 5000: | |
| score += min((self.carbon_dioxide - 5000) / 200, 20.0) | |
| # Hydrogen Cyanide (deadly above 20 ppm) | |
| if self.hydrogen_cyanide > 20: | |
| score += min(self.hydrogen_cyanide * 1.5, 25.0) | |
| # Hydrogen Chloride (dangerous above 5 ppm) | |
| if self.hydrogen_chloride > 5: | |
| score += min(self.hydrogen_chloride * 2, 15.0) | |
| # === NEW: FLASHOVER RISK === | |
| if self.flashover_risk > 0.7: | |
| score += 25.0 | |
| elif self.flashover_risk > 0.5: | |
| score += 15.0 | |
| elif self.flashover_risk > 0.3: | |
| score += 8.0 | |
| # === NEW: BACKDRAFT RISK === | |
| if self.backdraft_risk > 0.6: | |
| score += 20.0 | |
| elif self.backdraft_risk > 0.4: | |
| score += 10.0 | |
| # === NEW: CROWD DENSITY PENALTY === | |
| if self.occupancy_density > 0.8: | |
| score += (self.occupancy_density - 0.8) * 75.0 | |
| elif self.occupancy_density > 0.6: | |
| score += (self.occupancy_density - 0.6) * 30.0 | |
| # === NEW: EXIT BLOCKAGE === | |
| if not self.exit_accessible: | |
| score += 20.0 | |
| if not self.stairwell_clear: | |
| score += 15.0 | |
| # === NEW: TIME PRESSURE === | |
| if self.time_since_fire_start > 300: # 5 minutes | |
| score += min((self.time_since_fire_start - 300) / 20, 15.0) | |
| # === NEW: INFRASTRUCTURE FAILURES === | |
| if not self.emergency_lighting: | |
| score += 5.0 | |
| if not self.emergency_comm_working: | |
| score += 3.0 | |
| # === NEW: FIRE GROWTH RATE === | |
| if self.fire_growth_rate > 10: # m²/min | |
| score += min(self.fire_growth_rate / 2, 12.0) | |
| # === NEW: HEAT RADIATION === | |
| if self.heat_radiation > 2.5: # kW/m² (dangerous) | |
| score += min((self.heat_radiation - 2.5) * 3, 10.0) | |
| return min(score, 100.0) | |
| def is_passable(self) -> bool: | |
| """Determine if this location is passable""" | |
| danger = self.calculate_danger_score() | |
| # Consider passable if danger score is below 70 | |
| return danger < 70.0 and self.structural_integrity > 50.0 | |
| def __repr__(self): | |
| return (f"SensorReading({self.location_id}: " | |
| f"Fire={self.fire_detected}, " | |
| f"Smoke={self.smoke_level:.2f}, " | |
| f"Temp={self.temperature:.1f}°C, " | |
| f"Danger={self.calculate_danger_score():.1f})") | |
| class SensorSystem: | |
| """Manages all sensors in the building""" | |
| def __init__(self, floor_plan: FloorPlan): | |
| self.floor_plan = floor_plan | |
| self.sensors: Dict[str, SensorReading] = {} | |
| self._initialize_sensors() | |
| def _initialize_sensors(self): | |
| """Initialize sensors for all rooms with realistic mock data""" | |
| import random | |
| for room_id in self.floor_plan.rooms: | |
| sensor = SensorReading(room_id) | |
| # Initialize all rooms with realistic baseline mock data | |
| sensor.fire_detected = False | |
| sensor.smoke_level = random.uniform(0.0, 0.1) | |
| sensor.temperature = random.uniform(18, 25) | |
| sensor.oxygen_level = random.uniform(20.5, 21.0) | |
| sensor.visibility = random.uniform(95, 100) | |
| sensor.structural_integrity = 100.0 | |
| # Fire factors (none initially) | |
| sensor.fire_growth_rate = 0.0 | |
| sensor.flashover_risk = 0.0 | |
| sensor.backdraft_risk = 0.0 | |
| sensor.heat_radiation = 0.0 | |
| sensor.fire_type = "none" | |
| # Toxic gases (normal levels) | |
| sensor.carbon_monoxide = random.uniform(0, 5) # Normal < 9 ppm | |
| sensor.carbon_dioxide = random.uniform(350, 450) # Normal ~400 ppm | |
| sensor.hydrogen_cyanide = 0.0 | |
| sensor.hydrogen_chloride = 0.0 | |
| # Environmental (normal conditions) | |
| sensor.wind_direction = random.uniform(0, 360) | |
| sensor.wind_speed = random.uniform(0, 3) # m/s | |
| sensor.air_pressure = random.uniform(1010, 1020) | |
| sensor.humidity = random.uniform(40, 60) | |
| # Human factors (varies by room type) | |
| if "EXIT" in room_id: | |
| sensor.occupancy_density = random.uniform(0.1, 0.3) # Exits less crowded | |
| elif "C" in room_id: # Corridors | |
| sensor.occupancy_density = random.uniform(0.2, 0.5) | |
| else: # Rooms | |
| sensor.occupancy_density = random.uniform(0.3, 0.7) | |
| sensor.mobility_limitations = random.randint(0, 2) | |
| sensor.panic_level = random.uniform(0.0, 0.2) # Low initially | |
| sensor.evacuation_progress = 0.0 | |
| # Infrastructure (mostly working) | |
| sensor.sprinkler_active = True | |
| sensor.emergency_lighting = True | |
| sensor.elevator_available = True | |
| sensor.stairwell_clear = True | |
| sensor.exit_accessible = True | |
| sensor.exit_capacity = random.randint(80, 120) | |
| sensor.ventilation_active = True | |
| # Time-based | |
| sensor.time_since_fire_start = 0 | |
| sensor.estimated_time_to_exit = random.randint(30, 180) | |
| # Communication | |
| sensor.emergency_comm_working = True | |
| sensor.wifi_signal_strength = random.uniform(70, 100) | |
| # External | |
| sensor.weather_temperature = random.uniform(15, 25) | |
| sensor.weather_rain = random.choice([True, False]) | |
| sensor.time_of_day = random.randint(8, 18) | |
| sensor.day_of_week = random.randint(0, 6) | |
| self.sensors[room_id] = sensor | |
| def update_sensor(self, location_id: str, **kwargs): | |
| """Update sensor readings for a specific location""" | |
| if location_id in self.sensors: | |
| sensor = self.sensors[location_id] | |
| for key, value in kwargs.items(): | |
| if hasattr(sensor, key): | |
| setattr(sensor, key, value) | |
| def get_sensor_reading(self, location_id: str) -> SensorReading: | |
| """Get current sensor reading for a location""" | |
| return self.sensors.get(location_id) | |
| def simulate_fire_scenario(self, fire_locations: List[str], | |
| affected_areas: Dict[str, Dict] = None): | |
| """ | |
| Simulate a fire scenario with specified fire locations and affected areas | |
| Args: | |
| fire_locations: List of room IDs where fire started | |
| affected_areas: Dict of room_id -> sensor values for affected areas | |
| """ | |
| # Reset all sensors | |
| self._initialize_sensors() | |
| # Set fire locations | |
| for location in fire_locations: | |
| if location in self.sensors: | |
| self.update_sensor( | |
| location, | |
| fire_detected=True, | |
| smoke_level=0.9, | |
| temperature=200.0 + random.uniform(-20, 50), | |
| oxygen_level=15.0, | |
| visibility=10.0, | |
| structural_integrity=70.0 | |
| ) | |
| # Apply affected areas | |
| if affected_areas: | |
| for location, values in affected_areas.items(): | |
| if location in self.sensors: | |
| self.update_sensor(location, **values) | |
| # Simulate fire spread effect to adjacent rooms | |
| self._simulate_smoke_spread(fire_locations) | |
| def _simulate_smoke_spread(self, fire_locations: List[str]): | |
| """Simulate smoke spreading to adjacent areas""" | |
| for fire_loc in fire_locations: | |
| neighbors = self.floor_plan.get_neighbors(fire_loc) | |
| for neighbor_id, _ in neighbors: | |
| if neighbor_id in self.sensors and not self.sensors[neighbor_id].fire_detected: | |
| # Add smoke and heat to adjacent areas | |
| current = self.sensors[neighbor_id] | |
| self.update_sensor( | |
| neighbor_id, | |
| smoke_level=min(current.smoke_level + random.uniform(0.3, 0.6), 1.0), | |
| temperature=current.temperature + random.uniform(20, 40), | |
| visibility=max(current.visibility - random.uniform(20, 40), 20.0), | |
| oxygen_level=max(current.oxygen_level - random.uniform(1, 3), 16.0) | |
| ) | |
| def get_all_readings(self) -> Dict[str, SensorReading]: | |
| """Get all sensor readings""" | |
| return self.sensors | |
| def print_status(self): | |
| """Print status of all sensors""" | |
| print(f"\n{'='*80}") | |
| print(f"SENSOR SYSTEM STATUS") | |
| print(f"{'='*80}") | |
| for location_id, reading in sorted(self.sensors.items()): | |
| danger = reading.calculate_danger_score() | |
| status = "SAFE" if danger < 30 else "WARNING" if danger < 70 else "DANGER" | |
| print(f"{location_id:10} | {status:7} | {reading}") | |
| def create_sample_fire_scenario(floor_plan: FloorPlan) -> SensorSystem: | |
| """ | |
| Create a sample fire scenario with 3 routes of varying danger | |
| Scenario: | |
| - Route 1 (via C1, C4 to EXIT1): Has oxygen cylinder (explosion risk) but less fire | |
| - Route 2 (via C2, C5, C6 to EXIT2): Has moderate fire | |
| - Route 3 (via C7 to EXIT3): Has heavy fire blocking path | |
| """ | |
| sensor_system = SensorSystem(floor_plan) | |
| # Main fire locations | |
| fire_locations = ["R2", "C5", "C7"] | |
| # Specific affected areas with custom sensor values | |
| affected_areas = { | |
| # Route 1 - Oxygen cylinder area (explosion risk but less fire) | |
| "C1": { | |
| "fire_detected": False, | |
| "smoke_level": 0.4, | |
| "temperature": 45.0, | |
| "oxygen_level": 20.5, | |
| "visibility": 60.0, | |
| "structural_integrity": 95.0 | |
| }, | |
| "C4": { | |
| "fire_detected": False, | |
| "smoke_level": 0.5, | |
| "temperature": 50.0, | |
| "oxygen_level": 20.0, | |
| "visibility": 50.0, | |
| "structural_integrity": 90.0 | |
| }, | |
| # Route 2 - Moderate fire | |
| "C2": { | |
| "fire_detected": False, | |
| "smoke_level": 0.6, | |
| "temperature": 65.0, | |
| "oxygen_level": 18.5, | |
| "visibility": 40.0, | |
| "structural_integrity": 85.0 | |
| }, | |
| "C6": { | |
| "fire_detected": True, | |
| "smoke_level": 0.8, | |
| "temperature": 150.0, | |
| "oxygen_level": 16.0, | |
| "visibility": 20.0, | |
| "structural_integrity": 75.0 | |
| }, | |
| # Route 3 - Heavy fire (worst option) | |
| "R3": { | |
| "fire_detected": False, | |
| "smoke_level": 0.7, | |
| "temperature": 80.0, | |
| "oxygen_level": 17.5, | |
| "visibility": 30.0, | |
| "structural_integrity": 80.0 | |
| }, | |
| # Starting point | |
| "R1": { | |
| "fire_detected": False, | |
| "smoke_level": 0.2, | |
| "temperature": 35.0, | |
| "oxygen_level": 20.5, | |
| "visibility": 80.0, | |
| "structural_integrity": 100.0 | |
| }, | |
| # Exit areas | |
| "EXIT1": { | |
| "fire_detected": False, | |
| "smoke_level": 0.1, | |
| "temperature": 25.0, | |
| "oxygen_level": 21.0, | |
| "visibility": 100.0, | |
| "structural_integrity": 100.0 | |
| }, | |
| "EXIT2": { | |
| "fire_detected": False, | |
| "smoke_level": 0.3, | |
| "temperature": 40.0, | |
| "oxygen_level": 20.0, | |
| "visibility": 70.0, | |
| "structural_integrity": 100.0 | |
| }, | |
| "EXIT3": { | |
| "fire_detected": False, | |
| "smoke_level": 0.4, | |
| "temperature": 45.0, | |
| "oxygen_level": 19.5, | |
| "visibility": 60.0, | |
| "structural_integrity": 100.0 | |
| } | |
| } | |
| sensor_system.simulate_fire_scenario(fire_locations, affected_areas) | |
| return sensor_system | |