File size: 8,780 Bytes
825942f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265

# Asynchronous Getting
# 0. Get JSON / READ / TEXT / RESPONSE (sync)
# 1. Toggleable Caching - render_chat.py:get_images_cached(urls)
# 2. Configurable Connection Limit - render_chat.py:get_json_batch(urls, limit=100)
# 3. Toggleable AsyncInfo - Log status url reason counts
# 4. Handle status codes 404 and non-200
# 5. Configureable url2header Function
# 6. POST with payload

from box.timer import Timer   # Single Use
timer = Timer()

from box.db import db_init
from box.color import Code

# import aiohttp
# import asyncio
import json

aiohttp = None
asyncio = None
connection = None

class Config:
    cache = True
    log = True
    url2header = lambda url: {}
    url2cookie = lambda url: {}
    limit = 100

class Response:
    def __init__(self, response, obj):
        self.url = str(response.url)
        self.status = response.status
        self.ok = response.ok
        self.reason = response.reason
        self.headers = {row[0]: row[1] for row in response.headers.items()}
        self.obj = obj
        self.total_bytes = response.content.total_bytes
    def __repr__(self):
        return f"Response <{self.status}>"


class AsyncInfo:
    def __init__(self, lock):
        self.lock = lock
        self.results = []
        self.total = 0
        self.count_pass = 0
        self.count_fail = 0
        self.code_counter = {}

    def add(self, response):
        url = response.url
        reason = response.reason
        status = response.status

        self.results.append({ 'url': url, 'status': status, 'reason': reason })
        self.total = self.total + 1 
        if status == 200:
            self.count_pass = self.count_pass + 1
        else:
            self.count_fail = self.count_fail + 1
        self.code_counter[status] = self.code_counter.get(status, 0) + 1

    def print(self):
        code_strings = []
        for code in sorted(self.code_counter.keys()):
            count = self.code_counter[code]
            color = Code.GREEN if code < 400 else Code.RED
            code_strings.append(color + f"{code}: {count}")
        code_string = "{ " + ", ".join(code_strings) + " }"
        print(f"Total: {self.total} Pass: {self.count_pass} Fail: {self.count_fail} Codes: {code_string}", end="\r")

def configure_asyncio():
    global asyncio
    global aiohttp
    import asyncio
    import aiohttp

    if Config.log:
        global async_info
        async_info = AsyncInfo(asyncio.Lock())

    import platform
    if platform.system() == 'Windows':
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())


def is_binary(content_type):
    mime_type, subtype = content_type.split("/")

    if mime_type == "text":
        return False
    if mime_type != "application":
        return True
    return subtype not in ["json", "ld+json", "x-httpd-php", "x-sh", "x-csh", "xhtml+xml", "xml"]

# read text json content_type
async def async_response_to_sync_response(response):
    # https://stackoverflow.com/questions/57565577/how-to-return-aiohttp-like-python-requests
    # https://docs.aiohttp.org/en/stable/client_reference.html#response-object
    if response.content_type == 'application/json':
        obj = await response.json()
    elif is_binary(response.content_type):
        obj = await response.read()
    else:
        obj = await response.text()
    return Response(response, obj)

async def log_response(response):
    if Config.log:
        async with async_info.lock:
            async_info.add(response)
            async_info.print()

async def get_async(url, session):
    headers = Config.url2header(url)
    cookies = Config.url2cookie(url)
    # breakpoint()
    response = await session.get(url, headers=headers, cookies=cookies)
    async with response:
        sync_response = await async_response_to_sync_response(response)
    await log_response(response)

    return sync_response

async def batch_get_async(urls):
    async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=Config.limit), trust_env=True) as session:
        cors = [ get_async(url, session) for url in urls ]
        responses = await asyncio.gather(*cors)
    if Config.log and len(urls) > 0:
        print()
    return responses

def get_nocache(urls):
    configure_asyncio()
    # [ obj ]
    responses = asyncio.run(batch_get_async(urls))
    return responses

# -- START: CACHE --
def to_temp_table(cursor, items, table_string):
    table_name = table_string.split(" ")[0]
    cursor.execute(f"CREATE TEMP TABLE {table_string};")
    cursor.executemany(f"INSERT INTO {table_name} VALUES (?);", [ (item,) for item in items ])

