manim-mcp / neuroanim /graph /workflow.py
bhaveshgoel07's picture
Complete NeuroAnim HF Spaces deployment - all source files
0805c5b
raw
history blame
8.9 kB
"""
LangGraph Workflow Definition for NeuroAnim Pipeline
This module defines the complete animation generation workflow using LangGraph.
The workflow coordinates multiple agent nodes to transform a STEM topic into
an educational animation with narration.
"""
import logging
import tempfile
from pathlib import Path
from typing import Any, Dict
from langgraph.graph import END, StateGraph
from neuroanim.agents.nodes import AnimationNodes
from neuroanim.graph.state import AnimationState, create_initial_state
logger = logging.getLogger(__name__)
def should_retry_code_generation(state: AnimationState) -> str:
"""
Determine if code generation should be retried.
Args:
state: Current animation state
Returns:
Next node name: "generate_code" for retry, "write_file" to proceed
"""
if (
state.get("previous_code_errors")
and state["code_generation_attempts"] < state["max_retries"]
):
logger.info(
f"Code has errors, retrying (attempt {state['code_generation_attempts']}/{state['max_retries']})"
)
return "generate_code"
return "write_file"
def should_continue_after_error(state: AnimationState) -> str:
"""
Determine if pipeline should continue after errors.
Args:
state: Current animation state
Returns:
Next node name or END
"""
if state["errors"]:
logger.error(f"Pipeline encountered {len(state['errors'])} error(s), stopping")
return "finalize"
return "next"
def create_animation_workflow(nodes: AnimationNodes) -> StateGraph:
"""
Create the LangGraph workflow for animation generation.
The workflow follows this sequence:
1. Initialize - Set up directories and state
2. Plan Concept - Generate animation concept plan
3. Generate Narration - Create narration script
4. Generate Code - Create Manim code (with retry logic)
5. Write File - Save code to file
6. Render Animation - Execute Manim rendering
7. Generate Audio - Create speech audio
8. Merge Video/Audio - Combine into final output
9. Generate Quiz - Create assessment questions
10. Finalize - Compute metadata and complete
Args:
nodes: AnimationNodes instance with all node functions
Returns:
Compiled StateGraph ready for execution
"""
# Create the graph
workflow = StateGraph(AnimationState)
# Add all nodes
workflow.add_node("initialize", nodes.initialize_node)
workflow.add_node("plan_concept", nodes.plan_concept_node)
workflow.add_node("generate_narration", nodes.generate_narration_node)
workflow.add_node("generate_code", nodes.generate_code_node)
workflow.add_node("write_file", nodes.write_file_node)
workflow.add_node("render_animation", nodes.render_animation_node)
workflow.add_node("generate_audio", nodes.generate_audio_node)
workflow.add_node("merge_video_audio", nodes.merge_video_audio_node)
workflow.add_node("generate_quiz", nodes.generate_quiz_node)
workflow.add_node("finalize", nodes.finalize_node)
# Set entry point
workflow.set_entry_point("initialize")
# Define the workflow edges (sequential flow with error checking)
# Initialize -> Plan Concept
workflow.add_edge("initialize", "plan_concept")
# Plan Concept -> Check for errors -> Generate Narration
workflow.add_conditional_edges(
"plan_concept",
lambda state: "generate_narration" if not state["errors"] else "finalize",
)
# Generate Narration -> Check for errors -> Generate Code
workflow.add_conditional_edges(
"generate_narration",
lambda state: "generate_code" if not state["errors"] else "finalize",
)
# Generate Code -> Check syntax -> Retry or Write File
workflow.add_conditional_edges(
"generate_code",
should_retry_code_generation,
)
# Write File -> Check for errors -> Render
workflow.add_conditional_edges(
"write_file",
lambda state: "render_animation" if not state["errors"] else "finalize",
)
# Render -> Check for errors -> Generate Audio
workflow.add_conditional_edges(
"render_animation",
lambda state: "generate_audio" if not state["errors"] else "finalize",
)
# Generate Audio -> Check for errors -> Merge
workflow.add_conditional_edges(
"generate_audio",
lambda state: "merge_video_audio" if not state["errors"] else "finalize",
)
# Merge -> Check for errors -> Generate Quiz
workflow.add_conditional_edges(
"merge_video_audio",
lambda state: "generate_quiz" if not state["errors"] else "finalize",
)
# Generate Quiz -> Finalize (quiz errors are non-critical)
workflow.add_edge("generate_quiz", "finalize")
# Finalize -> END
workflow.add_edge("finalize", END)
# Compile the graph
return workflow.compile()
async def run_animation_pipeline(
mcp_session: Any,
tts_generator: Any,
topic: str,
target_audience: str = "general",
animation_length_minutes: float = 2.0,
output_filename: str = "animation.mp4",
rendering_quality: str = "medium",
max_retries: int = 3,
) -> Dict[str, Any]:
"""
Run the complete animation generation pipeline.
This is the main entry point for generating animations. It creates
the workflow, initializes the state, and executes all steps.
Args:
mcp_session: MCP client session
tts_generator: TTS generator instance
topic: STEM topic to animate
target_audience: Target audience level
animation_length_minutes: Desired animation length
output_filename: Name for output file
rendering_quality: Manim rendering quality
max_retries: Maximum retry attempts
Returns:
Dictionary with pipeline results including:
- success: Whether pipeline completed successfully
- final_output_path: Path to final video
- errors: List of errors encountered
- warnings: List of warnings
- completed_steps: List of completed steps
- metadata: Timing and other metadata
"""
# Create working directories
work_dir = Path(tempfile.mkdtemp(prefix="neuroanim_work_"))
output_dir = Path("outputs")
output_dir.mkdir(exist_ok=True)
logger.info(f"πŸ“ Working directory: {work_dir}")
logger.info(f"πŸ“ Output directory: {output_dir}")
# Initialize nodes
nodes = AnimationNodes(
mcp_session=mcp_session,
tts_generator=tts_generator,
work_dir=work_dir,
output_dir=output_dir,
)
# Create workflow
workflow = create_animation_workflow(nodes)
# Create initial state
initial_state = create_initial_state(
topic=topic,
target_audience=target_audience,
animation_length_minutes=animation_length_minutes,
output_filename=output_filename,
rendering_quality=rendering_quality,
max_retries=max_retries,
)
logger.info(f"🎬 Starting animation pipeline for topic: '{topic}'")
try:
# Run the workflow
final_state = await workflow.ainvoke(initial_state)
# Build result summary
result = {
"success": final_state.get("success", False),
"topic": final_state["topic"],
"target_audience": final_state["target_audience"],
"final_output_path": final_state.get("final_output_path"),
"concept_plan": final_state.get("concept_plan"),
"narration": final_state.get("narration_text"),
"manim_code": final_state.get("manim_code"),
"quiz": final_state.get("quiz_content"),
"errors": final_state.get("errors", []),
"warnings": final_state.get("warnings", []),
"completed_steps": final_state.get("completed_steps", []),
"total_duration": final_state.get("total_duration"),
"work_dir": str(work_dir),
"output_dir": str(output_dir),
}
if result["success"]:
logger.info(f"βœ… Animation pipeline completed successfully!")
logger.info(f"πŸ“Ή Output file: {result['final_output_path']}")
logger.info(f"⏱️ Total time: {result['total_duration']:.2f}s")
else:
logger.error(f"❌ Animation pipeline failed")
logger.error(f"Errors: {result['errors']}")
return result
except Exception as e:
logger.error(f"Pipeline execution failed: {e}", exc_info=True)
return {
"success": False,
"error": str(e),
"work_dir": str(work_dir),
"output_dir": str(output_dir),
}
finally:
# Note: We don't clean up work_dir here so users can inspect artifacts
logger.info(f"Work directory preserved at: {work_dir}")