""" APOO Core Engine — Adaptive Platoon Offset Optimizer ==================================================== Core algorithms: Robertson's platoon dispersion (India-calibrated), dynamic offset optimization, emission modeling, and simulation engine. Author: APOO Project for MoRTH India References: - Robertson (1969): TRANSYT platoon dispersion model - IRC:106-1990, IRC:SP:41: Indian PCU standards - ARAI/CPCB BS-VI emission factors - Kadiyali (2000), Mathew & Krishna Rao (2006): Indian β calibration """ import numpy as np import pandas as pd from dataclasses import dataclass, field from typing import Dict, List, Optional, Tuple import json # ============================================================ # 1. INDIAN TRAFFIC CONSTANTS # ============================================================ # IRC:106-1990 & IRC:SP:41 PCU values PCU_INDIA = { "car": 1.0, "two_wheeler": 0.5, "auto_rickshaw": 0.6, "bus": 3.0, "truck": 3.0, "lcv": 1.4, "bicycle": 0.5, "cycle_rickshaw": 1.5, } # Typical Indian urban vehicle composition profiles (%) VEHICLE_MIX_PROFILES = { "metro_peak": {"two_wheeler": 0.55, "car": 0.25, "auto_rickshaw": 0.08, "bus": 0.05, "truck": 0.02, "lcv": 0.03, "bicycle": 0.01, "cycle_rickshaw": 0.01}, "metro_offpeak": {"two_wheeler": 0.50, "car": 0.22, "auto_rickshaw": 0.10, "bus": 0.06, "truck": 0.04, "lcv": 0.04, "bicycle": 0.02, "cycle_rickshaw": 0.02}, "tier2_peak": {"two_wheeler": 0.60, "car": 0.15, "auto_rickshaw": 0.10, "bus": 0.05, "truck": 0.03, "lcv": 0.02, "bicycle": 0.03, "cycle_rickshaw": 0.02}, "tier2_offpeak": {"two_wheeler": 0.55, "car": 0.12, "auto_rickshaw": 0.12, "bus": 0.06, "truck": 0.04, "lcv": 0.03, "bicycle": 0.05, "cycle_rickshaw": 0.03}, } # Vehicle type average speeds (km/h) in Indian urban arterials VEHICLE_SPEEDS_INDIA = { "two_wheeler": {"free_flow": 40, "congested": 20, "saturated": 10}, "car": {"free_flow": 45, "congested": 18, "saturated": 8}, "auto_rickshaw": {"free_flow": 35, "congested": 15, "saturated": 7}, "bus": {"free_flow": 30, "congested": 12, "saturated": 5}, "truck": {"free_flow": 25, "congested": 10, "saturated": 4}, "lcv": {"free_flow": 35, "congested": 14, "saturated": 6}, "bicycle": {"free_flow": 15, "congested": 10, "saturated": 8}, "cycle_rickshaw": {"free_flow": 12, "congested": 8, "saturated": 5}, } # ARAI/CPCB BS-VI Emission Factors (g/km) EMISSION_FACTORS = { "two_wheeler": {"CO": 0.50, "HC": 0.10, "NOx": 0.06, "PM25": 0.005, "CO2": 55}, "car": {"CO": 0.50, "HC": 0.05, "NOx": 0.06, "PM25": 0.003, "CO2": 120}, "auto_rickshaw": {"CO": 0.30, "HC": 0.06, "NOx": 0.06, "PM25": 0.003, "CO2": 48}, "bus": {"CO": 1.50, "HC": 0.20, "NOx": 0.40, "PM25": 0.010, "CO2": 550}, "truck": {"CO": 1.80, "HC": 0.25, "NOx": 0.46, "PM25": 0.025, "CO2": 900}, "lcv": {"CO": 0.50, "HC": 0.06, "NOx": 0.17, "PM25": 0.008, "CO2": 200}, "bicycle": {"CO": 0, "HC": 0, "NOx": 0, "PM25": 0, "CO2": 0}, "cycle_rickshaw": {"CO": 0, "HC": 0, "NOx": 0, "PM25": 0, "CO2": 0}, } # Idle emission rates (g/min) IDLE_EMISSION_RATES = { "two_wheeler": {"CO": 0.10, "HC": 0.025, "NOx": 0.003, "CO2": 2.5}, "car": {"CO": 0.15, "HC": 0.010, "NOx": 0.005, "CO2": 8.0}, "auto_rickshaw": {"CO": 0.08, "HC": 0.015, "NOx": 0.004, "CO2": 3.0}, "bus": {"CO": 0.50, "HC": 0.050, "NOx": 0.080, "CO2": 55.0}, "truck": {"CO": 0.60, "HC": 0.060, "NOx": 0.090, "CO2": 60.0}, "lcv": {"CO": 0.20, "HC": 0.015, "NOx": 0.010, "CO2": 12.0}, "bicycle": {"CO": 0, "HC": 0, "NOx": 0, "CO2": 0}, "cycle_rickshaw": {"CO": 0, "HC": 0, "NOx": 0, "CO2": 0}, } # Weather impact factors on speed WEATHER_SPEED_FACTORS = { "clear": 1.0, "light_rain": 0.85, "heavy_rain": 0.65, # Monsoon conditions "fog": 0.70, "night": 0.90, } # ============================================================ # 2. DATA STRUCTURES # ============================================================ @dataclass class RoadLink: """Represents a road link between two signals.""" link_id: str length_m: float # meters num_lanes: int speed_limit_kmh: float gradient_pct: float = 0.0 # positive = uphill side_friction: float = 0.3 # 0-1, higher in India (vendors, parking) saturation_flow_pcu_hr: float = 1500 # PCU/hr/lane (Indian default) @dataclass class SignalPhase: """Signal phase configuration.""" phase_id: int green_time: float # seconds amber_time: float = 3.0 all_red_time: float = 2.0 min_green: float = 10.0 max_green: float = 60.0 is_pedestrian: bool = False @dataclass class Intersection: """Represents a signalized intersection.""" intersection_id: str name: str cycle_length: float # seconds phases: List[SignalPhase] = field(default_factory=list) current_offset: float = 0.0 # offset from master clock queue_length_pcu: float = 0.0 lat: float = 0.0 lon: float = 0.0 @dataclass class Platoon: """Represents a vehicle platoon released from upstream signal.""" platoon_id: str release_time: float # seconds from simulation start size_vehicles: int size_pcu: float vehicle_composition: Dict[str, float] # type -> fraction avg_speed_kmh: float speed_std_kmh: float head_time: float = 0.0 # arrival time of first vehicle tail_time: float = 0.0 # arrival time of last vehicle centroid_time: float = 0.0 @dataclass class SimulationResult: """Results from a simulation run.""" method: str # "fixed" or "apoo" total_delay_s: float avg_delay_per_vehicle_s: float total_stops: int platoons_on_green: int total_platoons: int green_arrival_pct: float total_fuel_ml: float total_co2_g: float total_co_g: float total_nox_g: float total_pm25_g: float throughput_veh_hr: float avg_speed_kmh: float cycle_details: List[dict] = field(default_factory=list) # ============================================================ # 3. ROBERTSON'S PLATOON DISPERSION MODEL (India-Calibrated) # ============================================================ class RobertsonDispersion: """ Robertson's (1969) platoon dispersion model. Calibrated for Indian heterogeneous traffic. Core equation: q'(t) = F * q'(t-1) + (1-F) * alpha * q(t - t_bar) Where: alpha = 1 / (1 + beta * t_bar) F = 1 - alpha beta = dispersion factor (0.50-0.80 for Indian mixed traffic) """ def __init__(self, beta: float = 0.60): """ Args: beta: Dispersion factor. 0.25-0.35 for homogeneous (Western cities) 0.50-0.80 for Indian heterogeneous traffic Default 0.60 per Kadiyali/Mathew calibration. """ self.beta = beta def compute_params(self, t_bar: float) -> Tuple[float, float]: """Compute alpha and F from beta and mean travel time.""" alpha = 1.0 / (1.0 + self.beta * t_bar) F = 1.0 - alpha return alpha, F def disperse( self, departure_profile: np.ndarray, link_length_m: float, speed_kmh: float, dt: float = 1.0, vehicle_mix: Optional[Dict[str, float]] = None, weather: str = "clear", side_friction: float = 0.3, ) -> Tuple[np.ndarray, float]: """ Propagate platoon through a link using Robertson's model. Args: departure_profile: Flow (veh/s) at upstream signal per time step link_length_m: Distance between signals (meters) speed_kmh: Base free-flow speed dt: Time step in seconds vehicle_mix: Vehicle composition dict (affects beta) weather: Weather condition side_friction: Side friction factor (0-1) Returns: (arrival_profile, effective_travel_time) """ # Adjust speed for conditions effective_speed = self._adjust_speed(speed_kmh, vehicle_mix, weather, side_friction) # Mean travel time t_bar = link_length_m / (effective_speed / 3.6) # seconds # Adjust beta for vehicle composition (more 2W → higher dispersion) effective_beta = self._adjust_beta(vehicle_mix) alpha = 1.0 / (1.0 + effective_beta * t_bar) F = 1.0 - alpha # Robertson recurrence shift = int(round(t_bar / dt)) T = len(departure_profile) total_len = T + shift + int(30 / dt) # extra buffer for tail arrival = np.zeros(total_len) for t in range(1, total_len): upstream_idx = t - shift upstream_flow = departure_profile[upstream_idx] if 0 <= upstream_idx < T else 0.0 arrival[t] = F * arrival[t - 1] + (1 - F) * upstream_flow return arrival, t_bar def _adjust_speed( self, base_speed: float, vehicle_mix: Optional[Dict[str, float]], weather: str, side_friction: float, ) -> float: """Adjust speed for Indian conditions.""" speed = base_speed # Weather factor speed *= WEATHER_SPEED_FACTORS.get(weather, 1.0) # Side friction (higher friction → lower speed) speed *= (1.0 - 0.3 * side_friction) # Vehicle mix effect (heavy vehicles slow down flow) if vehicle_mix: heavy_fraction = sum(vehicle_mix.get(v, 0) for v in ["bus", "truck", "lcv", "cycle_rickshaw"]) speed *= (1.0 - 0.15 * heavy_fraction) return max(speed, 5.0) # Minimum 5 km/h def _adjust_beta(self, vehicle_mix: Optional[Dict[str, float]]) -> float: """Adjust dispersion factor based on vehicle composition. Two-wheelers increase dispersion (they filter through traffic). Homogeneous traffic (all cars) → lower beta. """ if vehicle_mix is None: return self.beta two_wheeler_frac = vehicle_mix.get("two_wheeler", 0) auto_frac = vehicle_mix.get("auto_rickshaw", 0) # More 2W/autos → higher dispersion mix_factor = 1.0 + 0.3 * two_wheeler_frac + 0.2 * auto_frac return min(self.beta * mix_factor, 0.90) # ============================================================ # 4. EMISSION CALCULATOR # ============================================================ class EmissionCalculator: """Calculate emissions from traffic operations.""" @staticmethod def running_emissions(distance_km: float, vehicle_counts: Dict[str, int]) -> Dict[str, float]: """Emissions from vehicles traveling a distance.""" totals = {"CO": 0, "HC": 0, "NOx": 0, "PM25": 0, "CO2": 0} for vtype, count in vehicle_counts.items(): factors = EMISSION_FACTORS.get(vtype, EMISSION_FACTORS["car"]) for pollutant in totals: totals[pollutant] += count * factors[pollutant] * distance_km return totals @staticmethod def idle_emissions(idle_time_s: float, vehicle_counts: Dict[str, int]) -> Dict[str, float]: """Emissions from vehicles idling at a red signal.""" totals = {"CO": 0, "HC": 0, "NOx": 0, "CO2": 0} idle_min = idle_time_s / 60.0 for vtype, count in vehicle_counts.items(): rates = IDLE_EMISSION_RATES.get(vtype, IDLE_EMISSION_RATES.get("car", {})) for pollutant in totals: if pollutant in rates: totals[pollutant] += count * rates[pollutant] * idle_min return totals @staticmethod def fuel_consumption_ml(idle_time_s: float, distance_km: float, vehicle_counts: Dict[str, int]) -> float: """Estimate fuel consumption (mL) using CO2 as proxy. Gasoline: ~2.31 kg CO2 per liter. """ running = EmissionCalculator.running_emissions(distance_km, vehicle_counts) idling = EmissionCalculator.idle_emissions(idle_time_s, vehicle_counts) total_co2_g = running["CO2"] + idling.get("CO2", 0) fuel_liters = total_co2_g / 2310 # g CO2 → liters gasoline return fuel_liters * 1000 # mL # ============================================================ # 5. OFFSET OPTIMIZER # ============================================================ class OffsetOptimizer: """Dynamic platoon-based offset adjustment algorithm.""" def __init__(self, safety_buffer_s: float = 10.0): """ Args: safety_buffer_s: Safety buffer in seconds (higher for Indian conditions). Recommended: 10-20s for India. """ self.safety_buffer = safety_buffer_s def calculate_ideal_offset( self, t_arrive_head: float, t_arrive_tail: float, cycle_length: float, current_green_start: float, green_duration: float, min_green: float = 10.0, prediction_uncertainty: float = 5.0, ) -> Tuple[float, float, bool]: """ Calculate the ideal offset to maximize platoon-green overlap. Args: t_arrive_head: Predicted arrival time of platoon head (s from cycle start) t_arrive_tail: Predicted arrival of platoon tail cycle_length: Signal cycle length (s) current_green_start: Current green start time within cycle green_duration: Green phase duration min_green: Minimum green for cross traffic prediction_uncertainty: Std dev of prediction (s) Returns: (optimal_offset, overlap_fraction, is_feasible) """ # Arrival window (within cycle) arrive_head_mod = (t_arrive_head - self.safety_buffer) % cycle_length arrive_tail_mod = (t_arrive_tail + prediction_uncertainty) % cycle_length platoon_window = t_arrive_tail - t_arrive_head + self.safety_buffer + prediction_uncertainty # Try different offsets and find best overlap best_offset = current_green_start best_overlap = 0.0 for trial_offset in np.linspace(0, cycle_length, 100): green_start = trial_offset % cycle_length green_end = (green_start + green_duration) % cycle_length # Calculate overlap between green window and arrival window overlap = self._calculate_overlap( arrive_head_mod, arrive_tail_mod, green_start, green_end, cycle_length ) if overlap > best_overlap: best_overlap = overlap best_offset = trial_offset # Check feasibility (respect min cross-traffic green) remaining_for_cross = cycle_length - green_duration is_feasible = remaining_for_cross >= min_green overlap_fraction = best_overlap / max(platoon_window, 1.0) return best_offset, overlap_fraction, is_feasible def _calculate_overlap( self, a_start: float, a_end: float, g_start: float, g_end: float, cycle: float ) -> float: """Calculate temporal overlap between arrival window and green phase.""" # Handle wrap-around in cycle if a_end < a_start: a_end += cycle if g_end < g_start: g_end += cycle overlap_start = max(a_start, g_start) overlap_end = min(a_end, g_end) return max(0, overlap_end - overlap_start) def constrain_offset( self, ideal_offset: float, cycle_length: float, max_shift: float = None, ) -> float: """Apply constraints to keep offset within bounds.""" if max_shift is None: max_shift = cycle_length * 0.3 # Max 30% cycle shift # Clamp to valid range offset = ideal_offset % cycle_length return offset # ============================================================ # 6. SYNTHETIC DATA GENERATOR (Indian Conditions) # ============================================================ class IndianTrafficGenerator: """Generate synthetic traffic data calibrated for Indian conditions.""" def __init__(self, seed: int = 42): self.rng = np.random.RandomState(seed) def generate_corridor( self, n_intersections: int = 5, base_link_length: float = 300, city_type: str = "metro", ) -> Tuple[List[Intersection], List[RoadLink]]: """Generate a synthetic arterial corridor.""" intersections = [] links = [] base_cycle = 120 if city_type == "metro" else 90 for i in range(n_intersections): phases = [ SignalPhase(phase_id=0, green_time=40 + self.rng.randint(-5, 10)), SignalPhase(phase_id=1, green_time=25 + self.rng.randint(-5, 5)), SignalPhase(phase_id=2, green_time=15, is_pedestrian=True), ] cycle = sum(p.green_time + p.amber_time + p.all_red_time for p in phases) intersections.append(Intersection( intersection_id=f"INT_{i}", name=f"Intersection {i+1}", cycle_length=cycle, phases=phases, current_offset=i * 20, # Initial fixed offset lat=23.2599 + i * 0.003, # Bhopal coordinates as example lon=77.4126 + i * 0.003, )) for i in range(n_intersections - 1): length = base_link_length + self.rng.uniform(-50, 100) links.append(RoadLink( link_id=f"LINK_{i}_{i+1}", length_m=length, num_lanes=2 + self.rng.choice([0, 1]), speed_limit_kmh=40 + self.rng.choice([-10, 0, 10]), gradient_pct=self.rng.uniform(-2, 2), side_friction=0.2 + self.rng.uniform(0, 0.3), saturation_flow_pcu_hr=1400 + self.rng.randint(-100, 200), )) return intersections, links def generate_demand_profile( self, duration_hours: float = 2.0, peak_flow_veh_hr: float = 2000, profile_type: str = "morning_peak", city_type: str = "metro", ) -> pd.DataFrame: """Generate time-varying demand with Indian characteristics.""" dt_min = 5 # 5-minute intervals n_steps = int(duration_hours * 60 / dt_min) times = np.arange(n_steps) * dt_min # Demand profile shape if profile_type == "morning_peak": # Ramp up, peak, slight decline demand_factor = np.concatenate([ np.linspace(0.3, 1.0, n_steps // 3), np.ones(n_steps // 3) * 1.0, np.linspace(1.0, 0.6, n_steps - 2 * (n_steps // 3)), ]) elif profile_type == "evening_peak": demand_factor = np.concatenate([ np.linspace(0.5, 0.9, n_steps // 4), np.linspace(0.9, 1.0, n_steps // 4), np.ones(n_steps // 4) * 1.0, np.linspace(1.0, 0.4, n_steps - 3 * (n_steps // 4)), ]) else: # off_peak demand_factor = 0.4 + 0.1 * np.sin(2 * np.pi * times / (duration_hours * 60)) # Add stochastic noise (Indian traffic is highly variable) noise = 1.0 + self.rng.normal(0, 0.15, n_steps) noise = np.clip(noise, 0.5, 1.5) flow = peak_flow_veh_hr * demand_factor * noise # Vehicle mix (varies with time) mix_key = f"{city_type}_peak" if profile_type != "off_peak" else f"{city_type}_offpeak" if mix_key not in VEHICLE_MIX_PROFILES: mix_key = "metro_peak" base_mix = VEHICLE_MIX_PROFILES[mix_key] records = [] for i, t in enumerate(times): # Slight time variation in mix (more 2W in peak) mix = dict(base_mix) if demand_factor[i] > 0.8: mix["two_wheeler"] = min(mix["two_wheeler"] * 1.1, 0.70) mix["car"] = mix["car"] * 0.9 # Normalize total = sum(mix.values()) mix = {k: v / total for k, v in mix.items()} record = { "time_min": t, "flow_veh_hr": flow[i], "flow_pcu_hr": self._to_pcu_flow(flow[i], mix), } for vtype, frac in mix.items(): record[f"pct_{vtype}"] = frac * 100 records.append(record) return pd.DataFrame(records) def generate_training_data( self, n_samples: int = 5000, city_type: str = "metro", ) -> pd.DataFrame: """Generate training data for ML travel time prediction model.""" records = [] for i in range(n_samples): # Random link characteristics link_length = self.rng.uniform(150, 600) speed_limit = self.rng.choice([30, 40, 50, 60]) num_lanes = self.rng.choice([2, 3, 4]) gradient = self.rng.uniform(-3, 3) side_friction = self.rng.uniform(0.1, 0.6) # Vehicle composition mix_type = self.rng.choice(list(VEHICLE_MIX_PROFILES.keys())) base_mix = dict(VEHICLE_MIX_PROFILES[mix_type]) # Add noise to mix for k in base_mix: base_mix[k] *= (1 + self.rng.normal(0, 0.1)) total = sum(base_mix.values()) base_mix = {k: v / total for k, v in base_mix.items()} # Traffic conditions density = self.rng.uniform(10, 80) # veh/km/lane weather = self.rng.choice(["clear", "clear", "clear", "light_rain", "heavy_rain", "fog"]) time_of_day = self.rng.uniform(0, 24) # hours is_peak = 1 if (7 <= time_of_day <= 10) or (17 <= time_of_day <= 20) else 0 day_type = self.rng.choice(["weekday", "weekday", "weekday", "weekday", "weekday", "weekend", "weekend"]) # Platoon characteristics platoon_size = self.rng.randint(5, 40) platoon_pcu = self._platoon_pcu(platoon_size, base_mix) # Compute actual travel time using physics + stochastic model base_speed = self._compute_base_speed( speed_limit, base_mix, density, weather, side_friction, gradient ) base_tt = link_length / (base_speed / 3.6) # seconds # Add realistic noise (higher in India) noise_factor = 1.0 + self.rng.normal(0, 0.15 + 0.1 * is_peak) noise_factor = max(noise_factor, 0.6) actual_tt = base_tt * noise_factor # Dispersion time (how much platoon spreads) two_w_pct = base_mix.get("two_wheeler", 0) dispersion = actual_tt * (0.1 + 0.3 * two_w_pct) # 2W cause more dispersion records.append({ "link_length_m": link_length, "speed_limit_kmh": speed_limit, "num_lanes": num_lanes, "gradient_pct": gradient, "side_friction": side_friction, "pct_two_wheeler": base_mix.get("two_wheeler", 0) * 100, "pct_car": base_mix.get("car", 0) * 100, "pct_auto": base_mix.get("auto_rickshaw", 0) * 100, "pct_bus": base_mix.get("bus", 0) * 100, "pct_truck": base_mix.get("truck", 0) * 100, "density_veh_km_lane": density, "weather_speed_factor": WEATHER_SPEED_FACTORS.get(weather, 1.0), "time_of_day_sin": np.sin(2 * np.pi * time_of_day / 24), "time_of_day_cos": np.cos(2 * np.pi * time_of_day / 24), "is_peak": is_peak, "is_weekend": 1 if day_type == "weekend" else 0, "platoon_size": platoon_size, "platoon_pcu": platoon_pcu, "upstream_queue_pcu": self.rng.uniform(0, 30), "downstream_queue_pcu": self.rng.uniform(0, 20), "actual_travel_time_s": actual_tt, "platoon_dispersion_s": dispersion, "weather": weather, "city_type": city_type, "mix_type": mix_type, }) return pd.DataFrame(records) def _compute_base_speed( self, speed_limit, vehicle_mix, density, weather, side_friction, gradient ): """Compute effective speed from conditions.""" # Start with limit speed = speed_limit # Greenshields-like density relationship jam_density = 150 # veh/km/lane (Indian conditions) if density < jam_density: speed *= (1 - (density / jam_density) ** 1.5) else: speed = 5.0 # gridlock # Weather speed *= WEATHER_SPEED_FACTORS.get(weather, 1.0) # Side friction speed *= (1 - 0.25 * side_friction) # Gradient (uphill slows, downhill speeds up slightly) speed *= (1 - 0.02 * gradient) # Heavy vehicle slowdown heavy_frac = sum(vehicle_mix.get(v, 0) for v in ["bus", "truck", "cycle_rickshaw"]) speed *= (1 - 0.15 * heavy_frac) return max(speed, 3.0) def _to_pcu_flow(self, flow_veh_hr, vehicle_mix): """Convert vehicle flow to PCU flow.""" pcu_factor = sum(vehicle_mix.get(vtype, 0) * PCU_INDIA.get(vtype, 1.0) for vtype in vehicle_mix) return flow_veh_hr * pcu_factor def _platoon_pcu(self, platoon_size, vehicle_mix): """Convert platoon vehicle count to PCU.""" pcu = 0 for vtype, frac in vehicle_mix.items(): count = int(platoon_size * frac) pcu += count * PCU_INDIA.get(vtype, 1.0) return pcu # ============================================================ # 7. CORRIDOR SIMULATION ENGINE # ============================================================ class CorridorSimulator: """ Simulates traffic flow through a corridor of signals. Compares fixed-time vs. APOO adaptive timing. """ def __init__( self, intersections: List[Intersection], links: List[RoadLink], robertson: RobertsonDispersion = None, optimizer: OffsetOptimizer = None, emission_calc: EmissionCalculator = None, ): self.intersections = intersections self.links = links self.robertson = robertson or RobertsonDispersion(beta=0.60) self.optimizer = optimizer or OffsetOptimizer(safety_buffer_s=12.0) self.emission_calc = emission_calc or EmissionCalculator() self.rng = np.random.RandomState(42) def simulate( self, demand_profile: pd.DataFrame, vehicle_mix: Dict[str, float], weather: str = "clear", method: str = "fixed", # "fixed" or "apoo" ml_model=None, ml_features_func=None, ) -> SimulationResult: """ Run a full corridor simulation. Args: demand_profile: DataFrame with time_min and flow_veh_hr vehicle_mix: Vehicle composition weather: Weather condition method: "fixed" (baseline) or "apoo" (adaptive) ml_model: Trained ML model for APOO travel time prediction ml_features_func: Function to extract features for ML model Returns: SimulationResult """ total_delay = 0 total_stops = 0 total_vehicles = 0 platoons_on_green = 0 total_platoons = 0 total_idle_time = 0 cycle_details = [] n_links = len(self.links) for _, row in demand_profile.iterrows(): time_min = row["time_min"] flow = row["flow_veh_hr"] # Generate platoon for this time step platoon_size = max(1, int(flow * 5 / 3600)) # vehicles in 5-min window for link_idx in range(n_links): link = self.links[link_idx] upstream = self.intersections[link_idx] downstream = self.intersections[link_idx + 1] # Create departure profile (uniform discharge during green) green_time = upstream.phases[0].green_time discharge_rate = platoon_size / green_time # veh/s departure = np.zeros(int(upstream.cycle_length)) green_start = int(upstream.current_offset % upstream.cycle_length) for t in range(green_start, min(green_start + int(green_time), len(departure))): departure[t] = discharge_rate # Robertson dispersion arrival_profile, travel_time = self.robertson.disperse( departure, link.length_m, link.speed_limit_kmh, vehicle_mix=vehicle_mix, weather=weather, side_friction=link.side_friction, ) # Determine if platoon arrives on green platoon_centroid = travel_time + green_start if method == "apoo": # Predict travel time (use ML model if available, else Robertson) if ml_model is not None and ml_features_func is not None: features = ml_features_func(link, vehicle_mix, weather, time_min, flow) predicted_tt = ml_model.predict(features.reshape(1, -1))[0] uncertainty = abs(predicted_tt - travel_time) * 0.5 + 3.0 else: predicted_tt = travel_time uncertainty = travel_time * 0.12 # 12% uncertainty # Optimize offset t_arrive_head = green_start + predicted_tt - uncertainty t_arrive_tail = green_start + predicted_tt + uncertainty + 5 optimal_offset, overlap_frac, feasible = self.optimizer.calculate_ideal_offset( t_arrive_head, t_arrive_tail, downstream.cycle_length, downstream.current_offset, downstream.phases[0].green_time, prediction_uncertainty=uncertainty, ) if feasible: downstream.current_offset = optimal_offset # Check green arrival ds_green_start = downstream.current_offset ds_green_end = ds_green_start + downstream.phases[0].green_time centroid_in_cycle = platoon_centroid % downstream.cycle_length on_green = ds_green_start <= centroid_in_cycle <= ds_green_end total_platoons += 1 if on_green: platoons_on_green += 1 delay = self.rng.uniform(2, 8) # Minor delay even on green else: # Calculate delay (wait for next green) if centroid_in_cycle < ds_green_start: delay = ds_green_start - centroid_in_cycle else: delay = downstream.cycle_length - centroid_in_cycle + ds_green_start delay += self.rng.uniform(0, 5) # queue discharge delay total_stops += platoon_size total_delay += delay * platoon_size total_vehicles += platoon_size total_idle_time += delay * (0 if on_green else 1) # Only count red-signal idle cycle_details.append({ "time_min": time_min, "link_idx": link_idx, "travel_time_s": travel_time, "delay_s": delay, "on_green": on_green, "platoon_size": platoon_size, "offset": downstream.current_offset, "method": method, }) # Calculate emissions total_distance_km = sum(l.length_m for l in self.links) / 1000 vehicle_counts = {vtype: max(1, int(total_vehicles * frac)) for vtype, frac in vehicle_mix.items()} running = self.emission_calc.running_emissions(total_distance_km, vehicle_counts) idling = self.emission_calc.idle_emissions(total_idle_time, vehicle_counts) fuel = self.emission_calc.fuel_consumption_ml(total_idle_time, total_distance_km, vehicle_counts) avg_delay = total_delay / max(total_vehicles, 1) green_pct = (platoons_on_green / max(total_platoons, 1)) * 100 # Average speed considering delay total_distance = sum(l.length_m for l in self.links) avg_travel_time = total_distance / (30 / 3.6) + avg_delay # base + delay avg_speed = (total_distance / 1000) / (avg_travel_time / 3600) if avg_travel_time > 0 else 0 throughput = total_vehicles / (demand_profile["time_min"].max() / 60) if len(demand_profile) > 0 else 0 return SimulationResult( method=method, total_delay_s=total_delay, avg_delay_per_vehicle_s=avg_delay, total_stops=total_stops, platoons_on_green=platoons_on_green, total_platoons=total_platoons, green_arrival_pct=green_pct, total_fuel_ml=fuel, total_co2_g=running["CO2"] + idling.get("CO2", 0), total_co_g=running["CO"] + idling.get("CO", 0), total_nox_g=running["NOx"] + idling.get("NOx", 0), total_pm25_g=running.get("PM25", 0), throughput_veh_hr=throughput, avg_speed_kmh=avg_speed, cycle_details=cycle_details, ) # ============================================================ # 8. UTILITY FUNCTIONS # ============================================================ def compute_pcu_flow(flow_veh_hr: float, vehicle_mix: Dict[str, float]) -> float: """Convert vehicle flow to PCU-equivalent flow.""" pcu_factor = sum(frac * PCU_INDIA.get(vtype, 1.0) for vtype, frac in vehicle_mix.items()) return flow_veh_hr * pcu_factor def format_kpi_comparison(fixed: SimulationResult, apoo: SimulationResult) -> pd.DataFrame: """Create a comparison table of KPIs.""" metrics = [ ("Avg Delay per Vehicle (s)", f"{fixed.avg_delay_per_vehicle_s:.1f}", f"{apoo.avg_delay_per_vehicle_s:.1f}", f"{((fixed.avg_delay_per_vehicle_s - apoo.avg_delay_per_vehicle_s) / max(fixed.avg_delay_per_vehicle_s, 0.01)) * 100:.1f}%"), ("Platoons Arriving on Green (%)", f"{fixed.green_arrival_pct:.1f}", f"{apoo.green_arrival_pct:.1f}", f"+{apoo.green_arrival_pct - fixed.green_arrival_pct:.1f}pp"), ("Total Stops", f"{fixed.total_stops:,}", f"{apoo.total_stops:,}", f"{((fixed.total_stops - apoo.total_stops) / max(fixed.total_stops, 1)) * 100:.1f}%"), ("Total CO₂ (g)", f"{fixed.total_co2_g:.0f}", f"{apoo.total_co2_g:.0f}", f"{((fixed.total_co2_g - apoo.total_co2_g) / max(fixed.total_co2_g, 0.01)) * 100:.1f}%"), ("Total CO (g)", f"{fixed.total_co_g:.1f}", f"{apoo.total_co_g:.1f}", f"{((fixed.total_co_g - apoo.total_co_g) / max(fixed.total_co_g, 0.01)) * 100:.1f}%"), ("Total NOx (g)", f"{fixed.total_nox_g:.1f}", f"{apoo.total_nox_g:.1f}", f"{((fixed.total_nox_g - apoo.total_nox_g) / max(fixed.total_nox_g, 0.01)) * 100:.1f}%"), ("Fuel Consumption (mL)", f"{fixed.total_fuel_ml:.0f}", f"{apoo.total_fuel_ml:.0f}", f"{((fixed.total_fuel_ml - apoo.total_fuel_ml) / max(fixed.total_fuel_ml, 0.01)) * 100:.1f}%"), ("Avg Speed (km/h)", f"{fixed.avg_speed_kmh:.1f}", f"{apoo.avg_speed_kmh:.1f}", f"+{apoo.avg_speed_kmh - fixed.avg_speed_kmh:.1f}"), ("Throughput (veh/hr)", f"{fixed.throughput_veh_hr:.0f}", f"{apoo.throughput_veh_hr:.0f}", f"+{apoo.throughput_veh_hr - fixed.throughput_veh_hr:.0f}"), ] return pd.DataFrame(metrics, columns=["KPI", "Fixed-Time", "APOO Adaptive", "Improvement"]) if __name__ == "__main__": # Quick test gen = IndianTrafficGenerator(seed=42) intersections, links = gen.corridor(n_intersections=5) demand = gen.generate_demand_profile() training_data = gen.generate_training_data(n_samples=100) print(f"Generated corridor: {len(intersections)} intersections, {len(links)} links") print(f"Demand profile: {len(demand)} time steps") print(f"Training data: {len(training_data)} samples") print(f"Training columns: {list(training_data.columns)}")