Spaces:
Running
Running
File size: 8,897 Bytes
0805c5b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
"""
LangGraph Workflow Definition for NeuroAnim Pipeline
This module defines the complete animation generation workflow using LangGraph.
The workflow coordinates multiple agent nodes to transform a STEM topic into
an educational animation with narration.
"""
import logging
import tempfile
from pathlib import Path
from typing import Any, Dict
from langgraph.graph import END, StateGraph
from neuroanim.agents.nodes import AnimationNodes
from neuroanim.graph.state import AnimationState, create_initial_state
logger = logging.getLogger(__name__)
def should_retry_code_generation(state: AnimationState) -> str:
"""
Determine if code generation should be retried.
Args:
state: Current animation state
Returns:
Next node name: "generate_code" for retry, "write_file" to proceed
"""
if (
state.get("previous_code_errors")
and state["code_generation_attempts"] < state["max_retries"]
):
logger.info(
f"Code has errors, retrying (attempt {state['code_generation_attempts']}/{state['max_retries']})"
)
return "generate_code"
return "write_file"
def should_continue_after_error(state: AnimationState) -> str:
"""
Determine if pipeline should continue after errors.
Args:
state: Current animation state
Returns:
Next node name or END
"""
if state["errors"]:
logger.error(f"Pipeline encountered {len(state['errors'])} error(s), stopping")
return "finalize"
return "next"
def create_animation_workflow(nodes: AnimationNodes) -> StateGraph:
"""
Create the LangGraph workflow for animation generation.
The workflow follows this sequence:
1. Initialize - Set up directories and state
2. Plan Concept - Generate animation concept plan
3. Generate Narration - Create narration script
4. Generate Code - Create Manim code (with retry logic)
5. Write File - Save code to file
6. Render Animation - Execute Manim rendering
7. Generate Audio - Create speech audio
8. Merge Video/Audio - Combine into final output
9. Generate Quiz - Create assessment questions
10. Finalize - Compute metadata and complete
Args:
nodes: AnimationNodes instance with all node functions
Returns:
Compiled StateGraph ready for execution
"""
# Create the graph
workflow = StateGraph(AnimationState)
# Add all nodes
workflow.add_node("initialize", nodes.initialize_node)
workflow.add_node("plan_concept", nodes.plan_concept_node)
workflow.add_node("generate_narration", nodes.generate_narration_node)
workflow.add_node("generate_code", nodes.generate_code_node)
workflow.add_node("write_file", nodes.write_file_node)
workflow.add_node("render_animation", nodes.render_animation_node)
workflow.add_node("generate_audio", nodes.generate_audio_node)
workflow.add_node("merge_video_audio", nodes.merge_video_audio_node)
workflow.add_node("generate_quiz", nodes.generate_quiz_node)
workflow.add_node("finalize", nodes.finalize_node)
# Set entry point
workflow.set_entry_point("initialize")
# Define the workflow edges (sequential flow with error checking)
# Initialize -> Plan Concept
workflow.add_edge("initialize", "plan_concept")
# Plan Concept -> Check for errors -> Generate Narration
workflow.add_conditional_edges(
"plan_concept",
lambda state: "generate_narration" if not state["errors"] else "finalize",
)
# Generate Narration -> Check for errors -> Generate Code
workflow.add_conditional_edges(
"generate_narration",
lambda state: "generate_code" if not state["errors"] else "finalize",
)
# Generate Code -> Check syntax -> Retry or Write File
workflow.add_conditional_edges(
"generate_code",
should_retry_code_generation,
)
# Write File -> Check for errors -> Render
workflow.add_conditional_edges(
"write_file",
lambda state: "render_animation" if not state["errors"] else "finalize",
)
# Render -> Check for errors -> Generate Audio
workflow.add_conditional_edges(
"render_animation",
lambda state: "generate_audio" if not state["errors"] else "finalize",
)
# Generate Audio -> Check for errors -> Merge
workflow.add_conditional_edges(
"generate_audio",
lambda state: "merge_video_audio" if not state["errors"] else "finalize",
)
# Merge -> Check for errors -> Generate Quiz
workflow.add_conditional_edges(
"merge_video_audio",
lambda state: "generate_quiz" if not state["errors"] else "finalize",
)
# Generate Quiz -> Finalize (quiz errors are non-critical)
workflow.add_edge("generate_quiz", "finalize")
# Finalize -> END
workflow.add_edge("finalize", END)
# Compile the graph
return workflow.compile()
async def run_animation_pipeline(
mcp_session: Any,
tts_generator: Any,
topic: str,
target_audience: str = "general",
animation_length_minutes: float = 2.0,
output_filename: str = "animation.mp4",
rendering_quality: str = "medium",
max_retries: int = 3,
) -> Dict[str, Any]:
"""
Run the complete animation generation pipeline.
This is the main entry point for generating animations. It creates
the workflow, initializes the state, and executes all steps.
Args:
mcp_session: MCP client session
tts_generator: TTS generator instance
topic: STEM topic to animate
target_audience: Target audience level
animation_length_minutes: Desired animation length
output_filename: Name for output file
rendering_quality: Manim rendering quality
max_retries: Maximum retry attempts
Returns:
Dictionary with pipeline results including:
- success: Whether pipeline completed successfully
- final_output_path: Path to final video
- errors: List of errors encountered
- warnings: List of warnings
- completed_steps: List of completed steps
- metadata: Timing and other metadata
"""
# Create working directories
work_dir = Path(tempfile.mkdtemp(prefix="neuroanim_work_"))
output_dir = Path("outputs")
output_dir.mkdir(exist_ok=True)
logger.info(f"π Working directory: {work_dir}")
logger.info(f"π Output directory: {output_dir}")
# Initialize nodes
nodes = AnimationNodes(
mcp_session=mcp_session,
tts_generator=tts_generator,
work_dir=work_dir,
output_dir=output_dir,
)
# Create workflow
workflow = create_animation_workflow(nodes)
# Create initial state
initial_state = create_initial_state(
topic=topic,
target_audience=target_audience,
animation_length_minutes=animation_length_minutes,
output_filename=output_filename,
rendering_quality=rendering_quality,
max_retries=max_retries,
)
logger.info(f"π¬ Starting animation pipeline for topic: '{topic}'")
try:
# Run the workflow
final_state = await workflow.ainvoke(initial_state)
# Build result summary
result = {
"success": final_state.get("success", False),
"topic": final_state["topic"],
"target_audience": final_state["target_audience"],
"final_output_path": final_state.get("final_output_path"),
"concept_plan": final_state.get("concept_plan"),
"narration": final_state.get("narration_text"),
"manim_code": final_state.get("manim_code"),
"quiz": final_state.get("quiz_content"),
"errors": final_state.get("errors", []),
"warnings": final_state.get("warnings", []),
"completed_steps": final_state.get("completed_steps", []),
"total_duration": final_state.get("total_duration"),
"work_dir": str(work_dir),
"output_dir": str(output_dir),
}
if result["success"]:
logger.info(f"β
Animation pipeline completed successfully!")
logger.info(f"πΉ Output file: {result['final_output_path']}")
logger.info(f"β±οΈ Total time: {result['total_duration']:.2f}s")
else:
logger.error(f"β Animation pipeline failed")
logger.error(f"Errors: {result['errors']}")
return result
except Exception as e:
logger.error(f"Pipeline execution failed: {e}", exc_info=True)
return {
"success": False,
"error": str(e),
"work_dir": str(work_dir),
"output_dir": str(output_dir),
}
finally:
# Note: We don't clean up work_dir here so users can inspect artifacts
logger.info(f"Work directory preserved at: {work_dir}")
|