File size: 3,981 Bytes
9bed109
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from __future__ import annotations

import logging
from uuid import UUID, uuid4

from fastapi import APIRouter, Depends, Header, Request, status
from fastapi.responses import JSONResponse
from redis.exceptions import RedisError
from shared.pipeline_log import celery_trace_headers, pipeline_event
from shared.schemas.render_api import RenderEnqueueBody, RenderEnqueueResponse
from worker.tasks import render_manim_scene

from backend.api.access import project_readable_by_user
from backend.api.deps import get_content_store, get_job_store, get_request_user_id
from backend.core.config import settings
from backend.core.correlation import get_request_id
from backend.core.limiter import limiter
from backend.db.base import ContentStore
from backend.services.job_store import RedisRenderJobStore

logger = logging.getLogger(__name__)

router = APIRouter(tags=["render"])


@router.post("/{project_id}/render", summary="Enqueue Manim render job")
@limiter.limit("5/minute")
def enqueue_render(
    project_id: UUID,
    body: RenderEnqueueBody,
    request: Request,  # Needed for slowapi
    user_id: UUID = Depends(get_request_user_id),  # noqa: B008
    content: ContentStore = Depends(get_content_store),  # noqa: B008
    store: RedisRenderJobStore = Depends(get_job_store),  # noqa: B008
    x_idempotency_key: str | None = Header(None, alias="X-Idempotency-Key"),
) -> JSONResponse:
    """Create a render job in Redis and enqueue Celery (Manim runs in worker only)."""
    project_readable_by_user(content, project_id, user_id)

    # 1. Idempotency Check
    if x_idempotency_key:
        existing_job_id = store.get_idempotent_job_id(x_idempotency_key)
        if existing_job_id:
            logger.info("Idempotency hit for key: %s", x_idempotency_key)
            payload = RenderEnqueueResponse(job_id=existing_job_id).model_dump(mode="json")
            return JSONResponse(status_code=status.HTTP_200_OK, content=payload)

    job_id = uuid4()
    trace_id = get_request_id()
    webhook = str(body.webhook_url) if body.webhook_url is not None else None
    try:
        store.create_queued_job(
            job_id=job_id,
            project_id=project_id,
            scene_id=body.scene_id,
            job_type=body.render_type,
            render_quality=body.quality,
            webhook_url=webhook,
            docker_image_tag=settings.worker_image_tag,
        )
        if x_idempotency_key:
            store.set_idempotent_job_id(x_idempotency_key, job_id)

        pipeline_event(
            "api.render",
            "job_queued",
            "Render job stored in Redis",
            trace_id=trace_id,
            job_id=str(job_id),
            project_id=str(project_id),
            scene_id=str(body.scene_id),
            details={"render_type": body.render_type, "quality": body.quality},
        )
    except RedisError:
        logger.exception("Redis failure while creating render job")
        return JSONResponse(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            content={"error": {"code": "redis_unavailable", "message": "Job store unavailable"}},
        )

    try:
        render_manim_scene.apply_async(
            args=[str(job_id)],
            headers=celery_trace_headers(trace_id),
        )
        pipeline_event(
            "api.render",
            "celery_enqueued",
            "render_manim_scene dispatched",
            trace_id=trace_id,
            job_id=str(job_id),
        )
    except Exception:  # noqa: BLE001 — broker/kombu can raise varied connection errors
        logger.exception("Failed to enqueue Celery task (broker/redis)")
        return JSONResponse(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            content={"error": {"code": "broker_unavailable", "message": "Queue unavailable"}},
        )

    payload = RenderEnqueueResponse(job_id=job_id).model_dump(mode="json")
    return JSONResponse(status_code=status.HTTP_202_ACCEPTED, content=payload)