|
|
from __future__ import annotations |
|
|
|
|
|
from collections.abc import Callable, Coroutine |
|
|
from typing import TYPE_CHECKING, Any |
|
|
|
|
|
from loguru import logger |
|
|
|
|
|
from langflow.services.base import Service |
|
|
from langflow.services.task.backends.anyio import AnyIOBackend |
|
|
from langflow.services.task.utils import get_celery_worker_status |
|
|
|
|
|
if TYPE_CHECKING: |
|
|
from langflow.services.settings.service import SettingsService |
|
|
from langflow.services.task.backends.base import TaskBackend |
|
|
|
|
|
|
|
|
def check_celery_availability(): |
|
|
try: |
|
|
from langflow.worker import celery_app |
|
|
|
|
|
status = get_celery_worker_status(celery_app) |
|
|
logger.debug(f"Celery status: {status}") |
|
|
except Exception: |
|
|
logger.opt(exception=True).debug("Celery not available") |
|
|
status = {"availability": None} |
|
|
return status |
|
|
|
|
|
|
|
|
class TaskService(Service): |
|
|
name = "task_service" |
|
|
|
|
|
def __init__(self, settings_service: SettingsService): |
|
|
self.settings_service = settings_service |
|
|
try: |
|
|
if self.settings_service.settings.celery_enabled: |
|
|
status = check_celery_availability() |
|
|
|
|
|
use_celery = status.get("availability") is not None |
|
|
else: |
|
|
use_celery = False |
|
|
except ImportError: |
|
|
use_celery = False |
|
|
|
|
|
self.use_celery = use_celery |
|
|
self.backend = self.get_backend() |
|
|
|
|
|
@property |
|
|
def backend_name(self) -> str: |
|
|
return self.backend.name |
|
|
|
|
|
def get_backend(self) -> TaskBackend: |
|
|
if self.use_celery: |
|
|
from langflow.services.task.backends.celery import CeleryBackend |
|
|
|
|
|
logger.debug("Using Celery backend") |
|
|
return CeleryBackend() |
|
|
logger.debug("Using AnyIO backend") |
|
|
return AnyIOBackend() |
|
|
|
|
|
|
|
|
async def launch_and_await_task( |
|
|
self, |
|
|
task_func: Callable[..., Any], |
|
|
*args: Any, |
|
|
**kwargs: Any, |
|
|
) -> Any: |
|
|
if not self.use_celery: |
|
|
return None, await task_func(*args, **kwargs) |
|
|
if not hasattr(task_func, "apply"): |
|
|
msg = f"Task function {task_func} does not have an apply method" |
|
|
raise ValueError(msg) |
|
|
task = task_func.apply(args=args, kwargs=kwargs) |
|
|
|
|
|
result = task.get() |
|
|
|
|
|
if isinstance(result, Coroutine): |
|
|
result = await result |
|
|
return task.id, result |
|
|
|
|
|
async def launch_task(self, task_func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: |
|
|
logger.debug(f"Launching task {task_func} with args {args} and kwargs {kwargs}") |
|
|
logger.debug(f"Using backend {self.backend}") |
|
|
task = self.backend.launch_task(task_func, *args, **kwargs) |
|
|
return await task if isinstance(task, Coroutine) else task |
|
|
|
|
|
def get_task(self, task_id: str) -> Any: |
|
|
return self.backend.get_task(task_id) |
|
|
|