Spaces:
Sleeping
Sleeping
| import os | |
| from fastapi import FastAPI, BackgroundTasks, HTTPException | |
| from pydantic import BaseModel | |
| from tasks import process_video_pipeline | |
| from celery.result import AsyncResult | |
| app = FastAPI(title="DarkMedia-X Local API") | |
| class VideoRequest(BaseModel): | |
| story_slug: str | |
| async def generate_video(request: VideoRequest): | |
| """Lance la génération d'une vidéo via Celery.""" | |
| task = process_video_pipeline.delay(request.story_slug) | |
| return {"task_id": task.id, "status": "queued"} | |
| async def get_status(task_id: str): | |
| """Vérifie l'état d'une tâche Celery.""" | |
| result = AsyncResult(task_id) | |
| return { | |
| "task_id": task_id, | |
| "status": result.status, | |
| "info": result.info if result.status == "PROGRESS" else str(result.result) | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) | |