|
|
from __future__ import annotations |
|
|
|
|
|
from typing import TYPE_CHECKING, Any |
|
|
|
|
|
from asgiref.sync import async_to_sync |
|
|
from celery.exceptions import SoftTimeLimitExceeded |
|
|
|
|
|
from langflow.core.celery_app import celery_app |
|
|
|
|
|
if TYPE_CHECKING: |
|
|
from langflow.graph.vertex.base import Vertex |
|
|
|
|
|
|
|
|
@celery_app.task(acks_late=True) |
|
|
def test_celery(word: str) -> str: |
|
|
return f"test task return {word}" |
|
|
|
|
|
|
|
|
@celery_app.task(bind=True, soft_time_limit=30, max_retries=3) |
|
|
def build_vertex(self, vertex: Vertex) -> Vertex: |
|
|
"""Build a vertex.""" |
|
|
try: |
|
|
vertex.task_id = self.request.id |
|
|
async_to_sync(vertex.build)() |
|
|
except SoftTimeLimitExceeded as e: |
|
|
raise self.retry(exc=SoftTimeLimitExceeded("Task took too long"), countdown=2) from e |
|
|
return vertex |
|
|
|
|
|
|
|
|
@celery_app.task(acks_late=True) |
|
|
def process_graph_cached_task() -> dict[str, Any]: |
|
|
msg = "This task is not implemented yet" |
|
|
raise NotImplementedError(msg) |
|
|
|