Spaces:
Sleeping
Sleeping
| """ | |
| Download DVF geolocalized data from data.gouv.fr. | |
| Responsibility: Fetch raw CSV files and store them locally. | |
| Handles caching — won't re-download files that already exist. | |
| """ | |
| import gzip | |
| import logging | |
| from pathlib import Path | |
| import requests | |
| from tqdm import tqdm | |
| from src.config import DVF_YEARS, RAW_DIR, dvf_url | |
| logger = logging.getLogger(__name__) | |
| CHUNK_SIZE = 8192 # 8 KB chunks for streaming download | |
| def download_file(url: str, dest: Path, *, force: bool = False) -> Path: | |
| """ | |
| Download a single file with progress bar and caching. | |
| Args: | |
| url: Remote URL to download. | |
| dest: Local path to save the file. | |
| force: If True, re-download even if file exists. | |
| Returns: | |
| Path to the downloaded file. | |
| Raises: | |
| requests.HTTPError: If the server returns a non-2xx status. | |
| """ | |
| if dest.exists() and not force: | |
| logger.info("Cached: %s", dest.name) | |
| return dest | |
| dest.parent.mkdir(parents=True, exist_ok=True) | |
| logger.info("Downloading: %s", url) | |
| response = requests.get(url, stream=True, timeout=300) | |
| response.raise_for_status() | |
| total = int(response.headers.get("content-length", 0)) | |
| with ( | |
| open(dest, "wb") as f, | |
| tqdm(total=total, unit="B", unit_scale=True, desc=dest.name) as bar, | |
| ): | |
| for chunk in response.iter_content(chunk_size=CHUNK_SIZE): | |
| f.write(chunk) | |
| bar.update(len(chunk)) | |
| logger.info("Saved: %s (%.1f MB)", dest.name, dest.stat().st_size / 1e6) | |
| return dest | |
| def decompress_gz(gz_path: Path) -> Path: | |
| """ | |
| Decompress a .gz file to .csv in the same directory. | |
| Args: | |
| gz_path: Path to the .gz file. | |
| Returns: | |
| Path to the decompressed .csv file. | |
| """ | |
| csv_path = gz_path.with_suffix("") # removes .gz | |
| if csv_path.exists(): | |
| logger.info("Already decompressed: %s", csv_path.name) | |
| return csv_path | |
| logger.info("Decompressing: %s", gz_path.name) | |
| with gzip.open(gz_path, "rb") as f_in, open(csv_path, "wb") as f_out: | |
| while chunk := f_in.read(CHUNK_SIZE * 128): | |
| f_out.write(chunk) | |
| logger.info("Decompressed: %s (%.1f MB)", csv_path.name, csv_path.stat().st_size / 1e6) | |
| return csv_path | |
| def download_dvf_year(year: int, *, force: bool = False) -> Path: | |
| """ | |
| Download and decompress DVF data for a single year. | |
| Args: | |
| year: Year to download (e.g. 2024). | |
| force: Re-download even if cached. | |
| Returns: | |
| Path to the decompressed CSV file. | |
| """ | |
| url = dvf_url(year) | |
| gz_path = RAW_DIR / f"dvf_{year}.csv.gz" | |
| download_file(url, gz_path, force=force) | |
| return decompress_gz(gz_path) | |
| def download_all(years: list[int] | None = None, *, force: bool = False) -> list[Path]: | |
| """ | |
| Download DVF data for all configured years. | |
| Args: | |
| years: List of years to download. Defaults to config.DVF_YEARS. | |
| force: Re-download even if cached. | |
| Returns: | |
| List of paths to decompressed CSV files. | |
| """ | |
| years = years or DVF_YEARS | |
| paths = [] | |
| for year in years: | |
| try: | |
| path = download_dvf_year(year, force=force) | |
| paths.append(path) | |
| except requests.HTTPError as e: | |
| logger.error("Failed to download year %d: %s", year, e) | |
| return paths | |
| if __name__ == "__main__": | |
| logging.basicConfig(level=logging.INFO) | |
| download_all() | |