Spaces:
Running
Running
| from __future__ import annotations | |
| from typing import TYPE_CHECKING, Any | |
| from asgiref.sync import async_to_sync | |
| from celery.exceptions import SoftTimeLimitExceeded | |
| from langflow.core.celery_app import celery_app | |
| if TYPE_CHECKING: | |
| from langflow.graph.vertex.base import Vertex | |
| def test_celery(word: str) -> str: | |
| return f"test task return {word}" | |
| def build_vertex(self, vertex: Vertex) -> Vertex: | |
| """Build a vertex.""" | |
| try: | |
| vertex.task_id = self.request.id | |
| async_to_sync(vertex.build)() | |
| except SoftTimeLimitExceeded as e: | |
| raise self.retry(exc=SoftTimeLimitExceeded("Task took too long"), countdown=2) from e | |
| return vertex | |
| def process_graph_cached_task() -> dict[str, Any]: | |
| msg = "This task is not implemented yet" | |
| raise NotImplementedError(msg) | |