File size: 6,929 Bytes
be305fb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# Author: Marco Lustri 2022 - https://github.com/TheLustriVA
# MIT License

"""A script to make downloading the DiffusionDB dataset easier."""
from urllib.error import HTTPError
from urllib.request import urlretrieve
from alive_progress import alive_bar
from os.path import exists

import shutil
import os
import time
import argparse

index = None  # initiate main arguments as None
range_max = None
output = None
unzip = None
large = None

parser = argparse.ArgumentParser(description="Download a file from a URL")  #

# It's adding arguments to the parser.
parser.add_argument(
    "-i",
    "--index",
    type=int,
    default=1,
    help="File to download or lower bound of range if -r is set",
)
parser.add_argument(
    "-r",
    "--range",
    type=int,
    default=None,
    help="Upper bound of range if -i is provided",
)
parser.add_argument(
    "-o", "--output", type=str, default="images", help="Output directory name"
)
parser.add_argument(
    "-z",
    "--unzip",
    default=False,
    help="Unzip the file after downloading",
    # It's setting the argument to True if it's provided.
    action="store_true",
)
parser.add_argument(
    "-l",
    "--large",
    default=False,
    help="Download from DiffusionDB Large (14 million images)",
    action="store_true",
)

args = parser.parse_args()  # parse the arguments

# It's checking if the user has provided any arguments, and if they have, it
# sets the variables to the arguments.
if args.index:
    index = args.index
if args.range:
    range_max = args.range
if args.output:
    output = args.output
if args.unzip:
    unzip = args.unzip
if args.large:
    large = args.large



def download(index=1, range_index=0, output="", large=False):
    """
    Download a file from a URL and save it to a local file

    :param index: The index of the file to download, defaults to 1 (optional)
    :param range_index: The number of files to download. If you want to download
        all files, set this to the number of files you want to download,
        defaults to 0 (optional)
    :param output: The directory to download the files to :return: A list of
        files to unzip
    :param large: If downloading from DiffusionDB Large (14 million images)
        instead of DiffusionDB 2M (2 million images)
    """
    baseurl = "https://huggingface.co/datasets/poloclub/diffusiondb/resolve/main/"
    files_to_unzip = []

    if large:
        if index <= 10000:
            url = f"{baseurl}diffusiondb-large-part-1/part-{index:06}.zip"
        else:
            url = f"{baseurl}diffusiondb-large-part-2/part-{index:06}.zip"
    else:
        url = f"{baseurl}images/part-{index:06}.zip"

    if output != "":
        output = f"{output}/"

    if not exists(output):
        os.makedirs(output)

    if range_index == 0:
        print("Downloading file: ", url)
        file_path = f"{output}part-{index:06}.zip"
        try:
            urlretrieve(url, file_path)
        except HTTPError as e:
            print(f"Encountered an HTTPError downloading file: {url} - {e}")
        if unzip:
            unzip(file_path)
    else:
        # It's downloading the files numbered from index to range_index.
        with alive_bar(range_index - index, title="Downloading files") as bar:
            for idx in range(index, range_index):
                if large:
                    if idx <= 10000:
                        url = f"{baseurl}diffusiondb-large-part-1/part-{idx:06}.zip"
                    else:
                        url = f"{baseurl}diffusiondb-large-part-2/part-{idx:06}.zip"
                else:
                    url = f"{baseurl}images/part-{idx:06}.zip"

                loop_file_path = f"{output}part-{idx:06}.zip"
                # It's trying to download the file, and if it encounters an
                # HTTPError, it prints the error.
                try:
                    urlretrieve(url, loop_file_path)
                except HTTPError as e:
                    print(f"HTTPError downloading file: {url} - {e}")
                files_to_unzip.append(loop_file_path)
                # It's writing the url of the file to a manifest file.
                with open("manifest.txt", "a") as f:
                    f.write(url + "\n")
                time.sleep(0.1)
                bar()

    # It's checking if the user wants to unzip the files, and if they do, it
    # returns a list of files to unzip. It would be a bad idea to put these
    # together as the process is already lengthy.
    if unzip and len(files_to_unzip) > 0:
        return files_to_unzip


def unzip_file(file: str, extract_to: str = None):
   """
   > This function takes a zip file and unpacks it to specified directory

   :param file: str - path to zip file
   :param extract_to: str - directory to extract to (default: same name as zip file)
   :return: The extraction directory path
   """
   if extract_to is None:
       extract_to = file.replace('.zip', '')
   
   shutil.unpack_archive(file, extract_to)
   return f"File: {file} has been unzipped to {extract_to}"


def unzip_all(files: list):
    """
    > Unzip all files in a list of files

    :param files: list
    :type files: list
    """
    with alive_bar(len(files), title="Unzipping files") as bar:
        for file in files:
            unzip_file(file, '/home/user/app/images')
            time.sleep(0.1)
            bar()


def main(index=None, range_max=None, output=None, unzip=None, large=None):
    """
    `main` is a function that takes in an index, a range_max, an output, and an
    unzip, and if the user confirms that they have enough space, it downloads
    the files from the index to the output, and if unzip is true, it unzips them

    :param index: The index of the file you want to download
    :param range_max: The number of files to download
    :param output: The directory to download the files to
    :param unzip: If you want to unzip the files after downloading them, set
        this to True
    :param large: If you want to download from DiffusionDB Large (14 million
        images) instead of DiffusionDB 2M (2 million images)
    :return: A list of files that have been downloaded
    """
    if index and range_max:
        if range_max - index >= 1999:
            confirmation = input("Do you have at least 1.7Tb free: (y/n)")
            if confirmation != "y":
                return
        files = download(index, range_max, output, large)
        if unzip:
            unzip_all(files)
    elif index:
        download(index, output=output, large=large)
    else:
        print("No index provided")


# This is a common pattern in Python. It allows you to run the main function of
# your script by running the script through the interpreter. It also allows you
# to import the script into the interpreter without automatically running the
# main function.
if __name__ == "__main__":
    main(index, range_max, output, unzip, large)