Spaces:
Sleeping
Sleeping
File size: 5,949 Bytes
ddabbe4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
"""
Crew setup for NBA data analysis workflow.
"""
import os
import shutil
from crewai import Crew, Process
from agents import create_engineer_agent, create_analyst_agent, create_storyteller_agent
from tasks import create_data_engineering_task, create_data_analysis_task, create_custom_analysis_task, create_storyteller_task
def create_crew() -> Crew:
"""
Create and configure the CrewAI crew with agents and tasks.
Returns:
Crew: Configured CrewAI crew ready for execution
"""
# Create agents
engineer_agent = create_engineer_agent()
analyst_agent = create_analyst_agent()
# Create tasks
data_engineering_task = create_data_engineering_task(engineer_agent)
data_analysis_task = create_data_analysis_task(analyst_agent, data_engineering_task)
# Create and return the crew
return Crew(
agents=[engineer_agent, analyst_agent],
tasks=[data_engineering_task, data_analysis_task],
process=Process.sequential,
verbose=True,
)
def create_crew_with_custom_task(user_query: str, csv_path: str = None) -> Crew:
"""
Create a CrewAI crew with engineering task, custom analyst task, and storyteller task.
Args:
user_query: The user's custom analysis query/task
csv_path: Optional path to CSV file (if None, uses default from config)
Returns:
Crew: Configured CrewAI crew ready for execution
"""
# Create agents (they will use the csv_path from tools)
engineer_agent = create_engineer_agent(csv_path)
analyst_agent = create_analyst_agent(csv_path)
storyteller_agent = create_storyteller_agent()
# Create engineering task (fixed)
data_engineering_task = create_data_engineering_task(engineer_agent, csv_path)
# Create custom analyst task from user input (no dependency on engineer task for parallel execution)
custom_analysis_task = create_custom_analysis_task(analyst_agent, user_query, None, csv_path)
# Create storyteller task that uses the analyst's output
storyteller_task = create_storyteller_task(storyteller_agent, custom_analysis_task)
# Create and return the crew
return Crew(
agents=[engineer_agent, analyst_agent, storyteller_agent],
tasks=[data_engineering_task, custom_analysis_task, storyteller_task],
process=Process.sequential,
verbose=True,
)
def create_flow_crew(user_query: str, csv_path: str) -> Crew:
"""
Create a single crew with parallel tasks (Engineer and Analyst) that merge results at the end.
This satisfies the assignment requirement: "Parallelize tasks via a Flow; merge results at the end."
Args:
user_query: The user's custom analysis query/task
csv_path: Path to the uploaded CSV file
Returns:
Crew: Single crew with parallel tasks that will merge results
"""
# Create all agents
engineer_agent = create_engineer_agent(csv_path)
analyst_agent = create_analyst_agent(csv_path)
storyteller_agent = create_storyteller_agent()
# Create tasks WITHOUT dependencies so they can run in parallel
# Engineer task - independent
data_engineering_task = create_data_engineering_task(engineer_agent, csv_path)
# Analyst task - independent (no dependency on engineer for parallel execution)
custom_analysis_task = create_custom_analysis_task(analyst_agent, user_query, None, csv_path)
# Storyteller task - depends on analyst (runs after analyst completes)
storyteller_task = create_storyteller_task(storyteller_agent, custom_analysis_task)
# Create a single crew with all tasks
# Tasks without dependencies will run in parallel
# Storyteller will run after analyst completes
return Crew(
agents=[engineer_agent, analyst_agent, storyteller_agent],
tasks=[data_engineering_task, custom_analysis_task, storyteller_task],
process=Process.sequential, # CrewAI will parallelize independent tasks automatically
verbose=True,
)
def create_analysis_only_crew(user_query: str, csv_path: str) -> Crew:
"""
Create a crew with only Analyst and Storyteller agents (no Engineer).
Used when engineer results are already available and user asks a new question.
Args:
user_query: The user's custom analysis query/task
csv_path: Path to the uploaded CSV file
Returns:
Crew: Crew with only analyst and storyteller tasks
"""
# Create only analyst and storyteller agents
analyst_agent = create_analyst_agent(csv_path)
storyteller_agent = create_storyteller_agent()
# Create analyst task with user query
custom_analysis_task = create_custom_analysis_task(analyst_agent, user_query, None, csv_path)
# Storyteller task depends on analyst
storyteller_task = create_storyteller_task(storyteller_agent, custom_analysis_task)
return Crew(
agents=[analyst_agent, storyteller_agent],
tasks=[custom_analysis_task, storyteller_task],
process=Process.sequential,
verbose=True,
)
def create_analyst_only_crew(user_query: str, csv_path: str) -> Crew:
"""
Create a crew with only Analyst agent (no Engineer, no Storyteller).
Used for specific user questions where only analysis is needed.
Args:
user_query: The user's custom analysis query/task
csv_path: Path to the uploaded CSV file
Returns:
Crew: Crew with only analyst task
"""
# Create only analyst agent
analyst_agent = create_analyst_agent(csv_path)
# Create analyst task with user query
custom_analysis_task = create_custom_analysis_task(analyst_agent, user_query, None, csv_path)
return Crew(
agents=[analyst_agent],
tasks=[custom_analysis_task],
process=Process.sequential,
verbose=True,
)
|