Spaces:
Runtime error
Runtime error
| # Asynchronous Getting | |
| # 0. Get JSON / READ / TEXT / RESPONSE (sync) | |
| # 1. Toggleable Caching - render_chat.py:get_images_cached(urls) | |
| # 2. Configurable Connection Limit - render_chat.py:get_json_batch(urls, limit=100) | |
| # 3. Toggleable AsyncInfo - Log status url reason counts | |
| # 4. Handle status codes 404 and non-200 | |
| # 5. Configureable url2header Function | |
| # 6. POST with payload | |
| from box.timer import Timer # Single Use | |
| timer = Timer() | |
| from box.db import db_init | |
| from box.color import Code | |
| # import aiohttp | |
| # import asyncio | |
| import json | |
| aiohttp = None | |
| asyncio = None | |
| connection = None | |
| class Config: | |
| cache = True | |
| log = True | |
| url2header = lambda url: {} | |
| url2cookie = lambda url: {} | |
| limit = 100 | |
| class Response: | |
| def __init__(self, response, obj): | |
| self.url = str(response.url) | |
| self.status = response.status | |
| self.ok = response.ok | |
| self.reason = response.reason | |
| self.headers = {row[0]: row[1] for row in response.headers.items()} | |
| self.obj = obj | |
| self.total_bytes = response.content.total_bytes | |
| def __repr__(self): | |
| return f"Response <{self.status}>" | |
| class AsyncInfo: | |
| def __init__(self, lock): | |
| self.lock = lock | |
| self.results = [] | |
| self.total = 0 | |
| self.count_pass = 0 | |
| self.count_fail = 0 | |
| self.code_counter = {} | |
| def add(self, response): | |
| url = response.url | |
| reason = response.reason | |
| status = response.status | |
| self.results.append({ 'url': url, 'status': status, 'reason': reason }) | |
| self.total = self.total + 1 | |
| if status == 200: | |
| self.count_pass = self.count_pass + 1 | |
| else: | |
| self.count_fail = self.count_fail + 1 | |
| self.code_counter[status] = self.code_counter.get(status, 0) + 1 | |
| def print(self): | |
| code_strings = [] | |
| for code in sorted(self.code_counter.keys()): | |
| count = self.code_counter[code] | |
| color = Code.GREEN if code < 400 else Code.RED | |
| code_strings.append(color + f"{code}: {count}") | |
| code_string = "{ " + ", ".join(code_strings) + " }" | |
| print(f"Total: {self.total} Pass: {self.count_pass} Fail: {self.count_fail} Codes: {code_string}", end="\r") | |
| def configure_asyncio(): | |
| global asyncio | |
| global aiohttp | |
| import asyncio | |
| import aiohttp | |
| if Config.log: | |
| global async_info | |
| async_info = AsyncInfo(asyncio.Lock()) | |
| import platform | |
| if platform.system() == 'Windows': | |
| asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) | |
| def is_binary(content_type): | |
| mime_type, subtype = content_type.split("/") | |
| if mime_type == "text": | |
| return False | |
| if mime_type != "application": | |
| return True | |
| return subtype not in ["json", "ld+json", "x-httpd-php", "x-sh", "x-csh", "xhtml+xml", "xml"] | |
| # read text json content_type | |
| async def async_response_to_sync_response(response): | |
| # https://stackoverflow.com/questions/57565577/how-to-return-aiohttp-like-python-requests | |
| # https://docs.aiohttp.org/en/stable/client_reference.html#response-object | |
| if response.content_type == 'application/json': | |
| obj = await response.json() | |
| elif is_binary(response.content_type): | |
| obj = await response.read() | |
| else: | |
| obj = await response.text() | |
| return Response(response, obj) | |
| async def log_response(response): | |
| if Config.log: | |
| async with async_info.lock: | |
| async_info.add(response) | |
| async_info.print() | |
| async def get_async(url, session): | |
| headers = Config.url2header(url) | |
| cookies = Config.url2cookie(url) | |
| # breakpoint() | |
| response = await session.get(url, headers=headers, cookies=cookies) | |
| async with response: | |
| sync_response = await async_response_to_sync_response(response) | |
| await log_response(response) | |
| return sync_response | |
| async def batch_get_async(urls): | |
| async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=Config.limit), trust_env=True) as session: | |
| cors = [ get_async(url, session) for url in urls ] | |
| responses = await asyncio.gather(*cors) | |
| if Config.log and len(urls) > 0: | |
| print() | |
| return responses | |
| def get_nocache(urls): | |
| configure_asyncio() | |
| # [ obj ] | |
| responses = asyncio.run(batch_get_async(urls)) | |
| return responses | |
| # -- START: CACHE -- | |
| def to_temp_table(cursor, items, table_string): | |
| table_name = table_string.split(" ")[0] | |
| cursor.execute(f"CREATE TEMP TABLE {table_string};") | |
| cursor.executemany(f"INSERT INTO {table_name} VALUES (?);", [ (item,) for item in items ]) | |
| def get_cached_urls(urls): | |
| cursor = connection.cursor() | |
| to_temp_table(cursor, urls, "urls (url TEXT UNIQUE)") | |
| uncached_urls = [ url for url, in cursor.execute("SELECT url FROM urls WHERE url IN (SELECT url FROM cache);").fetchall() ] | |
| cursor.execute("DROP TABLE urls;") | |
| return uncached_urls | |
| def get_uncached_urls(urls): | |
| cursor = connection.cursor() | |
| to_temp_table(cursor, urls, "urls (url TEXT UNIQUE)") | |
| uncached_urls = [ url for url, in cursor.execute("SELECT url FROM urls WHERE url NOT IN (SELECT url FROM cache);").fetchall() ] | |
| cursor.execute("DROP TABLE urls;") | |
| return uncached_urls | |
| def object_to_type(obj): | |
| if type(obj) is str: | |
| return 'text' | |
| elif type(obj) is bytes: | |
| return 'binary' | |
| else: | |
| return 'json' | |
| def cache(urls, objects): | |
| cursor = connection.cursor() | |
| tuples = [] | |
| for url, obj in zip(urls, objects): | |
| obj_type = object_to_type(obj) | |
| obj = json.dumps(obj) if obj_type == 'json' else obj | |
| T = (url, obj_type, obj) | |
| tuples.append(T) | |
| cursor.executemany("INSERT INTO cache (url, type, response) VALUES (?, ?, ?);", tuples) | |
| connection.commit() | |
| def cache_get(urls): | |
| cursor = connection.cursor() | |
| to_temp_table(cursor, urls, "urls (url TEXT UNIQUE)") | |
| url_to_object_D = { url: response for url, response in cursor.execute("SELECT url, response FROM cache WHERE url IN (SELECT url FROM urls);") } | |
| url_to_type_D = { url: type for url, type in cursor.execute("SELECT url, type FROM cache WHERE url IN (SELECT url FROM urls);") } | |
| objects = [] | |
| for url in urls: | |
| object_type = url_to_type_D[url] | |
| obj = url_to_object_D[url] | |
| if object_type == 'json': | |
| obj = json.loads(obj) | |
| objects.append(obj) | |
| cursor.execute("DROP TABLE urls;") | |
| return objects | |
| def get_yescache(urls): | |
| # -> [ obj | Response ] | |
| cursor = connection.cursor() | |
| if DEBUG: | |
| cursor.execute("DROP TABLE IF EXISTS cache;") | |
| cursor.execute("CREATE TABLE IF NOT EXISTS cache (url TEXT UNIQUE, type TEXT, response);") | |
| unique_urls = list(set(urls)) | |
| uncached_urls = get_uncached_urls(unique_urls) | |
| responses = get_nocache(uncached_urls) if len(uncached_urls) > 0 else [] | |
| pass_objects = [ response.obj for response in responses if response.ok ] | |
| cache(uncached_urls, pass_objects) | |
| cached_urls = get_cached_urls(unique_urls) | |
| cached_objects = cache_get(cached_urls) | |
| D1 = { url: obj for url, obj in zip(cached_urls, cached_objects) } | |
| D2 = { url: response.obj if response.ok else response for url, response in zip(uncached_urls, responses) } | |
| url_to_object_D = D1 | D2 # Dictionary Union | |
| objects = [ url_to_object_D[url] for url in urls ] | |
| return objects | |
| # -- END: CACHE -- | |
| def aget(urls, cache=True, log=True, limit=100, url2header=lambda url: {}, url2cookie=lambda url: {}): | |
| # START: Configuration | |
| global DEBUG | |
| global Config | |
| DEBUG = False | |
| Config.cache = cache | |
| Config.log = log | |
| Config.url2header = url2header | |
| Config.url2cookie = url2cookie | |
| Config.limit = limit | |
| if Config.cache: | |
| global connection | |
| connection = db_init("aget_cache/cache.db") | |
| # END: Configuration | |
| if DEBUG: | |
| print(Code.RED + "Debug: True") | |
| if cache: | |
| objects = get_yescache(urls) | |
| return objects | |
| else: | |
| responses = get_nocache(urls) | |
| objects = [ response.obj for response in responses ] | |
| return objects | |
| def run_async_get(): | |
| json_url = "https://api.betterttv.net/3/cached/emotes/global" | |
| image_url = "https://cdn.betterttv.net/emote/56e9f494fff3cc5c35e5287e/3x" | |
| html_url = "https://example.com/" | |
| text_url = "https://www.w3.org/TR/PNG/iso_8859-1.txt" | |
| dupe_url = "https://www.google.com" | |
| not_found_url = "https://cdn.betterttv.net/emote/404/3x" | |
| urls = [ json_url, image_url, html_url, text_url, not_found_url, dupe_url, dupe_url ] | |
| objects = aget(urls, cache=True, log=True) | |
| ic(objects) | |
| return objects | |
| if __name__ == "__main__": | |
| from box.ic import ic | |
| from box.error import handler | |
| with handler(): | |
| run_async_get() | |