| |
| import numpy as np |
| import geopandas as gpd |
| from shapely.geometry import Point |
| from geopy.distance import geodesic |
| from sklearn.neighbors import BallTree |
| import pandas as pd |
|
|
| |
| CITY_CENTER = (51.5072, -0.1276) |
| EPSG = "EPSG:4326" |
|
|
|
|
| class LondonPropertyGeoFeatures: |
| """Extract London property geo features for model inference.""" |
|
|
| def __init__(self, geo_dir): |
| self.CITY_CENTER = CITY_CENTER |
| self.EPSG = EPSG |
| self.geo_dir = geo_dir |
| self.load_datasets() |
| self.prepare_station_tree() |
|
|
| def load_datasets(self): |
| """Load and prepare geographic datasets.""" |
| self.london_boundaries = gpd.read_file(f"{self.geo_dir}/london_boroughs.geojson").to_crs(self.EPSG) |
| self.hex_gdf = gpd.read_parquet(f"{self.geo_dir}/noize.parquet").to_crs(self.EPSG) |
| self.zone_fares = gpd.read_parquet(f"{self.geo_dir}/zone_fares.parquet").to_crs(self.EPSG) |
| self.stations = gpd.read_parquet(f"{self.geo_dir}/rail_tfl.parquet").to_crs(self.EPSG) |
|
|
|
|
| def prepare_station_tree(self): |
| """Prepare BallTree for fast station distance queries.""" |
| |
| self.stations_utm = self.stations.to_crs(self.stations.estimate_utm_crs()) |
| station_coords = np.array([[p.x, p.y] for p in self.stations_utm.geometry]) |
| self.station_tree = BallTree(station_coords, leaf_size=15, metric='euclidean') |
| self.station_names = self.stations_utm['CommonName'].values |
| self.station_tfl = self.stations_utm['TFL'].values |
| self.station_rail = self.stations_utm['RAIL'].values |
|
|
| def _create_point_gdf(self, lat, lon): |
| """Create a GeoDataFrame for the point (internal helper).""" |
| point = Point(lon, lat) |
| return gpd.GeoDataFrame(geometry=[point], crs=self.EPSG) |
|
|
| def borough(self, lat, lon): |
| """Return the London borough name containing the given coordinates.""" |
| prop_gdf = self._create_point_gdf(lat, lon) |
| joined = gpd.sjoin(prop_gdf, self.london_boundaries, how="left", predicate="within") |
| return joined.iloc[0].get("name", None) |
|
|
| def compute_angle(self, lat, lon): |
| """Compute angle (in radians) of a point relative to London center.""" |
| lat1, lon1 = np.radians(self.CITY_CENTER[0]), np.radians(self.CITY_CENTER[1]) |
| lat2, lon2 = np.radians(lat), np.radians(lon) |
|
|
| dlon = lon2 - lon1 |
| x = np.cos(lat2) * np.sin(dlon) |
| y = np.cos(lat1) * np.sin(lat2) - np.sin(lat1) * np.cos(lat2) * np.cos(dlon) |
| return np.arctan2(x, y) |
|
|
| def distance_to_center(self, lat, lon): |
| """Return distance from city center (in miles).""" |
| return geodesic((lat, lon), self.CITY_CENTER).miles |
|
|
| def noize_class(self, lat, lon): |
| """Return noise class for given coordinates.""" |
| prop_gdf = self._create_point_gdf(lat, lon) |
| joined = gpd.sjoin(prop_gdf, self.hex_gdf, how="left", predicate="within") |
| return joined.iloc[0].get("NoiseClass", None) |
|
|
| def zone_fare(self, lat, lon): |
| """Return transport fare zone for given coordinates.""" |
| prop_gdf = self._create_point_gdf(lat, lon) |
| joined = gpd.sjoin(prop_gdf, self.zone_fares, how="left", predicate="within") |
| zone_name = joined.iloc[0].get("Name", None) |
| |
| if zone_name and "Zone" in zone_name: |
| return zone_name.split(" ")[-1] |
| return zone_name |
|
|
| def find_nearest_stations(self, lat, lon, k=3, max_distance_meters=50000): |
| """ |
| Find k nearest stations with distances and TFL/RAIL flags. |
| Returns distances in miles. |
| """ |
| prop_gdf = self._create_point_gdf(lat, lon) |
| prop_utm = prop_gdf.to_crs(self.stations_utm.crs) |
|
|
| |
| prop_coords = np.array([[p.x, p.y] for p in prop_utm.geometry]) |
| distances_m, indices = self.station_tree.query(prop_coords, k=k) |
|
|
| results = [] |
| for dist_m, idx in zip(distances_m[0], indices[0]): |
| if dist_m <= max_distance_meters: |
| station_data = { |
| 'distance_miles': dist_m / 1609.34, |
| 'name': self.station_names[idx], |
| 'TFL': bool(self.station_tfl[idx]), |
| 'RAIL': bool(self.station_rail[idx]) |
| } |
| results.append(station_data) |
|
|
| return results |
|
|
|
|
| def extract_geo_features(self, lat, lon): |
| """ |
| Extract all GEO features for model inference in the required format. |
| """ |
| |
| borough_name = self.borough(lat, lon) |
| angle = self.compute_angle(lat, lon) |
| center_distance = self.distance_to_center(lat, lon) |
| noise_class = self.noize_class(lat, lon) |
| zone = self.zone_fare(lat, lon) |
|
|
| |
| nearest_stations = self.find_nearest_stations(lat, lon, k=3) |
|
|
| |
| station_features = {} |
| for i, station in enumerate(nearest_stations[:3], 1): |
| station_features[f'distance_to_station{i}'] = round(station['distance_miles'], 6) |
| station_features[f'TFL{i}'] = station['TFL'] |
| station_features[f'RAIL{i}'] = station['RAIL'] |
|
|
| |
| for i in range(len(nearest_stations) + 1, 4): |
| station_features[f'distance_to_station{i}'] = None |
| station_features[f'TFL{i}'] = False |
| station_features[f'RAIL{i}'] = False |
|
|
| geo_features = { |
| "distance_to_center": round(center_distance, 6), |
| "angle_from_center": round(angle, 6), |
| "zone": zone, |
| "borough": borough_name, |
| "NoiseClass": noise_class, |
| **station_features |
| } |
|
|
| return geo_features |
|
|
|
|
| def add_features_to_df(self, df: pd.DataFrame) -> pd.DataFrame: |
| """Vectorized feature extraction for a full DataFrame.""" |
| features = df.apply( |
| lambda row: pd.Series(self.extract_geo_features(row["latitude"], row["longitude"])), |
| axis=1 |
| ) |
| return pd.concat([df, features], axis=1) |
|
|
|
|
|
|
|
|
|
|