Spaces:
Paused
Paused
| # References: | |
| # https://docs.crewai.com/introduction | |
| # https://ai.google.dev/gemini-api/docs | |
| import os | |
| from crewai import Agent, Crew, Task | |
| from crewai.agents.agent_builder.base_agent import BaseAgent | |
| from crewai.project import CrewBase, agent, crew, task | |
| from google import genai | |
| from openinference.instrumentation.crewai import CrewAIInstrumentor | |
| from phoenix.otel import register | |
| from tools.ai_tools import AITools | |
| from tools.arithmetic_tools import ArithmeticTools | |
| from typing import List | |
| from utils import read_file_json, is_ext | |
| ## LLMs | |
| MANAGER_MODEL = "gpt-4.5-preview" | |
| AGENT_MODEL = "gpt-4.1-mini" | |
| FINAL_ANSWER_MODEL = "gemini-2.5-pro-preview-03-25" | |
| # LLM evaluation | |
| PHOENIX_API_KEY = os.environ["PHOENIX_API_KEY"] | |
| os.environ["PHOENIX_CLIENT_HEADERS"] = f"api_key={PHOENIX_API_KEY}" | |
| os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = "https://app.phoenix.arize.com" | |
| tracer_provider = register( | |
| auto_instrument=True, | |
| project_name="gaia" | |
| ) | |
| ## Tools | |
| DOCUMENT_TOOLS = [ | |
| AITools.document_analysis_tool, | |
| AITools.summarize_tool, | |
| AITools.translate_tool | |
| ] | |
| MEDIA_TOOLS = [ | |
| AITools.image_analysis_tool, | |
| AITools.audio_analysis_tool, | |
| AITools.video_analysis_tool, | |
| AITools.youtube_analysis_tool | |
| ] | |
| WEB_TOOLS = [ | |
| AITools.web_search_tool, | |
| AITools.web_browser_tool | |
| ] | |
| ARITHMETIC_TOOLS = [ | |
| ArithmeticTools.add, | |
| ArithmeticTools.subtract, | |
| ArithmeticTools.multiply, | |
| ArithmeticTools.divide, | |
| ArithmeticTools.modulus | |
| ] | |
| CODE_TOOLS = [ | |
| AITools.code_generation_tool, | |
| AITools.code_execution_tool | |
| ] | |
| #Get specific tools | |
| def get_tools_for(agent_name: str): | |
| if "document" in agent_name or "translation" in agent_name or "summarization" in agent_name: | |
| return DOCUMENT_TOOLS | |
| elif any(keyword in agent_name for keyword in ["image", "audio", "video", "youtube"]): | |
| return MEDIA_TOOLS | |
| elif "web_search" in agent_name or "web_browser" in agent_name: | |
| return WEB_TOOLS | |
| elif "code_generation" in agent_name or "code_execution" in agent_name: | |
| return CODE_TOOLS | |
| elif "arithmetic" in agent_name: | |
| return ARITHMETIC_TOOLS | |
| elif "manager" in agent_name: | |
| return [] | |
| else: | |
| return [] | |
| #CrewAIInstrumentor().instrument(tracer_provider=tracer_provider) | |
| #@CrewBase | |
| class GAIACrew(): | |
| tasks: List[Task] | |
| def __init__(self): | |
| self.agents_config = self._load_yaml("config/agents.yaml") | |
| self.tasks_config = self._load_yaml("config/tasks.yaml") | |
| def _load_yaml(self, path): | |
| import yaml | |
| with open(path, "r") as f: | |
| return yaml.safe_load(f) | |
| def agents(self) -> List[Agent]: | |
| agents = [] | |
| for name in self.agents_config: | |
| config = self.agents_config[name] | |
| if config is None: | |
| print(f"❌ Agent config for '{name}' is None!") | |
| continue | |
| full_config = {**config, "name": name} | |
| print(f"✅ Creating agent: {name}") | |
| agents.append(Agent( | |
| config=full_config, | |
| allow_delegation=("manager" in name), | |
| llm=MANAGER_MODEL if "manager" in name else AGENT_MODEL, | |
| max_iter=5 if "manager" in name else 2, | |
| tools=get_tools_for(name), | |
| verbose=True | |
| )) | |
| return agents | |
| def manager_task(self) -> Task: | |
| # Build the Task object from your YAML | |
| task = Task(config=self.tasks_config["manager_task"]) | |
| # Find the Agent instance whose YAML key is "manager_agent" | |
| agent_list = self.agents | |
| name_list = list(self.agents_config.keys()) | |
| for idx, name in enumerate(name_list): | |
| if name == "manager_agent": | |
| task.agent = agent_list[idx] | |
| break | |
| return task | |
| def get_crew(self) -> Crew: | |
| return Crew( | |
| agents=self.agents, | |
| tasks=[self.manager_task()], | |
| verbose=True | |
| ) | |
| import concurrent.futures | |
| def run_parallel_crew(question: str, file_path: str): | |
| """ | |
| 1) Prepares the prompt (including file data if any). | |
| 2) Runs every non-manager agent in parallel on that prompt. | |
| 3) Gathers their raw outputs. | |
| 4) Sends a combined prompt to the manager_agent for the final answer. | |
| """ | |
| # 1) Build the final prompt | |
| final_question = question | |
| if file_path: | |
| if is_ext(file_path, ".csv") or is_ext(file_path, ".xls") \ | |
| or is_ext(file_path, ".xlsx") or is_ext(file_path, ".json") \ | |
| or is_ext(file_path, ".jsonl"): | |
| json_data = read_file_json(file_path) | |
| final_question = f"{question} JSON data:\n{json_data}." | |
| else: | |
| final_question = f"{question} File path: {file_path}." | |
| # 2) Instantiate your crew and split manager vs workers | |
| crew_instance = GAIACrew() | |
| all_agents = crew_instance.agents | |
| workers = [a for a in all_agents if a.config.get("name") != "manager_agent"] | |
| manager = next(a for a in all_agents if a.config.get("name") == "manager_agent") | |
| # 3) Run workers in parallel | |
| inputs = {"question": final_question} | |
| results = {} | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=len(workers)) as pool: | |
| futures = { | |
| pool.submit(lambda ag: ag.kickoff(inputs), ag): ag.config["name"] | |
| for ag in workers | |
| } | |
| for fut in concurrent.futures.as_completed(futures): | |
| name = futures[fut] | |
| try: | |
| results[name] = fut.result() | |
| except Exception as e: | |
| results[name] = f"<error: {e}>" | |
| # 4) Compose a manager prompt that includes all worker outputs | |
| combined = "\n\n".join(f"--- {name} output ---\n{out}" | |
| for name, out in results.items()) | |
| manager_prompt = ( | |
| f"You have received these reports from your coworkers:\n\n" | |
| f"{combined}\n\n" | |
| f"Now, based on the original question, provide the final answer.\n" | |
| f"Original question: {question}" | |
| ) | |
| # 5) Run the manager agent for the final answer | |
| final = manager.kickoff(inputs={"question": manager_prompt}) | |
| return get_final_answer(FINAL_ANSWER_MODEL, question, str(final)) | |
| def get_final_answer(model, question, answer): | |
| prompt_template = """ | |
| You are an expert question answering assistant. Given a question and an initial answer, your task is to provide the final answer. | |
| Your final answer must be a number and/or string OR as few words as possible OR a comma-separated list of numbers and/or strings. | |
| If you are asked for a number, don't use comma to write your number neither use units such as USD, $, percent, or % unless specified otherwise. | |
| If you are asked for a string, don't use articles, neither abbreviations (for example cities), and write the digits in plain text unless specified otherwise. | |
| If you are asked for a comma-separated list, apply the above rules depending of whether the element to be put in the list is a number or a string. | |
| If the final answer is a number, use a number not a word. | |
| If the final answer is a string, start with an uppercase character. | |
| If the final answer is a comma-separated list of numbers, use a space character after each comma. | |
| If the final answer is a comma-separated list of strings, use a space character after each comma and start with a lowercase character. | |
| Do not add any content to the final answer that is not in the initial answer. | |
| **Question:** """ + question + """ | |
| **Initial answer:** """ + answer + """ | |
| **Example 1:** What is the biggest city in California? Los Angeles | |
| **Example 2:** How many 'r's are in strawberry? 3 | |
| **Example 3:** What is the opposite of black? White | |
| **Example 4:** What are the first 5 numbers in the Fibonacci sequence? 0, 1, 1, 2, 3 | |
| **Example 5:** What is the opposite of bad, worse, worst? good, better, best | |
| **Final answer:** | |
| """ | |
| client = genai.Client(api_key=os.environ["GEMINI_API_KEY"]) | |
| response = client.models.generate_content( | |
| model=model, | |
| contents=[prompt_template] | |
| ) | |
| return response.text |