climsight / download_data.py
kuivi's picture
code
3f25adf
import requests
import tarfile
import zipfile
import os
import shutil
import sys
import yaml
import argparse
def download_file(url, local_filename):
"""Attempt to download a file from a URL and save it locally, with a progress indicator."""
try:
with requests.get(url, stream=True) as r:
r.raise_for_status()
# Get total file size from headers, if available
total_length = r.headers.get('content-length')
if total_length is not None:
total_length = int(total_length)
downloaded = 0
with open(local_filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
if total_length is not None:
downloaded += len(chunk)
# Calculate the percentage of the file downloaded and update the progress bar
done_percentage = int(100 * downloaded / total_length)
# Update the progress bar
#sys.stdout.write(f"\rDownloading {local_filename}: {done_percentage}%")
#sys.stdout.flush()
if total_length is not None:
sys.stdout.write('\n') # Move the cursor to the next line after download completes
return True
except requests.RequestException as e:
print(f"\033[93mWarning: Failed to download {url}. Please download manually.\033[0m")
print(f"\033[91mError: {e}\033[0m")
return False
def extract_tar(file_path, extract_to='.'):
"""Extract tar file and handle errors."""
try:
with tarfile.open(file_path) as tar:
tar.extractall(path=extract_to)
os.remove(file_path)
except Exception as e:
print(f"\033[93mWarning: Failed to extract {file_path}.\033[0m")
def extract_zip(file_path, extract_to='.'):
"""Extract zip file and handle errors."""
try:
with zipfile.ZipFile(file_path, 'r') as zip_ref:
zip_ref.extractall(extract_to)
os.remove(file_path)
except Exception as e:
print(f"\033[93mWarning: Failed to extract {file_path}.\033[0m")
def extract_arch(file_path, extract_to='.', archive_type=''):
"""Extract tar/zip file and handle errors.
if archive_type='' (default) file will be moved
"""
if not archive_type:
_, file_extension = os.path.splitext(file_path)
if file_extension in ['.zip']:
archive_type = 'zip'
elif file_extension in ['.tar']:
archive_type = 'tar'
try:
if archive_type=='zip':
with zipfile.ZipFile(file_path, 'r') as zip_ref:
zip_ref.extractall(extract_to)
elif archive_type=='tar':
with tarfile.open(file_path) as tar:
tar.extractall(path=extract_to)
else:
#cp file to
destination_file = os.path.join(extract_to, os.path.basename(file_path))
shutil.copyfile(file_path,destination_file)
os.remove(file_path)
except Exception as e:
print(f"\033[93mWarning: Failed to extract {file_path}.\033[0m")
def create_dir(path):
"""Create a directory if it doesn't exist."""
os.makedirs(path, exist_ok=True)
def remove_dir(path):
"""Remove a directory if it exists."""
if os.path.exists(path) and os.path.isdir(path):
shutil.rmtree(path)
def main():
# Parse command-line argument (--source_files)
parser = argparse.ArgumentParser(description="Download and extract the raw source files of the RAG.")
parser.add_argument('--source_files', type=bool, default=False, help='Whether to download and extract source files (IPCC text reports).')
args = parser.parse_args()
# Load the YAML file
with open('data_sources.yml', 'r') as file:
data_config = yaml.safe_load(file)
base_path = data_config['base_path']
sources = data_config['sources']
#make subdirs list and clean it
subdirs = []
for entry in sources:
subdirs.append(entry['subdir'])
subdirs = set(subdirs)
subdirs = list(subdirs)
subdirs = [folder for folder in subdirs if folder not in ['.', './']]
for subdir in subdirs:
create_dir(os.path.join(base_path, subdir))
# Download and extract files
files_downloaded = []
files_skiped = []
urls_skiped = []
subdirs_skiped = []
for entry in sources:
file = entry['filename']
url = entry['url']
subdir = os.path.join(base_path, entry['subdir'])
# Skip downloading source files of RAG unless --source_files is set to True
if file == 'ipcc_text_reports.zip' and not args.source_files:
print("Skipping IPCC text report download as --source_files flag is not set or False.")
continue
if download_file(url, file):
extract_arch(file, subdir)
files_downloaded.append(file)
else:
files_skiped.append(file)
urls_skiped.append(url)
subdirs_skiped.append(subdir)
if (files_skiped):
print('\n')
print('----------------------------------------------')
print(f"\033[91mFiles not downloaded, please download manualy:\033[0m")
for i,file in enumerate(files_skiped):
print('--------')
print(f"\033[93mFile:\033[0m", file)
print(f"\033[93mUrl:\033[0m", urls_skiped[i])
print(f"\033[93munpack it into the:\033[0m ", subdirs_skiped[i])
print('--------')
# I would leave it for a while
#print('\n')
#print('----------------------------------------------')
#print("You also need to download the natural hazard data (for which you have to create a free account). Please download the CSV - Disaster Location Centroids [zip file] and unpack it into the 'data/natural_hazards' folder. Your file should automatically be called 'pend-gdis-1960-2018-disasterlocations.csv'. If not, please change the file name accordingly.")
#print(f"\033[93mhttps://sedac.ciesin.columbia.edu/data/set/pend-gdis-1960-2018/data-download\033[0m")
#print('-------------------')
if __name__ == "__main__":
main()