File size: 1,941 Bytes
dbced4f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
"""
tasks.py — Celery async tasks for video segmentation.
"""

import os
import json
import logging
from celery import Celery
from inference import process_video, VOC_CLASSES

logger = logging.getLogger(__name__)

REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")

celery_app = Celery(
    "video_seg",
    broker=REDIS_URL,
    backend=REDIS_URL,
)

celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="UTC",
    enable_utc=True,
    task_track_started=True,
    result_expires=3600,  # results expire in 1 hour
)


@celery_app.task(bind=True, name="tasks.segment_video")
def segment_video_task(self, job_id: str, input_path: str, output_path: str):
    """
    Celery task: runs video segmentation and updates progress via Redis.
    Progress is stored in Celery's backend so FastAPI can poll it.
    """
    try:
        self.update_state(
            state="PROGRESS",
            meta={"pct": 0.0, "detected": [], "status": "starting"},
        )

        def on_progress(pct, detected_names):
            self.update_state(
                state="PROGRESS",
                meta={
                    "pct": pct,
                    "detected": detected_names,
                    "status": "processing",
                },
            )

        detected = process_video(
            input_path=input_path,
            output_path=output_path,
            progress_callback=on_progress,
        )

        detected_names = [
            VOC_CLASSES[c] for c in sorted(detected) if c < len(VOC_CLASSES)
        ]

        return {
            "status": "done",
            "pct": 100.0,
            "detected": detected_names,
            "output_path": output_path,
        }

    except Exception as exc:
        self.update_state(
            state="FAILURE",
            meta={"status": "error", "error": str(exc)},
        )
        raise exc