Spaces:
Paused
Paused
| # References: | |
| # https://docs.crewai.com/introduction | |
| # https://ai.google.dev/gemini-api/docs | |
| import os | |
| from crewai import Agent, Crew, Task | |
| from crewai.agents.agent_builder.base_agent import BaseAgent | |
| from crewai.project import CrewBase, agent, crew, task | |
| from google import genai | |
| from openinference.instrumentation.crewai import CrewAIInstrumentor | |
| from phoenix.otel import register | |
| from tools.ai_tools import AITools | |
| from tools.arithmetic_tools import ArithmeticTools | |
| from typing import List | |
| from utils import read_file_json, is_ext | |
| ## LLMs | |
| MANAGER_MODEL = "gpt-4.5-preview" | |
| AGENT_MODEL = "gpt-4.1-mini" | |
| FINAL_ANSWER_MODEL = "gemini-2.5-pro-preview-03-25" | |
| # LLM evaluation | |
| PHOENIX_API_KEY = os.environ["PHOENIX_API_KEY"] | |
| os.environ["PHOENIX_CLIENT_HEADERS"] = f"api_key={PHOENIX_API_KEY}" | |
| os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = "https://app.phoenix.arize.com" | |
| tracer_provider = register( | |
| auto_instrument=True, | |
| project_name="gaia" | |
| ) | |
| ## Tools | |
| DOCUMENT_TOOLS = [ | |
| AITools.document_analysis_tool, | |
| AITools.summarize_tool, | |
| AITools.translate_tool | |
| ] | |
| MEDIA_TOOLS = [ | |
| AITools.image_analysis_tool, | |
| AITools.audio_analysis_tool, | |
| AITools.video_analysis_tool, | |
| AITools.youtube_analysis_tool | |
| ] | |
| WEB_TOOLS = [ | |
| AITools.web_search_tool, | |
| AITools.web_browser_tool | |
| ] | |
| ARITHMETIC_TOOLS = [ | |
| ArithmeticTools.add, | |
| ArithmeticTools.subtract, | |
| ArithmeticTools.multiply, | |
| ArithmeticTools.divide, | |
| ArithmeticTools.modulus | |
| ] | |
| CODE_TOOLS = [ | |
| AITools.code_generation_tool, | |
| AITools.code_execution_tool | |
| ] | |
| #Get specific tools | |
| def get_tools_for(agent_name: str): | |
| if "document" in agent_name or "translation" in agent_name or "summarization" in agent_name: | |
| return DOCUMENT_TOOLS | |
| elif any(keyword in agent_name for keyword in ["image", "audio", "video", "youtube"]): | |
| return MEDIA_TOOLS | |
| elif "web_search" in agent_name or "web_browser" in agent_name: | |
| return WEB_TOOLS | |
| elif "code_generation" in agent_name or "code_execution" in agent_name: | |
| return CODE_TOOLS | |
| elif "arithmetic" in agent_name: | |
| return ARITHMETIC_TOOLS | |
| elif "manager" in agent_name: | |
| return [] | |
| else: | |
| return [] | |
| #CrewAIInstrumentor().instrument(tracer_provider=tracer_provider) | |
| #@CrewBase | |
| class GAIACrew(): | |
| tasks: List[Task] | |
| def __init__(self): | |
| self.agents_config = self._load_yaml("config/agents.yaml") | |
| self.tasks_config = self._load_yaml("config/tasks.yaml") | |
| def _load_yaml(self, path): | |
| import yaml | |
| with open(path, "r") as f: | |
| return yaml.safe_load(f) | |
| def agents(self) -> List[Agent]: | |
| agents = [] | |
| for name in self.agents_config: | |
| config = self.agents_config[name] | |
| if config is None: | |
| print(f"❌ Agent config for '{name}' is None!") | |
| continue | |
| full_config = {**config, "name": name} | |
| print(f"✅ Creating agent: {name}") | |
| agents.append(Agent( | |
| config=full_config, | |
| allow_delegation=("manager" in name), | |
| llm=MANAGER_MODEL if "manager" in name else AGENT_MODEL, | |
| max_iter=5 if "manager" in name else 2, | |
| tools=get_tools_for(name), | |
| verbose=True | |
| )) | |
| return agents | |
| def manager_task(self) -> Task: | |
| # Build the Task object from your YAML | |
| task = Task(config=self.tasks_config["manager_task"]) | |
| # Find the Agent instance whose YAML key is "manager_agent" | |
| agent_list = self.agents | |
| name_list = list(self.agents_config.keys()) | |
| for idx, name in enumerate(name_list): | |
| if name == "manager_agent": | |
| task.agent = agent_list[idx] | |
| break | |
| return task | |
| def get_crew(self) -> Crew: | |
| return Crew( | |
| agents=self.agents, | |
| tasks=[self.manager_task()], | |
| verbose=True | |
| ) | |
| def run_crew(question, file_path): | |
| """ | |
| Orchestrates the GAIA crew to answer a question, optionally with a file. | |
| Args: | |
| question (str): The user's question. | |
| file_path (str): Optional path to a data file to include in the prompt. | |
| Returns: | |
| str: The final answer from the manager agent. | |
| """ | |
| # Build the final prompt, including file JSON if needed | |
| final_question = question | |
| if file_path: | |
| if is_ext(file_path, ".csv") or is_ext(file_path, ".xls") \ | |
| or is_ext(file_path, ".xlsx") or is_ext(file_path, ".json") \ | |
| or is_ext(file_path, ".jsonl"): | |
| json_data = read_file_json(file_path) | |
| final_question = f"{question} JSON data:\n{json_data}." | |
| else: | |
| final_question = f"{question} File path: {file_path}." | |
| # Instantiate the crew and kick off the workflow | |
| crew_instance = GAIACrew() | |
| crew = crew_instance.get_crew() | |
| answer = crew.kickoff(inputs={"question": final_question}) | |
| # Post-process through the final-answer model | |
| final_answer = get_final_answer(FINAL_ANSWER_MODEL, question, str(answer)) | |
| # Debug logging | |
| print(f"=> Initial question: {question}") | |
| print(f"=> Final question: {final_question}") | |
| print(f"=> Initial answer: {answer}") | |
| print(f"=> Final answer: {final_answer}") | |
| return final_answer | |
| import concurrent.futures | |
| def run_parallel_crew(question: str, file_path: str): | |
| """ | |
| 1) Prepares the prompt (including file data if any). | |
| 2) Runs every non-manager agent in parallel on that prompt. | |
| 3) Gathers their raw outputs. | |
| 4) Sends a combined prompt to the manager_agent for the final answer. | |
| """ | |
| # 1) Build the final prompt | |
| final_question = question | |
| if file_path: | |
| if is_ext(file_path, ".csv") or is_ext(file_path, ".xls") \ | |
| or is_ext(file_path, ".xlsx") or is_ext(file_path, ".json") \ | |
| or is_ext(file_path, ".jsonl"): | |
| json_data = read_file_json(file_path) | |
| final_question = f"{question} JSON data:\n{json_data}." | |
| else: | |
| final_question = f"{question} File path: {file_path}." | |
| # 2) Instantiate your crew and split manager vs workers | |
| crew_instance = GAIACrew() | |
| names = list(crew_instance.agents_config.keys()) | |
| agents = crew_instance.agents | |
| pairs = list(zip(names, agents)) | |
| workers = [(n, a) for n, a in pairs if n != "manager_agent"] | |
| manager_name, manager = next((n, a) for n, a in pairs if n == "manager_agent") | |
| # 3) Run workers in parallel, giving each the plain-string prompt | |
| results = {} | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=len(workers)) as pool: | |
| future_to_name = { | |
| pool.submit(agent.kickoff, final_question): name | |
| for name, agent in workers | |
| } | |
| for fut in concurrent.futures.as_completed(future_to_name): | |
| name = future_to_name[fut] | |
| try: | |
| results[name] = fut.result() | |
| except Exception as e: | |
| results[name] = f"<error: {e}>" | |
| # 4) Compose a manager prompt with all the raw outputs | |
| combined = "\n\n".join(f"--- {n} output ---\n{out}" | |
| for n, out in results.items()) | |
| manager_prompt = ( | |
| f"You have received these reports from your coworkers:\n\n" | |
| f"{combined}\n\n" | |
| f"Now, based on the original question, provide the final answer.\n" | |
| f"Original question: {question}" | |
| ) | |
| # 5) Run the manager agent on the combined prompt | |
| final = manager.kickoff(manager_prompt) | |
| # 6) Post-process via your final-answer model | |
| return get_final_answer(FINAL_ANSWER_MODEL, question, str(final)) | |
| def get_final_answer(model, question, answer): | |
| prompt_template = """ | |
| You are an expert question answering assistant. Given a question and an initial answer, your task is to provide the final answer. | |
| Your final answer must be a number and/or string OR as few words as possible OR a comma-separated list of numbers and/or strings. | |
| If you are asked for a number, don't use comma to write your number neither use units such as USD, $, percent, or % unless specified otherwise. | |
| If you are asked for a string, don't use articles, neither abbreviations (for example cities), and write the digits in plain text unless specified otherwise. | |
| If you are asked for a comma-separated list, apply the above rules depending of whether the element to be put in the list is a number or a string. | |
| If the final answer is a number, use a number not a word. | |
| If the final answer is a string, start with an uppercase character. | |
| If the final answer is a comma-separated list of numbers, use a space character after each comma. | |
| If the final answer is a comma-separated list of strings, use a space character after each comma and start with a lowercase character. | |
| Do not add any content to the final answer that is not in the initial answer. | |
| **Question:** """ + question + """ | |
| **Initial answer:** """ + answer + """ | |
| **Example 1:** What is the biggest city in California? Los Angeles | |
| **Example 2:** How many 'r's are in strawberry? 3 | |
| **Example 3:** What is the opposite of black? White | |
| **Example 4:** What are the first 5 numbers in the Fibonacci sequence? 0, 1, 1, 2, 3 | |
| **Example 5:** What is the opposite of bad, worse, worst? good, better, best | |
| **Final answer:** | |
| """ | |
| client = genai.Client(api_key=os.environ["GEMINI_API_KEY"]) | |
| response = client.models.generate_content( | |
| model=model, | |
| contents=[prompt_template] | |
| ) | |
| return response.text |