Spaces:
Sleeping
Sleeping
| """ | |
| Orchestrate the full DVF data pipeline. | |
| Responsibility: Wire together download -> clean -> aggregate -> export. | |
| This is the main entry point for running the complete pipeline. | |
| """ | |
| import logging | |
| import time | |
| from src.config import AGGREGATED_DIR, PROCESSED_DIR, RAW_DIR | |
| from src.downloader import download_all | |
| from src.cleaner import clean | |
| from src.aggregator import aggregate_all_levels, export_json | |
| from src.top_cities import compute_top_cities, export_top_cities | |
| logger = logging.getLogger(__name__) | |
| def run( | |
| *, | |
| years: list[int] | None = None, | |
| skip_download: bool = False, | |
| skip_section: bool = False, | |
| ) -> None: | |
| """ | |
| Run the complete pipeline: download -> clean -> aggregate -> export. | |
| Args: | |
| years: Years to process. None = all configured years. | |
| skip_download: Skip download step (use existing raw files). | |
| skip_section: Skip section-level aggregation (slow, large output). | |
| """ | |
| t0 = time.time() | |
| # Step 1: Download | |
| if not skip_download: | |
| logger.info("=" * 60) | |
| logger.info("STEP 1: Downloading DVF data") | |
| logger.info("=" * 60) | |
| csv_paths = download_all(years) | |
| else: | |
| logger.info("Skipping download, using existing files...") | |
| csv_paths = sorted(RAW_DIR.glob("dvf_*.csv")) | |
| csv_paths = [p for p in csv_paths if not p.name.endswith(".gz")] | |
| logger.info("Found %d raw CSV files", len(csv_paths)) | |
| if not csv_paths: | |
| logger.error("No CSV files found. Run without --skip-download first.") | |
| return | |
| # Step 2: Clean | |
| logger.info("=" * 60) | |
| logger.info("STEP 2: Cleaning data") | |
| logger.info("=" * 60) | |
| lf = clean(csv_paths) | |
| # Materialize once and save as parquet for reuse | |
| logger.info("Materializing cleaned data...") | |
| PROCESSED_DIR.mkdir(parents=True, exist_ok=True) | |
| df_clean = lf.collect() | |
| parquet_path = PROCESSED_DIR / "dvf_clean.parquet" | |
| df_clean.write_parquet(parquet_path) | |
| logger.info( | |
| "Saved: %s (%d rows, %.1f MB)", | |
| parquet_path.name, | |
| len(df_clean), | |
| parquet_path.stat().st_size / 1e6, | |
| ) | |
| # Step 3: Aggregate (using collected DataFrame for weighted stats) | |
| logger.info("=" * 60) | |
| logger.info("STEP 3: Aggregating prices (time-weighted)") | |
| logger.info("=" * 60) | |
| if skip_section: | |
| from src.config import AGGREGATION_LEVELS | |
| from src.aggregator import LEVEL_TO_COLUMN, aggregate_all_types | |
| aggregated = {} | |
| for level in AGGREGATION_LEVELS: | |
| if level == "section": | |
| continue | |
| col = LEVEL_TO_COLUMN[level] | |
| logger.info("Aggregating: %s", level) | |
| aggregated[level] = aggregate_all_types(df_clean, col) | |
| logger.info(" -> %d unique codes", len(aggregated[level])) | |
| else: | |
| aggregated = aggregate_all_levels(df_clean) | |
| export_json(aggregated) | |
| # Step 4: Top cities | |
| logger.info("=" * 60) | |
| logger.info("STEP 4: Top 10 cities") | |
| logger.info("=" * 60) | |
| top_cities = compute_top_cities(df_clean) | |
| export_top_cities(top_cities) | |
| elapsed = time.time() - t0 | |
| logger.info("=" * 60) | |
| logger.info("Pipeline complete in %.1f seconds", elapsed) | |
| logger.info("Output: %s", AGGREGATED_DIR) | |
| logger.info("=" * 60) | |
| if __name__ == "__main__": | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s %(levelname)s %(name)s: %(message)s", | |
| ) | |
| run() | |