Spaces:
Sleeping
Sleeping
File size: 1,941 Bytes
dbced4f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 | """
tasks.py — Celery async tasks for video segmentation.
"""
import os
import json
import logging
from celery import Celery
from inference import process_video, VOC_CLASSES
logger = logging.getLogger(__name__)
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
celery_app = Celery(
"video_seg",
broker=REDIS_URL,
backend=REDIS_URL,
)
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_track_started=True,
result_expires=3600, # results expire in 1 hour
)
@celery_app.task(bind=True, name="tasks.segment_video")
def segment_video_task(self, job_id: str, input_path: str, output_path: str):
"""
Celery task: runs video segmentation and updates progress via Redis.
Progress is stored in Celery's backend so FastAPI can poll it.
"""
try:
self.update_state(
state="PROGRESS",
meta={"pct": 0.0, "detected": [], "status": "starting"},
)
def on_progress(pct, detected_names):
self.update_state(
state="PROGRESS",
meta={
"pct": pct,
"detected": detected_names,
"status": "processing",
},
)
detected = process_video(
input_path=input_path,
output_path=output_path,
progress_callback=on_progress,
)
detected_names = [
VOC_CLASSES[c] for c in sorted(detected) if c < len(VOC_CLASSES)
]
return {
"status": "done",
"pct": 100.0,
"detected": detected_names,
"output_path": output_path,
}
except Exception as exc:
self.update_state(
state="FAILURE",
meta={"status": "error", "error": str(exc)},
)
raise exc
|