Psiska's picture
Testing
a410403
# References:
# https://docs.crewai.com/introduction
# https://ai.google.dev/gemini-api/docs
import os
from crewai import Agent, Crew, Task
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.project import CrewBase, agent, crew, task
from google import genai
from openinference.instrumentation.crewai import CrewAIInstrumentor
from phoenix.otel import register
from tools.ai_tools import AITools
from tools.arithmetic_tools import ArithmeticTools
from typing import List
from utils import read_file_json, is_ext
## LLMs
MANAGER_MODEL = "gpt-4.5-preview"
AGENT_MODEL = "gpt-4.1-mini"
FINAL_ANSWER_MODEL = "gemini-2.5-pro-preview-03-25"
# LLM evaluation
PHOENIX_API_KEY = os.environ["PHOENIX_API_KEY"]
os.environ["PHOENIX_CLIENT_HEADERS"] = f"api_key={PHOENIX_API_KEY}"
os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = "https://app.phoenix.arize.com"
tracer_provider = register(
auto_instrument=True,
project_name="gaia"
)
## Tools
DOCUMENT_TOOLS = [
AITools.document_analysis_tool,
AITools.summarize_tool,
AITools.translate_tool
]
MEDIA_TOOLS = [
AITools.image_analysis_tool,
AITools.audio_analysis_tool,
AITools.video_analysis_tool,
AITools.youtube_analysis_tool
]
WEB_TOOLS = [
AITools.web_search_tool,
AITools.web_browser_tool
]
ARITHMETIC_TOOLS = [
ArithmeticTools.add,
ArithmeticTools.subtract,
ArithmeticTools.multiply,
ArithmeticTools.divide,
ArithmeticTools.modulus
]
CODE_TOOLS = [
AITools.code_generation_tool,
AITools.code_execution_tool
]
#Get specific tools
def get_tools_for(agent_name: str):
if "document" in agent_name or "translation" in agent_name or "summarization" in agent_name:
return DOCUMENT_TOOLS
elif any(keyword in agent_name for keyword in ["image", "audio", "video", "youtube"]):
return MEDIA_TOOLS
elif "web_search" in agent_name or "web_browser" in agent_name:
return WEB_TOOLS
elif "code_generation" in agent_name or "code_execution" in agent_name:
return CODE_TOOLS
elif "arithmetic" in agent_name:
return ARITHMETIC_TOOLS
elif "manager" in agent_name:
return []
else:
return []
#CrewAIInstrumentor().instrument(tracer_provider=tracer_provider)
#@CrewBase
class GAIACrew():
tasks: List[Task]
def __init__(self):
self.agents_config = self._load_yaml("config/agents.yaml")
self.tasks_config = self._load_yaml("config/tasks.yaml")
def _load_yaml(self, path):
import yaml
with open(path, "r") as f:
return yaml.safe_load(f)
@property
def agents(self) -> List[Agent]:
agents = []
for name in self.agents_config:
config = self.agents_config[name]
if config is None:
print(f"❌ Agent config for '{name}' is None!")
continue
full_config = {**config, "name": name}
print(f"✅ Creating agent: {name}")
agents.append(Agent(
config=full_config,
allow_delegation=("manager" in name),
llm=MANAGER_MODEL if "manager" in name else AGENT_MODEL,
max_iter=5 if "manager" in name else 2,
tools=get_tools_for(name),
verbose=True
))
return agents
@task
def manager_task(self) -> Task:
# Build the Task object from your YAML
task = Task(config=self.tasks_config["manager_task"])
# Find the Agent instance whose YAML key is "manager_agent"
agent_list = self.agents
name_list = list(self.agents_config.keys())
for idx, name in enumerate(name_list):
if name == "manager_agent":
task.agent = agent_list[idx]
break
return task
def get_crew(self) -> Crew:
return Crew(
agents=self.agents,
tasks=[self.manager_task()],
verbose=True
)
def run_crew(question, file_path):
"""
Orchestrates the GAIA crew to answer a question, optionally with a file.
Args:
question (str): The user's question.
file_path (str): Optional path to a data file to include in the prompt.
Returns:
str: The final answer from the manager agent.
"""
# Build the final prompt, including file JSON if needed
final_question = question
if file_path:
if is_ext(file_path, ".csv") or is_ext(file_path, ".xls") \
or is_ext(file_path, ".xlsx") or is_ext(file_path, ".json") \
or is_ext(file_path, ".jsonl"):
json_data = read_file_json(file_path)
final_question = f"{question} JSON data:\n{json_data}."
else:
final_question = f"{question} File path: {file_path}."
# Instantiate the crew and kick off the workflow
crew_instance = GAIACrew()
crew = crew_instance.get_crew()
answer = crew.kickoff(inputs={"question": final_question})
# Post-process through the final-answer model
final_answer = get_final_answer(FINAL_ANSWER_MODEL, question, str(answer))
# Debug logging
print(f"=> Initial question: {question}")
print(f"=> Final question: {final_question}")
print(f"=> Initial answer: {answer}")
print(f"=> Final answer: {final_answer}")
return final_answer
import concurrent.futures
def run_parallel_crew(question: str, file_path: str):
"""
1) Prepares the prompt (including file data if any).
2) Runs every non-manager agent in parallel on that prompt.
3) Gathers their raw outputs.
4) Sends a combined prompt to the manager_agent for the final answer.
"""
# 1) Build the final prompt
final_question = question
if file_path:
if is_ext(file_path, ".csv") or is_ext(file_path, ".xls") \
or is_ext(file_path, ".xlsx") or is_ext(file_path, ".json") \
or is_ext(file_path, ".jsonl"):
json_data = read_file_json(file_path)
final_question = f"{question} JSON data:\n{json_data}."
else:
final_question = f"{question} File path: {file_path}."
# 2) Instantiate your crew and split manager vs workers
crew_instance = GAIACrew()
names = list(crew_instance.agents_config.keys())
agents = crew_instance.agents
pairs = list(zip(names, agents))
workers = [(n, a) for n, a in pairs if n != "manager_agent"]
manager_name, manager = next((n, a) for n, a in pairs if n == "manager_agent")
# 3) Run workers in parallel, giving each the plain-string prompt
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=len(workers)) as pool:
future_to_name = {
pool.submit(agent.kickoff, final_question): name
for name, agent in workers
}
for fut in concurrent.futures.as_completed(future_to_name):
name = future_to_name[fut]
try:
results[name] = fut.result()
except Exception as e:
results[name] = f"<error: {e}>"
# 4) Compose a manager prompt with all the raw outputs
combined = "\n\n".join(f"--- {n} output ---\n{out}"
for n, out in results.items())
manager_prompt = (
f"You have received these reports from your coworkers:\n\n"
f"{combined}\n\n"
f"Now, based on the original question, provide the final answer.\n"
f"Original question: {question}"
)
# 5) Run the manager agent on the combined prompt
final = manager.kickoff(manager_prompt)
# 6) Post-process via your final-answer model
return get_final_answer(FINAL_ANSWER_MODEL, question, str(final))
def get_final_answer(model, question, answer):
prompt_template = """
You are an expert question answering assistant. Given a question and an initial answer, your task is to provide the final answer.
Your final answer must be a number and/or string OR as few words as possible OR a comma-separated list of numbers and/or strings.
If you are asked for a number, don't use comma to write your number neither use units such as USD, $, percent, or % unless specified otherwise.
If you are asked for a string, don't use articles, neither abbreviations (for example cities), and write the digits in plain text unless specified otherwise.
If you are asked for a comma-separated list, apply the above rules depending of whether the element to be put in the list is a number or a string.
If the final answer is a number, use a number not a word.
If the final answer is a string, start with an uppercase character.
If the final answer is a comma-separated list of numbers, use a space character after each comma.
If the final answer is a comma-separated list of strings, use a space character after each comma and start with a lowercase character.
Do not add any content to the final answer that is not in the initial answer.
**Question:** """ + question + """
**Initial answer:** """ + answer + """
**Example 1:** What is the biggest city in California? Los Angeles
**Example 2:** How many 'r's are in strawberry? 3
**Example 3:** What is the opposite of black? White
**Example 4:** What are the first 5 numbers in the Fibonacci sequence? 0, 1, 1, 2, 3
**Example 5:** What is the opposite of bad, worse, worst? good, better, best
**Final answer:**
"""
client = genai.Client(api_key=os.environ["GEMINI_API_KEY"])
response = client.models.generate_content(
model=model,
contents=[prompt_template]
)
return response.text