jesshewyz's picture
Upload folder using huggingface_hub
825942f verified
# Asynchronous Getting
# 0. Get JSON / READ / TEXT / RESPONSE (sync)
# 1. Toggleable Caching - render_chat.py:get_images_cached(urls)
# 2. Configurable Connection Limit - render_chat.py:get_json_batch(urls, limit=100)
# 3. Toggleable AsyncInfo - Log status url reason counts
# 4. Handle status codes 404 and non-200
# 5. Configureable url2header Function
# 6. POST with payload
from box.timer import Timer # Single Use
timer = Timer()
from box.db import db_init
from box.color import Code
# import aiohttp
# import asyncio
import json
aiohttp = None
asyncio = None
connection = None
class Config:
cache = True
log = True
url2header = lambda url: {}
url2cookie = lambda url: {}
limit = 100
class Response:
def __init__(self, response, obj):
self.url = str(response.url)
self.status = response.status
self.ok = response.ok
self.reason = response.reason
self.headers = {row[0]: row[1] for row in response.headers.items()}
self.obj = obj
self.total_bytes = response.content.total_bytes
def __repr__(self):
return f"Response <{self.status}>"
class AsyncInfo:
def __init__(self, lock):
self.lock = lock
self.results = []
self.total = 0
self.count_pass = 0
self.count_fail = 0
self.code_counter = {}
def add(self, response):
url = response.url
reason = response.reason
status = response.status
self.results.append({ 'url': url, 'status': status, 'reason': reason })
self.total = self.total + 1
if status == 200:
self.count_pass = self.count_pass + 1
else:
self.count_fail = self.count_fail + 1
self.code_counter[status] = self.code_counter.get(status, 0) + 1
def print(self):
code_strings = []
for code in sorted(self.code_counter.keys()):
count = self.code_counter[code]
color = Code.GREEN if code < 400 else Code.RED
code_strings.append(color + f"{code}: {count}")
code_string = "{ " + ", ".join(code_strings) + " }"
print(f"Total: {self.total} Pass: {self.count_pass} Fail: {self.count_fail} Codes: {code_string}", end="\r")
def configure_asyncio():
global asyncio
global aiohttp
import asyncio
import aiohttp
if Config.log:
global async_info
async_info = AsyncInfo(asyncio.Lock())
import platform
if platform.system() == 'Windows':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
def is_binary(content_type):
mime_type, subtype = content_type.split("/")
if mime_type == "text":
return False
if mime_type != "application":
return True
return subtype not in ["json", "ld+json", "x-httpd-php", "x-sh", "x-csh", "xhtml+xml", "xml"]
# read text json content_type
async def async_response_to_sync_response(response):
# https://stackoverflow.com/questions/57565577/how-to-return-aiohttp-like-python-requests
# https://docs.aiohttp.org/en/stable/client_reference.html#response-object
if response.content_type == 'application/json':
obj = await response.json()
elif is_binary(response.content_type):
obj = await response.read()
else:
obj = await response.text()
return Response(response, obj)
async def log_response(response):
if Config.log:
async with async_info.lock:
async_info.add(response)
async_info.print()
async def get_async(url, session):
headers = Config.url2header(url)
cookies = Config.url2cookie(url)
# breakpoint()
response = await session.get(url, headers=headers, cookies=cookies)
async with response:
sync_response = await async_response_to_sync_response(response)
await log_response(response)
return sync_response
async def batch_get_async(urls):
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=Config.limit), trust_env=True) as session:
cors = [ get_async(url, session) for url in urls ]
responses = await asyncio.gather(*cors)
if Config.log and len(urls) > 0:
print()
return responses
def get_nocache(urls):
configure_asyncio()
# [ obj ]
responses = asyncio.run(batch_get_async(urls))
return responses
# -- START: CACHE --
def to_temp_table(cursor, items, table_string):
table_name = table_string.split(" ")[0]
cursor.execute(f"CREATE TEMP TABLE {table_string};")
cursor.executemany(f"INSERT INTO {table_name} VALUES (?);", [ (item,) for item in items ])
def get_cached_urls(urls):
cursor = connection.cursor()
to_temp_table(cursor, urls, "urls (url TEXT UNIQUE)")
uncached_urls = [ url for url, in cursor.execute("SELECT url FROM urls WHERE url IN (SELECT url FROM cache);").fetchall() ]
cursor.execute("DROP TABLE urls;")
return uncached_urls
def get_uncached_urls(urls):
cursor = connection.cursor()
to_temp_table(cursor, urls, "urls (url TEXT UNIQUE)")
uncached_urls = [ url for url, in cursor.execute("SELECT url FROM urls WHERE url NOT IN (SELECT url FROM cache);").fetchall() ]
cursor.execute("DROP TABLE urls;")
return uncached_urls
def object_to_type(obj):
if type(obj) is str:
return 'text'
elif type(obj) is bytes:
return 'binary'
else:
return 'json'
def cache(urls, objects):
cursor = connection.cursor()
tuples = []
for url, obj in zip(urls, objects):
obj_type = object_to_type(obj)
obj = json.dumps(obj) if obj_type == 'json' else obj
T = (url, obj_type, obj)
tuples.append(T)
cursor.executemany("INSERT INTO cache (url, type, response) VALUES (?, ?, ?);", tuples)
connection.commit()
def cache_get(urls):
cursor = connection.cursor()
to_temp_table(cursor, urls, "urls (url TEXT UNIQUE)")
url_to_object_D = { url: response for url, response in cursor.execute("SELECT url, response FROM cache WHERE url IN (SELECT url FROM urls);") }
url_to_type_D = { url: type for url, type in cursor.execute("SELECT url, type FROM cache WHERE url IN (SELECT url FROM urls);") }
objects = []
for url in urls:
object_type = url_to_type_D[url]
obj = url_to_object_D[url]
if object_type == 'json':
obj = json.loads(obj)
objects.append(obj)
cursor.execute("DROP TABLE urls;")
return objects
def get_yescache(urls):
# -> [ obj | Response ]
cursor = connection.cursor()
if DEBUG:
cursor.execute("DROP TABLE IF EXISTS cache;")
cursor.execute("CREATE TABLE IF NOT EXISTS cache (url TEXT UNIQUE, type TEXT, response);")
unique_urls = list(set(urls))
uncached_urls = get_uncached_urls(unique_urls)
responses = get_nocache(uncached_urls) if len(uncached_urls) > 0 else []
pass_objects = [ response.obj for response in responses if response.ok ]
cache(uncached_urls, pass_objects)
cached_urls = get_cached_urls(unique_urls)
cached_objects = cache_get(cached_urls)
D1 = { url: obj for url, obj in zip(cached_urls, cached_objects) }
D2 = { url: response.obj if response.ok else response for url, response in zip(uncached_urls, responses) }
url_to_object_D = D1 | D2 # Dictionary Union
objects = [ url_to_object_D[url] for url in urls ]
return objects
# -- END: CACHE --
def aget(urls, cache=True, log=True, limit=100, url2header=lambda url: {}, url2cookie=lambda url: {}):
# START: Configuration
global DEBUG
global Config
DEBUG = False
Config.cache = cache
Config.log = log
Config.url2header = url2header
Config.url2cookie = url2cookie
Config.limit = limit
if Config.cache:
global connection
connection = db_init("aget_cache/cache.db")
# END: Configuration
if DEBUG:
print(Code.RED + "Debug: True")
if cache:
objects = get_yescache(urls)
return objects
else:
responses = get_nocache(urls)
objects = [ response.obj for response in responses ]
return objects
def run_async_get():
json_url = "https://api.betterttv.net/3/cached/emotes/global"
image_url = "https://cdn.betterttv.net/emote/56e9f494fff3cc5c35e5287e/3x"
html_url = "https://example.com/"
text_url = "https://www.w3.org/TR/PNG/iso_8859-1.txt"
dupe_url = "https://www.google.com"
not_found_url = "https://cdn.betterttv.net/emote/404/3x"
urls = [ json_url, image_url, html_url, text_url, not_found_url, dupe_url, dupe_url ]
objects = aget(urls, cache=True, log=True)
ic(objects)
return objects
if __name__ == "__main__":
from box.ic import ic
from box.error import handler
with handler():
run_async_get()