File size: 6,451 Bytes
3f25adf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
import requests
import tarfile
import zipfile
import os
import shutil
import sys
import yaml
import argparse
def download_file(url, local_filename):
"""Attempt to download a file from a URL and save it locally, with a progress indicator."""
try:
with requests.get(url, stream=True) as r:
r.raise_for_status()
# Get total file size from headers, if available
total_length = r.headers.get('content-length')
if total_length is not None:
total_length = int(total_length)
downloaded = 0
with open(local_filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
if total_length is not None:
downloaded += len(chunk)
# Calculate the percentage of the file downloaded and update the progress bar
done_percentage = int(100 * downloaded / total_length)
# Update the progress bar
#sys.stdout.write(f"\rDownloading {local_filename}: {done_percentage}%")
#sys.stdout.flush()
if total_length is not None:
sys.stdout.write('\n') # Move the cursor to the next line after download completes
return True
except requests.RequestException as e:
print(f"\033[93mWarning: Failed to download {url}. Please download manually.\033[0m")
print(f"\033[91mError: {e}\033[0m")
return False
def extract_tar(file_path, extract_to='.'):
"""Extract tar file and handle errors."""
try:
with tarfile.open(file_path) as tar:
tar.extractall(path=extract_to)
os.remove(file_path)
except Exception as e:
print(f"\033[93mWarning: Failed to extract {file_path}.\033[0m")
def extract_zip(file_path, extract_to='.'):
"""Extract zip file and handle errors."""
try:
with zipfile.ZipFile(file_path, 'r') as zip_ref:
zip_ref.extractall(extract_to)
os.remove(file_path)
except Exception as e:
print(f"\033[93mWarning: Failed to extract {file_path}.\033[0m")
def extract_arch(file_path, extract_to='.', archive_type=''):
"""Extract tar/zip file and handle errors.
if archive_type='' (default) file will be moved
"""
if not archive_type:
_, file_extension = os.path.splitext(file_path)
if file_extension in ['.zip']:
archive_type = 'zip'
elif file_extension in ['.tar']:
archive_type = 'tar'
try:
if archive_type=='zip':
with zipfile.ZipFile(file_path, 'r') as zip_ref:
zip_ref.extractall(extract_to)
elif archive_type=='tar':
with tarfile.open(file_path) as tar:
tar.extractall(path=extract_to)
else:
#cp file to
destination_file = os.path.join(extract_to, os.path.basename(file_path))
shutil.copyfile(file_path,destination_file)
os.remove(file_path)
except Exception as e:
print(f"\033[93mWarning: Failed to extract {file_path}.\033[0m")
def create_dir(path):
"""Create a directory if it doesn't exist."""
os.makedirs(path, exist_ok=True)
def remove_dir(path):
"""Remove a directory if it exists."""
if os.path.exists(path) and os.path.isdir(path):
shutil.rmtree(path)
def main():
# Parse command-line argument (--source_files)
parser = argparse.ArgumentParser(description="Download and extract the raw source files of the RAG.")
parser.add_argument('--source_files', type=bool, default=False, help='Whether to download and extract source files (IPCC text reports).')
args = parser.parse_args()
# Load the YAML file
with open('data_sources.yml', 'r') as file:
data_config = yaml.safe_load(file)
base_path = data_config['base_path']
sources = data_config['sources']
#make subdirs list and clean it
subdirs = []
for entry in sources:
subdirs.append(entry['subdir'])
subdirs = set(subdirs)
subdirs = list(subdirs)
subdirs = [folder for folder in subdirs if folder not in ['.', './']]
for subdir in subdirs:
create_dir(os.path.join(base_path, subdir))
# Download and extract files
files_downloaded = []
files_skiped = []
urls_skiped = []
subdirs_skiped = []
for entry in sources:
file = entry['filename']
url = entry['url']
subdir = os.path.join(base_path, entry['subdir'])
# Skip downloading source files of RAG unless --source_files is set to True
if file == 'ipcc_text_reports.zip' and not args.source_files:
print("Skipping IPCC text report download as --source_files flag is not set or False.")
continue
if download_file(url, file):
extract_arch(file, subdir)
files_downloaded.append(file)
else:
files_skiped.append(file)
urls_skiped.append(url)
subdirs_skiped.append(subdir)
if (files_skiped):
print('\n')
print('----------------------------------------------')
print(f"\033[91mFiles not downloaded, please download manualy:\033[0m")
for i,file in enumerate(files_skiped):
print('--------')
print(f"\033[93mFile:\033[0m", file)
print(f"\033[93mUrl:\033[0m", urls_skiped[i])
print(f"\033[93munpack it into the:\033[0m ", subdirs_skiped[i])
print('--------')
# I would leave it for a while
#print('\n')
#print('----------------------------------------------')
#print("You also need to download the natural hazard data (for which you have to create a free account). Please download the CSV - Disaster Location Centroids [zip file] and unpack it into the 'data/natural_hazards' folder. Your file should automatically be called 'pend-gdis-1960-2018-disasterlocations.csv'. If not, please change the file name accordingly.")
#print(f"\033[93mhttps://sedac.ciesin.columbia.edu/data/set/pend-gdis-1960-2018/data-download\033[0m")
#print('-------------------')
if __name__ == "__main__":
main()
|