#!/usr/bin/env python3 """ Script to download all images from the dataset locally. This file downloads all images from URLs in the dataset CSV and saves them locally to speed up training by avoiding repeated downloads. It uses parallel processing to download multiple images simultaneously and updates the CSV with local paths of downloaded images. """ import pandas as pd import requests from PIL import Image from io import BytesIO from tqdm import tqdm import hashlib from pathlib import Path import time import concurrent.futures from threading import Lock import config class ImageDownloader: def __init__(self, df, images_dir=config.images_dir, max_workers=8, timeout=10): """ Initialize the image downloader. Args: csv_path: Path to the CSV file containing the URLs images_dir: Directory to save the images max_workers: Number of threads for parallel download timeout: Timeout for HTTP requests (seconds) """ self.df = df self.images_dir = Path(images_dir) self.max_workers = max_workers self.timeout = timeout # Create the images directory if it doesn't exist self.images_dir.mkdir(parents=True, exist_ok=True) # Statistics self.stats = { 'downloaded': 0, 'skipped': 0, 'failed': 0, 'total': 0 } self.stats_lock = Lock() def url_to_filename(self, url): """Convert a URL to a secure filename.""" # Use MD5 hash of the URL to avoid character issues url_hash = hashlib.md5(url.encode()).hexdigest() return f"{url_hash}.jpg" def download_single_image(self, row): """ Download a single image. Args: row: Tuple (index, pandas.Series) containing the row data Returns: tuple: (success, index, message) """ idx, data = row url = data[config.column_url_image] # Filename based on the URL filename = self.url_to_filename(url) filepath = self.images_dir / filename # Check if the image already exists if filepath.exists(): with self.stats_lock: self.stats['skipped'] += 1 return True, idx, f"Skipped (already exists): {filename}" try: # Download the image response = requests.get(url, timeout=self.timeout, stream=True) response.raise_for_status() # Check the content type content_type = response.headers.get('content-type', '') if not content_type.startswith('image/'): with self.stats_lock: self.stats['failed'] += 1 return False, idx, f"Not an image: {content_type}" # Save the image try: image = Image.open(BytesIO(response.content)).convert("RGB") image.save(filepath, "JPEG", quality=85, optimize=True) with self.stats_lock: self.stats['downloaded'] += 1 return True, idx, f"Downloaded: {filename}" except Exception as img_error: with self.stats_lock: self.stats['failed'] += 1 return False, idx, f"Image processing error: {str(img_error)}" except requests.exceptions.RequestException as e: with self.stats_lock: self.stats['failed'] += 1 return False, idx, f"Download error: {str(e)}" except Exception as e: with self.stats_lock: self.stats['failed'] += 1 return False, idx, f"Unexpected error: {str(e)}" def download_all_images(self): """Download all images from the dataset.""" print(f"šŸ“Š Loading dataset from {self.df}") self.stats['total'] = len(self.df) print(f"šŸ” Found {len(self.df)} images to download") print(f"šŸ“ Saving in: {self.images_dir}") print(f"šŸ”§ Using {self.max_workers} threads") # Create a new DataFrame with local paths df_local = self.df.copy() df_local[config.column_local_image_path] = "" df_local['download_success'] = False start_time = time.time() # Parallel download with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: # Submit all tasks future_to_row = { executor.submit(self.download_single_image, row): row for row in self.df.iterrows() } # Process the results with a progress bar with tqdm(total=len(self.df), desc="šŸ“„ Downloading", unit="img") as pbar: for future in concurrent.futures.as_completed(future_to_row): row = future_to_row[future] idx = row[0] try: success, _, message = future.result() if success: # Add the local path to the DataFrame filename = self.url_to_filename(row[1][config.column_url_image]) df_local.loc[idx, config.column_local_image_path] = str(self.images_dir / filename) df_local.loc[idx, 'download_success'] = True # Update the progress bar pbar.set_postfix({ 'OK': self.stats['downloaded'], 'Skip': self.stats['skipped'], 'Fail': self.stats['failed'] }) pbar.update(1) except Exception as e: print(f"āŒ Unexpected error for index {idx}: {e}") with self.stats_lock: self.stats['failed'] += 1 pbar.update(1) elapsed_time = time.time() - start_time # Final statistics print("\n" + "="*60) print("šŸ“Š DOWNLOAD STATISTICS") print("="*60) print(f"āœ… Downloaded: {self.stats['downloaded']}") print(f"ā­ļø Skipped (already present): {self.stats['skipped']}") print(f"āŒ Failed: {self.stats['failed']}") print(f"šŸ“Š Total: {self.stats['total']}") print(f"ā±ļø Time elapsed: {elapsed_time:.1f}s") success_rate = (self.stats['downloaded'] + self.stats['skipped']) / self.stats['total'] * 100 print(f"šŸŽÆ Success rate: {success_rate:.1f}%") if self.stats['downloaded'] > 0: avg_time = elapsed_time / self.stats['downloaded'] print(f"⚔ Average time per image: {avg_time:.2f}s") # Save the updated DataFrame output_path = config.local_dataset_path df_local.to_csv(output_path, index=False) print(f"šŸ’¾ Updated dataset saved: {output_path}") return df_local def main(): """Main function.""" print("šŸš€ STARTING IMAGE DOWNLOADER") print("="*60) # Configuration df = pd.read_csv(config.local_dataset_path) df = df[df['color'] != 'unknown'] # Create the downloader downloader = ImageDownloader( df=df, images_dir=config.images_dir, max_workers=8, timeout=10 ) # Download all images df_with_paths = downloader.download_all_images() print("\nšŸŽ‰ DOWNLOAD COMPLETED!") print("šŸ’” You can now use the local images for training.") if __name__ == "__main__": main()