secureagentrag-api / utils /async_helpers.py
LeomordKaly's picture
deploy: phase 3 BYOK backend (Dockerfile.hf, FastAPI on 7860)
09ed8ca verified
raw
history blame contribute delete
811 Bytes
"""Async helper utilities for bridging sync and async contexts."""
from __future__ import annotations
import asyncio
from typing import Any
def run_async(coro) -> Any:
"""Run an async coroutine from a synchronous context.
Handles the case where an event loop may already be running
(e.g., inside Jupyter, Streamlit, or certain async frameworks).
Uses nest_asyncio when a loop is already running.
Args:
coro: Awaitable coroutine to execute.
Returns:
The result of the coroutine.
"""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop and loop.is_running():
import nest_asyncio
nest_asyncio.apply()
return loop.run_until_complete(coro)
else:
return asyncio.run(coro)