Spaces:
Running
Running
| from typing import TYPE_CHECKING | |
| if TYPE_CHECKING: | |
| import contextlib | |
| with contextlib.suppress(ImportError): | |
| from celery import Celery | |
| def get_celery_worker_status(app: "Celery"): | |
| i = app.control.inspect() | |
| availability = app.control.ping() | |
| stats = i.stats() | |
| registered_tasks = i.registered() | |
| active_tasks = i.active() | |
| scheduled_tasks = i.scheduled() | |
| return { | |
| "availability": availability, | |
| "stats": stats, | |
| "registered_tasks": registered_tasks, | |
| "active_tasks": active_tasks, | |
| "scheduled_tasks": scheduled_tasks, | |
| } | |