realadvisor-challenge / tests /test_aggregator.py
dcrey7's picture
test: add unit tests for all pipeline modules
e751d0d
"""Tests for aggregation functions."""
import json
import math
from pathlib import Path
import numpy as np
import polars as pl
import pytest
from src.aggregator import (
LEVEL_TO_COLUMN,
_aggregate_group,
_export_sections,
aggregate_all_levels,
aggregate_all_types,
aggregate_level,
effective_sample_size,
export_json,
weighted_trimmed_mean,
)
# ---- weighted_trimmed_mean ----
def test_wtm_uniform_weights():
"""With equal weights and 0 trim, WTM == simple mean."""
prices = np.array([100.0, 200.0, 300.0, 400.0, 500.0])
weights = np.ones(5)
result = weighted_trimmed_mean(prices, weights, trim=0.0)
assert result == pytest.approx(300.0)
def test_wtm_trims_extremes():
"""With 20% trim, bottom and top tails should be clipped."""
prices = np.array([1.0, 100.0, 200.0, 300.0, 10000.0])
weights = np.ones(5)
result_trimmed = weighted_trimmed_mean(prices, weights, trim=0.2)
result_full = weighted_trimmed_mean(prices, weights, trim=0.0)
# Trimmed mean should be less affected by outliers
assert result_trimmed < result_full
def test_wtm_respects_weight_ordering():
"""Higher-weighted observations should pull the mean toward them."""
prices = np.array([100.0, 200.0])
# Heavily weight the 200
weights_high = np.array([0.1, 10.0])
weights_low = np.array([10.0, 0.1])
result_high = weighted_trimmed_mean(prices, weights_high, trim=0.0)
result_low = weighted_trimmed_mean(prices, weights_low, trim=0.0)
assert result_high > result_low
def test_wtm_empty_array():
result = weighted_trimmed_mean(np.array([]), np.array([]), trim=0.2)
assert math.isnan(result)
def test_wtm_single_element():
result = weighted_trimmed_mean(np.array([5000.0]), np.array([1.0]), trim=0.2)
assert result == pytest.approx(5000.0)
def test_wtm_zero_weights():
result = weighted_trimmed_mean(np.array([100.0, 200.0]), np.array([0.0, 0.0]))
assert math.isnan(result)
# ---- effective_sample_size ----
def test_ess_equal_weights():
"""With all equal weights, n_eff == n."""
weights = np.ones(50)
assert effective_sample_size(weights) == pytest.approx(50.0)
def test_ess_unequal_weights():
"""Unequal weights should give n_eff < n."""
weights = np.array([1.0, 1.0, 1.0, 0.01])
n_eff = effective_sample_size(weights)
assert n_eff < 4.0
assert n_eff > 1.0
def test_ess_single_dominant():
"""When one weight dominates, n_eff -> 1."""
weights = np.array([1000.0, 0.001, 0.001, 0.001])
n_eff = effective_sample_size(weights)
assert n_eff < 2.0
def test_ess_empty():
assert effective_sample_size(np.array([])) == 0.0
# ---- _aggregate_group ----
def test_aggregate_group_basic():
df = pl.DataFrame({
"prix_m2": [2000.0, 3000.0, 4000.0, 2500.0, 3500.0],
"temporal_weight": [1.0, 1.0, 1.0, 1.0, 1.0],
})
stats = _aggregate_group(df)
assert stats["volume"] == 5
assert stats["median"] == pytest.approx(3000.0, rel=0.01)
assert stats["q1"] < stats["median"]
assert stats["q3"] > stats["median"]
assert stats["n_eff"] == pytest.approx(5.0, abs=0.1)
assert 0.0 <= stats["confidence"] <= 1.0
def test_aggregate_group_with_temporal_decay():
"""Older transactions should have lower weights, pulling WTM toward recent."""
df = pl.DataFrame({
"prix_m2": [2000.0, 2000.0, 2000.0, 5000.0, 5000.0],
"temporal_weight": [0.5, 0.5, 0.5, 1.0, 1.0],
})
stats = _aggregate_group(df)
# WTM should be pulled toward 5000 (higher weight)
assert stats["wtm"] > stats["median"]
def test_aggregate_group_empty():
df = pl.DataFrame({
"prix_m2": pl.Series([], dtype=pl.Float64),
"temporal_weight": pl.Series([], dtype=pl.Float64),
})
stats = _aggregate_group(df)
assert stats["volume"] == 0
assert stats["confidence"] == 0.0
def test_aggregate_group_returns_all_keys():
df = pl.DataFrame({
"prix_m2": [3000.0],
"temporal_weight": [1.0],
})
stats = _aggregate_group(df)
expected_keys = {"median", "wtm", "q1", "q3", "volume", "n_eff", "confidence"}
assert set(stats.keys()) == expected_keys
# ---- aggregate_level ----
def test_aggregate_level_groups_correctly(sample_clean_df):
result = aggregate_level(sample_clean_df, "code_departement")
assert "75" in result
assert "69" in result
assert result["75"]["volume"] == 4 # 4 entries with dept 75
assert result["69"]["volume"] == 4
def test_aggregate_level_with_property_type(sample_clean_df):
result = aggregate_level(
sample_clean_df, "code_departement", property_type="Appartement"
)
assert "75" in result
# Only Appartement rows for dept 75
expected = sample_clean_df.filter(
(pl.col("code_departement") == "75")
& (pl.col("type_local") == "Appartement")
)
assert result["75"]["volume"] == len(expected)
def test_aggregate_level_country(sample_clean_df):
result = aggregate_level(sample_clean_df, "_country")
assert "FR" in result
assert result["FR"]["volume"] == 8
# ---- aggregate_all_types ----
def test_aggregate_all_types_keys(sample_clean_df):
result = aggregate_all_types(sample_clean_df, "code_departement")
for code in result:
assert "tous" in result[code]
# Dept 75 has only Appartement in sample, dept 69 has only Maison
assert "appartement" in result["75"]
assert "maison" in result["69"]
# ---- aggregate_all_levels ----
def test_aggregate_all_levels_keys(sample_clean_df):
result = aggregate_all_levels(sample_clean_df)
for level in ["country", "region", "department", "commune", "postcode", "section"]:
assert level in result
# ---- _export_sections ----
def test_export_sections_splits_by_dept(tmp_path):
section_data = {
"7510100001": {"tous": {"median": 5000}},
"7510100002": {"tous": {"median": 5100}},
"6938100001": {"tous": {"median": 3000}},
"2A004000B0": {"tous": {"median": 2000}},
}
_export_sections(section_data, tmp_path)
sections_dir = tmp_path / "sections"
assert (sections_dir / "75.json").exists()
assert (sections_dir / "69.json").exists()
assert (sections_dir / "2A.json").exists()
with open(sections_dir / "75.json") as f:
data_75 = json.load(f)
assert len(data_75) == 2
def test_export_sections_dom_tom(tmp_path):
"""DOM-TOM departments (971-976) use 3-digit dept codes."""
section_data = {
"97105000001": {"tous": {"median": 2500}},
"97205000001": {"tous": {"median": 2600}},
}
_export_sections(section_data, tmp_path)
sections_dir = tmp_path / "sections"
assert (sections_dir / "971.json").exists()
assert (sections_dir / "972.json").exists()
# ---- export_json ----
def test_export_json_creates_files(tmp_path, sample_clean_df):
aggregated = aggregate_all_levels(sample_clean_df)
export_json(aggregated, tmp_path)
assert (tmp_path / "prices_country.json").exists()
assert (tmp_path / "prices_region.json").exists()
assert (tmp_path / "prices_department.json").exists()
assert (tmp_path / "prices_commune.json").exists()
assert (tmp_path / "prices_postcode.json").exists()
# Section is split into per-dept files
assert (tmp_path / "sections").is_dir()
# ---- LEVEL_TO_COLUMN mapping ----
def test_level_to_column_covers_all_levels():
from src.config import AGGREGATION_LEVELS
for level in AGGREGATION_LEVELS:
assert level in LEVEL_TO_COLUMN