""" Compute price/m2 breakdown for the top 10 French cities. Responsibility: Produce a clean table of time-weighted median price per m2 by property type for the largest cities. """ import json import logging from pathlib import Path import polars as pl from src.aggregator import _aggregate_group from src.config import AGGREGATED_DIR, TOP_10_CITIES, TYPE_LOCAL_SHORT logger = logging.getLogger(__name__) def compute_top_cities(df: pl.DataFrame) -> dict[str, dict]: """ Compute price statistics for top 10 cities, broken down by property type. Uses code_commune_city (with arrondissements mapped to parent city) to correctly aggregate Paris, Lyon, and Marseille. Args: df: Collected DataFrame with code_commune_city, prix_m2, temporal_weight. Returns: Nested dict: {"Paris": {"code": "75056", "tous": {...}, ...}, ...} """ city_codes = list(TOP_10_CITIES.keys()) city_data = df.filter(pl.col("code_commune_city").is_in(city_codes)) result: dict[str, dict] = {} for city_code, city_name in TOP_10_CITIES.items(): city_df = city_data.filter(pl.col("code_commune_city") == city_code) if len(city_df) == 0: logger.warning("No data for %s (%s)", city_name, city_code) continue entry: dict = {"code": city_code} # All residential combined entry["tous"] = _aggregate_group(city_df) # Per property type for full_name, short_name in TYPE_LOCAL_SHORT.items(): type_df = city_df.filter(pl.col("type_local") == full_name) if len(type_df) > 0: entry[short_name] = _aggregate_group(type_df) result[city_name] = entry return result def export_top_cities(data: dict[str, dict], output_dir: Path | None = None) -> None: """ Export top cities data to JSON. """ output_dir = output_dir or AGGREGATED_DIR output_dir.mkdir(parents=True, exist_ok=True) path = output_dir / "top_cities.json" with open(path, "w") as f: json.dump(data, f, ensure_ascii=False, indent=2) logger.info("Exported: %s (%d cities)", path.name, len(data))