""" Crew setup for NBA data analysis workflow. """ import os import shutil from crewai import Crew, Process from agents import create_engineer_agent, create_analyst_agent, create_storyteller_agent from tasks import create_data_engineering_task, create_data_analysis_task, create_custom_analysis_task, create_storyteller_task def create_crew() -> Crew: """ Create and configure the CrewAI crew with agents and tasks. Returns: Crew: Configured CrewAI crew ready for execution """ # Create agents engineer_agent = create_engineer_agent() analyst_agent = create_analyst_agent() # Create tasks data_engineering_task = create_data_engineering_task(engineer_agent) data_analysis_task = create_data_analysis_task(analyst_agent, data_engineering_task) # Create and return the crew return Crew( agents=[engineer_agent, analyst_agent], tasks=[data_engineering_task, data_analysis_task], process=Process.sequential, verbose=True, ) def create_crew_with_custom_task(user_query: str, csv_path: str = None) -> Crew: """ Create a CrewAI crew with engineering task, custom analyst task, and storyteller task. Args: user_query: The user's custom analysis query/task csv_path: Optional path to CSV file (if None, uses default from config) Returns: Crew: Configured CrewAI crew ready for execution """ # Create agents (they will use the csv_path from tools) engineer_agent = create_engineer_agent(csv_path) analyst_agent = create_analyst_agent(csv_path) storyteller_agent = create_storyteller_agent() # Create engineering task (fixed) data_engineering_task = create_data_engineering_task(engineer_agent, csv_path) # Create custom analyst task from user input (no dependency on engineer task for parallel execution) custom_analysis_task = create_custom_analysis_task(analyst_agent, user_query, None, csv_path) # Create storyteller task that uses the analyst's output storyteller_task = create_storyteller_task(storyteller_agent, custom_analysis_task) # Create and return the crew return Crew( agents=[engineer_agent, analyst_agent, storyteller_agent], tasks=[data_engineering_task, custom_analysis_task, storyteller_task], process=Process.sequential, verbose=True, ) def create_flow_crew(user_query: str, csv_path: str) -> Crew: """ Create a single crew with parallel tasks (Engineer and Analyst) that merge results at the end. This satisfies the assignment requirement: "Parallelize tasks via a Flow; merge results at the end." Args: user_query: The user's custom analysis query/task csv_path: Path to the uploaded CSV file Returns: Crew: Single crew with parallel tasks that will merge results """ # Create all agents engineer_agent = create_engineer_agent(csv_path) analyst_agent = create_analyst_agent(csv_path) storyteller_agent = create_storyteller_agent() # Create tasks WITHOUT dependencies so they can run in parallel # Engineer task - independent data_engineering_task = create_data_engineering_task(engineer_agent, csv_path) # Analyst task - independent (no dependency on engineer for parallel execution) custom_analysis_task = create_custom_analysis_task(analyst_agent, user_query, None, csv_path) # Storyteller task - depends on analyst (runs after analyst completes) storyteller_task = create_storyteller_task(storyteller_agent, custom_analysis_task) # Create a single crew with all tasks # Tasks without dependencies will run in parallel # Storyteller will run after analyst completes return Crew( agents=[engineer_agent, analyst_agent, storyteller_agent], tasks=[data_engineering_task, custom_analysis_task, storyteller_task], process=Process.sequential, # CrewAI will parallelize independent tasks automatically verbose=True, ) def create_analysis_only_crew(user_query: str, csv_path: str) -> Crew: """ Create a crew with only Analyst and Storyteller agents (no Engineer). Used when engineer results are already available and user asks a new question. Args: user_query: The user's custom analysis query/task csv_path: Path to the uploaded CSV file Returns: Crew: Crew with only analyst and storyteller tasks """ # Create only analyst and storyteller agents analyst_agent = create_analyst_agent(csv_path) storyteller_agent = create_storyteller_agent() # Create analyst task with user query custom_analysis_task = create_custom_analysis_task(analyst_agent, user_query, None, csv_path) # Storyteller task depends on analyst storyteller_task = create_storyteller_task(storyteller_agent, custom_analysis_task) return Crew( agents=[analyst_agent, storyteller_agent], tasks=[custom_analysis_task, storyteller_task], process=Process.sequential, verbose=True, ) def create_analyst_only_crew(user_query: str, csv_path: str) -> Crew: """ Create a crew with only Analyst agent (no Engineer, no Storyteller). Used for specific user questions where only analysis is needed. Args: user_query: The user's custom analysis query/task csv_path: Path to the uploaded CSV file Returns: Crew: Crew with only analyst task """ # Create only analyst agent analyst_agent = create_analyst_agent(csv_path) # Create analyst task with user query custom_analysis_task = create_custom_analysis_task(analyst_agent, user_query, None, csv_path) return Crew( agents=[analyst_agent], tasks=[custom_analysis_task], process=Process.sequential, verbose=True, )