TripAI / modules /utils.py
mahbubchula's picture
Upload 8 files
a58f1b7 verified
# modules/utils.py
# TripAI – Utility functions (IPF, etc.)
from __future__ import annotations
import numpy as np
import pandas as pd
def iterative_proportional_fitting(
T_init: np.ndarray,
row_targets: np.ndarray,
col_targets: np.ndarray,
max_iter: int = 50,
tol: float = 1e-6,
) -> np.ndarray:
"""
Perform Iterative Proportional Fitting (IPF) to adjust an initial
non-negative matrix T_init so that its row and column sums match
given marginals.
Parameters
----------
T_init : np.ndarray
Initial non-negative matrix (NxN).
row_targets : np.ndarray
Target row sums (length N).
col_targets : np.ndarray
Target column sums (length N).
max_iter : int
Maximum number of IPF iterations.
tol : float
Convergence tolerance on row/column marginal differences.
Returns
-------
np.ndarray
Adjusted matrix with approximately matching row/column totals.
"""
T = T_init.copy().astype(float)
n = T.shape[0]
for _ in range(max_iter):
# Row scaling
row_sums = T.sum(axis=1)
row_factors = np.ones(n)
mask = row_sums > 0
row_factors[mask] = row_targets[mask] / row_sums[mask]
T *= row_factors[:, None]
# Column scaling
col_sums = T.sum(axis=0)
col_factors = np.ones(n)
mask = col_sums > 0
col_factors[mask] = col_targets[mask] / col_sums[mask]
T *= col_factors[None, :]
# Check convergence
if (
np.allclose(T.sum(axis=1), row_targets, atol=tol)
and np.allclose(T.sum(axis=0), col_targets, atol=tol)
):
break
return T