|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
from crewai import Agent, Crew, Task |
|
|
from crewai.agents.agent_builder.base_agent import BaseAgent |
|
|
from crewai.project import CrewBase, agent, crew, task |
|
|
from google import genai |
|
|
from openinference.instrumentation.crewai import CrewAIInstrumentor |
|
|
from phoenix.otel import register |
|
|
from tools.ai_tools import AITools |
|
|
from tools.arithmetic_tools import ArithmeticTools |
|
|
from typing import List |
|
|
from utils import read_file_json, is_ext |
|
|
|
|
|
|
|
|
|
|
|
MANAGER_MODEL = "gpt-4.5-preview" |
|
|
AGENT_MODEL = "gpt-4.1-mini" |
|
|
|
|
|
FINAL_ANSWER_MODEL = "gemini-2.5-pro-preview-03-25" |
|
|
|
|
|
|
|
|
|
|
|
PHOENIX_API_KEY = os.environ["PHOENIX_API_KEY"] |
|
|
|
|
|
os.environ["PHOENIX_CLIENT_HEADERS"] = f"api_key={PHOENIX_API_KEY}" |
|
|
os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = "https://app.phoenix.arize.com" |
|
|
|
|
|
tracer_provider = register( |
|
|
auto_instrument=True, |
|
|
project_name="gaia" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
DOCUMENT_TOOLS = [ |
|
|
AITools.document_analysis_tool, |
|
|
AITools.summarize_tool, |
|
|
AITools.translate_tool |
|
|
] |
|
|
|
|
|
MEDIA_TOOLS = [ |
|
|
AITools.image_analysis_tool, |
|
|
AITools.audio_analysis_tool, |
|
|
AITools.video_analysis_tool, |
|
|
AITools.youtube_analysis_tool |
|
|
] |
|
|
|
|
|
WEB_TOOLS = [ |
|
|
AITools.web_search_tool, |
|
|
AITools.web_browser_tool |
|
|
] |
|
|
|
|
|
ARITHMETIC_TOOLS = [ |
|
|
ArithmeticTools.add, |
|
|
ArithmeticTools.subtract, |
|
|
ArithmeticTools.multiply, |
|
|
ArithmeticTools.divide, |
|
|
ArithmeticTools.modulus |
|
|
] |
|
|
|
|
|
CODE_TOOLS = [ |
|
|
AITools.code_generation_tool, |
|
|
AITools.code_execution_tool |
|
|
] |
|
|
|
|
|
|
|
|
def get_tools_for(agent_name: str): |
|
|
if "document" in agent_name or "translation" in agent_name or "summarization" in agent_name: |
|
|
return DOCUMENT_TOOLS |
|
|
elif any(keyword in agent_name for keyword in ["image", "audio", "video", "youtube"]): |
|
|
return MEDIA_TOOLS |
|
|
elif "web_search" in agent_name or "web_browser" in agent_name: |
|
|
return WEB_TOOLS |
|
|
elif "code_generation" in agent_name or "code_execution" in agent_name: |
|
|
return CODE_TOOLS |
|
|
elif "arithmetic" in agent_name: |
|
|
return ARITHMETIC_TOOLS |
|
|
elif "manager" in agent_name: |
|
|
return [] |
|
|
else: |
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class GAIACrew(): |
|
|
tasks: List[Task] |
|
|
|
|
|
def __init__(self): |
|
|
self.agents_config = self._load_yaml("config/agents.yaml") |
|
|
self.tasks_config = self._load_yaml("config/tasks.yaml") |
|
|
|
|
|
def _load_yaml(self, path): |
|
|
import yaml |
|
|
with open(path, "r") as f: |
|
|
return yaml.safe_load(f) |
|
|
|
|
|
@property |
|
|
def agents(self) -> List[Agent]: |
|
|
agents = [] |
|
|
for name in self.agents_config: |
|
|
config = self.agents_config[name] |
|
|
if config is None: |
|
|
print(f"❌ Agent config for '{name}' is None!") |
|
|
continue |
|
|
|
|
|
full_config = {**config, "name": name} |
|
|
print(f"✅ Creating agent: {name}") |
|
|
|
|
|
agents.append(Agent( |
|
|
config=full_config, |
|
|
allow_delegation=("manager" in name), |
|
|
llm=MANAGER_MODEL if "manager" in name else AGENT_MODEL, |
|
|
max_iter=5 if "manager" in name else 2, |
|
|
tools=get_tools_for(name), |
|
|
verbose=True |
|
|
)) |
|
|
return agents |
|
|
|
|
|
@task |
|
|
def manager_task(self) -> Task: |
|
|
|
|
|
task = Task(config=self.tasks_config["manager_task"]) |
|
|
|
|
|
|
|
|
agent_list = self.agents |
|
|
name_list = list(self.agents_config.keys()) |
|
|
for idx, name in enumerate(name_list): |
|
|
if name == "manager_agent": |
|
|
task.agent = agent_list[idx] |
|
|
break |
|
|
|
|
|
return task |
|
|
|
|
|
def get_crew(self) -> Crew: |
|
|
return Crew( |
|
|
agents=self.agents, |
|
|
tasks=[self.manager_task()], |
|
|
verbose=True |
|
|
) |
|
|
|
|
|
def run_crew(question, file_path): |
|
|
""" |
|
|
Orchestrates the GAIA crew to answer a question, optionally with a file. |
|
|
Args: |
|
|
question (str): The user's question. |
|
|
file_path (str): Optional path to a data file to include in the prompt. |
|
|
Returns: |
|
|
str: The final answer from the manager agent. |
|
|
""" |
|
|
|
|
|
final_question = question |
|
|
if file_path: |
|
|
if is_ext(file_path, ".csv") or is_ext(file_path, ".xls") \ |
|
|
or is_ext(file_path, ".xlsx") or is_ext(file_path, ".json") \ |
|
|
or is_ext(file_path, ".jsonl"): |
|
|
json_data = read_file_json(file_path) |
|
|
final_question = f"{question} JSON data:\n{json_data}." |
|
|
else: |
|
|
final_question = f"{question} File path: {file_path}." |
|
|
|
|
|
|
|
|
crew_instance = GAIACrew() |
|
|
crew = crew_instance.get_crew() |
|
|
answer = crew.kickoff(inputs={"question": final_question}) |
|
|
|
|
|
|
|
|
final_answer = get_final_answer(FINAL_ANSWER_MODEL, question, str(answer)) |
|
|
|
|
|
|
|
|
print(f"=> Initial question: {question}") |
|
|
print(f"=> Final question: {final_question}") |
|
|
print(f"=> Initial answer: {answer}") |
|
|
print(f"=> Final answer: {final_answer}") |
|
|
|
|
|
return final_answer |
|
|
|
|
|
import concurrent.futures |
|
|
|
|
|
def run_parallel_crew(question: str, file_path: str): |
|
|
""" |
|
|
1) Prepares the prompt (including file data if any). |
|
|
2) Runs every non-manager agent in parallel on that prompt. |
|
|
3) Gathers their raw outputs. |
|
|
4) Sends a combined prompt to the manager_agent for the final answer. |
|
|
""" |
|
|
|
|
|
final_question = question |
|
|
if file_path: |
|
|
if is_ext(file_path, ".csv") or is_ext(file_path, ".xls") \ |
|
|
or is_ext(file_path, ".xlsx") or is_ext(file_path, ".json") \ |
|
|
or is_ext(file_path, ".jsonl"): |
|
|
json_data = read_file_json(file_path) |
|
|
final_question = f"{question} JSON data:\n{json_data}." |
|
|
else: |
|
|
final_question = f"{question} File path: {file_path}." |
|
|
|
|
|
|
|
|
crew_instance = GAIACrew() |
|
|
names = list(crew_instance.agents_config.keys()) |
|
|
agents = crew_instance.agents |
|
|
pairs = list(zip(names, agents)) |
|
|
|
|
|
workers = [(n, a) for n, a in pairs if n != "manager_agent"] |
|
|
manager_name, manager = next((n, a) for n, a in pairs if n == "manager_agent") |
|
|
|
|
|
|
|
|
results = {} |
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=len(workers)) as pool: |
|
|
future_to_name = { |
|
|
pool.submit(agent.kickoff, final_question): name |
|
|
for name, agent in workers |
|
|
} |
|
|
for fut in concurrent.futures.as_completed(future_to_name): |
|
|
name = future_to_name[fut] |
|
|
try: |
|
|
results[name] = fut.result() |
|
|
except Exception as e: |
|
|
results[name] = f"<error: {e}>" |
|
|
|
|
|
|
|
|
combined = "\n\n".join(f"--- {n} output ---\n{out}" |
|
|
for n, out in results.items()) |
|
|
manager_prompt = ( |
|
|
f"You have received these reports from your coworkers:\n\n" |
|
|
f"{combined}\n\n" |
|
|
f"Now, based on the original question, provide the final answer.\n" |
|
|
f"Original question: {question}" |
|
|
) |
|
|
|
|
|
|
|
|
final = manager.kickoff(manager_prompt) |
|
|
|
|
|
|
|
|
return get_final_answer(FINAL_ANSWER_MODEL, question, str(final)) |
|
|
|
|
|
|
|
|
def get_final_answer(model, question, answer): |
|
|
prompt_template = """ |
|
|
You are an expert question answering assistant. Given a question and an initial answer, your task is to provide the final answer. |
|
|
Your final answer must be a number and/or string OR as few words as possible OR a comma-separated list of numbers and/or strings. |
|
|
If you are asked for a number, don't use comma to write your number neither use units such as USD, $, percent, or % unless specified otherwise. |
|
|
If you are asked for a string, don't use articles, neither abbreviations (for example cities), and write the digits in plain text unless specified otherwise. |
|
|
If you are asked for a comma-separated list, apply the above rules depending of whether the element to be put in the list is a number or a string. |
|
|
If the final answer is a number, use a number not a word. |
|
|
If the final answer is a string, start with an uppercase character. |
|
|
If the final answer is a comma-separated list of numbers, use a space character after each comma. |
|
|
If the final answer is a comma-separated list of strings, use a space character after each comma and start with a lowercase character. |
|
|
Do not add any content to the final answer that is not in the initial answer. |
|
|
**Question:** """ + question + """ |
|
|
|
|
|
**Initial answer:** """ + answer + """ |
|
|
|
|
|
**Example 1:** What is the biggest city in California? Los Angeles |
|
|
**Example 2:** How many 'r's are in strawberry? 3 |
|
|
**Example 3:** What is the opposite of black? White |
|
|
**Example 4:** What are the first 5 numbers in the Fibonacci sequence? 0, 1, 1, 2, 3 |
|
|
**Example 5:** What is the opposite of bad, worse, worst? good, better, best |
|
|
|
|
|
**Final answer:** |
|
|
""" |
|
|
|
|
|
client = genai.Client(api_key=os.environ["GEMINI_API_KEY"]) |
|
|
|
|
|
response = client.models.generate_content( |
|
|
model=model, |
|
|
contents=[prompt_template] |
|
|
) |
|
|
|
|
|
return response.text |