realadvisor-challenge / src /downloader.py
dcrey7's picture
feat: initial DVF data pipeline with 6-level price aggregation
ba08c19
"""
Download DVF geolocalized data from data.gouv.fr.
Responsibility: Fetch raw CSV files and store them locally.
Handles caching — won't re-download files that already exist.
"""
import gzip
import logging
from pathlib import Path
import requests
from tqdm import tqdm
from src.config import DVF_YEARS, RAW_DIR, dvf_url
logger = logging.getLogger(__name__)
CHUNK_SIZE = 8192 # 8 KB chunks for streaming download
def download_file(url: str, dest: Path, *, force: bool = False) -> Path:
"""
Download a single file with progress bar and caching.
Args:
url: Remote URL to download.
dest: Local path to save the file.
force: If True, re-download even if file exists.
Returns:
Path to the downloaded file.
Raises:
requests.HTTPError: If the server returns a non-2xx status.
"""
if dest.exists() and not force:
logger.info("Cached: %s", dest.name)
return dest
dest.parent.mkdir(parents=True, exist_ok=True)
logger.info("Downloading: %s", url)
response = requests.get(url, stream=True, timeout=300)
response.raise_for_status()
total = int(response.headers.get("content-length", 0))
with (
open(dest, "wb") as f,
tqdm(total=total, unit="B", unit_scale=True, desc=dest.name) as bar,
):
for chunk in response.iter_content(chunk_size=CHUNK_SIZE):
f.write(chunk)
bar.update(len(chunk))
logger.info("Saved: %s (%.1f MB)", dest.name, dest.stat().st_size / 1e6)
return dest
def decompress_gz(gz_path: Path) -> Path:
"""
Decompress a .gz file to .csv in the same directory.
Args:
gz_path: Path to the .gz file.
Returns:
Path to the decompressed .csv file.
"""
csv_path = gz_path.with_suffix("") # removes .gz
if csv_path.exists():
logger.info("Already decompressed: %s", csv_path.name)
return csv_path
logger.info("Decompressing: %s", gz_path.name)
with gzip.open(gz_path, "rb") as f_in, open(csv_path, "wb") as f_out:
while chunk := f_in.read(CHUNK_SIZE * 128):
f_out.write(chunk)
logger.info("Decompressed: %s (%.1f MB)", csv_path.name, csv_path.stat().st_size / 1e6)
return csv_path
def download_dvf_year(year: int, *, force: bool = False) -> Path:
"""
Download and decompress DVF data for a single year.
Args:
year: Year to download (e.g. 2024).
force: Re-download even if cached.
Returns:
Path to the decompressed CSV file.
"""
url = dvf_url(year)
gz_path = RAW_DIR / f"dvf_{year}.csv.gz"
download_file(url, gz_path, force=force)
return decompress_gz(gz_path)
def download_all(years: list[int] | None = None, *, force: bool = False) -> list[Path]:
"""
Download DVF data for all configured years.
Args:
years: List of years to download. Defaults to config.DVF_YEARS.
force: Re-download even if cached.
Returns:
List of paths to decompressed CSV files.
"""
years = years or DVF_YEARS
paths = []
for year in years:
try:
path = download_dvf_year(year, force=force)
paths.append(path)
except requests.HTTPError as e:
logger.error("Failed to download year %d: %s", year, e)
return paths
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
download_all()