""" fourstep_synthetic.py Synthetic four-step travel demand model for a 20-TAZ city. Stage 1: classical model on synthetic data (no AI yet). Author: (Your Name) """ from __future__ import annotations import numpy as np import pandas as pd from dataclasses import dataclass from typing import Dict, Tuple import networkx as nx # ------------------------------------------------- # GLOBAL SETTINGS # ------------------------------------------------- RANDOM_SEED = 42 NUM_ZONES = 20 rng = np.random.default_rng(RANDOM_SEED) # ------------------------------------------------- # 1. SYNTHETIC CITY GENERATOR (TAZ-LEVEL DATA) # ------------------------------------------------- @dataclass class SyntheticCity: taz: pd.DataFrame # zone attributes distance_matrix: pd.DataFrame # minutes between TAZs (symmetric) travel_time_matrix: pd.DataFrame # base car travel time (minutes) def generate_synthetic_city(num_zones: int = NUM_ZONES, seed: int = RANDOM_SEED) -> SyntheticCity: """ Generate synthetic socio-economic and spatial data for a set of TAZs. Returns ------- SyntheticCity """ rng_local = np.random.default_rng(seed) # Create synthetic 2D coordinates for zones (km), roughly a 10x10 km city x = rng_local.uniform(0, 10, size=num_zones) y = rng_local.uniform(0, 10, size=num_zones) # Population and households population = rng_local.normal(loc=25000, scale=5000, size=num_zones) population = np.clip(population, 8000, None).astype(int) households = (population / rng_local.normal(loc=3.2, scale=0.3, size=num_zones)).astype(int) # Workers and students workers = (population * rng_local.uniform(0.35, 0.45, size=num_zones)).astype(int) students = (population * rng_local.uniform(0.2, 0.3, size=num_zones)).astype(int) # Income (monthly, arbitrary units) – lognormal income = rng_local.lognormal(mean=10, sigma=0.4, size=num_zones) # Car ownership rate as sigmoid of income def sigmoid(z): return 1 / (1 + np.exp(-z)) car_ownership_rate = sigmoid(0.00003 * income - 3.0) cars = (car_ownership_rate * households * rng_local.uniform(0.8, 1.2, size=num_zones)).astype(int) # Land-use mix index (0–1) land_use_mix = rng_local.uniform(0.2, 0.9, size=num_zones) # Jobs and floor areas service_jobs = (workers * rng_local.uniform(0.8, 1.4, size=num_zones)).astype(int) industrial_jobs = (workers * rng_local.uniform(0.3, 0.8, size=num_zones)).astype(int) retail_jobs = (workers * rng_local.uniform(0.3, 0.7, size=num_zones)).astype(int) school_capacity = (students * rng_local.uniform(1.1, 1.5, size=num_zones)).astype(int) retail_floor_area = (retail_jobs * rng_local.uniform(20, 40, size=num_zones)) # arbitrary units taz_df = pd.DataFrame({ "TAZ": np.arange(1, num_zones + 1), "x_km": x, "y_km": y, "population": population, "households": households, "workers": workers, "students": students, "income": income, "car_ownership_rate": car_ownership_rate, "cars": cars, "land_use_mix": land_use_mix, "service_jobs": service_jobs, "industrial_jobs": industrial_jobs, "retail_jobs": retail_jobs, "school_capacity": school_capacity, "retail_floor_area": retail_floor_area, }) taz_df.set_index("TAZ", inplace=True) # Distance matrix (Euclidean) and base car travel time (min) coords = taz_df[["x_km", "y_km"]].to_numpy() dx = coords[:, 0][:, None] - coords[:, 0][None, :] dy = coords[:, 1][:, None] - coords[:, 1][None, :] dist_km = np.sqrt(dx ** 2 + dy ** 2) # Assume average car speed ~ 25–35 km/h plus 3–8 minutes terminal time avg_speed_kmh = rng_local.uniform(25, 35) tt_base = (dist_km / avg_speed_kmh) * 60 # minutes tt_matrix = tt_base + rng_local.uniform(3, 8, size=(num_zones, num_zones)) # Ensure diagonal is small (intra-zonal trips) np.fill_diagonal(tt_matrix, rng_local.uniform(3, 5, size=num_zones)) np.fill_diagonal(dist_km, rng_local.uniform(0.2, 0.5, size=num_zones)) distance_df = pd.DataFrame(dist_km, index=taz_df.index, columns=taz_df.index) tt_df = pd.DataFrame(tt_matrix, index=taz_df.index, columns=taz_df.index) return SyntheticCity(taz=taz_df, distance_matrix=distance_df, travel_time_matrix=tt_df) # ------------------------------------------------- # 2. TRIP GENERATION # ------------------------------------------------- PURPOSES = ["HBW", "HBE", "HBS"] def trip_generation(taz: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]: """ Generate synthetic trip productions and attractions by purpose. Parameters ---------- taz : DataFrame TAZ-level socio-economic attributes. Returns ------- productions : DataFrame (index=TAZ, columns=PURPOSES) attractions : DataFrame (index=TAZ, columns=PURPOSES) """ df = taz # Productions (synthetic "true" equations) P_HBW = 0.8 * df["workers"] + 0.2 * df["cars"] P_HBE = 1.2 * df["students"] P_HBS = 0.4 * df["households"] productions = pd.DataFrame({ "HBW": P_HBW, "HBE": P_HBE, "HBS": P_HBS }, index=df.index) # Attractions (jobs, schools, retail) A_HBW = 0.7 * df["service_jobs"] + 0.3 * df["industrial_jobs"] A_HBE = 1.5 * df["school_capacity"] A_HBS = 1.3 * df["retail_floor_area"] attractions = pd.DataFrame({ "HBW": A_HBW, "HBE": A_HBE, "HBS": A_HBS }, index=df.index) # Balance productions and attractions for each purpose for p in PURPOSES: total_P = productions[p].sum() total_A = attractions[p].sum() if total_A <= 0: continue factor = total_P / total_A attractions[p] *= factor return productions, attractions # ------------------------------------------------- # 3. GRAVITY-BASED TRIP DISTRIBUTION WITH IPF # ------------------------------------------------- def gravity_impedance(travel_time_min: np.ndarray, beta: float = 1.5) -> np.ndarray: """ Simple impedance function f(c_ij) = c_ij^beta. Smaller f => more attractive; will be inverted later. """ c = np.maximum(travel_time_min, 1e-3) return c ** beta def gravity_distribution(productions: pd.Series, attractions: pd.Series, travel_time: pd.DataFrame, beta: float = 1.5, max_iter: int = 1000, tol: float = 1e-4) -> pd.DataFrame: """ Gravity model with iterative proportional fitting (IPF) to match row and column totals. Parameters ---------- productions : Series attractions : Series travel_time : DataFrame beta : float max_iter : int tol : float Returns ------- T : DataFrame (OD matrix) """ zones = productions.index c = travel_time.loc[zones, zones].to_numpy() f = gravity_impedance(c, beta=beta) P = productions.to_numpy() A = attractions.to_numpy() # Initial unbalanced matrix W = np.outer(P, A) / f W[W < 0] = 0.0 T = W.copy() # IPF for _ in range(max_iter): # Row adjustment row_sums = T.sum(axis=1) row_factors = np.divide(P, row_sums, out=np.ones_like(P), where=row_sums > 0) T = (T.T * row_factors).T # Column adjustment col_sums = T.sum(axis=0) col_factors = np.divide(A, col_sums, out=np.ones_like(A), where=col_sums > 0) T = T * col_factors # Convergence check row_err = np.abs(T.sum(axis=1) - P).sum() col_err = np.abs(T.sum(axis=0) - A).sum() if row_err < tol and col_err < tol: break T_df = pd.DataFrame(T, index=zones, columns=zones) return T_df def build_all_od_matrices(productions: pd.DataFrame, attractions: pd.DataFrame, travel_time: pd.DataFrame, beta_by_purpose: Dict[str, float] | None = None ) -> Dict[str, pd.DataFrame]: """ Build OD matrices for each purpose. Returns ------- od_mats : dict[purpose -> DataFrame] """ if beta_by_purpose is None: beta_by_purpose = {"HBW": 1.5, "HBE": 1.6, "HBS": 1.4} od_mats = {} for p in PURPOSES: od_mats[p] = gravity_distribution( productions[p], attractions[p], travel_time=travel_time, beta=beta_by_purpose.get(p, 1.5), ) return od_mats # ------------------------------------------------- # 4. MODE CHOICE (MULTINOMIAL LOGIT) # ------------------------------------------------- MODES = ["car", "metro", "bus"] @dataclass class ModeChoiceResult: probabilities: Dict[str, pd.DataFrame] # mode -> P_ij volumes: Dict[str, pd.DataFrame] # mode -> T_ij^mode total_od: pd.DataFrame # aggregate OD (all purposes) def synthetic_mode_choice_costs(travel_time_car: pd.DataFrame ) -> Tuple[Dict[str, pd.DataFrame], Dict[str, pd.DataFrame]]: """ Given base car travel time, build synthetic time and cost matrices for each mode. Returns ------- time_mats : dict[mode -> DataFrame] cost_mats : dict[mode -> DataFrame] """ tt_car = travel_time_car.copy() zones = tt_car.index # Metro is faster, bus is slower tt_metro = tt_car * 0.8 tt_bus = tt_car * 1.3 # Costs (arbitrary synthetic) dist_factor = tt_car / 60 * 30 # ~ distance proxy (km) cost_car = 2 + 0.12 * dist_factor # fuel + parking etc. cost_metro = 15 + 0.02 * dist_factor # base fare + distance cost_bus = 8 + 0.03 * dist_factor time_mats = { "car": tt_car, "metro": tt_metro, "bus": tt_bus } cost_mats = { "car": cost_car, "metro": cost_metro, "bus": cost_bus } return time_mats, cost_mats def mode_choice(od_mats: Dict[str, pd.DataFrame], taz: pd.DataFrame, travel_time_car: pd.DataFrame, beta_time: float = -0.06, beta_cost: float = -0.03, beta_car_own: float = 0.5 ) -> ModeChoiceResult: """ Multinomial logit mode choice applied to aggregate OD flows (sum over purposes). Parameters ---------- od_mats : dict[purpose -> OD matrix] taz : DataFrame travel_time_car : DataFrame Returns ------- ModeChoiceResult """ zones = travel_time_car.index # Aggregate OD across purposes total_od = sum(od_mats.values()) total_od = total_od.loc[zones, zones] time_mats, cost_mats = synthetic_mode_choice_costs(travel_time_car) # Car ownership by origin car_own = taz["car_ownership_rate"].reindex(zones).to_numpy() n = len(zones) car_own_matrix = np.repeat(car_own[:, None], n, axis=1) utilities = {} for mode in MODES: tt = time_mats[mode].to_numpy() cost = cost_mats[mode].to_numpy() if mode == "car": U = beta_time * tt + beta_cost * cost + beta_car_own * car_own_matrix else: U = beta_time * tt + beta_cost * cost utilities[mode] = U # Compute probabilities exp_U_sum = np.zeros_like(next(iter(utilities.values()))) for U in utilities.values(): exp_U_sum += np.exp(U) probabilities = {} for mode, U in utilities.items(): P = np.exp(U) / np.maximum(exp_U_sum, 1e-12) probabilities[mode] = pd.DataFrame(P, index=zones, columns=zones) # Mode-specific flows volumes = {} total_od_np = total_od.to_numpy() for mode in MODES: volumes[mode] = pd.DataFrame( total_od_np * probabilities[mode].to_numpy(), index=zones, columns=zones ) return ModeChoiceResult( probabilities=probabilities, volumes=volumes, total_od=total_od ) # ------------------------------------------------- # 5. SYNTHETIC NETWORK & AON ROUTE ASSIGNMENT # ------------------------------------------------- @dataclass class Network: G: nx.DiGraph link_df: pd.DataFrame # index: link id, columns: from, to, ff_time, capacity, distance taz_to_node: Dict[int, int] # mapping from TAZ -> nearest node def generate_synthetic_network(taz: pd.DataFrame, avg_speed_kmh: float = 30.0, seed: int = RANDOM_SEED) -> Network: """ Build a synthetic directed network using TAZ centroids plus extra connectors. Strategy: - Use TAZ centroids as main nodes. - Connect each node to its k nearest neighbours (k=3) both directions. Returns ------- Network """ rng_local = np.random.default_rng(seed) coords = taz[["x_km", "y_km"]].to_numpy() zones = taz.index.to_list() n = len(zones) G = nx.DiGraph() for i, z in enumerate(zones): G.add_node(z, x=coords[i, 0], y=coords[i, 1]) # Connect to k nearest neighbours k = 3 link_records = [] link_id = 0 for i, zi in enumerate(zones): xi, yi = coords[i] # distances to others dx = coords[:, 0] - xi dy = coords[:, 1] - yi dist = np.sqrt(dx ** 2 + dy ** 2) order = np.argsort(dist) # take nearest k excluding itself neighbours_idx = [j for j in order if j != i][:k] for j in neighbours_idx: zj = zones[j] d_km = dist[j] if d_km <= 0: continue ff_time = (d_km / avg_speed_kmh) * 60 # minutes # capacity (veh/h) synthetic cap = rng_local.integers(1200, 2400) G.add_edge(zi, zj, length_km=d_km, ff_time=ff_time, capacity=cap) link_records.append({ "link_id": link_id, "from": zi, "to": zj, "distance_km": d_km, "ff_time_min": ff_time, "capacity_vehph": cap }) link_id += 1 link_df = pd.DataFrame(link_records).set_index("link_id") # Map each TAZ directly to its node (here they coincide) taz_to_node = {int(z): int(z) for z in zones} return Network(G=G, link_df=link_df, taz_to_node=taz_to_node) def aon_assignment(od_matrix: pd.DataFrame, network: Network) -> pd.DataFrame: """ All-or-nothing assignment of OD matrix to network links using free-flow travel time as cost. Parameters ---------- od_matrix : DataFrame (TAZ x TAZ) network : Network Returns ------- link_flows : DataFrame (index=link_id, column='flow') """ G = network.G taz_to_node = network.taz_to_node zones = od_matrix.index.to_list() flows = np.zeros(len(network.link_df), dtype=float) # Precompute a mapping from (u,v) to link_id edge_to_link = {} for lid, row in network.link_df.iterrows(): edge_to_link[(row["from"], row["to"])] = lid # Use ff_time as edge weight for (u, v, data) in G.edges(data=True): if "ff_time" not in data: data["ff_time"] = data.get("ff_time_min", 1.0) # For each OD pair, find shortest path and add flow for i, o in enumerate(zones): origin_node = taz_to_node[int(o)] for j, d in enumerate(zones): if i == j: continue dest_node = taz_to_node[int(d)] demand = od_matrix.iat[i, j] if demand <= 0: continue try: path = nx.shortest_path(G, origin_node, dest_node, weight="ff_time") except nx.NetworkXNoPath: continue # accumulate flow on each edge of path for k in range(len(path) - 1): u = path[k] v = path[k + 1] lid = edge_to_link.get((u, v)) if lid is not None: flows[lid] += demand link_flows = network.link_df.copy() link_flows["flow_vehph"] = flows return link_flows # ------------------------------------------------- # 6. QUICK DEMO (RUN THIS FILE DIRECTLY) # ------------------------------------------------- if __name__ == "__main__": # 1. Generate synthetic city city = generate_synthetic_city(num_zones=NUM_ZONES) taz = city.taz print("TAZ sample:\n", taz.head(), "\n") # 2. Trip generation productions, attractions = trip_generation(taz) print("Total productions by purpose:\n", productions.sum(), "\n") print("Total attractions by purpose:\n", attractions.sum(), "\n") # 3. OD matrices by gravity od_mats = build_all_od_matrices(productions, attractions, travel_time=city.travel_time_matrix) for p, od in od_mats.items(): print(f"OD matrix ({p}) total trips: {od.values.sum():.1f}") # 4. Mode choice mc_result = mode_choice(od_mats, taz, city.travel_time_matrix) print("\nMode shares (total trips):") total_trips = mc_result.total_od.values.sum() for m in MODES: trips_m = mc_result.volumes[m].values.sum() print(f" {m}: {trips_m:.1f} ({100 * trips_m / total_trips:.1f} %)") # 5. Network & AON assignment (using car OD only as example) network = generate_synthetic_network(taz) car_od = mc_result.volumes["car"] link_flows = aon_assignment(car_od, network) print("\nLink flows (first 10):\n", link_flows.head(10))