Spaces:
Running
Running
File size: 3,002 Bytes
a58f1b7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# modules/gravity_model.py
# TripAI – Gravity Model + OD Matrix Builder
from __future__ import annotations
import numpy as np
import pandas as pd
from .utils import iterative_proportional_fitting
PURPOSES = ["HBW", "HBE", "HBS"]
def gravity_model_doubly_constrained(
productions: pd.Series,
attractions: pd.Series,
travel_time: pd.DataFrame,
beta: float = -0.1,
max_iter: int = 50,
tol: float = 1e-6,
) -> pd.DataFrame:
"""
Doubly-constrained gravity model with IPF.
T_ij ∝ P_i * A_j * f(c_ij),
where f(c_ij) = exp(beta * c_ij), beta < 0
IPF is used to ensure row sums match productions and column sums
match attractions.
Parameters
----------
productions : pd.Series
Trip productions P_i by origin TAZ.
attractions : pd.Series
Trip attractions A_j by destination TAZ.
travel_time : pd.DataFrame
Impedance matrix c_ij (minutes).
beta : float
Distance-decay parameter (negative).
max_iter : int
Maximum IPF iterations.
tol : float
Tolerance for marginal convergence.
Returns
-------
pd.DataFrame
OD matrix T_ij (index=origins, columns=destinations).
"""
idx = productions.index
P = productions.values.astype(float)
A = attractions.values.astype(float)
c = travel_time.loc[idx, idx].values.astype(float)
# Impedance function
F = np.exp(beta * c)
# Initial gravity estimate
T0 = np.outer(P, A) * F
# Avoid all-zero rows/cols
T0[T0 < 0] = 0.0
T_adj = iterative_proportional_fitting(T0, P, A, max_iter=max_iter, tol=tol)
return pd.DataFrame(T_adj, index=idx, columns=idx)
def build_all_od_matrices(
productions_df: pd.DataFrame,
attractions_df: pd.DataFrame,
travel_time: pd.DataFrame,
beta: float = -0.1,
max_iter: int = 50,
tol: float = 1e-6,
) -> dict[str, pd.DataFrame]:
"""
Build OD matrices for all purposes using the doubly-constrained gravity model.
Parameters
----------
productions_df : pd.DataFrame
Columns = purposes (HBW, HBE, HBS), index = TAZ.
attractions_df : pd.DataFrame
Columns = purposes (HBW, HBE, HBS), index = TAZ.
travel_time : pd.DataFrame
Travel time matrix (minutes), index/cols = TAZ.
beta : float
Distance-decay parameter for all purposes.
max_iter : int
Maximum iterations for IPF.
tol : float
Convergence tolerance.
Returns
-------
dict[str, pd.DataFrame]
Mapping from purpose -> OD matrix (DataFrame).
"""
od_mats: dict[str, pd.DataFrame] = {}
for purpose in PURPOSES:
P = productions_df[purpose]
A = attractions_df[purpose]
od_mats[purpose] = gravity_model_doubly_constrained(
P, A, travel_time, beta=beta, max_iter=max_iter, tol=tol
)
return od_mats
|