Spaces:
Sleeping
Sleeping
| """Synthetic NYC-style taxi trip duration dataset generator. | |
| Produces a realistic, time-dependent dataset where trip duration depends | |
| non-linearly on distance, time-of-day, and day-of-week. Rush-hour patterns | |
| create temporal non-stationarity. Noise is heteroscedastic. | |
| """ | |
| from __future__ import annotations | |
| import numpy as np | |
| import pandas as pd | |
| from datetime import datetime, timedelta | |
| from typing import Optional | |
| from src.utils.logging_config import get_logger | |
| log = get_logger(__name__) | |
| class TaxiDataGenerator: | |
| """Generate synthetic taxi trip duration data with realistic temporal patterns.""" | |
| VENDOR_IDS = [1, 2] | |
| RATE_CODES = [1, 2, 3, 4, 5] | |
| PAYMENT_TYPES = [1, 2, 3, 4] | |
| PU_ZONES = list(range(1, 51)) | |
| DO_ZONES = list(range(1, 51)) | |
| def __init__(self, random_seed: int = 42) -> None: | |
| self.rng = np.random.default_rng(random_seed) | |
| log.info("TaxiDataGenerator initialised (seed=%d)", random_seed) | |
| def generate( | |
| self, | |
| n_samples: int, | |
| start_date: Optional[datetime] = None, | |
| freq_seconds: int = 60, | |
| ) -> pd.DataFrame: | |
| """Generate a DataFrame of taxi trips.""" | |
| if start_date is None: | |
| start_date = datetime(2023, 1, 1, 0, 0, 0) | |
| timestamps = [ | |
| start_date + timedelta(seconds=i * freq_seconds) | |
| for i in range(n_samples) | |
| ] | |
| df = self._generate_features(timestamps) | |
| df["trip_duration_min"] = self._compute_duration(df) | |
| log.info("Generated %d trip samples starting %s", n_samples, start_date) | |
| return df | |
| def generate_reference(self, n_samples: int = 5000) -> pd.DataFrame: | |
| """Generate a stable reference dataset with no drift.""" | |
| return self.generate(n_samples, start_date=datetime(2023, 1, 1)) | |
| def _generate_features(self, timestamps: list[datetime]) -> pd.DataFrame: | |
| n = len(timestamps) | |
| ts = pd.to_datetime(timestamps) | |
| passenger_count = self.rng.integers(1, 7, size=n) | |
| trip_distance = self._realistic_distance(n) | |
| pickup_hour = ts.hour.to_numpy() | |
| pickup_dow = ts.dayofweek.to_numpy() | |
| pickup_month = ts.month.to_numpy() | |
| pickup_is_weekend = (pickup_dow >= 5).astype(int) | |
| rate_code_id = self.rng.choice(self.RATE_CODES, size=n, p=[0.85, 0.06, 0.04, 0.03, 0.02]) | |
| payment_type = self.rng.choice(self.PAYMENT_TYPES, size=n, p=[0.65, 0.30, 0.03, 0.02]) | |
| pu_location_zone = self.rng.choice(self.PU_ZONES, size=n) | |
| do_location_zone = self.rng.choice(self.DO_ZONES, size=n) | |
| vendor_id = self.rng.choice(self.VENDOR_IDS, size=n) | |
| return pd.DataFrame({ | |
| "timestamp": ts, | |
| "vendor_id": vendor_id, | |
| "passenger_count": passenger_count, | |
| "trip_distance": trip_distance, | |
| "pickup_hour": pickup_hour, | |
| "pickup_dow": pickup_dow, | |
| "pickup_month": pickup_month, | |
| "pickup_is_weekend": pickup_is_weekend, | |
| "rate_code_id": rate_code_id, | |
| "payment_type": payment_type, | |
| "pu_location_zone": pu_location_zone, | |
| "do_location_zone": do_location_zone, | |
| }) | |
| def _realistic_distance(self, n: int) -> np.ndarray: | |
| base = self.rng.lognormal(mean=0.8, sigma=0.7, size=n) | |
| long_trip_mask = self.rng.random(n) < 0.05 | |
| base[long_trip_mask] *= self.rng.uniform(3, 8, size=long_trip_mask.sum()) | |
| return np.clip(base, 0.1, 50.0) | |
| def _compute_duration(self, df: pd.DataFrame) -> np.ndarray: | |
| dist = df["trip_distance"].to_numpy() | |
| hour = df["pickup_hour"].to_numpy() | |
| dow = df["pickup_dow"].to_numpy() | |
| rate = df["rate_code_id"].to_numpy() | |
| base_duration = 3.5 * np.power(dist, 0.75) + 2.0 | |
| rush_morning = ((hour >= 7) & (hour <= 9)).astype(float) | |
| rush_evening = ((hour >= 16) & (hour <= 19)).astype(float) | |
| weekday = (dow < 5).astype(float) | |
| congestion = 1.0 + 0.35 * rush_morning * weekday + 0.45 * rush_evening * weekday | |
| rate_effect = np.where(rate == 2, 8.0, np.where(rate == 3, 12.0, 0.0)) | |
| night = ((hour >= 0) & (hour <= 5)).astype(float) | |
| speed_factor = 1.0 - 0.15 * night | |
| duration = base_duration * congestion * speed_factor + rate_effect | |
| noise_std = 0.1 * duration + 0.5 | |
| noise = self.rng.normal(0, noise_std) | |
| return np.clip(duration + noise, 1.0, 120.0) | |