File size: 3,469 Bytes
ba08c19
 
 
c7a66d3
ba08c19
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c7a66d3
ba08c19
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c7a66d3
ba08c19
c7a66d3
ba08c19
 
 
 
 
 
c7a66d3
 
 
ba08c19
 
c7a66d3
 
ba08c19
c7a66d3
ba08c19
 
 
 
 
 
 
c7a66d3
 
ba08c19
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""
Orchestrate the full DVF data pipeline.

Responsibility: Wire together download -> clean -> aggregate -> export.
This is the main entry point for running the complete pipeline.
"""

import logging
import time

from src.config import AGGREGATED_DIR, PROCESSED_DIR, RAW_DIR
from src.downloader import download_all
from src.cleaner import clean
from src.aggregator import aggregate_all_levels, export_json
from src.top_cities import compute_top_cities, export_top_cities

logger = logging.getLogger(__name__)


def run(
    *,
    years: list[int] | None = None,
    skip_download: bool = False,
    skip_section: bool = False,
) -> None:
    """
    Run the complete pipeline: download -> clean -> aggregate -> export.

    Args:
        years:          Years to process. None = all configured years.
        skip_download:  Skip download step (use existing raw files).
        skip_section:   Skip section-level aggregation (slow, large output).
    """
    t0 = time.time()

    # Step 1: Download
    if not skip_download:
        logger.info("=" * 60)
        logger.info("STEP 1: Downloading DVF data")
        logger.info("=" * 60)
        csv_paths = download_all(years)
    else:
        logger.info("Skipping download, using existing files...")
        csv_paths = sorted(RAW_DIR.glob("dvf_*.csv"))
        csv_paths = [p for p in csv_paths if not p.name.endswith(".gz")]
        logger.info("Found %d raw CSV files", len(csv_paths))

    if not csv_paths:
        logger.error("No CSV files found. Run without --skip-download first.")
        return

    # Step 2: Clean
    logger.info("=" * 60)
    logger.info("STEP 2: Cleaning data")
    logger.info("=" * 60)
    lf = clean(csv_paths)

    # Materialize once and save as parquet for reuse
    logger.info("Materializing cleaned data...")
    PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
    df_clean = lf.collect()
    parquet_path = PROCESSED_DIR / "dvf_clean.parquet"
    df_clean.write_parquet(parquet_path)
    logger.info(
        "Saved: %s (%d rows, %.1f MB)",
        parquet_path.name,
        len(df_clean),
        parquet_path.stat().st_size / 1e6,
    )

    # Step 3: Aggregate (using collected DataFrame for weighted stats)
    logger.info("=" * 60)
    logger.info("STEP 3: Aggregating prices (time-weighted)")
    logger.info("=" * 60)

    if skip_section:
        from src.config import AGGREGATION_LEVELS
        from src.aggregator import LEVEL_TO_COLUMN, aggregate_all_types
        aggregated = {}
        for level in AGGREGATION_LEVELS:
            if level == "section":
                continue
            col = LEVEL_TO_COLUMN[level]
            logger.info("Aggregating: %s", level)
            aggregated[level] = aggregate_all_types(df_clean, col)
            logger.info("  -> %d unique codes", len(aggregated[level]))
    else:
        aggregated = aggregate_all_levels(df_clean)

    export_json(aggregated)

    # Step 4: Top cities
    logger.info("=" * 60)
    logger.info("STEP 4: Top 10 cities")
    logger.info("=" * 60)
    top_cities = compute_top_cities(df_clean)
    export_top_cities(top_cities)

    elapsed = time.time() - t0
    logger.info("=" * 60)
    logger.info("Pipeline complete in %.1f seconds", elapsed)
    logger.info("Output: %s", AGGREGATED_DIR)
    logger.info("=" * 60)


if __name__ == "__main__":
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s %(levelname)s %(name)s: %(message)s",
    )
    run()