| """ |
| APOO Core Engine — Adaptive Platoon Offset Optimizer |
| ==================================================== |
| Core algorithms: Robertson's platoon dispersion (India-calibrated), |
| dynamic offset optimization, emission modeling, and simulation engine. |
| |
| Author: APOO Project for MoRTH India |
| References: |
| - Robertson (1969): TRANSYT platoon dispersion model |
| - IRC:106-1990, IRC:SP:41: Indian PCU standards |
| - ARAI/CPCB BS-VI emission factors |
| - Kadiyali (2000), Mathew & Krishna Rao (2006): Indian β calibration |
| """ |
|
|
| import numpy as np |
| import pandas as pd |
| from dataclasses import dataclass, field |
| from typing import Dict, List, Optional, Tuple |
| import json |
|
|
| |
| |
| |
|
|
| |
| PCU_INDIA = { |
| "car": 1.0, |
| "two_wheeler": 0.5, |
| "auto_rickshaw": 0.6, |
| "bus": 3.0, |
| "truck": 3.0, |
| "lcv": 1.4, |
| "bicycle": 0.5, |
| "cycle_rickshaw": 1.5, |
| } |
|
|
| |
| VEHICLE_MIX_PROFILES = { |
| "metro_peak": {"two_wheeler": 0.55, "car": 0.25, "auto_rickshaw": 0.08, "bus": 0.05, "truck": 0.02, "lcv": 0.03, "bicycle": 0.01, "cycle_rickshaw": 0.01}, |
| "metro_offpeak": {"two_wheeler": 0.50, "car": 0.22, "auto_rickshaw": 0.10, "bus": 0.06, "truck": 0.04, "lcv": 0.04, "bicycle": 0.02, "cycle_rickshaw": 0.02}, |
| "tier2_peak": {"two_wheeler": 0.60, "car": 0.15, "auto_rickshaw": 0.10, "bus": 0.05, "truck": 0.03, "lcv": 0.02, "bicycle": 0.03, "cycle_rickshaw": 0.02}, |
| "tier2_offpeak": {"two_wheeler": 0.55, "car": 0.12, "auto_rickshaw": 0.12, "bus": 0.06, "truck": 0.04, "lcv": 0.03, "bicycle": 0.05, "cycle_rickshaw": 0.03}, |
| } |
|
|
| |
| VEHICLE_SPEEDS_INDIA = { |
| "two_wheeler": {"free_flow": 40, "congested": 20, "saturated": 10}, |
| "car": {"free_flow": 45, "congested": 18, "saturated": 8}, |
| "auto_rickshaw": {"free_flow": 35, "congested": 15, "saturated": 7}, |
| "bus": {"free_flow": 30, "congested": 12, "saturated": 5}, |
| "truck": {"free_flow": 25, "congested": 10, "saturated": 4}, |
| "lcv": {"free_flow": 35, "congested": 14, "saturated": 6}, |
| "bicycle": {"free_flow": 15, "congested": 10, "saturated": 8}, |
| "cycle_rickshaw": {"free_flow": 12, "congested": 8, "saturated": 5}, |
| } |
|
|
| |
| EMISSION_FACTORS = { |
| "two_wheeler": {"CO": 0.50, "HC": 0.10, "NOx": 0.06, "PM25": 0.005, "CO2": 55}, |
| "car": {"CO": 0.50, "HC": 0.05, "NOx": 0.06, "PM25": 0.003, "CO2": 120}, |
| "auto_rickshaw": {"CO": 0.30, "HC": 0.06, "NOx": 0.06, "PM25": 0.003, "CO2": 48}, |
| "bus": {"CO": 1.50, "HC": 0.20, "NOx": 0.40, "PM25": 0.010, "CO2": 550}, |
| "truck": {"CO": 1.80, "HC": 0.25, "NOx": 0.46, "PM25": 0.025, "CO2": 900}, |
| "lcv": {"CO": 0.50, "HC": 0.06, "NOx": 0.17, "PM25": 0.008, "CO2": 200}, |
| "bicycle": {"CO": 0, "HC": 0, "NOx": 0, "PM25": 0, "CO2": 0}, |
| "cycle_rickshaw": {"CO": 0, "HC": 0, "NOx": 0, "PM25": 0, "CO2": 0}, |
| } |
|
|
| |
| IDLE_EMISSION_RATES = { |
| "two_wheeler": {"CO": 0.10, "HC": 0.025, "NOx": 0.003, "CO2": 2.5}, |
| "car": {"CO": 0.15, "HC": 0.010, "NOx": 0.005, "CO2": 8.0}, |
| "auto_rickshaw": {"CO": 0.08, "HC": 0.015, "NOx": 0.004, "CO2": 3.0}, |
| "bus": {"CO": 0.50, "HC": 0.050, "NOx": 0.080, "CO2": 55.0}, |
| "truck": {"CO": 0.60, "HC": 0.060, "NOx": 0.090, "CO2": 60.0}, |
| "lcv": {"CO": 0.20, "HC": 0.015, "NOx": 0.010, "CO2": 12.0}, |
| "bicycle": {"CO": 0, "HC": 0, "NOx": 0, "CO2": 0}, |
| "cycle_rickshaw": {"CO": 0, "HC": 0, "NOx": 0, "CO2": 0}, |
| } |
|
|
| |
| WEATHER_SPEED_FACTORS = { |
| "clear": 1.0, |
| "light_rain": 0.85, |
| "heavy_rain": 0.65, |
| "fog": 0.70, |
| "night": 0.90, |
| } |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class RoadLink: |
| """Represents a road link between two signals.""" |
| link_id: str |
| length_m: float |
| num_lanes: int |
| speed_limit_kmh: float |
| gradient_pct: float = 0.0 |
| side_friction: float = 0.3 |
| saturation_flow_pcu_hr: float = 1500 |
|
|
|
|
| @dataclass |
| class SignalPhase: |
| """Signal phase configuration.""" |
| phase_id: int |
| green_time: float |
| amber_time: float = 3.0 |
| all_red_time: float = 2.0 |
| min_green: float = 10.0 |
| max_green: float = 60.0 |
| is_pedestrian: bool = False |
|
|
|
|
| @dataclass |
| class Intersection: |
| """Represents a signalized intersection.""" |
| intersection_id: str |
| name: str |
| cycle_length: float |
| phases: List[SignalPhase] = field(default_factory=list) |
| current_offset: float = 0.0 |
| queue_length_pcu: float = 0.0 |
| lat: float = 0.0 |
| lon: float = 0.0 |
|
|
|
|
| @dataclass |
| class Platoon: |
| """Represents a vehicle platoon released from upstream signal.""" |
| platoon_id: str |
| release_time: float |
| size_vehicles: int |
| size_pcu: float |
| vehicle_composition: Dict[str, float] |
| avg_speed_kmh: float |
| speed_std_kmh: float |
| head_time: float = 0.0 |
| tail_time: float = 0.0 |
| centroid_time: float = 0.0 |
|
|
|
|
| @dataclass |
| class SimulationResult: |
| """Results from a simulation run.""" |
| method: str |
| total_delay_s: float |
| avg_delay_per_vehicle_s: float |
| total_stops: int |
| platoons_on_green: int |
| total_platoons: int |
| green_arrival_pct: float |
| total_fuel_ml: float |
| total_co2_g: float |
| total_co_g: float |
| total_nox_g: float |
| total_pm25_g: float |
| throughput_veh_hr: float |
| avg_speed_kmh: float |
| cycle_details: List[dict] = field(default_factory=list) |
|
|
|
|
| |
| |
| |
|
|
| class RobertsonDispersion: |
| """ |
| Robertson's (1969) platoon dispersion model. |
| Calibrated for Indian heterogeneous traffic. |
| |
| Core equation: q'(t) = F * q'(t-1) + (1-F) * alpha * q(t - t_bar) |
| |
| Where: |
| alpha = 1 / (1 + beta * t_bar) |
| F = 1 - alpha |
| beta = dispersion factor (0.50-0.80 for Indian mixed traffic) |
| """ |
| |
| def __init__(self, beta: float = 0.60): |
| """ |
| Args: |
| beta: Dispersion factor. |
| 0.25-0.35 for homogeneous (Western cities) |
| 0.50-0.80 for Indian heterogeneous traffic |
| Default 0.60 per Kadiyali/Mathew calibration. |
| """ |
| self.beta = beta |
| |
| def compute_params(self, t_bar: float) -> Tuple[float, float]: |
| """Compute alpha and F from beta and mean travel time.""" |
| alpha = 1.0 / (1.0 + self.beta * t_bar) |
| F = 1.0 - alpha |
| return alpha, F |
| |
| def disperse( |
| self, |
| departure_profile: np.ndarray, |
| link_length_m: float, |
| speed_kmh: float, |
| dt: float = 1.0, |
| vehicle_mix: Optional[Dict[str, float]] = None, |
| weather: str = "clear", |
| side_friction: float = 0.3, |
| ) -> Tuple[np.ndarray, float]: |
| """ |
| Propagate platoon through a link using Robertson's model. |
| |
| Args: |
| departure_profile: Flow (veh/s) at upstream signal per time step |
| link_length_m: Distance between signals (meters) |
| speed_kmh: Base free-flow speed |
| dt: Time step in seconds |
| vehicle_mix: Vehicle composition dict (affects beta) |
| weather: Weather condition |
| side_friction: Side friction factor (0-1) |
| |
| Returns: |
| (arrival_profile, effective_travel_time) |
| """ |
| |
| effective_speed = self._adjust_speed(speed_kmh, vehicle_mix, weather, side_friction) |
| |
| |
| t_bar = link_length_m / (effective_speed / 3.6) |
| |
| |
| effective_beta = self._adjust_beta(vehicle_mix) |
| |
| alpha = 1.0 / (1.0 + effective_beta * t_bar) |
| F = 1.0 - alpha |
| |
| |
| shift = int(round(t_bar / dt)) |
| T = len(departure_profile) |
| total_len = T + shift + int(30 / dt) |
| |
| arrival = np.zeros(total_len) |
| for t in range(1, total_len): |
| upstream_idx = t - shift |
| upstream_flow = departure_profile[upstream_idx] if 0 <= upstream_idx < T else 0.0 |
| arrival[t] = F * arrival[t - 1] + (1 - F) * upstream_flow |
| |
| return arrival, t_bar |
| |
| def _adjust_speed( |
| self, |
| base_speed: float, |
| vehicle_mix: Optional[Dict[str, float]], |
| weather: str, |
| side_friction: float, |
| ) -> float: |
| """Adjust speed for Indian conditions.""" |
| speed = base_speed |
| |
| |
| speed *= WEATHER_SPEED_FACTORS.get(weather, 1.0) |
| |
| |
| speed *= (1.0 - 0.3 * side_friction) |
| |
| |
| if vehicle_mix: |
| heavy_fraction = sum(vehicle_mix.get(v, 0) for v in ["bus", "truck", "lcv", "cycle_rickshaw"]) |
| speed *= (1.0 - 0.15 * heavy_fraction) |
| |
| return max(speed, 5.0) |
| |
| def _adjust_beta(self, vehicle_mix: Optional[Dict[str, float]]) -> float: |
| """Adjust dispersion factor based on vehicle composition. |
| |
| Two-wheelers increase dispersion (they filter through traffic). |
| Homogeneous traffic (all cars) → lower beta. |
| """ |
| if vehicle_mix is None: |
| return self.beta |
| |
| two_wheeler_frac = vehicle_mix.get("two_wheeler", 0) |
| auto_frac = vehicle_mix.get("auto_rickshaw", 0) |
| |
| |
| mix_factor = 1.0 + 0.3 * two_wheeler_frac + 0.2 * auto_frac |
| return min(self.beta * mix_factor, 0.90) |
|
|
|
|
| |
| |
| |
|
|
| class EmissionCalculator: |
| """Calculate emissions from traffic operations.""" |
| |
| @staticmethod |
| def running_emissions(distance_km: float, vehicle_counts: Dict[str, int]) -> Dict[str, float]: |
| """Emissions from vehicles traveling a distance.""" |
| totals = {"CO": 0, "HC": 0, "NOx": 0, "PM25": 0, "CO2": 0} |
| for vtype, count in vehicle_counts.items(): |
| factors = EMISSION_FACTORS.get(vtype, EMISSION_FACTORS["car"]) |
| for pollutant in totals: |
| totals[pollutant] += count * factors[pollutant] * distance_km |
| return totals |
| |
| @staticmethod |
| def idle_emissions(idle_time_s: float, vehicle_counts: Dict[str, int]) -> Dict[str, float]: |
| """Emissions from vehicles idling at a red signal.""" |
| totals = {"CO": 0, "HC": 0, "NOx": 0, "CO2": 0} |
| idle_min = idle_time_s / 60.0 |
| for vtype, count in vehicle_counts.items(): |
| rates = IDLE_EMISSION_RATES.get(vtype, IDLE_EMISSION_RATES.get("car", {})) |
| for pollutant in totals: |
| if pollutant in rates: |
| totals[pollutant] += count * rates[pollutant] * idle_min |
| return totals |
| |
| @staticmethod |
| def fuel_consumption_ml(idle_time_s: float, distance_km: float, |
| vehicle_counts: Dict[str, int]) -> float: |
| """Estimate fuel consumption (mL) using CO2 as proxy. |
| Gasoline: ~2.31 kg CO2 per liter. |
| """ |
| running = EmissionCalculator.running_emissions(distance_km, vehicle_counts) |
| idling = EmissionCalculator.idle_emissions(idle_time_s, vehicle_counts) |
| total_co2_g = running["CO2"] + idling.get("CO2", 0) |
| fuel_liters = total_co2_g / 2310 |
| return fuel_liters * 1000 |
|
|
|
|
| |
| |
| |
|
|
| class OffsetOptimizer: |
| """Dynamic platoon-based offset adjustment algorithm.""" |
| |
| def __init__(self, safety_buffer_s: float = 10.0): |
| """ |
| Args: |
| safety_buffer_s: Safety buffer in seconds (higher for Indian conditions). |
| Recommended: 10-20s for India. |
| """ |
| self.safety_buffer = safety_buffer_s |
| |
| def calculate_ideal_offset( |
| self, |
| t_arrive_head: float, |
| t_arrive_tail: float, |
| cycle_length: float, |
| current_green_start: float, |
| green_duration: float, |
| min_green: float = 10.0, |
| prediction_uncertainty: float = 5.0, |
| ) -> Tuple[float, float, bool]: |
| """ |
| Calculate the ideal offset to maximize platoon-green overlap. |
| |
| Args: |
| t_arrive_head: Predicted arrival time of platoon head (s from cycle start) |
| t_arrive_tail: Predicted arrival of platoon tail |
| cycle_length: Signal cycle length (s) |
| current_green_start: Current green start time within cycle |
| green_duration: Green phase duration |
| min_green: Minimum green for cross traffic |
| prediction_uncertainty: Std dev of prediction (s) |
| |
| Returns: |
| (optimal_offset, overlap_fraction, is_feasible) |
| """ |
| |
| arrive_head_mod = (t_arrive_head - self.safety_buffer) % cycle_length |
| arrive_tail_mod = (t_arrive_tail + prediction_uncertainty) % cycle_length |
| |
| platoon_window = t_arrive_tail - t_arrive_head + self.safety_buffer + prediction_uncertainty |
| |
| |
| best_offset = current_green_start |
| best_overlap = 0.0 |
| |
| for trial_offset in np.linspace(0, cycle_length, 100): |
| green_start = trial_offset % cycle_length |
| green_end = (green_start + green_duration) % cycle_length |
| |
| |
| overlap = self._calculate_overlap( |
| arrive_head_mod, arrive_tail_mod, |
| green_start, green_end, |
| cycle_length |
| ) |
| |
| if overlap > best_overlap: |
| best_overlap = overlap |
| best_offset = trial_offset |
| |
| |
| remaining_for_cross = cycle_length - green_duration |
| is_feasible = remaining_for_cross >= min_green |
| |
| overlap_fraction = best_overlap / max(platoon_window, 1.0) |
| |
| return best_offset, overlap_fraction, is_feasible |
| |
| def _calculate_overlap( |
| self, a_start: float, a_end: float, |
| g_start: float, g_end: float, |
| cycle: float |
| ) -> float: |
| """Calculate temporal overlap between arrival window and green phase.""" |
| |
| if a_end < a_start: |
| a_end += cycle |
| if g_end < g_start: |
| g_end += cycle |
| |
| overlap_start = max(a_start, g_start) |
| overlap_end = min(a_end, g_end) |
| |
| return max(0, overlap_end - overlap_start) |
| |
| def constrain_offset( |
| self, |
| ideal_offset: float, |
| cycle_length: float, |
| max_shift: float = None, |
| ) -> float: |
| """Apply constraints to keep offset within bounds.""" |
| if max_shift is None: |
| max_shift = cycle_length * 0.3 |
| |
| |
| offset = ideal_offset % cycle_length |
| return offset |
|
|
|
|
| |
| |
| |
|
|
| class IndianTrafficGenerator: |
| """Generate synthetic traffic data calibrated for Indian conditions.""" |
| |
| def __init__(self, seed: int = 42): |
| self.rng = np.random.RandomState(seed) |
| |
| def generate_corridor( |
| self, |
| n_intersections: int = 5, |
| base_link_length: float = 300, |
| city_type: str = "metro", |
| ) -> Tuple[List[Intersection], List[RoadLink]]: |
| """Generate a synthetic arterial corridor.""" |
| intersections = [] |
| links = [] |
| |
| base_cycle = 120 if city_type == "metro" else 90 |
| |
| for i in range(n_intersections): |
| phases = [ |
| SignalPhase(phase_id=0, green_time=40 + self.rng.randint(-5, 10)), |
| SignalPhase(phase_id=1, green_time=25 + self.rng.randint(-5, 5)), |
| SignalPhase(phase_id=2, green_time=15, is_pedestrian=True), |
| ] |
| cycle = sum(p.green_time + p.amber_time + p.all_red_time for p in phases) |
| |
| intersections.append(Intersection( |
| intersection_id=f"INT_{i}", |
| name=f"Intersection {i+1}", |
| cycle_length=cycle, |
| phases=phases, |
| current_offset=i * 20, |
| lat=23.2599 + i * 0.003, |
| lon=77.4126 + i * 0.003, |
| )) |
| |
| for i in range(n_intersections - 1): |
| length = base_link_length + self.rng.uniform(-50, 100) |
| links.append(RoadLink( |
| link_id=f"LINK_{i}_{i+1}", |
| length_m=length, |
| num_lanes=2 + self.rng.choice([0, 1]), |
| speed_limit_kmh=40 + self.rng.choice([-10, 0, 10]), |
| gradient_pct=self.rng.uniform(-2, 2), |
| side_friction=0.2 + self.rng.uniform(0, 0.3), |
| saturation_flow_pcu_hr=1400 + self.rng.randint(-100, 200), |
| )) |
| |
| return intersections, links |
| |
| def generate_demand_profile( |
| self, |
| duration_hours: float = 2.0, |
| peak_flow_veh_hr: float = 2000, |
| profile_type: str = "morning_peak", |
| city_type: str = "metro", |
| ) -> pd.DataFrame: |
| """Generate time-varying demand with Indian characteristics.""" |
| dt_min = 5 |
| n_steps = int(duration_hours * 60 / dt_min) |
| times = np.arange(n_steps) * dt_min |
| |
| |
| if profile_type == "morning_peak": |
| |
| demand_factor = np.concatenate([ |
| np.linspace(0.3, 1.0, n_steps // 3), |
| np.ones(n_steps // 3) * 1.0, |
| np.linspace(1.0, 0.6, n_steps - 2 * (n_steps // 3)), |
| ]) |
| elif profile_type == "evening_peak": |
| demand_factor = np.concatenate([ |
| np.linspace(0.5, 0.9, n_steps // 4), |
| np.linspace(0.9, 1.0, n_steps // 4), |
| np.ones(n_steps // 4) * 1.0, |
| np.linspace(1.0, 0.4, n_steps - 3 * (n_steps // 4)), |
| ]) |
| else: |
| demand_factor = 0.4 + 0.1 * np.sin(2 * np.pi * times / (duration_hours * 60)) |
| |
| |
| noise = 1.0 + self.rng.normal(0, 0.15, n_steps) |
| noise = np.clip(noise, 0.5, 1.5) |
| |
| flow = peak_flow_veh_hr * demand_factor * noise |
| |
| |
| mix_key = f"{city_type}_peak" if profile_type != "off_peak" else f"{city_type}_offpeak" |
| if mix_key not in VEHICLE_MIX_PROFILES: |
| mix_key = "metro_peak" |
| base_mix = VEHICLE_MIX_PROFILES[mix_key] |
| |
| records = [] |
| for i, t in enumerate(times): |
| |
| mix = dict(base_mix) |
| if demand_factor[i] > 0.8: |
| mix["two_wheeler"] = min(mix["two_wheeler"] * 1.1, 0.70) |
| mix["car"] = mix["car"] * 0.9 |
| |
| |
| total = sum(mix.values()) |
| mix = {k: v / total for k, v in mix.items()} |
| |
| record = { |
| "time_min": t, |
| "flow_veh_hr": flow[i], |
| "flow_pcu_hr": self._to_pcu_flow(flow[i], mix), |
| } |
| for vtype, frac in mix.items(): |
| record[f"pct_{vtype}"] = frac * 100 |
| |
| records.append(record) |
| |
| return pd.DataFrame(records) |
| |
| def generate_training_data( |
| self, |
| n_samples: int = 5000, |
| city_type: str = "metro", |
| ) -> pd.DataFrame: |
| """Generate training data for ML travel time prediction model.""" |
| records = [] |
| |
| for i in range(n_samples): |
| |
| link_length = self.rng.uniform(150, 600) |
| speed_limit = self.rng.choice([30, 40, 50, 60]) |
| num_lanes = self.rng.choice([2, 3, 4]) |
| gradient = self.rng.uniform(-3, 3) |
| side_friction = self.rng.uniform(0.1, 0.6) |
| |
| |
| mix_type = self.rng.choice(list(VEHICLE_MIX_PROFILES.keys())) |
| base_mix = dict(VEHICLE_MIX_PROFILES[mix_type]) |
| |
| for k in base_mix: |
| base_mix[k] *= (1 + self.rng.normal(0, 0.1)) |
| total = sum(base_mix.values()) |
| base_mix = {k: v / total for k, v in base_mix.items()} |
| |
| |
| density = self.rng.uniform(10, 80) |
| weather = self.rng.choice(["clear", "clear", "clear", "light_rain", "heavy_rain", "fog"]) |
| time_of_day = self.rng.uniform(0, 24) |
| is_peak = 1 if (7 <= time_of_day <= 10) or (17 <= time_of_day <= 20) else 0 |
| day_type = self.rng.choice(["weekday", "weekday", "weekday", "weekday", "weekday", "weekend", "weekend"]) |
| |
| |
| platoon_size = self.rng.randint(5, 40) |
| platoon_pcu = self._platoon_pcu(platoon_size, base_mix) |
| |
| |
| base_speed = self._compute_base_speed( |
| speed_limit, base_mix, density, weather, side_friction, gradient |
| ) |
| base_tt = link_length / (base_speed / 3.6) |
| |
| |
| noise_factor = 1.0 + self.rng.normal(0, 0.15 + 0.1 * is_peak) |
| noise_factor = max(noise_factor, 0.6) |
| actual_tt = base_tt * noise_factor |
| |
| |
| two_w_pct = base_mix.get("two_wheeler", 0) |
| dispersion = actual_tt * (0.1 + 0.3 * two_w_pct) |
| |
| records.append({ |
| "link_length_m": link_length, |
| "speed_limit_kmh": speed_limit, |
| "num_lanes": num_lanes, |
| "gradient_pct": gradient, |
| "side_friction": side_friction, |
| "pct_two_wheeler": base_mix.get("two_wheeler", 0) * 100, |
| "pct_car": base_mix.get("car", 0) * 100, |
| "pct_auto": base_mix.get("auto_rickshaw", 0) * 100, |
| "pct_bus": base_mix.get("bus", 0) * 100, |
| "pct_truck": base_mix.get("truck", 0) * 100, |
| "density_veh_km_lane": density, |
| "weather_speed_factor": WEATHER_SPEED_FACTORS.get(weather, 1.0), |
| "time_of_day_sin": np.sin(2 * np.pi * time_of_day / 24), |
| "time_of_day_cos": np.cos(2 * np.pi * time_of_day / 24), |
| "is_peak": is_peak, |
| "is_weekend": 1 if day_type == "weekend" else 0, |
| "platoon_size": platoon_size, |
| "platoon_pcu": platoon_pcu, |
| "upstream_queue_pcu": self.rng.uniform(0, 30), |
| "downstream_queue_pcu": self.rng.uniform(0, 20), |
| "actual_travel_time_s": actual_tt, |
| "platoon_dispersion_s": dispersion, |
| "weather": weather, |
| "city_type": city_type, |
| "mix_type": mix_type, |
| }) |
| |
| return pd.DataFrame(records) |
| |
| def _compute_base_speed( |
| self, speed_limit, vehicle_mix, density, weather, side_friction, gradient |
| ): |
| """Compute effective speed from conditions.""" |
| |
| speed = speed_limit |
| |
| |
| jam_density = 150 |
| if density < jam_density: |
| speed *= (1 - (density / jam_density) ** 1.5) |
| else: |
| speed = 5.0 |
| |
| |
| speed *= WEATHER_SPEED_FACTORS.get(weather, 1.0) |
| |
| |
| speed *= (1 - 0.25 * side_friction) |
| |
| |
| speed *= (1 - 0.02 * gradient) |
| |
| |
| heavy_frac = sum(vehicle_mix.get(v, 0) for v in ["bus", "truck", "cycle_rickshaw"]) |
| speed *= (1 - 0.15 * heavy_frac) |
| |
| return max(speed, 3.0) |
| |
| def _to_pcu_flow(self, flow_veh_hr, vehicle_mix): |
| """Convert vehicle flow to PCU flow.""" |
| pcu_factor = sum(vehicle_mix.get(vtype, 0) * PCU_INDIA.get(vtype, 1.0) |
| for vtype in vehicle_mix) |
| return flow_veh_hr * pcu_factor |
| |
| def _platoon_pcu(self, platoon_size, vehicle_mix): |
| """Convert platoon vehicle count to PCU.""" |
| pcu = 0 |
| for vtype, frac in vehicle_mix.items(): |
| count = int(platoon_size * frac) |
| pcu += count * PCU_INDIA.get(vtype, 1.0) |
| return pcu |
|
|
|
|
| |
| |
| |
|
|
| class CorridorSimulator: |
| """ |
| Simulates traffic flow through a corridor of signals. |
| Compares fixed-time vs. APOO adaptive timing. |
| """ |
| |
| def __init__( |
| self, |
| intersections: List[Intersection], |
| links: List[RoadLink], |
| robertson: RobertsonDispersion = None, |
| optimizer: OffsetOptimizer = None, |
| emission_calc: EmissionCalculator = None, |
| ): |
| self.intersections = intersections |
| self.links = links |
| self.robertson = robertson or RobertsonDispersion(beta=0.60) |
| self.optimizer = optimizer or OffsetOptimizer(safety_buffer_s=12.0) |
| self.emission_calc = emission_calc or EmissionCalculator() |
| self.rng = np.random.RandomState(42) |
| |
| def simulate( |
| self, |
| demand_profile: pd.DataFrame, |
| vehicle_mix: Dict[str, float], |
| weather: str = "clear", |
| method: str = "fixed", |
| ml_model=None, |
| ml_features_func=None, |
| ) -> SimulationResult: |
| """ |
| Run a full corridor simulation. |
| |
| Args: |
| demand_profile: DataFrame with time_min and flow_veh_hr |
| vehicle_mix: Vehicle composition |
| weather: Weather condition |
| method: "fixed" (baseline) or "apoo" (adaptive) |
| ml_model: Trained ML model for APOO travel time prediction |
| ml_features_func: Function to extract features for ML model |
| |
| Returns: |
| SimulationResult |
| """ |
| total_delay = 0 |
| total_stops = 0 |
| total_vehicles = 0 |
| platoons_on_green = 0 |
| total_platoons = 0 |
| total_idle_time = 0 |
| cycle_details = [] |
| |
| n_links = len(self.links) |
| |
| for _, row in demand_profile.iterrows(): |
| time_min = row["time_min"] |
| flow = row["flow_veh_hr"] |
| |
| |
| platoon_size = max(1, int(flow * 5 / 3600)) |
| |
| for link_idx in range(n_links): |
| link = self.links[link_idx] |
| upstream = self.intersections[link_idx] |
| downstream = self.intersections[link_idx + 1] |
| |
| |
| green_time = upstream.phases[0].green_time |
| discharge_rate = platoon_size / green_time |
| departure = np.zeros(int(upstream.cycle_length)) |
| green_start = int(upstream.current_offset % upstream.cycle_length) |
| for t in range(green_start, min(green_start + int(green_time), len(departure))): |
| departure[t] = discharge_rate |
| |
| |
| arrival_profile, travel_time = self.robertson.disperse( |
| departure, link.length_m, |
| link.speed_limit_kmh, |
| vehicle_mix=vehicle_mix, |
| weather=weather, |
| side_friction=link.side_friction, |
| ) |
| |
| |
| platoon_centroid = travel_time + green_start |
| |
| if method == "apoo": |
| |
| if ml_model is not None and ml_features_func is not None: |
| features = ml_features_func(link, vehicle_mix, weather, time_min, flow) |
| predicted_tt = ml_model.predict(features.reshape(1, -1))[0] |
| uncertainty = abs(predicted_tt - travel_time) * 0.5 + 3.0 |
| else: |
| predicted_tt = travel_time |
| uncertainty = travel_time * 0.12 |
| |
| |
| t_arrive_head = green_start + predicted_tt - uncertainty |
| t_arrive_tail = green_start + predicted_tt + uncertainty + 5 |
| |
| optimal_offset, overlap_frac, feasible = self.optimizer.calculate_ideal_offset( |
| t_arrive_head, t_arrive_tail, |
| downstream.cycle_length, |
| downstream.current_offset, |
| downstream.phases[0].green_time, |
| prediction_uncertainty=uncertainty, |
| ) |
| |
| if feasible: |
| downstream.current_offset = optimal_offset |
| |
| |
| ds_green_start = downstream.current_offset |
| ds_green_end = ds_green_start + downstream.phases[0].green_time |
| |
| centroid_in_cycle = platoon_centroid % downstream.cycle_length |
| on_green = ds_green_start <= centroid_in_cycle <= ds_green_end |
| |
| total_platoons += 1 |
| if on_green: |
| platoons_on_green += 1 |
| delay = self.rng.uniform(2, 8) |
| else: |
| |
| if centroid_in_cycle < ds_green_start: |
| delay = ds_green_start - centroid_in_cycle |
| else: |
| delay = downstream.cycle_length - centroid_in_cycle + ds_green_start |
| delay += self.rng.uniform(0, 5) |
| total_stops += platoon_size |
| |
| total_delay += delay * platoon_size |
| total_vehicles += platoon_size |
| total_idle_time += delay * (0 if on_green else 1) |
| |
| cycle_details.append({ |
| "time_min": time_min, |
| "link_idx": link_idx, |
| "travel_time_s": travel_time, |
| "delay_s": delay, |
| "on_green": on_green, |
| "platoon_size": platoon_size, |
| "offset": downstream.current_offset, |
| "method": method, |
| }) |
| |
| |
| total_distance_km = sum(l.length_m for l in self.links) / 1000 |
| vehicle_counts = {vtype: max(1, int(total_vehicles * frac)) |
| for vtype, frac in vehicle_mix.items()} |
| |
| running = self.emission_calc.running_emissions(total_distance_km, vehicle_counts) |
| idling = self.emission_calc.idle_emissions(total_idle_time, vehicle_counts) |
| fuel = self.emission_calc.fuel_consumption_ml(total_idle_time, total_distance_km, vehicle_counts) |
| |
| avg_delay = total_delay / max(total_vehicles, 1) |
| green_pct = (platoons_on_green / max(total_platoons, 1)) * 100 |
| |
| |
| total_distance = sum(l.length_m for l in self.links) |
| avg_travel_time = total_distance / (30 / 3.6) + avg_delay |
| avg_speed = (total_distance / 1000) / (avg_travel_time / 3600) if avg_travel_time > 0 else 0 |
| |
| throughput = total_vehicles / (demand_profile["time_min"].max() / 60) if len(demand_profile) > 0 else 0 |
| |
| return SimulationResult( |
| method=method, |
| total_delay_s=total_delay, |
| avg_delay_per_vehicle_s=avg_delay, |
| total_stops=total_stops, |
| platoons_on_green=platoons_on_green, |
| total_platoons=total_platoons, |
| green_arrival_pct=green_pct, |
| total_fuel_ml=fuel, |
| total_co2_g=running["CO2"] + idling.get("CO2", 0), |
| total_co_g=running["CO"] + idling.get("CO", 0), |
| total_nox_g=running["NOx"] + idling.get("NOx", 0), |
| total_pm25_g=running.get("PM25", 0), |
| throughput_veh_hr=throughput, |
| avg_speed_kmh=avg_speed, |
| cycle_details=cycle_details, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| def compute_pcu_flow(flow_veh_hr: float, vehicle_mix: Dict[str, float]) -> float: |
| """Convert vehicle flow to PCU-equivalent flow.""" |
| pcu_factor = sum(frac * PCU_INDIA.get(vtype, 1.0) |
| for vtype, frac in vehicle_mix.items()) |
| return flow_veh_hr * pcu_factor |
|
|
|
|
| def format_kpi_comparison(fixed: SimulationResult, apoo: SimulationResult) -> pd.DataFrame: |
| """Create a comparison table of KPIs.""" |
| metrics = [ |
| ("Avg Delay per Vehicle (s)", f"{fixed.avg_delay_per_vehicle_s:.1f}", |
| f"{apoo.avg_delay_per_vehicle_s:.1f}", |
| f"{((fixed.avg_delay_per_vehicle_s - apoo.avg_delay_per_vehicle_s) / max(fixed.avg_delay_per_vehicle_s, 0.01)) * 100:.1f}%"), |
| ("Platoons Arriving on Green (%)", f"{fixed.green_arrival_pct:.1f}", |
| f"{apoo.green_arrival_pct:.1f}", |
| f"+{apoo.green_arrival_pct - fixed.green_arrival_pct:.1f}pp"), |
| ("Total Stops", f"{fixed.total_stops:,}", f"{apoo.total_stops:,}", |
| f"{((fixed.total_stops - apoo.total_stops) / max(fixed.total_stops, 1)) * 100:.1f}%"), |
| ("Total CO₂ (g)", f"{fixed.total_co2_g:.0f}", f"{apoo.total_co2_g:.0f}", |
| f"{((fixed.total_co2_g - apoo.total_co2_g) / max(fixed.total_co2_g, 0.01)) * 100:.1f}%"), |
| ("Total CO (g)", f"{fixed.total_co_g:.1f}", f"{apoo.total_co_g:.1f}", |
| f"{((fixed.total_co_g - apoo.total_co_g) / max(fixed.total_co_g, 0.01)) * 100:.1f}%"), |
| ("Total NOx (g)", f"{fixed.total_nox_g:.1f}", f"{apoo.total_nox_g:.1f}", |
| f"{((fixed.total_nox_g - apoo.total_nox_g) / max(fixed.total_nox_g, 0.01)) * 100:.1f}%"), |
| ("Fuel Consumption (mL)", f"{fixed.total_fuel_ml:.0f}", f"{apoo.total_fuel_ml:.0f}", |
| f"{((fixed.total_fuel_ml - apoo.total_fuel_ml) / max(fixed.total_fuel_ml, 0.01)) * 100:.1f}%"), |
| ("Avg Speed (km/h)", f"{fixed.avg_speed_kmh:.1f}", f"{apoo.avg_speed_kmh:.1f}", |
| f"+{apoo.avg_speed_kmh - fixed.avg_speed_kmh:.1f}"), |
| ("Throughput (veh/hr)", f"{fixed.throughput_veh_hr:.0f}", f"{apoo.throughput_veh_hr:.0f}", |
| f"+{apoo.throughput_veh_hr - fixed.throughput_veh_hr:.0f}"), |
| ] |
| |
| return pd.DataFrame(metrics, columns=["KPI", "Fixed-Time", "APOO Adaptive", "Improvement"]) |
|
|
|
|
| if __name__ == "__main__": |
| |
| gen = IndianTrafficGenerator(seed=42) |
| intersections, links = gen.corridor(n_intersections=5) |
| demand = gen.generate_demand_profile() |
| training_data = gen.generate_training_data(n_samples=100) |
| print(f"Generated corridor: {len(intersections)} intersections, {len(links)} links") |
| print(f"Demand profile: {len(demand)} time steps") |
| print(f"Training data: {len(training_data)} samples") |
| print(f"Training columns: {list(training_data.columns)}") |
|
|