def get_cached_urls(urls):
    cursor = connection.cursor()
    to_temp_table(cursor, urls, "urls (url TEXT UNIQUE)")
    uncached_urls = [ url for url, in cursor.execute("SELECT url FROM urls WHERE url IN (SELECT url FROM cache);").fetchall() ]
    cursor.execute("DROP TABLE urls;")
    return uncached_urls


def get_uncached_urls(urls):
    cursor = connection.cursor()
    to_temp_table(cursor, urls, "urls (url TEXT UNIQUE)")
    uncached_urls = [ url for url, in cursor.execute("SELECT url FROM urls WHERE url NOT IN (SELECT url FROM cache);").fetchall() ]
    cursor.execute("DROP TABLE urls;")
    return uncached_urls

def object_to_type(obj):
    if type(obj) is str:
        return 'text'
    elif type(obj) is bytes:
        return 'binary'
    else:
        return 'json'

def cache(urls, objects):
    cursor = connection.cursor()
    tuples = []
    for url, obj in zip(urls, objects):
        obj_type = object_to_type(obj)
        obj = json.dumps(obj) if obj_type == 'json' else obj
        T = (url, obj_type, obj)
        tuples.append(T)

    cursor.executemany("INSERT INTO cache (url, type, response) VALUES (?, ?, ?);", tuples)
    connection.commit()

def cache_get(urls):
    cursor = connection.cursor()
    to_temp_table(cursor, urls, "urls (url TEXT UNIQUE)")
    url_to_object_D = { url: response for url, response in cursor.execute("SELECT url, response FROM cache WHERE url IN (SELECT url FROM urls);") }
    url_to_type_D = { url: type for url, type in cursor.execute("SELECT url, type FROM cache WHERE url IN (SELECT url FROM urls);") }
    objects = []
    for url in urls:
        object_type = url_to_type_D[url]
        obj = url_to_object_D[url]
        if object_type == 'json':
            obj = json.loads(obj)
        objects.append(obj)
    cursor.execute("DROP TABLE urls;")
    return objects

def get_yescache(urls):
    # -> [ obj | Response ]
    cursor = connection.cursor()
    if DEBUG:
        cursor.execute("DROP TABLE IF EXISTS cache;")
    cursor.execute("CREATE TABLE IF NOT EXISTS cache (url TEXT UNIQUE, type TEXT, response);")

    unique_urls = list(set(urls))
    uncached_urls = get_uncached_urls(unique_urls)
    responses = get_nocache(uncached_urls) if len(uncached_urls) > 0 else []
    pass_objects = [ response.obj for response in responses if response.ok ]
    cache(uncached_urls, pass_objects)

    cached_urls = get_cached_urls(unique_urls)
    cached_objects = cache_get(cached_urls)
    D1 = { url: obj for url, obj in zip(cached_urls, cached_objects) }
    D2 = { url: response.obj if response.ok else response for url, response in zip(uncached_urls, responses) }
    url_to_object_D = D1 | D2 # Dictionary Union
    objects = [ url_to_object_D[url] for url in urls ]
    return objects

# -- END: CACHE --

def aget(urls, cache=True, log=True, limit=100, url2header=lambda url: {}, url2cookie=lambda url: {}):
    # START: Configuration

    global DEBUG
    global Config
    DEBUG = False
    Config.cache = cache
    Config.log = log
    Config.url2header = url2header
    Config.url2cookie = url2cookie
    Config.limit = limit
    if Config.cache:
        global connection
        connection = db_init("aget_cache/cache.db")
    # END: Configuration
    if DEBUG:
        print(Code.RED + "Debug: True")
    if cache:
        objects = get_yescache(urls)
        return objects
    else:
        responses = get_nocache(urls)
        objects = [ response.obj for response in responses ]
        return objects

def run_async_get():
    json_url  = "https://api.betterttv.net/3/cached/emotes/global"
    image_url = "https://cdn.betterttv.net/emote/56e9f494fff3cc5c35e5287e/3x"
    html_url  = "https://example.com/"
    text_url = "https://www.w3.org/TR/PNG/iso_8859-1.txt"
    dupe_url = "https://www.google.com"
    not_found_url = "https://cdn.betterttv.net/emote/404/3x"

    urls = [ json_url, image_url, html_url, text_url, not_found_url, dupe_url, dupe_url ]
    objects = aget(urls, cache=True, log=True)
    ic(objects)
    return objects

if __name__ == "__main__":
    from box.ic import ic
    from box.error import handler
    with handler():
        run_async_get()