Spaces:
Sleeping
Sleeping
| """Tests for aggregation functions.""" | |
| import json | |
| import math | |
| from pathlib import Path | |
| import numpy as np | |
| import polars as pl | |
| import pytest | |
| from src.aggregator import ( | |
| LEVEL_TO_COLUMN, | |
| _aggregate_group, | |
| _export_sections, | |
| aggregate_all_levels, | |
| aggregate_all_types, | |
| aggregate_level, | |
| effective_sample_size, | |
| export_json, | |
| weighted_trimmed_mean, | |
| ) | |
| # ---- weighted_trimmed_mean ---- | |
| def test_wtm_uniform_weights(): | |
| """With equal weights and 0 trim, WTM == simple mean.""" | |
| prices = np.array([100.0, 200.0, 300.0, 400.0, 500.0]) | |
| weights = np.ones(5) | |
| result = weighted_trimmed_mean(prices, weights, trim=0.0) | |
| assert result == pytest.approx(300.0) | |
| def test_wtm_trims_extremes(): | |
| """With 20% trim, bottom and top tails should be clipped.""" | |
| prices = np.array([1.0, 100.0, 200.0, 300.0, 10000.0]) | |
| weights = np.ones(5) | |
| result_trimmed = weighted_trimmed_mean(prices, weights, trim=0.2) | |
| result_full = weighted_trimmed_mean(prices, weights, trim=0.0) | |
| # Trimmed mean should be less affected by outliers | |
| assert result_trimmed < result_full | |
| def test_wtm_respects_weight_ordering(): | |
| """Higher-weighted observations should pull the mean toward them.""" | |
| prices = np.array([100.0, 200.0]) | |
| # Heavily weight the 200 | |
| weights_high = np.array([0.1, 10.0]) | |
| weights_low = np.array([10.0, 0.1]) | |
| result_high = weighted_trimmed_mean(prices, weights_high, trim=0.0) | |
| result_low = weighted_trimmed_mean(prices, weights_low, trim=0.0) | |
| assert result_high > result_low | |
| def test_wtm_empty_array(): | |
| result = weighted_trimmed_mean(np.array([]), np.array([]), trim=0.2) | |
| assert math.isnan(result) | |
| def test_wtm_single_element(): | |
| result = weighted_trimmed_mean(np.array([5000.0]), np.array([1.0]), trim=0.2) | |
| assert result == pytest.approx(5000.0) | |
| def test_wtm_zero_weights(): | |
| result = weighted_trimmed_mean(np.array([100.0, 200.0]), np.array([0.0, 0.0])) | |
| assert math.isnan(result) | |
| # ---- effective_sample_size ---- | |
| def test_ess_equal_weights(): | |
| """With all equal weights, n_eff == n.""" | |
| weights = np.ones(50) | |
| assert effective_sample_size(weights) == pytest.approx(50.0) | |
| def test_ess_unequal_weights(): | |
| """Unequal weights should give n_eff < n.""" | |
| weights = np.array([1.0, 1.0, 1.0, 0.01]) | |
| n_eff = effective_sample_size(weights) | |
| assert n_eff < 4.0 | |
| assert n_eff > 1.0 | |
| def test_ess_single_dominant(): | |
| """When one weight dominates, n_eff -> 1.""" | |
| weights = np.array([1000.0, 0.001, 0.001, 0.001]) | |
| n_eff = effective_sample_size(weights) | |
| assert n_eff < 2.0 | |
| def test_ess_empty(): | |
| assert effective_sample_size(np.array([])) == 0.0 | |
| # ---- _aggregate_group ---- | |
| def test_aggregate_group_basic(): | |
| df = pl.DataFrame({ | |
| "prix_m2": [2000.0, 3000.0, 4000.0, 2500.0, 3500.0], | |
| "temporal_weight": [1.0, 1.0, 1.0, 1.0, 1.0], | |
| }) | |
| stats = _aggregate_group(df) | |
| assert stats["volume"] == 5 | |
| assert stats["median"] == pytest.approx(3000.0, rel=0.01) | |
| assert stats["q1"] < stats["median"] | |
| assert stats["q3"] > stats["median"] | |
| assert stats["n_eff"] == pytest.approx(5.0, abs=0.1) | |
| assert 0.0 <= stats["confidence"] <= 1.0 | |
| def test_aggregate_group_with_temporal_decay(): | |
| """Older transactions should have lower weights, pulling WTM toward recent.""" | |
| df = pl.DataFrame({ | |
| "prix_m2": [2000.0, 2000.0, 2000.0, 5000.0, 5000.0], | |
| "temporal_weight": [0.5, 0.5, 0.5, 1.0, 1.0], | |
| }) | |
| stats = _aggregate_group(df) | |
| # WTM should be pulled toward 5000 (higher weight) | |
| assert stats["wtm"] > stats["median"] | |
| def test_aggregate_group_empty(): | |
| df = pl.DataFrame({ | |
| "prix_m2": pl.Series([], dtype=pl.Float64), | |
| "temporal_weight": pl.Series([], dtype=pl.Float64), | |
| }) | |
| stats = _aggregate_group(df) | |
| assert stats["volume"] == 0 | |
| assert stats["confidence"] == 0.0 | |
| def test_aggregate_group_returns_all_keys(): | |
| df = pl.DataFrame({ | |
| "prix_m2": [3000.0], | |
| "temporal_weight": [1.0], | |
| }) | |
| stats = _aggregate_group(df) | |
| expected_keys = {"median", "wtm", "q1", "q3", "volume", "n_eff", "confidence"} | |
| assert set(stats.keys()) == expected_keys | |
| # ---- aggregate_level ---- | |
| def test_aggregate_level_groups_correctly(sample_clean_df): | |
| result = aggregate_level(sample_clean_df, "code_departement") | |
| assert "75" in result | |
| assert "69" in result | |
| assert result["75"]["volume"] == 4 # 4 entries with dept 75 | |
| assert result["69"]["volume"] == 4 | |
| def test_aggregate_level_with_property_type(sample_clean_df): | |
| result = aggregate_level( | |
| sample_clean_df, "code_departement", property_type="Appartement" | |
| ) | |
| assert "75" in result | |
| # Only Appartement rows for dept 75 | |
| expected = sample_clean_df.filter( | |
| (pl.col("code_departement") == "75") | |
| & (pl.col("type_local") == "Appartement") | |
| ) | |
| assert result["75"]["volume"] == len(expected) | |
| def test_aggregate_level_country(sample_clean_df): | |
| result = aggregate_level(sample_clean_df, "_country") | |
| assert "FR" in result | |
| assert result["FR"]["volume"] == 8 | |
| # ---- aggregate_all_types ---- | |
| def test_aggregate_all_types_keys(sample_clean_df): | |
| result = aggregate_all_types(sample_clean_df, "code_departement") | |
| for code in result: | |
| assert "tous" in result[code] | |
| # Dept 75 has only Appartement in sample, dept 69 has only Maison | |
| assert "appartement" in result["75"] | |
| assert "maison" in result["69"] | |
| # ---- aggregate_all_levels ---- | |
| def test_aggregate_all_levels_keys(sample_clean_df): | |
| result = aggregate_all_levels(sample_clean_df) | |
| for level in ["country", "region", "department", "commune", "postcode", "section"]: | |
| assert level in result | |
| # ---- _export_sections ---- | |
| def test_export_sections_splits_by_dept(tmp_path): | |
| section_data = { | |
| "7510100001": {"tous": {"median": 5000}}, | |
| "7510100002": {"tous": {"median": 5100}}, | |
| "6938100001": {"tous": {"median": 3000}}, | |
| "2A004000B0": {"tous": {"median": 2000}}, | |
| } | |
| _export_sections(section_data, tmp_path) | |
| sections_dir = tmp_path / "sections" | |
| assert (sections_dir / "75.json").exists() | |
| assert (sections_dir / "69.json").exists() | |
| assert (sections_dir / "2A.json").exists() | |
| with open(sections_dir / "75.json") as f: | |
| data_75 = json.load(f) | |
| assert len(data_75) == 2 | |
| def test_export_sections_dom_tom(tmp_path): | |
| """DOM-TOM departments (971-976) use 3-digit dept codes.""" | |
| section_data = { | |
| "97105000001": {"tous": {"median": 2500}}, | |
| "97205000001": {"tous": {"median": 2600}}, | |
| } | |
| _export_sections(section_data, tmp_path) | |
| sections_dir = tmp_path / "sections" | |
| assert (sections_dir / "971.json").exists() | |
| assert (sections_dir / "972.json").exists() | |
| # ---- export_json ---- | |
| def test_export_json_creates_files(tmp_path, sample_clean_df): | |
| aggregated = aggregate_all_levels(sample_clean_df) | |
| export_json(aggregated, tmp_path) | |
| assert (tmp_path / "prices_country.json").exists() | |
| assert (tmp_path / "prices_region.json").exists() | |
| assert (tmp_path / "prices_department.json").exists() | |
| assert (tmp_path / "prices_commune.json").exists() | |
| assert (tmp_path / "prices_postcode.json").exists() | |
| # Section is split into per-dept files | |
| assert (tmp_path / "sections").is_dir() | |
| # ---- LEVEL_TO_COLUMN mapping ---- | |
| def test_level_to_column_covers_all_levels(): | |
| from src.config import AGGREGATION_LEVELS | |
| for level in AGGREGATION_LEVELS: | |
| assert level in LEVEL_TO_COLUMN | |