APOO-Traffic-Optimizer / apoo_core.py
omshrivastava's picture
Add APOO core engine
7ef6f81 verified
"""
APOO Core Engine — Adaptive Platoon Offset Optimizer
====================================================
Core algorithms: Robertson's platoon dispersion (India-calibrated),
dynamic offset optimization, emission modeling, and simulation engine.
Author: APOO Project for MoRTH India
References:
- Robertson (1969): TRANSYT platoon dispersion model
- IRC:106-1990, IRC:SP:41: Indian PCU standards
- ARAI/CPCB BS-VI emission factors
- Kadiyali (2000), Mathew & Krishna Rao (2006): Indian β calibration
"""
import numpy as np
import pandas as pd
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
import json
# ============================================================
# 1. INDIAN TRAFFIC CONSTANTS
# ============================================================
# IRC:106-1990 & IRC:SP:41 PCU values
PCU_INDIA = {
"car": 1.0,
"two_wheeler": 0.5,
"auto_rickshaw": 0.6,
"bus": 3.0,
"truck": 3.0,
"lcv": 1.4,
"bicycle": 0.5,
"cycle_rickshaw": 1.5,
}
# Typical Indian urban vehicle composition profiles (%)
VEHICLE_MIX_PROFILES = {
"metro_peak": {"two_wheeler": 0.55, "car": 0.25, "auto_rickshaw": 0.08, "bus": 0.05, "truck": 0.02, "lcv": 0.03, "bicycle": 0.01, "cycle_rickshaw": 0.01},
"metro_offpeak": {"two_wheeler": 0.50, "car": 0.22, "auto_rickshaw": 0.10, "bus": 0.06, "truck": 0.04, "lcv": 0.04, "bicycle": 0.02, "cycle_rickshaw": 0.02},
"tier2_peak": {"two_wheeler": 0.60, "car": 0.15, "auto_rickshaw": 0.10, "bus": 0.05, "truck": 0.03, "lcv": 0.02, "bicycle": 0.03, "cycle_rickshaw": 0.02},
"tier2_offpeak": {"two_wheeler": 0.55, "car": 0.12, "auto_rickshaw": 0.12, "bus": 0.06, "truck": 0.04, "lcv": 0.03, "bicycle": 0.05, "cycle_rickshaw": 0.03},
}
# Vehicle type average speeds (km/h) in Indian urban arterials
VEHICLE_SPEEDS_INDIA = {
"two_wheeler": {"free_flow": 40, "congested": 20, "saturated": 10},
"car": {"free_flow": 45, "congested": 18, "saturated": 8},
"auto_rickshaw": {"free_flow": 35, "congested": 15, "saturated": 7},
"bus": {"free_flow": 30, "congested": 12, "saturated": 5},
"truck": {"free_flow": 25, "congested": 10, "saturated": 4},
"lcv": {"free_flow": 35, "congested": 14, "saturated": 6},
"bicycle": {"free_flow": 15, "congested": 10, "saturated": 8},
"cycle_rickshaw": {"free_flow": 12, "congested": 8, "saturated": 5},
}
# ARAI/CPCB BS-VI Emission Factors (g/km)
EMISSION_FACTORS = {
"two_wheeler": {"CO": 0.50, "HC": 0.10, "NOx": 0.06, "PM25": 0.005, "CO2": 55},
"car": {"CO": 0.50, "HC": 0.05, "NOx": 0.06, "PM25": 0.003, "CO2": 120},
"auto_rickshaw": {"CO": 0.30, "HC": 0.06, "NOx": 0.06, "PM25": 0.003, "CO2": 48},
"bus": {"CO": 1.50, "HC": 0.20, "NOx": 0.40, "PM25": 0.010, "CO2": 550},
"truck": {"CO": 1.80, "HC": 0.25, "NOx": 0.46, "PM25": 0.025, "CO2": 900},
"lcv": {"CO": 0.50, "HC": 0.06, "NOx": 0.17, "PM25": 0.008, "CO2": 200},
"bicycle": {"CO": 0, "HC": 0, "NOx": 0, "PM25": 0, "CO2": 0},
"cycle_rickshaw": {"CO": 0, "HC": 0, "NOx": 0, "PM25": 0, "CO2": 0},
}
# Idle emission rates (g/min)
IDLE_EMISSION_RATES = {
"two_wheeler": {"CO": 0.10, "HC": 0.025, "NOx": 0.003, "CO2": 2.5},
"car": {"CO": 0.15, "HC": 0.010, "NOx": 0.005, "CO2": 8.0},
"auto_rickshaw": {"CO": 0.08, "HC": 0.015, "NOx": 0.004, "CO2": 3.0},
"bus": {"CO": 0.50, "HC": 0.050, "NOx": 0.080, "CO2": 55.0},
"truck": {"CO": 0.60, "HC": 0.060, "NOx": 0.090, "CO2": 60.0},
"lcv": {"CO": 0.20, "HC": 0.015, "NOx": 0.010, "CO2": 12.0},
"bicycle": {"CO": 0, "HC": 0, "NOx": 0, "CO2": 0},
"cycle_rickshaw": {"CO": 0, "HC": 0, "NOx": 0, "CO2": 0},
}
# Weather impact factors on speed
WEATHER_SPEED_FACTORS = {
"clear": 1.0,
"light_rain": 0.85,
"heavy_rain": 0.65, # Monsoon conditions
"fog": 0.70,
"night": 0.90,
}
# ============================================================
# 2. DATA STRUCTURES
# ============================================================
@dataclass
class RoadLink:
"""Represents a road link between two signals."""
link_id: str
length_m: float # meters
num_lanes: int
speed_limit_kmh: float
gradient_pct: float = 0.0 # positive = uphill
side_friction: float = 0.3 # 0-1, higher in India (vendors, parking)
saturation_flow_pcu_hr: float = 1500 # PCU/hr/lane (Indian default)
@dataclass
class SignalPhase:
"""Signal phase configuration."""
phase_id: int
green_time: float # seconds
amber_time: float = 3.0
all_red_time: float = 2.0
min_green: float = 10.0
max_green: float = 60.0
is_pedestrian: bool = False
@dataclass
class Intersection:
"""Represents a signalized intersection."""
intersection_id: str
name: str
cycle_length: float # seconds
phases: List[SignalPhase] = field(default_factory=list)
current_offset: float = 0.0 # offset from master clock
queue_length_pcu: float = 0.0
lat: float = 0.0
lon: float = 0.0
@dataclass
class Platoon:
"""Represents a vehicle platoon released from upstream signal."""
platoon_id: str
release_time: float # seconds from simulation start
size_vehicles: int
size_pcu: float
vehicle_composition: Dict[str, float] # type -> fraction
avg_speed_kmh: float
speed_std_kmh: float
head_time: float = 0.0 # arrival time of first vehicle
tail_time: float = 0.0 # arrival time of last vehicle
centroid_time: float = 0.0
@dataclass
class SimulationResult:
"""Results from a simulation run."""
method: str # "fixed" or "apoo"
total_delay_s: float
avg_delay_per_vehicle_s: float
total_stops: int
platoons_on_green: int
total_platoons: int
green_arrival_pct: float
total_fuel_ml: float
total_co2_g: float
total_co_g: float
total_nox_g: float
total_pm25_g: float
throughput_veh_hr: float
avg_speed_kmh: float
cycle_details: List[dict] = field(default_factory=list)
# ============================================================
# 3. ROBERTSON'S PLATOON DISPERSION MODEL (India-Calibrated)
# ============================================================
class RobertsonDispersion:
"""
Robertson's (1969) platoon dispersion model.
Calibrated for Indian heterogeneous traffic.
Core equation: q'(t) = F * q'(t-1) + (1-F) * alpha * q(t - t_bar)
Where:
alpha = 1 / (1 + beta * t_bar)
F = 1 - alpha
beta = dispersion factor (0.50-0.80 for Indian mixed traffic)
"""
def __init__(self, beta: float = 0.60):
"""
Args:
beta: Dispersion factor.
0.25-0.35 for homogeneous (Western cities)
0.50-0.80 for Indian heterogeneous traffic
Default 0.60 per Kadiyali/Mathew calibration.
"""
self.beta = beta
def compute_params(self, t_bar: float) -> Tuple[float, float]:
"""Compute alpha and F from beta and mean travel time."""
alpha = 1.0 / (1.0 + self.beta * t_bar)
F = 1.0 - alpha
return alpha, F
def disperse(
self,
departure_profile: np.ndarray,
link_length_m: float,
speed_kmh: float,
dt: float = 1.0,
vehicle_mix: Optional[Dict[str, float]] = None,
weather: str = "clear",
side_friction: float = 0.3,
) -> Tuple[np.ndarray, float]:
"""
Propagate platoon through a link using Robertson's model.
Args:
departure_profile: Flow (veh/s) at upstream signal per time step
link_length_m: Distance between signals (meters)
speed_kmh: Base free-flow speed
dt: Time step in seconds
vehicle_mix: Vehicle composition dict (affects beta)
weather: Weather condition
side_friction: Side friction factor (0-1)
Returns:
(arrival_profile, effective_travel_time)
"""
# Adjust speed for conditions
effective_speed = self._adjust_speed(speed_kmh, vehicle_mix, weather, side_friction)
# Mean travel time
t_bar = link_length_m / (effective_speed / 3.6) # seconds
# Adjust beta for vehicle composition (more 2W → higher dispersion)
effective_beta = self._adjust_beta(vehicle_mix)
alpha = 1.0 / (1.0 + effective_beta * t_bar)
F = 1.0 - alpha
# Robertson recurrence
shift = int(round(t_bar / dt))
T = len(departure_profile)
total_len = T + shift + int(30 / dt) # extra buffer for tail
arrival = np.zeros(total_len)
for t in range(1, total_len):
upstream_idx = t - shift
upstream_flow = departure_profile[upstream_idx] if 0 <= upstream_idx < T else 0.0
arrival[t] = F * arrival[t - 1] + (1 - F) * upstream_flow
return arrival, t_bar
def _adjust_speed(
self,
base_speed: float,
vehicle_mix: Optional[Dict[str, float]],
weather: str,
side_friction: float,
) -> float:
"""Adjust speed for Indian conditions."""
speed = base_speed
# Weather factor
speed *= WEATHER_SPEED_FACTORS.get(weather, 1.0)
# Side friction (higher friction → lower speed)
speed *= (1.0 - 0.3 * side_friction)
# Vehicle mix effect (heavy vehicles slow down flow)
if vehicle_mix:
heavy_fraction = sum(vehicle_mix.get(v, 0) for v in ["bus", "truck", "lcv", "cycle_rickshaw"])
speed *= (1.0 - 0.15 * heavy_fraction)
return max(speed, 5.0) # Minimum 5 km/h
def _adjust_beta(self, vehicle_mix: Optional[Dict[str, float]]) -> float:
"""Adjust dispersion factor based on vehicle composition.
Two-wheelers increase dispersion (they filter through traffic).
Homogeneous traffic (all cars) → lower beta.
"""
if vehicle_mix is None:
return self.beta
two_wheeler_frac = vehicle_mix.get("two_wheeler", 0)
auto_frac = vehicle_mix.get("auto_rickshaw", 0)
# More 2W/autos → higher dispersion
mix_factor = 1.0 + 0.3 * two_wheeler_frac + 0.2 * auto_frac
return min(self.beta * mix_factor, 0.90)
# ============================================================
# 4. EMISSION CALCULATOR
# ============================================================
class EmissionCalculator:
"""Calculate emissions from traffic operations."""
@staticmethod
def running_emissions(distance_km: float, vehicle_counts: Dict[str, int]) -> Dict[str, float]:
"""Emissions from vehicles traveling a distance."""
totals = {"CO": 0, "HC": 0, "NOx": 0, "PM25": 0, "CO2": 0}
for vtype, count in vehicle_counts.items():
factors = EMISSION_FACTORS.get(vtype, EMISSION_FACTORS["car"])
for pollutant in totals:
totals[pollutant] += count * factors[pollutant] * distance_km
return totals
@staticmethod
def idle_emissions(idle_time_s: float, vehicle_counts: Dict[str, int]) -> Dict[str, float]:
"""Emissions from vehicles idling at a red signal."""
totals = {"CO": 0, "HC": 0, "NOx": 0, "CO2": 0}
idle_min = idle_time_s / 60.0
for vtype, count in vehicle_counts.items():
rates = IDLE_EMISSION_RATES.get(vtype, IDLE_EMISSION_RATES.get("car", {}))
for pollutant in totals:
if pollutant in rates:
totals[pollutant] += count * rates[pollutant] * idle_min
return totals
@staticmethod
def fuel_consumption_ml(idle_time_s: float, distance_km: float,
vehicle_counts: Dict[str, int]) -> float:
"""Estimate fuel consumption (mL) using CO2 as proxy.
Gasoline: ~2.31 kg CO2 per liter.
"""
running = EmissionCalculator.running_emissions(distance_km, vehicle_counts)
idling = EmissionCalculator.idle_emissions(idle_time_s, vehicle_counts)
total_co2_g = running["CO2"] + idling.get("CO2", 0)
fuel_liters = total_co2_g / 2310 # g CO2 → liters gasoline
return fuel_liters * 1000 # mL
# ============================================================
# 5. OFFSET OPTIMIZER
# ============================================================
class OffsetOptimizer:
"""Dynamic platoon-based offset adjustment algorithm."""
def __init__(self, safety_buffer_s: float = 10.0):
"""
Args:
safety_buffer_s: Safety buffer in seconds (higher for Indian conditions).
Recommended: 10-20s for India.
"""
self.safety_buffer = safety_buffer_s
def calculate_ideal_offset(
self,
t_arrive_head: float,
t_arrive_tail: float,
cycle_length: float,
current_green_start: float,
green_duration: float,
min_green: float = 10.0,
prediction_uncertainty: float = 5.0,
) -> Tuple[float, float, bool]:
"""
Calculate the ideal offset to maximize platoon-green overlap.
Args:
t_arrive_head: Predicted arrival time of platoon head (s from cycle start)
t_arrive_tail: Predicted arrival of platoon tail
cycle_length: Signal cycle length (s)
current_green_start: Current green start time within cycle
green_duration: Green phase duration
min_green: Minimum green for cross traffic
prediction_uncertainty: Std dev of prediction (s)
Returns:
(optimal_offset, overlap_fraction, is_feasible)
"""
# Arrival window (within cycle)
arrive_head_mod = (t_arrive_head - self.safety_buffer) % cycle_length
arrive_tail_mod = (t_arrive_tail + prediction_uncertainty) % cycle_length
platoon_window = t_arrive_tail - t_arrive_head + self.safety_buffer + prediction_uncertainty
# Try different offsets and find best overlap
best_offset = current_green_start
best_overlap = 0.0
for trial_offset in np.linspace(0, cycle_length, 100):
green_start = trial_offset % cycle_length
green_end = (green_start + green_duration) % cycle_length
# Calculate overlap between green window and arrival window
overlap = self._calculate_overlap(
arrive_head_mod, arrive_tail_mod,
green_start, green_end,
cycle_length
)
if overlap > best_overlap:
best_overlap = overlap
best_offset = trial_offset
# Check feasibility (respect min cross-traffic green)
remaining_for_cross = cycle_length - green_duration
is_feasible = remaining_for_cross >= min_green
overlap_fraction = best_overlap / max(platoon_window, 1.0)
return best_offset, overlap_fraction, is_feasible
def _calculate_overlap(
self, a_start: float, a_end: float,
g_start: float, g_end: float,
cycle: float
) -> float:
"""Calculate temporal overlap between arrival window and green phase."""
# Handle wrap-around in cycle
if a_end < a_start:
a_end += cycle
if g_end < g_start:
g_end += cycle
overlap_start = max(a_start, g_start)
overlap_end = min(a_end, g_end)
return max(0, overlap_end - overlap_start)
def constrain_offset(
self,
ideal_offset: float,
cycle_length: float,
max_shift: float = None,
) -> float:
"""Apply constraints to keep offset within bounds."""
if max_shift is None:
max_shift = cycle_length * 0.3 # Max 30% cycle shift
# Clamp to valid range
offset = ideal_offset % cycle_length
return offset
# ============================================================
# 6. SYNTHETIC DATA GENERATOR (Indian Conditions)
# ============================================================
class IndianTrafficGenerator:
"""Generate synthetic traffic data calibrated for Indian conditions."""
def __init__(self, seed: int = 42):
self.rng = np.random.RandomState(seed)
def generate_corridor(
self,
n_intersections: int = 5,
base_link_length: float = 300,
city_type: str = "metro",
) -> Tuple[List[Intersection], List[RoadLink]]:
"""Generate a synthetic arterial corridor."""
intersections = []
links = []
base_cycle = 120 if city_type == "metro" else 90
for i in range(n_intersections):
phases = [
SignalPhase(phase_id=0, green_time=40 + self.rng.randint(-5, 10)),
SignalPhase(phase_id=1, green_time=25 + self.rng.randint(-5, 5)),
SignalPhase(phase_id=2, green_time=15, is_pedestrian=True),
]
cycle = sum(p.green_time + p.amber_time + p.all_red_time for p in phases)
intersections.append(Intersection(
intersection_id=f"INT_{i}",
name=f"Intersection {i+1}",
cycle_length=cycle,
phases=phases,
current_offset=i * 20, # Initial fixed offset
lat=23.2599 + i * 0.003, # Bhopal coordinates as example
lon=77.4126 + i * 0.003,
))
for i in range(n_intersections - 1):
length = base_link_length + self.rng.uniform(-50, 100)
links.append(RoadLink(
link_id=f"LINK_{i}_{i+1}",
length_m=length,
num_lanes=2 + self.rng.choice([0, 1]),
speed_limit_kmh=40 + self.rng.choice([-10, 0, 10]),
gradient_pct=self.rng.uniform(-2, 2),
side_friction=0.2 + self.rng.uniform(0, 0.3),
saturation_flow_pcu_hr=1400 + self.rng.randint(-100, 200),
))
return intersections, links
def generate_demand_profile(
self,
duration_hours: float = 2.0,
peak_flow_veh_hr: float = 2000,
profile_type: str = "morning_peak",
city_type: str = "metro",
) -> pd.DataFrame:
"""Generate time-varying demand with Indian characteristics."""
dt_min = 5 # 5-minute intervals
n_steps = int(duration_hours * 60 / dt_min)
times = np.arange(n_steps) * dt_min
# Demand profile shape
if profile_type == "morning_peak":
# Ramp up, peak, slight decline
demand_factor = np.concatenate([
np.linspace(0.3, 1.0, n_steps // 3),
np.ones(n_steps // 3) * 1.0,
np.linspace(1.0, 0.6, n_steps - 2 * (n_steps // 3)),
])
elif profile_type == "evening_peak":
demand_factor = np.concatenate([
np.linspace(0.5, 0.9, n_steps // 4),
np.linspace(0.9, 1.0, n_steps // 4),
np.ones(n_steps // 4) * 1.0,
np.linspace(1.0, 0.4, n_steps - 3 * (n_steps // 4)),
])
else: # off_peak
demand_factor = 0.4 + 0.1 * np.sin(2 * np.pi * times / (duration_hours * 60))
# Add stochastic noise (Indian traffic is highly variable)
noise = 1.0 + self.rng.normal(0, 0.15, n_steps)
noise = np.clip(noise, 0.5, 1.5)
flow = peak_flow_veh_hr * demand_factor * noise
# Vehicle mix (varies with time)
mix_key = f"{city_type}_peak" if profile_type != "off_peak" else f"{city_type}_offpeak"
if mix_key not in VEHICLE_MIX_PROFILES:
mix_key = "metro_peak"
base_mix = VEHICLE_MIX_PROFILES[mix_key]
records = []
for i, t in enumerate(times):
# Slight time variation in mix (more 2W in peak)
mix = dict(base_mix)
if demand_factor[i] > 0.8:
mix["two_wheeler"] = min(mix["two_wheeler"] * 1.1, 0.70)
mix["car"] = mix["car"] * 0.9
# Normalize
total = sum(mix.values())
mix = {k: v / total for k, v in mix.items()}
record = {
"time_min": t,
"flow_veh_hr": flow[i],
"flow_pcu_hr": self._to_pcu_flow(flow[i], mix),
}
for vtype, frac in mix.items():
record[f"pct_{vtype}"] = frac * 100
records.append(record)
return pd.DataFrame(records)
def generate_training_data(
self,
n_samples: int = 5000,
city_type: str = "metro",
) -> pd.DataFrame:
"""Generate training data for ML travel time prediction model."""
records = []
for i in range(n_samples):
# Random link characteristics
link_length = self.rng.uniform(150, 600)
speed_limit = self.rng.choice([30, 40, 50, 60])
num_lanes = self.rng.choice([2, 3, 4])
gradient = self.rng.uniform(-3, 3)
side_friction = self.rng.uniform(0.1, 0.6)
# Vehicle composition
mix_type = self.rng.choice(list(VEHICLE_MIX_PROFILES.keys()))
base_mix = dict(VEHICLE_MIX_PROFILES[mix_type])
# Add noise to mix
for k in base_mix:
base_mix[k] *= (1 + self.rng.normal(0, 0.1))
total = sum(base_mix.values())
base_mix = {k: v / total for k, v in base_mix.items()}
# Traffic conditions
density = self.rng.uniform(10, 80) # veh/km/lane
weather = self.rng.choice(["clear", "clear", "clear", "light_rain", "heavy_rain", "fog"])
time_of_day = self.rng.uniform(0, 24) # hours
is_peak = 1 if (7 <= time_of_day <= 10) or (17 <= time_of_day <= 20) else 0
day_type = self.rng.choice(["weekday", "weekday", "weekday", "weekday", "weekday", "weekend", "weekend"])
# Platoon characteristics
platoon_size = self.rng.randint(5, 40)
platoon_pcu = self._platoon_pcu(platoon_size, base_mix)
# Compute actual travel time using physics + stochastic model
base_speed = self._compute_base_speed(
speed_limit, base_mix, density, weather, side_friction, gradient
)
base_tt = link_length / (base_speed / 3.6) # seconds
# Add realistic noise (higher in India)
noise_factor = 1.0 + self.rng.normal(0, 0.15 + 0.1 * is_peak)
noise_factor = max(noise_factor, 0.6)
actual_tt = base_tt * noise_factor
# Dispersion time (how much platoon spreads)
two_w_pct = base_mix.get("two_wheeler", 0)
dispersion = actual_tt * (0.1 + 0.3 * two_w_pct) # 2W cause more dispersion
records.append({
"link_length_m": link_length,
"speed_limit_kmh": speed_limit,
"num_lanes": num_lanes,
"gradient_pct": gradient,
"side_friction": side_friction,
"pct_two_wheeler": base_mix.get("two_wheeler", 0) * 100,
"pct_car": base_mix.get("car", 0) * 100,
"pct_auto": base_mix.get("auto_rickshaw", 0) * 100,
"pct_bus": base_mix.get("bus", 0) * 100,
"pct_truck": base_mix.get("truck", 0) * 100,
"density_veh_km_lane": density,
"weather_speed_factor": WEATHER_SPEED_FACTORS.get(weather, 1.0),
"time_of_day_sin": np.sin(2 * np.pi * time_of_day / 24),
"time_of_day_cos": np.cos(2 * np.pi * time_of_day / 24),
"is_peak": is_peak,
"is_weekend": 1 if day_type == "weekend" else 0,
"platoon_size": platoon_size,
"platoon_pcu": platoon_pcu,
"upstream_queue_pcu": self.rng.uniform(0, 30),
"downstream_queue_pcu": self.rng.uniform(0, 20),
"actual_travel_time_s": actual_tt,
"platoon_dispersion_s": dispersion,
"weather": weather,
"city_type": city_type,
"mix_type": mix_type,
})
return pd.DataFrame(records)
def _compute_base_speed(
self, speed_limit, vehicle_mix, density, weather, side_friction, gradient
):
"""Compute effective speed from conditions."""
# Start with limit
speed = speed_limit
# Greenshields-like density relationship
jam_density = 150 # veh/km/lane (Indian conditions)
if density < jam_density:
speed *= (1 - (density / jam_density) ** 1.5)
else:
speed = 5.0 # gridlock
# Weather
speed *= WEATHER_SPEED_FACTORS.get(weather, 1.0)
# Side friction
speed *= (1 - 0.25 * side_friction)
# Gradient (uphill slows, downhill speeds up slightly)
speed *= (1 - 0.02 * gradient)
# Heavy vehicle slowdown
heavy_frac = sum(vehicle_mix.get(v, 0) for v in ["bus", "truck", "cycle_rickshaw"])
speed *= (1 - 0.15 * heavy_frac)
return max(speed, 3.0)
def _to_pcu_flow(self, flow_veh_hr, vehicle_mix):
"""Convert vehicle flow to PCU flow."""
pcu_factor = sum(vehicle_mix.get(vtype, 0) * PCU_INDIA.get(vtype, 1.0)
for vtype in vehicle_mix)
return flow_veh_hr * pcu_factor
def _platoon_pcu(self, platoon_size, vehicle_mix):
"""Convert platoon vehicle count to PCU."""
pcu = 0
for vtype, frac in vehicle_mix.items():
count = int(platoon_size * frac)
pcu += count * PCU_INDIA.get(vtype, 1.0)
return pcu
# ============================================================
# 7. CORRIDOR SIMULATION ENGINE
# ============================================================
class CorridorSimulator:
"""
Simulates traffic flow through a corridor of signals.
Compares fixed-time vs. APOO adaptive timing.
"""
def __init__(
self,
intersections: List[Intersection],
links: List[RoadLink],
robertson: RobertsonDispersion = None,
optimizer: OffsetOptimizer = None,
emission_calc: EmissionCalculator = None,
):
self.intersections = intersections
self.links = links
self.robertson = robertson or RobertsonDispersion(beta=0.60)
self.optimizer = optimizer or OffsetOptimizer(safety_buffer_s=12.0)
self.emission_calc = emission_calc or EmissionCalculator()
self.rng = np.random.RandomState(42)
def simulate(
self,
demand_profile: pd.DataFrame,
vehicle_mix: Dict[str, float],
weather: str = "clear",
method: str = "fixed", # "fixed" or "apoo"
ml_model=None,
ml_features_func=None,
) -> SimulationResult:
"""
Run a full corridor simulation.
Args:
demand_profile: DataFrame with time_min and flow_veh_hr
vehicle_mix: Vehicle composition
weather: Weather condition
method: "fixed" (baseline) or "apoo" (adaptive)
ml_model: Trained ML model for APOO travel time prediction
ml_features_func: Function to extract features for ML model
Returns:
SimulationResult
"""
total_delay = 0
total_stops = 0
total_vehicles = 0
platoons_on_green = 0
total_platoons = 0
total_idle_time = 0
cycle_details = []
n_links = len(self.links)
for _, row in demand_profile.iterrows():
time_min = row["time_min"]
flow = row["flow_veh_hr"]
# Generate platoon for this time step
platoon_size = max(1, int(flow * 5 / 3600)) # vehicles in 5-min window
for link_idx in range(n_links):
link = self.links[link_idx]
upstream = self.intersections[link_idx]
downstream = self.intersections[link_idx + 1]
# Create departure profile (uniform discharge during green)
green_time = upstream.phases[0].green_time
discharge_rate = platoon_size / green_time # veh/s
departure = np.zeros(int(upstream.cycle_length))
green_start = int(upstream.current_offset % upstream.cycle_length)
for t in range(green_start, min(green_start + int(green_time), len(departure))):
departure[t] = discharge_rate
# Robertson dispersion
arrival_profile, travel_time = self.robertson.disperse(
departure, link.length_m,
link.speed_limit_kmh,
vehicle_mix=vehicle_mix,
weather=weather,
side_friction=link.side_friction,
)
# Determine if platoon arrives on green
platoon_centroid = travel_time + green_start
if method == "apoo":
# Predict travel time (use ML model if available, else Robertson)
if ml_model is not None and ml_features_func is not None:
features = ml_features_func(link, vehicle_mix, weather, time_min, flow)
predicted_tt = ml_model.predict(features.reshape(1, -1))[0]
uncertainty = abs(predicted_tt - travel_time) * 0.5 + 3.0
else:
predicted_tt = travel_time
uncertainty = travel_time * 0.12 # 12% uncertainty
# Optimize offset
t_arrive_head = green_start + predicted_tt - uncertainty
t_arrive_tail = green_start + predicted_tt + uncertainty + 5
optimal_offset, overlap_frac, feasible = self.optimizer.calculate_ideal_offset(
t_arrive_head, t_arrive_tail,
downstream.cycle_length,
downstream.current_offset,
downstream.phases[0].green_time,
prediction_uncertainty=uncertainty,
)
if feasible:
downstream.current_offset = optimal_offset
# Check green arrival
ds_green_start = downstream.current_offset
ds_green_end = ds_green_start + downstream.phases[0].green_time
centroid_in_cycle = platoon_centroid % downstream.cycle_length
on_green = ds_green_start <= centroid_in_cycle <= ds_green_end
total_platoons += 1
if on_green:
platoons_on_green += 1
delay = self.rng.uniform(2, 8) # Minor delay even on green
else:
# Calculate delay (wait for next green)
if centroid_in_cycle < ds_green_start:
delay = ds_green_start - centroid_in_cycle
else:
delay = downstream.cycle_length - centroid_in_cycle + ds_green_start
delay += self.rng.uniform(0, 5) # queue discharge delay
total_stops += platoon_size
total_delay += delay * platoon_size
total_vehicles += platoon_size
total_idle_time += delay * (0 if on_green else 1) # Only count red-signal idle
cycle_details.append({
"time_min": time_min,
"link_idx": link_idx,
"travel_time_s": travel_time,
"delay_s": delay,
"on_green": on_green,
"platoon_size": platoon_size,
"offset": downstream.current_offset,
"method": method,
})
# Calculate emissions
total_distance_km = sum(l.length_m for l in self.links) / 1000
vehicle_counts = {vtype: max(1, int(total_vehicles * frac))
for vtype, frac in vehicle_mix.items()}
running = self.emission_calc.running_emissions(total_distance_km, vehicle_counts)
idling = self.emission_calc.idle_emissions(total_idle_time, vehicle_counts)
fuel = self.emission_calc.fuel_consumption_ml(total_idle_time, total_distance_km, vehicle_counts)
avg_delay = total_delay / max(total_vehicles, 1)
green_pct = (platoons_on_green / max(total_platoons, 1)) * 100
# Average speed considering delay
total_distance = sum(l.length_m for l in self.links)
avg_travel_time = total_distance / (30 / 3.6) + avg_delay # base + delay
avg_speed = (total_distance / 1000) / (avg_travel_time / 3600) if avg_travel_time > 0 else 0
throughput = total_vehicles / (demand_profile["time_min"].max() / 60) if len(demand_profile) > 0 else 0
return SimulationResult(
method=method,
total_delay_s=total_delay,
avg_delay_per_vehicle_s=avg_delay,
total_stops=total_stops,
platoons_on_green=platoons_on_green,
total_platoons=total_platoons,
green_arrival_pct=green_pct,
total_fuel_ml=fuel,
total_co2_g=running["CO2"] + idling.get("CO2", 0),
total_co_g=running["CO"] + idling.get("CO", 0),
total_nox_g=running["NOx"] + idling.get("NOx", 0),
total_pm25_g=running.get("PM25", 0),
throughput_veh_hr=throughput,
avg_speed_kmh=avg_speed,
cycle_details=cycle_details,
)
# ============================================================
# 8. UTILITY FUNCTIONS
# ============================================================
def compute_pcu_flow(flow_veh_hr: float, vehicle_mix: Dict[str, float]) -> float:
"""Convert vehicle flow to PCU-equivalent flow."""
pcu_factor = sum(frac * PCU_INDIA.get(vtype, 1.0)
for vtype, frac in vehicle_mix.items())
return flow_veh_hr * pcu_factor
def format_kpi_comparison(fixed: SimulationResult, apoo: SimulationResult) -> pd.DataFrame:
"""Create a comparison table of KPIs."""
metrics = [
("Avg Delay per Vehicle (s)", f"{fixed.avg_delay_per_vehicle_s:.1f}",
f"{apoo.avg_delay_per_vehicle_s:.1f}",
f"{((fixed.avg_delay_per_vehicle_s - apoo.avg_delay_per_vehicle_s) / max(fixed.avg_delay_per_vehicle_s, 0.01)) * 100:.1f}%"),
("Platoons Arriving on Green (%)", f"{fixed.green_arrival_pct:.1f}",
f"{apoo.green_arrival_pct:.1f}",
f"+{apoo.green_arrival_pct - fixed.green_arrival_pct:.1f}pp"),
("Total Stops", f"{fixed.total_stops:,}", f"{apoo.total_stops:,}",
f"{((fixed.total_stops - apoo.total_stops) / max(fixed.total_stops, 1)) * 100:.1f}%"),
("Total CO₂ (g)", f"{fixed.total_co2_g:.0f}", f"{apoo.total_co2_g:.0f}",
f"{((fixed.total_co2_g - apoo.total_co2_g) / max(fixed.total_co2_g, 0.01)) * 100:.1f}%"),
("Total CO (g)", f"{fixed.total_co_g:.1f}", f"{apoo.total_co_g:.1f}",
f"{((fixed.total_co_g - apoo.total_co_g) / max(fixed.total_co_g, 0.01)) * 100:.1f}%"),
("Total NOx (g)", f"{fixed.total_nox_g:.1f}", f"{apoo.total_nox_g:.1f}",
f"{((fixed.total_nox_g - apoo.total_nox_g) / max(fixed.total_nox_g, 0.01)) * 100:.1f}%"),
("Fuel Consumption (mL)", f"{fixed.total_fuel_ml:.0f}", f"{apoo.total_fuel_ml:.0f}",
f"{((fixed.total_fuel_ml - apoo.total_fuel_ml) / max(fixed.total_fuel_ml, 0.01)) * 100:.1f}%"),
("Avg Speed (km/h)", f"{fixed.avg_speed_kmh:.1f}", f"{apoo.avg_speed_kmh:.1f}",
f"+{apoo.avg_speed_kmh - fixed.avg_speed_kmh:.1f}"),
("Throughput (veh/hr)", f"{fixed.throughput_veh_hr:.0f}", f"{apoo.throughput_veh_hr:.0f}",
f"+{apoo.throughput_veh_hr - fixed.throughput_veh_hr:.0f}"),
]
return pd.DataFrame(metrics, columns=["KPI", "Fixed-Time", "APOO Adaptive", "Improvement"])
if __name__ == "__main__":
# Quick test
gen = IndianTrafficGenerator(seed=42)
intersections, links = gen.corridor(n_intersections=5)
demand = gen.generate_demand_profile()
training_data = gen.generate_training_data(n_samples=100)
print(f"Generated corridor: {len(intersections)} intersections, {len(links)} links")
print(f"Demand profile: {len(demand)} time steps")
print(f"Training data: {len(training_data)} samples")
print(f"Training columns: {list(training_data.columns)}")