dcrey7's picture
feat: add temporal weighting, remove commercial, expand to 2014-2025
c7a66d3
"""
Orchestrate the full DVF data pipeline.
Responsibility: Wire together download -> clean -> aggregate -> export.
This is the main entry point for running the complete pipeline.
"""
import logging
import time
from src.config import AGGREGATED_DIR, PROCESSED_DIR, RAW_DIR
from src.downloader import download_all
from src.cleaner import clean
from src.aggregator import aggregate_all_levels, export_json
from src.top_cities import compute_top_cities, export_top_cities
logger = logging.getLogger(__name__)
def run(
*,
years: list[int] | None = None,
skip_download: bool = False,
skip_section: bool = False,
) -> None:
"""
Run the complete pipeline: download -> clean -> aggregate -> export.
Args:
years: Years to process. None = all configured years.
skip_download: Skip download step (use existing raw files).
skip_section: Skip section-level aggregation (slow, large output).
"""
t0 = time.time()
# Step 1: Download
if not skip_download:
logger.info("=" * 60)
logger.info("STEP 1: Downloading DVF data")
logger.info("=" * 60)
csv_paths = download_all(years)
else:
logger.info("Skipping download, using existing files...")
csv_paths = sorted(RAW_DIR.glob("dvf_*.csv"))
csv_paths = [p for p in csv_paths if not p.name.endswith(".gz")]
logger.info("Found %d raw CSV files", len(csv_paths))
if not csv_paths:
logger.error("No CSV files found. Run without --skip-download first.")
return
# Step 2: Clean
logger.info("=" * 60)
logger.info("STEP 2: Cleaning data")
logger.info("=" * 60)
lf = clean(csv_paths)
# Materialize once and save as parquet for reuse
logger.info("Materializing cleaned data...")
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
df_clean = lf.collect()
parquet_path = PROCESSED_DIR / "dvf_clean.parquet"
df_clean.write_parquet(parquet_path)
logger.info(
"Saved: %s (%d rows, %.1f MB)",
parquet_path.name,
len(df_clean),
parquet_path.stat().st_size / 1e6,
)
# Step 3: Aggregate (using collected DataFrame for weighted stats)
logger.info("=" * 60)
logger.info("STEP 3: Aggregating prices (time-weighted)")
logger.info("=" * 60)
if skip_section:
from src.config import AGGREGATION_LEVELS
from src.aggregator import LEVEL_TO_COLUMN, aggregate_all_types
aggregated = {}
for level in AGGREGATION_LEVELS:
if level == "section":
continue
col = LEVEL_TO_COLUMN[level]
logger.info("Aggregating: %s", level)
aggregated[level] = aggregate_all_types(df_clean, col)
logger.info(" -> %d unique codes", len(aggregated[level]))
else:
aggregated = aggregate_all_levels(df_clean)
export_json(aggregated)
# Step 4: Top cities
logger.info("=" * 60)
logger.info("STEP 4: Top 10 cities")
logger.info("=" * 60)
top_cities = compute_top_cities(df_clean)
export_top_cities(top_cities)
elapsed = time.time() - t0
logger.info("=" * 60)
logger.info("Pipeline complete in %.1f seconds", elapsed)
logger.info("Output: %s", AGGREGATED_DIR)
logger.info("=" * 60)
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
run()