# References: # https://docs.crewai.com/introduction # https://ai.google.dev/gemini-api/docs import os from crewai import Agent, Crew, Task from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.project import CrewBase, agent, crew, task from google import genai from openinference.instrumentation.crewai import CrewAIInstrumentor from phoenix.otel import register from tools.ai_tools import AITools from tools.arithmetic_tools import ArithmeticTools from typing import List from utils import read_file_json, is_ext ## LLMs MANAGER_MODEL = "gpt-4.5-preview" AGENT_MODEL = "gpt-4.1-mini" FINAL_ANSWER_MODEL = "gemini-2.5-pro-preview-03-25" # LLM evaluation PHOENIX_API_KEY = os.environ["PHOENIX_API_KEY"] os.environ["PHOENIX_CLIENT_HEADERS"] = f"api_key={PHOENIX_API_KEY}" os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = "https://app.phoenix.arize.com" tracer_provider = register( auto_instrument=True, project_name="gaia" ) ## Tools DOCUMENT_TOOLS = [ AITools.document_analysis_tool, AITools.summarize_tool, AITools.translate_tool ] MEDIA_TOOLS = [ AITools.image_analysis_tool, AITools.audio_analysis_tool, AITools.video_analysis_tool, AITools.youtube_analysis_tool ] WEB_TOOLS = [ AITools.web_search_tool, AITools.web_browser_tool ] ARITHMETIC_TOOLS = [ ArithmeticTools.add, ArithmeticTools.subtract, ArithmeticTools.multiply, ArithmeticTools.divide, ArithmeticTools.modulus ] CODE_TOOLS = [ AITools.code_generation_tool, AITools.code_execution_tool ] #Get specific tools def get_tools_for(agent_name: str): if "document" in agent_name or "translation" in agent_name or "summarization" in agent_name: return DOCUMENT_TOOLS elif any(keyword in agent_name for keyword in ["image", "audio", "video", "youtube"]): return MEDIA_TOOLS elif "web_search" in agent_name or "web_browser" in agent_name: return WEB_TOOLS elif "code_generation" in agent_name or "code_execution" in agent_name: return CODE_TOOLS elif "arithmetic" in agent_name: return ARITHMETIC_TOOLS elif "manager" in agent_name: return [] else: return [] #CrewAIInstrumentor().instrument(tracer_provider=tracer_provider) #@CrewBase class GAIACrew(): tasks: List[Task] def __init__(self): self.agents_config = self._load_yaml("config/agents.yaml") self.tasks_config = self._load_yaml("config/tasks.yaml") def _load_yaml(self, path): import yaml with open(path, "r") as f: return yaml.safe_load(f) @property def agents(self) -> List[Agent]: agents = [] for name in self.agents_config: config = self.agents_config[name] if config is None: print(f"❌ Agent config for '{name}' is None!") continue full_config = {**config, "name": name} print(f"✅ Creating agent: {name}") agents.append(Agent( config=full_config, allow_delegation=("manager" in name), llm=MANAGER_MODEL if "manager" in name else AGENT_MODEL, max_iter=5 if "manager" in name else 2, tools=get_tools_for(name), verbose=True )) return agents @task def manager_task(self) -> Task: # Build the Task object from your YAML task = Task(config=self.tasks_config["manager_task"]) # Find the Agent instance whose YAML key is "manager_agent" agent_list = self.agents name_list = list(self.agents_config.keys()) for idx, name in enumerate(name_list): if name == "manager_agent": task.agent = agent_list[idx] break return task def get_crew(self) -> Crew: return Crew( agents=self.agents, tasks=[self.manager_task()], verbose=True ) def run_crew(question, file_path): """ Orchestrates the GAIA crew to answer a question, optionally with a file. Args: question (str): The user's question. file_path (str): Optional path to a data file to include in the prompt. Returns: str: The final answer from the manager agent. """ # Build the final prompt, including file JSON if needed final_question = question if file_path: if is_ext(file_path, ".csv") or is_ext(file_path, ".xls") \ or is_ext(file_path, ".xlsx") or is_ext(file_path, ".json") \ or is_ext(file_path, ".jsonl"): json_data = read_file_json(file_path) final_question = f"{question} JSON data:\n{json_data}." else: final_question = f"{question} File path: {file_path}." # Instantiate the crew and kick off the workflow crew_instance = GAIACrew() crew = crew_instance.get_crew() answer = crew.kickoff(inputs={"question": final_question}) # Post-process through the final-answer model final_answer = get_final_answer(FINAL_ANSWER_MODEL, question, str(answer)) # Debug logging print(f"=> Initial question: {question}") print(f"=> Final question: {final_question}") print(f"=> Initial answer: {answer}") print(f"=> Final answer: {final_answer}") return final_answer import concurrent.futures def run_parallel_crew(question: str, file_path: str): """ 1) Prepares the prompt (including file data if any). 2) Runs every non-manager agent in parallel on that prompt. 3) Gathers their raw outputs. 4) Sends a combined prompt to the manager_agent for the final answer. """ # 1) Build the final prompt final_question = question if file_path: if is_ext(file_path, ".csv") or is_ext(file_path, ".xls") \ or is_ext(file_path, ".xlsx") or is_ext(file_path, ".json") \ or is_ext(file_path, ".jsonl"): json_data = read_file_json(file_path) final_question = f"{question} JSON data:\n{json_data}." else: final_question = f"{question} File path: {file_path}." # 2) Instantiate your crew and split manager vs workers crew_instance = GAIACrew() names = list(crew_instance.agents_config.keys()) agents = crew_instance.agents pairs = list(zip(names, agents)) workers = [(n, a) for n, a in pairs if n != "manager_agent"] manager_name, manager = next((n, a) for n, a in pairs if n == "manager_agent") # 3) Run workers in parallel, giving each the plain-string prompt results = {} with concurrent.futures.ThreadPoolExecutor(max_workers=len(workers)) as pool: future_to_name = { pool.submit(agent.kickoff, final_question): name for name, agent in workers } for fut in concurrent.futures.as_completed(future_to_name): name = future_to_name[fut] try: results[name] = fut.result() except Exception as e: results[name] = f"" # 4) Compose a manager prompt with all the raw outputs combined = "\n\n".join(f"--- {n} output ---\n{out}" for n, out in results.items()) manager_prompt = ( f"You have received these reports from your coworkers:\n\n" f"{combined}\n\n" f"Now, based on the original question, provide the final answer.\n" f"Original question: {question}" ) # 5) Run the manager agent on the combined prompt final = manager.kickoff(manager_prompt) # 6) Post-process via your final-answer model return get_final_answer(FINAL_ANSWER_MODEL, question, str(final)) def get_final_answer(model, question, answer): prompt_template = """ You are an expert question answering assistant. Given a question and an initial answer, your task is to provide the final answer. Your final answer must be a number and/or string OR as few words as possible OR a comma-separated list of numbers and/or strings. If you are asked for a number, don't use comma to write your number neither use units such as USD, $, percent, or % unless specified otherwise. If you are asked for a string, don't use articles, neither abbreviations (for example cities), and write the digits in plain text unless specified otherwise. If you are asked for a comma-separated list, apply the above rules depending of whether the element to be put in the list is a number or a string. If the final answer is a number, use a number not a word. If the final answer is a string, start with an uppercase character. If the final answer is a comma-separated list of numbers, use a space character after each comma. If the final answer is a comma-separated list of strings, use a space character after each comma and start with a lowercase character. Do not add any content to the final answer that is not in the initial answer. **Question:** """ + question + """ **Initial answer:** """ + answer + """ **Example 1:** What is the biggest city in California? Los Angeles **Example 2:** How many 'r's are in strawberry? 3 **Example 3:** What is the opposite of black? White **Example 4:** What are the first 5 numbers in the Fibonacci sequence? 0, 1, 1, 2, 3 **Example 5:** What is the opposite of bad, worse, worst? good, better, best **Final answer:** """ client = genai.Client(api_key=os.environ["GEMINI_API_KEY"]) response = client.models.generate_content( model=model, contents=[prompt_template] ) return response.text