File size: 5,949 Bytes
ddabbe4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
"""
Crew setup for NBA data analysis workflow.
"""
import os
import shutil
from crewai import Crew, Process
from agents import create_engineer_agent, create_analyst_agent, create_storyteller_agent
from tasks import create_data_engineering_task, create_data_analysis_task, create_custom_analysis_task, create_storyteller_task


def create_crew() -> Crew:
    """
    Create and configure the CrewAI crew with agents and tasks.
    
    Returns:
        Crew: Configured CrewAI crew ready for execution
    """
    # Create agents
    engineer_agent = create_engineer_agent()
    analyst_agent = create_analyst_agent()
    
    # Create tasks
    data_engineering_task = create_data_engineering_task(engineer_agent)
    data_analysis_task = create_data_analysis_task(analyst_agent, data_engineering_task)
    
    # Create and return the crew
    return Crew(
        agents=[engineer_agent, analyst_agent],
        tasks=[data_engineering_task, data_analysis_task],
        process=Process.sequential,
        verbose=True,
    )


def create_crew_with_custom_task(user_query: str, csv_path: str = None) -> Crew:
    """
    Create a CrewAI crew with engineering task, custom analyst task, and storyteller task.
    
    Args:
        user_query: The user's custom analysis query/task
        csv_path: Optional path to CSV file (if None, uses default from config)
        
    Returns:
        Crew: Configured CrewAI crew ready for execution
    """
    # Create agents (they will use the csv_path from tools)
    engineer_agent = create_engineer_agent(csv_path)
    analyst_agent = create_analyst_agent(csv_path)
    storyteller_agent = create_storyteller_agent()
    
    # Create engineering task (fixed)
    data_engineering_task = create_data_engineering_task(engineer_agent, csv_path)
    
    # Create custom analyst task from user input (no dependency on engineer task for parallel execution)
    custom_analysis_task = create_custom_analysis_task(analyst_agent, user_query, None, csv_path)
    
    # Create storyteller task that uses the analyst's output
    storyteller_task = create_storyteller_task(storyteller_agent, custom_analysis_task)
    
    # Create and return the crew
    return Crew(
        agents=[engineer_agent, analyst_agent, storyteller_agent],
        tasks=[data_engineering_task, custom_analysis_task, storyteller_task],
        process=Process.sequential,
        verbose=True,
    )


def create_flow_crew(user_query: str, csv_path: str) -> Crew:
    """
    Create a single crew with parallel tasks (Engineer and Analyst) that merge results at the end.
    This satisfies the assignment requirement: "Parallelize tasks via a Flow; merge results at the end."
    
    Args:
        user_query: The user's custom analysis query/task
        csv_path: Path to the uploaded CSV file
        
    Returns:
        Crew: Single crew with parallel tasks that will merge results
    """
    # Create all agents
    engineer_agent = create_engineer_agent(csv_path)
    analyst_agent = create_analyst_agent(csv_path)
    storyteller_agent = create_storyteller_agent()
    
    # Create tasks WITHOUT dependencies so they can run in parallel
    # Engineer task - independent
    data_engineering_task = create_data_engineering_task(engineer_agent, csv_path)
    
    # Analyst task - independent (no dependency on engineer for parallel execution)
    custom_analysis_task = create_custom_analysis_task(analyst_agent, user_query, None, csv_path)
    
    # Storyteller task - depends on analyst (runs after analyst completes)
    storyteller_task = create_storyteller_task(storyteller_agent, custom_analysis_task)
    
    # Create a single crew with all tasks
    # Tasks without dependencies will run in parallel
    # Storyteller will run after analyst completes
    return Crew(
        agents=[engineer_agent, analyst_agent, storyteller_agent],
        tasks=[data_engineering_task, custom_analysis_task, storyteller_task],
        process=Process.sequential,  # CrewAI will parallelize independent tasks automatically
        verbose=True,
    )


def create_analysis_only_crew(user_query: str, csv_path: str) -> Crew:
    """
    Create a crew with only Analyst and Storyteller agents (no Engineer).
    Used when engineer results are already available and user asks a new question.
    
    Args:
        user_query: The user's custom analysis query/task
        csv_path: Path to the uploaded CSV file
        
    Returns:
        Crew: Crew with only analyst and storyteller tasks
    """
    # Create only analyst and storyteller agents
    analyst_agent = create_analyst_agent(csv_path)
    storyteller_agent = create_storyteller_agent()
    
    # Create analyst task with user query
    custom_analysis_task = create_custom_analysis_task(analyst_agent, user_query, None, csv_path)
    
    # Storyteller task depends on analyst
    storyteller_task = create_storyteller_task(storyteller_agent, custom_analysis_task)
    
    return Crew(
        agents=[analyst_agent, storyteller_agent],
        tasks=[custom_analysis_task, storyteller_task],
        process=Process.sequential,
        verbose=True,
    )


def create_analyst_only_crew(user_query: str, csv_path: str) -> Crew:
    """
    Create a crew with only Analyst agent (no Engineer, no Storyteller).
    Used for specific user questions where only analysis is needed.
    
    Args:
        user_query: The user's custom analysis query/task
        csv_path: Path to the uploaded CSV file
        
    Returns:
        Crew: Crew with only analyst task
    """
    # Create only analyst agent
    analyst_agent = create_analyst_agent(csv_path)
    
    # Create analyst task with user query
    custom_analysis_task = create_custom_analysis_task(analyst_agent, user_query, None, csv_path)
    
    return Crew(
        agents=[analyst_agent],
        tasks=[custom_analysis_task],
        process=Process.sequential,
        verbose=True,
    )