import os import sys from pathlib import Path from celery_app import app # Ajout du parent au path pour importer les modules de l'Engine ENGINE_DIR = Path(__file__).resolve().parent.parent sys.path.append(str(ENGINE_DIR)) @app.task(name="tasks.process_video_pipeline", bind=True) def process_video_pipeline(self, story_name): """ Tâche Celery qui exécute le pipeline complet pour une histoire. """ from asset_generator import main as run_assets from story_to_video import main as run_video import argparse self.update_state(state="PROGRESS", meta={"status": f"Génération des assets pour {story_name}..."}) # Simuler les arguments CLI sys.argv = ["asset_generator.py", "--story", story_name] run_assets() self.update_state(state="PROGRESS", meta={"status": f"Génération de la vidéo pour {story_name}..."}) # Chemin vers le fichier markdown de la story # On assume que story_name est le slug du dossier story_file = None stories_root = ENGINE_DIR.parent / "stories" for p in stories_root.glob(f"**/{story_name}.md"): story_file = p break if not story_file: return {"status": "error", "message": f"Histoire {story_name} non trouvée."} sys.argv = ["story_to_video.py", str(story_file)] run_video() return {"status": "success", "story": story_name}