""" Orchestrate the full DVF data pipeline. Responsibility: Wire together download -> clean -> aggregate -> export. This is the main entry point for running the complete pipeline. """ import logging import time from src.config import AGGREGATED_DIR, PROCESSED_DIR, RAW_DIR from src.downloader import download_all from src.cleaner import clean from src.aggregator import aggregate_all_levels, export_json from src.top_cities import compute_top_cities, export_top_cities logger = logging.getLogger(__name__) def run( *, years: list[int] | None = None, skip_download: bool = False, skip_section: bool = False, ) -> None: """ Run the complete pipeline: download -> clean -> aggregate -> export. Args: years: Years to process. None = all configured years. skip_download: Skip download step (use existing raw files). skip_section: Skip section-level aggregation (slow, large output). """ t0 = time.time() # Step 1: Download if not skip_download: logger.info("=" * 60) logger.info("STEP 1: Downloading DVF data") logger.info("=" * 60) csv_paths = download_all(years) else: logger.info("Skipping download, using existing files...") csv_paths = sorted(RAW_DIR.glob("dvf_*.csv")) csv_paths = [p for p in csv_paths if not p.name.endswith(".gz")] logger.info("Found %d raw CSV files", len(csv_paths)) if not csv_paths: logger.error("No CSV files found. Run without --skip-download first.") return # Step 2: Clean logger.info("=" * 60) logger.info("STEP 2: Cleaning data") logger.info("=" * 60) lf = clean(csv_paths) # Materialize once and save as parquet for reuse logger.info("Materializing cleaned data...") PROCESSED_DIR.mkdir(parents=True, exist_ok=True) df_clean = lf.collect() parquet_path = PROCESSED_DIR / "dvf_clean.parquet" df_clean.write_parquet(parquet_path) logger.info( "Saved: %s (%d rows, %.1f MB)", parquet_path.name, len(df_clean), parquet_path.stat().st_size / 1e6, ) # Step 3: Aggregate (using collected DataFrame for weighted stats) logger.info("=" * 60) logger.info("STEP 3: Aggregating prices (time-weighted)") logger.info("=" * 60) if skip_section: from src.config import AGGREGATION_LEVELS from src.aggregator import LEVEL_TO_COLUMN, aggregate_all_types aggregated = {} for level in AGGREGATION_LEVELS: if level == "section": continue col = LEVEL_TO_COLUMN[level] logger.info("Aggregating: %s", level) aggregated[level] = aggregate_all_types(df_clean, col) logger.info(" -> %d unique codes", len(aggregated[level])) else: aggregated = aggregate_all_levels(df_clean) export_json(aggregated) # Step 4: Top cities logger.info("=" * 60) logger.info("STEP 4: Top 10 cities") logger.info("=" * 60) top_cities = compute_top_cities(df_clean) export_top_cities(top_cities) elapsed = time.time() - t0 logger.info("=" * 60) logger.info("Pipeline complete in %.1f seconds", elapsed) logger.info("Output: %s", AGGREGATED_DIR) logger.info("=" * 60) if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) run()