Spaces:
Running
Running
| """ | |
| Aggregate property prices at different geographic levels. | |
| Responsibility: Given cleaned transaction data, compute time-weighted | |
| summary statistics (weighted trimmed mean, median, confidence) grouped | |
| by any geographic column. | |
| """ | |
| import json | |
| import logging | |
| import math | |
| from pathlib import Path | |
| import numpy as np | |
| import polars as pl | |
| from src.config import ( | |
| AGGREGATED_DIR, | |
| AGGREGATION_LEVELS, | |
| SECTIONS_DIR, | |
| TRIM_FRACTION, | |
| TYPE_LOCAL_SHORT, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Map each aggregation level to the column used for grouping | |
| LEVEL_TO_COLUMN: dict[str, str] = { | |
| "country": "_country", | |
| "region": "code_region", | |
| "department": "code_departement", | |
| "commune": "code_commune", | |
| "postcode": "code_postal", | |
| "section": "code_section", | |
| } | |
| def weighted_trimmed_mean( | |
| prices: np.ndarray, | |
| weights: np.ndarray, | |
| trim: float = TRIM_FRACTION, | |
| ) -> float: | |
| """ | |
| Compute a weighted trimmed mean. | |
| Sorts by price, trims `trim` fraction from each tail by cumulative | |
| weight, then computes the weighted average of the remaining middle. | |
| Args: | |
| prices: Array of price/m2 values. | |
| weights: Array of temporal weights (same length as prices). | |
| trim: Fraction to trim from each tail (0.20 = drop bottom/top 20%). | |
| Returns: | |
| Weighted trimmed mean, or NaN if insufficient data. | |
| """ | |
| if len(prices) == 0: | |
| return float("nan") | |
| order = np.argsort(prices) | |
| prices = prices[order] | |
| weights = weights[order] | |
| cum_w = np.cumsum(weights) | |
| total_w = cum_w[-1] | |
| if total_w <= 0: | |
| return float("nan") | |
| lo = total_w * trim | |
| hi = total_w * (1.0 - trim) | |
| mask = (cum_w >= lo) & (cum_w <= hi) | |
| # Ensure at least one observation survives trimming | |
| if not mask.any(): | |
| mask[len(mask) // 2] = True | |
| w_sel = weights[mask] | |
| p_sel = prices[mask] | |
| return float(np.average(p_sel, weights=w_sel)) | |
| def effective_sample_size(weights: np.ndarray) -> float: | |
| """ | |
| Compute Kish's effective sample size for weighted observations. | |
| n_eff = (sum(w))^2 / sum(w^2) | |
| When all weights are equal, n_eff = n. | |
| """ | |
| sw = weights.sum() | |
| if sw <= 0: | |
| return 0.0 | |
| return float(sw**2 / (weights**2).sum()) | |
| def _aggregate_group(group_df: pl.DataFrame, trim: float = TRIM_FRACTION) -> dict: | |
| """ | |
| Compute all stats for one geographic group. | |
| Returns a dict with median, wtm (weighted trimmed mean), q1, q3, | |
| volume, n_eff, confidence. | |
| """ | |
| prices = group_df["prix_m2"].to_numpy() | |
| weights = group_df["temporal_weight"].to_numpy() | |
| volume = len(prices) | |
| if volume == 0: | |
| return { | |
| "median": 0.0, "wtm": 0.0, "q1": 0.0, "q3": 0.0, | |
| "volume": 0, "n_eff": 0.0, "confidence": 0.0, | |
| } | |
| median = float(np.median(prices)) | |
| q1 = float(np.percentile(prices, 25)) | |
| q3 = float(np.percentile(prices, 75)) | |
| iqr = q3 - q1 | |
| wtm = weighted_trimmed_mean(prices, weights, trim) | |
| n_eff = effective_sample_size(weights) | |
| # Confidence: volume component + stability component | |
| conf_volume = min(1.0, math.log1p(n_eff) / math.log1p(100)) | |
| conf_stability = max(0.0, 1.0 - iqr / median) if median > 0 else 0.0 | |
| confidence = round(0.6 * conf_volume + 0.4 * conf_stability, 3) | |
| return { | |
| "median": round(median, 1), | |
| "wtm": round(wtm, 1), | |
| "q1": round(q1, 1), | |
| "q3": round(q3, 1), | |
| "volume": volume, | |
| "n_eff": round(n_eff, 1), | |
| "confidence": confidence, | |
| } | |
| def aggregate_level( | |
| df: pl.DataFrame, | |
| group_col: str, | |
| *, | |
| property_type: str | None = None, | |
| ) -> dict[str, dict]: | |
| """ | |
| Compute time-weighted price statistics for one geographic level. | |
| Args: | |
| df: Collected DataFrame with prix_m2 and temporal_weight. | |
| group_col: Column to group by. | |
| property_type: Filter to this type_local, or None for all. | |
| Returns: | |
| Dict of {code: stats_dict}. | |
| """ | |
| filtered = df | |
| if property_type: | |
| filtered = filtered.filter(pl.col("type_local") == property_type) | |
| if group_col == "_country": | |
| filtered = filtered.with_columns(pl.lit("FR").alias("_country")) | |
| results = {} | |
| for code, group_df in filtered.group_by(group_col): | |
| code_str = str(code[0]) if isinstance(code, tuple) else str(code) | |
| results[code_str] = _aggregate_group(group_df) | |
| return results | |
| def aggregate_all_types( | |
| df: pl.DataFrame, | |
| group_col: str, | |
| ) -> dict[str, dict[str, dict]]: | |
| """ | |
| Aggregate for all property types + combined for a given level. | |
| Returns: | |
| Nested dict: {code: {"tous": {...}, "appartement": {...}, "maison": {...}}} | |
| """ | |
| combined: dict[str, dict[str, dict]] = {} | |
| # All types combined | |
| all_stats = aggregate_level(df, group_col) | |
| for code, stats in all_stats.items(): | |
| combined.setdefault(code, {})["tous"] = stats | |
| # Per property type | |
| for full_name, short_name in TYPE_LOCAL_SHORT.items(): | |
| type_stats = aggregate_level(df, group_col, property_type=full_name) | |
| for code, stats in type_stats.items(): | |
| combined.setdefault(code, {})[short_name] = stats | |
| return combined | |
| def aggregate_all_levels(df: pl.DataFrame) -> dict[str, dict]: | |
| """ | |
| Run aggregation for all 6 geographic levels. | |
| Args: | |
| df: Collected DataFrame (not lazy). | |
| Returns: | |
| Dict mapping level name -> nested price dict. | |
| """ | |
| results = {} | |
| for level in AGGREGATION_LEVELS: | |
| col = LEVEL_TO_COLUMN[level] | |
| logger.info("Aggregating level: %s (group by %s)", level, col) | |
| results[level] = aggregate_all_types(df, col) | |
| logger.info(" -> %d unique codes", len(results[level])) | |
| return results | |
| def export_json( | |
| aggregated: dict[str, dict], | |
| output_dir: Path | None = None, | |
| ) -> None: | |
| """ | |
| Export aggregated data to JSON files for the frontend. | |
| Section level is split into per-department files under sections/. | |
| All other levels produce a single JSON file each. | |
| """ | |
| output_dir = output_dir or AGGREGATED_DIR | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| for level, data in aggregated.items(): | |
| if level == "section": | |
| _export_sections(data, output_dir) | |
| else: | |
| path = output_dir / f"prices_{level}.json" | |
| with open(path, "w") as f: | |
| json.dump(data, f, ensure_ascii=False) | |
| logger.info("Exported: %s (%d entries)", path.name, len(data)) | |
| def _export_sections( | |
| section_data: dict[str, dict], | |
| output_dir: Path, | |
| ) -> None: | |
| """ | |
| Split section-level data into per-department JSON files. | |
| Section codes start with the department code (first 2-3 chars). | |
| Output: sections/01.json, sections/02.json, ..., sections/2A.json, etc. | |
| """ | |
| sections_dir = output_dir / "sections" | |
| sections_dir.mkdir(parents=True, exist_ok=True) | |
| by_dept: dict[str, dict] = {} | |
| for code, stats in section_data.items(): | |
| # Section code format: DDDCCSSSS (dept 2-3 chars + commune + section) | |
| # Department is first 2 chars, except Corsica (2A, 2B) and DOM (3 chars) | |
| if code[:3].isdigit() and int(code[:3]) >= 970: | |
| dept = code[:3] # DOM-TOM: 971, 972, etc. | |
| elif code[:2] in ("2A", "2B"): | |
| dept = code[:2] # Corsica | |
| else: | |
| dept = code[:2] # Mainland | |
| by_dept.setdefault(dept, {})[code] = stats | |
| for dept, dept_data in sorted(by_dept.items()): | |
| path = sections_dir / f"{dept}.json" | |
| with open(path, "w") as f: | |
| json.dump(dept_data, f, ensure_ascii=False) | |
| logger.info( | |
| "Exported: sections/ (%d departments, %d total sections)", | |
| len(by_dept), | |
| len(section_data), | |
| ) | |