JustinTX's picture
Add files using upload-large-folder tool
d7b3a74 verified
import asyncio
import threading
__all__ = ["get_async_loop", "run"]
# Create a background event loop thread
class AsyncLoopThread:
def __init__(self):
self.loop = asyncio.new_event_loop()
self._thread = threading.Thread(target=self._start_loop, daemon=True)
self._thread.start()
def _start_loop(self):
asyncio.set_event_loop(self.loop)
self.loop.run_forever()
def run(self, coro):
# Schedule a coroutine onto the loop and block until it's done
return asyncio.run_coroutine_threadsafe(coro, self.loop).result()
# Create one global instance
async_loop = None
def get_async_loop():
global async_loop
if async_loop is None:
async_loop = AsyncLoopThread()
return async_loop
def run(coro):
"""Run a coroutine in the background event loop."""
return get_async_loop().run(coro)