Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import logging | |
| from uuid import UUID, uuid4 | |
| from fastapi import APIRouter, Depends, Header, Request, status | |
| from fastapi.responses import JSONResponse | |
| from redis.exceptions import RedisError | |
| from shared.pipeline_log import celery_trace_headers, pipeline_event | |
| from shared.schemas.render_api import RenderEnqueueBody, RenderEnqueueResponse | |
| from worker.tasks import render_manim_scene | |
| from backend.api.access import project_readable_by_user | |
| from backend.api.deps import get_content_store, get_job_store, get_request_user_id | |
| from backend.core.config import settings | |
| from backend.core.correlation import get_request_id | |
| from backend.core.limiter import limiter | |
| from backend.db.base import ContentStore | |
| from backend.services.job_store import RedisRenderJobStore | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter(tags=["render"]) | |
| def enqueue_render( | |
| project_id: UUID, | |
| body: RenderEnqueueBody, | |
| request: Request, # Needed for slowapi | |
| user_id: UUID = Depends(get_request_user_id), # noqa: B008 | |
| content: ContentStore = Depends(get_content_store), # noqa: B008 | |
| store: RedisRenderJobStore = Depends(get_job_store), # noqa: B008 | |
| x_idempotency_key: str | None = Header(None, alias="X-Idempotency-Key"), | |
| ) -> JSONResponse: | |
| """Create a render job in Redis and enqueue Celery (Manim runs in worker only).""" | |
| project_readable_by_user(content, project_id, user_id) | |
| # 1. Idempotency Check | |
| if x_idempotency_key: | |
| existing_job_id = store.get_idempotent_job_id(x_idempotency_key) | |
| if existing_job_id: | |
| logger.info("Idempotency hit for key: %s", x_idempotency_key) | |
| payload = RenderEnqueueResponse(job_id=existing_job_id).model_dump(mode="json") | |
| return JSONResponse(status_code=status.HTTP_200_OK, content=payload) | |
| job_id = uuid4() | |
| trace_id = get_request_id() | |
| webhook = str(body.webhook_url) if body.webhook_url is not None else None | |
| try: | |
| store.create_queued_job( | |
| job_id=job_id, | |
| project_id=project_id, | |
| scene_id=body.scene_id, | |
| job_type=body.render_type, | |
| render_quality=body.quality, | |
| webhook_url=webhook, | |
| docker_image_tag=settings.worker_image_tag, | |
| ) | |
| if x_idempotency_key: | |
| store.set_idempotent_job_id(x_idempotency_key, job_id) | |
| pipeline_event( | |
| "api.render", | |
| "job_queued", | |
| "Render job stored in Redis", | |
| trace_id=trace_id, | |
| job_id=str(job_id), | |
| project_id=str(project_id), | |
| scene_id=str(body.scene_id), | |
| details={"render_type": body.render_type, "quality": body.quality}, | |
| ) | |
| except RedisError: | |
| logger.exception("Redis failure while creating render job") | |
| return JSONResponse( | |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
| content={"error": {"code": "redis_unavailable", "message": "Job store unavailable"}}, | |
| ) | |
| try: | |
| render_manim_scene.apply_async( | |
| args=[str(job_id)], | |
| headers=celery_trace_headers(trace_id), | |
| ) | |
| pipeline_event( | |
| "api.render", | |
| "celery_enqueued", | |
| "render_manim_scene dispatched", | |
| trace_id=trace_id, | |
| job_id=str(job_id), | |
| ) | |
| except Exception: # noqa: BLE001 — broker/kombu can raise varied connection errors | |
| logger.exception("Failed to enqueue Celery task (broker/redis)") | |
| return JSONResponse( | |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
| content={"error": {"code": "broker_unavailable", "message": "Queue unavailable"}}, | |
| ) | |
| payload = RenderEnqueueResponse(job_id=job_id).model_dump(mode="json") | |
| return JSONResponse(status_code=status.HTTP_202_ACCEPTED, content=payload) | |