import asyncio import threading __all__ = ["get_async_loop", "run"] # Create a background event loop thread class AsyncLoopThread: def __init__(self): self.loop = asyncio.new_event_loop() self._thread = threading.Thread(target=self._start_loop, daemon=True) self._thread.start() def _start_loop(self): asyncio.set_event_loop(self.loop) self.loop.run_forever() def run(self, coro): # Schedule a coroutine onto the loop and block until it's done return asyncio.run_coroutine_threadsafe(coro, self.loop).result() # Create one global instance async_loop = None def get_async_loop(): global async_loop if async_loop is None: async_loop = AsyncLoopThread() return async_loop def run(coro): """Run a coroutine in the background event loop.""" return get_async_loop().run(coro)