Spaces:
Running
Running
| import traceback | |
| from collections.abc import Callable | |
| from typing import Any | |
| import anyio | |
| from loguru import logger | |
| from langflow.services.task.backends.base import TaskBackend | |
| class AnyIOTaskResult: | |
| def __init__(self, scope) -> None: | |
| self._scope = scope | |
| self._status = "PENDING" | |
| self._result = None | |
| self._exception: Exception | None = None | |
| def status(self) -> str: | |
| if self._status == "DONE": | |
| return "FAILURE" if self._exception is not None else "SUCCESS" | |
| return self._status | |
| def traceback(self) -> str: | |
| if self._traceback is not None: | |
| return "".join(traceback.format_tb(self._traceback)) | |
| return "" | |
| def result(self) -> Any: | |
| return self._result | |
| def ready(self) -> bool: | |
| return self._status == "DONE" | |
| async def run(self, func, *args, **kwargs) -> None: | |
| try: | |
| self._result = await func(*args, **kwargs) | |
| except Exception as e: # noqa: BLE001 | |
| self._exception = e | |
| self._traceback = e.__traceback__ | |
| finally: | |
| self._status = "DONE" | |
| class AnyIOBackend(TaskBackend): | |
| name = "anyio" | |
| def __init__(self) -> None: | |
| self.tasks: dict[str, AnyIOTaskResult] = {} | |
| async def launch_task( | |
| self, task_func: Callable[..., Any], *args: Any, **kwargs: Any | |
| ) -> tuple[str | None, AnyIOTaskResult | None]: | |
| """Launch a new task in an asynchronous manner. | |
| Parameters: | |
| task_func: The asynchronous function to run. | |
| *args: Positional arguments to pass to task_func. | |
| **kwargs: Keyword arguments to pass to task_func. | |
| Returns: | |
| A tuple containing a unique task ID and the task result object. | |
| """ | |
| async with anyio.create_task_group() as tg: | |
| try: | |
| task_result = AnyIOTaskResult(tg) | |
| tg.start_soon(task_result.run, task_func, *args, **kwargs) | |
| except Exception: # noqa: BLE001 | |
| logger.exception("An error occurred while launching the task") | |
| return None, None | |
| task_id = str(id(task_result)) | |
| self.tasks[task_id] = task_result | |
| logger.info(f"Task {task_id} started.") | |
| return task_id, task_result | |
| def get_task(self, task_id: str) -> Any: | |
| return self.tasks.get(task_id) | |