agentbee / src /agent /llm_client.py
mangubee's picture
fix: correct author name formatting in multiple files
e7b4937
"""
LLM Client Module - Multi-Provider LLM Integration
Author: @mangubee
Date: 2026-01-02
Handles all LLM calls for:
- Planning (question analysis and execution plan generation)
- Tool selection (function calling)
- Answer synthesis (factoid answer generation from evidence)
- Conflict resolution (evaluating contradictory information)
Based on Level 5 decision: Gemini 2.0 Flash (primary/free) + Claude Sonnet 4.5 (fallback/paid)
Based on Level 6 decision: LLM function calling for tool selection
Pattern: Matches Stage 2 tools (Gemini primary, Claude fallback)
"""
import os
import logging
import time
import datetime
from pathlib import Path
from typing import List, Dict, Optional, Any, Callable
from anthropic import Anthropic
import google.generativeai as genai
from huggingface_hub import InferenceClient
from groq import Groq
# ============================================================================
# CONFIG
# ============================================================================
# Claude Configuration
CLAUDE_MODEL = "claude-sonnet-4-5-20250929"
# Gemini Configuration
GEMINI_MODEL = "gemini-2.0-flash-exp"
# HuggingFace Configuration
HF_MODEL = "openai/gpt-oss-120b:scaleway" # OpenAI's 120B open source model, strong reasoning
# Previous: "meta-llama/Llama-3.3-70B-Instruct:scaleway" (failed synthesis)
# Previous: "Qwen/Qwen2.5-72B-Instruct" (weaker at handling transcription errors)
# Groq Configuration
GROQ_MODEL = "openai/gpt-oss-120b"
# Alternatives: "llama-3.1-8b-instant", "mixtral-8x7b-32768"
# Shared Configuration
TEMPERATURE = 0 # Deterministic for factoid answers
MAX_TOKENS = 4096
# LLM Provider Selection (removed - now read at runtime for UI flexibility)
# ============================================================================
# Logging Setup
# ============================================================================
logger = logging.getLogger(__name__)
# ============================================================================
# Session Log File Management (Single file per evaluation run)
# ============================================================================
_SESSION_LOG_FILE = None
_SYSTEM_PROMPT_WRITTEN = False
def get_session_log_file() -> Path:
"""
Get or create the session log file for LLM synthesis context.
Creates a single log file per session (not per question) to avoid polluting
the log/ folder with multiple files. All questions append to this one file.
Returns:
Path: Session log file path
"""
global _SESSION_LOG_FILE
if _SESSION_LOG_FILE is None:
log_dir = Path("_log")
log_dir.mkdir(exist_ok=True)
# Create session filename with timestamp (use .md for Markdown)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
_SESSION_LOG_FILE = log_dir / f"llm_session_{timestamp}.md"
# Write session header in Markdown
with open(_SESSION_LOG_FILE, "w", encoding="utf-8") as f:
f.write("# LLM Synthesis Session Log\n\n")
f.write(f"**Session Start:** {datetime.datetime.now().isoformat()}\n\n")
return _SESSION_LOG_FILE
def reset_session_log():
"""Reset session log file (for testing or new evaluation run)."""
global _SESSION_LOG_FILE, _SYSTEM_PROMPT_WRITTEN
_SESSION_LOG_FILE = None
_SYSTEM_PROMPT_WRITTEN = False
# ============================================================================
# Retry Logic with Exponential Backoff
# ============================================================================
def retry_with_backoff(func: Callable, max_retries: int = 3) -> Any:
"""
Retry function with exponential backoff on quota errors.
Handles:
- 429 rate limit errors
- Quota exceeded errors
- Respects retry_after header if present
Args:
func: Function to retry (should be a lambda or callable with no args)
max_retries: Maximum number of retry attempts (default: 3)
Returns:
Result of successful function call
Raises:
Exception: If all retries exhausted or non-quota error encountered
"""
for attempt in range(max_retries):
try:
return func()
except Exception as e:
error_str = str(e).lower()
# Check if this is a quota/rate limit error
is_quota_error = (
"429" in str(e)
or "quota" in error_str
or "rate limit" in error_str
or "too many requests" in error_str
)
if is_quota_error and attempt < max_retries - 1:
# Exponential backoff: 1s, 2s, 4s
wait_time = 2**attempt
logger.warning(
f"Quota/rate limit error (attempt {attempt + 1}/{max_retries}): {e}. "
f"Retrying in {wait_time}s..."
)
time.sleep(wait_time)
continue
# If not a quota error, or last attempt, raise immediately
raise
# ============================================================================
# LLM Provider Routing
# ============================================================================
def _get_provider_function(function_name: str, provider: str) -> Callable:
"""
Get the provider-specific function for a given operation.
Args:
function_name: Base function name ("plan_question", "select_tools", "synthesize_answer")
provider: Provider name ("gemini", "huggingface", "groq", "claude")
Returns:
Callable: Provider-specific function
Raises:
ValueError: If provider is invalid
"""
# Map function names to provider-specific implementations
function_map = {
"plan_question": {
"gemini": plan_question_gemini,
"huggingface": plan_question_hf,
"groq": plan_question_groq,
"claude": plan_question_claude,
},
"select_tools": {
"gemini": select_tools_gemini,
"huggingface": select_tools_hf,
"groq": select_tools_groq,
"claude": select_tools_claude,
},
"synthesize_answer": {
"gemini": synthesize_answer_gemini,
"huggingface": synthesize_answer_hf,
"groq": synthesize_answer_groq,
"claude": synthesize_answer_claude,
},
}
if function_name not in function_map:
raise ValueError(f"Unknown function name: {function_name}")
if provider not in function_map[function_name]:
raise ValueError(
f"Unknown provider: {provider}. Valid options: gemini, huggingface, groq, claude"
)
return function_map[function_name][provider]
def _call_with_fallback(function_name: str, *args, **kwargs) -> Any:
"""
Call LLM function with configured provider.
NOTE: Fallback mechanism has been archived to reduce complexity.
Only the primary provider is used. If it fails, the error is raised directly.
Args:
function_name: Base function name ("plan_question", "select_tools", "synthesize_answer")
*args, **kwargs: Arguments to pass to the provider-specific function
Returns:
Result from LLM call
Raises:
Exception: If primary provider fails
"""
# Read config at runtime for UI flexibility
primary_provider = os.getenv("LLM_PROVIDER", "gemini").lower()
# ============================================================================
# ARCHIVED: Fallback mechanism removed to reduce complexity
# Original fallback code was at: dev/dev_260112_02_fallback_archived.md
# To restore: Check git history or archived dev file
# ============================================================================
# Try primary provider only (no fallback)
try:
primary_func = _get_provider_function(function_name, primary_provider)
logger.info(f"[{function_name}] Using provider: {primary_provider}")
return retry_with_backoff(lambda: primary_func(*args, **kwargs))
except Exception as primary_error:
logger.error(f"[{function_name}] Provider {primary_provider} failed: {primary_error}")
raise Exception(
f"{function_name} failed with {primary_provider}: {primary_error}"
)
# ============================================================================
# Client Initialization
# ============================================================================
def create_claude_client() -> Anthropic:
"""Initialize Anthropic client with API key from environment."""
api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
raise ValueError("ANTHROPIC_API_KEY environment variable not set")
logger.info(f"Initializing Anthropic client with model: {CLAUDE_MODEL}")
return Anthropic(api_key=api_key)
def create_gemini_client():
"""Initialize Gemini client with API key from environment."""
api_key = os.getenv("GOOGLE_API_KEY")
if not api_key:
raise ValueError("GOOGLE_API_KEY environment variable not set")
genai.configure(api_key=api_key)
logger.info(f"Initializing Gemini client with model: {GEMINI_MODEL}")
return genai.GenerativeModel(GEMINI_MODEL)
def create_hf_client() -> InferenceClient:
"""Initialize HuggingFace Inference API client with token from environment."""
hf_token = os.getenv("HF_TOKEN")
if not hf_token:
raise ValueError("HF_TOKEN environment variable not set")
logger.info(f"Initializing HuggingFace Inference client with model: {HF_MODEL}")
return InferenceClient(model=HF_MODEL, token=hf_token)
def create_groq_client() -> Groq:
"""Initialize Groq client with API key from environment."""
api_key = os.getenv("GROQ_API_KEY")
if not api_key:
raise ValueError("GROQ_API_KEY environment variable not set")
logger.info(f"Initializing Groq client with model: {GROQ_MODEL}")
return Groq(api_key=api_key)
# ============================================================================
# Planning Functions - Claude Implementation
# ============================================================================
def plan_question_claude(
question: str,
available_tools: Dict[str, Dict],
file_paths: Optional[List[str]] = None,
) -> str:
"""Analyze question and generate execution plan using Claude."""
client = create_claude_client()
# Format tool information
tool_descriptions = []
for name, info in available_tools.items():
tool_descriptions.append(
f"- {name}: {info['description']} (Category: {info['category']})"
)
tools_text = "\n".join(tool_descriptions)
# File context
file_context = ""
if file_paths:
file_context = f"\n\nAvailable files:\n" + "\n".join(
[f"- {fp}" for fp in file_paths]
)
# Prompt for planning
system_prompt = """You are a planning agent for answering complex questions.
Your task is to analyze the question and create a step-by-step execution plan.
Consider:
1. What information is needed to answer the question?
2. Which tools can provide that information?
3. In what order should tools be executed?
4. What parameters need to be extracted from the question?
Generate a concise plan with numbered steps."""
user_prompt = f"""Question: {question}{file_context}
Available tools:
{tools_text}
Create an execution plan to answer this question. Format as numbered steps."""
logger.info(f"[plan_question_claude] Calling Claude for planning")
response = client.messages.create(
model=CLAUDE_MODEL,
max_tokens=MAX_TOKENS,
temperature=TEMPERATURE,
system=system_prompt,
messages=[{"role": "user", "content": user_prompt}],
)
plan = response.content[0].text
logger.info(f"[plan_question_claude] Generated plan ({len(plan)} chars)")
return plan
# ============================================================================
# Planning Functions - Gemini Implementation
# ============================================================================
def plan_question_gemini(
question: str,
available_tools: Dict[str, Dict],
file_paths: Optional[List[str]] = None,
) -> str:
"""Analyze question and generate execution plan using Gemini."""
model = create_gemini_client()
# Format tool information
tool_descriptions = []
for name, info in available_tools.items():
tool_descriptions.append(
f"- {name}: {info['description']} (Category: {info['category']})"
)
tools_text = "\n".join(tool_descriptions)
# File context
file_context = ""
if file_paths:
file_context = f"\n\nAvailable files:\n" + "\n".join(
[f"- {fp}" for fp in file_paths]
)
# Combined prompt (Gemini doesn't use separate system/user like Claude)
prompt = f"""You are a planning agent for answering complex questions.
Your task is to analyze the question and create a step-by-step execution plan.
Consider:
1. What information is needed to answer the question?
2. Which tools can provide that information?
3. In what order should tools be executed?
4. What parameters need to be extracted from the question?
Generate a concise plan with numbered steps.
Question: {question}{file_context}
Available tools:
{tools_text}
Create an execution plan to answer this question. Format as numbered steps."""
logger.info(f"[plan_question_gemini] Calling Gemini for planning")
response = model.generate_content(
prompt,
generation_config=genai.types.GenerationConfig(
temperature=TEMPERATURE, max_output_tokens=MAX_TOKENS
),
)
plan = response.text
logger.info(f"[plan_question_gemini] Generated plan ({len(plan)} chars)")
return plan
# ============================================================================
# Planning Functions - HuggingFace Implementation
# ============================================================================
def plan_question_hf(
question: str,
available_tools: Dict[str, Dict],
file_paths: Optional[List[str]] = None,
) -> str:
"""Analyze question and generate execution plan using HuggingFace Inference API."""
client = create_hf_client()
# Format tool information
tool_descriptions = []
for name, info in available_tools.items():
tool_descriptions.append(
f"- {name}: {info['description']} (Category: {info['category']})"
)
tools_text = "\n".join(tool_descriptions)
# File context
file_context = ""
if file_paths:
file_context = f"\n\nAvailable files:\n" + "\n".join(
[f"- {fp}" for fp in file_paths]
)
# System message for Qwen 2.5 (supports system/user format)
system_prompt = """You are a planning agent for answering complex questions.
Your task is to analyze the question and create a step-by-step execution plan.
Consider:
1. What information is needed to answer the question?
2. Which tools can provide that information?
3. In what order should tools be executed?
4. What parameters need to be extracted from the question?
Generate a concise plan with numbered steps."""
user_prompt = f"""Question: {question}{file_context}
Available tools:
{tools_text}
Create an execution plan to answer this question. Format as numbered steps."""
logger.info(f"[plan_question_hf] Calling HuggingFace ({HF_MODEL}) for planning")
# HuggingFace Inference API chat completion
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
response = client.chat_completion(
messages=messages, max_tokens=MAX_TOKENS, temperature=TEMPERATURE
)
plan = response.choices[0].message.content
logger.info(f"[plan_question_hf] Generated plan ({len(plan)} chars)")
return plan
# ============================================================================
# Planning Functions - Groq Implementation
# ============================================================================
def plan_question_groq(
question: str,
available_tools: Dict[str, Dict],
file_paths: Optional[List[str]] = None,
) -> str:
"""Analyze question and generate execution plan using Groq."""
client = create_groq_client()
# Format tool information
tool_descriptions = []
for name, info in available_tools.items():
tool_descriptions.append(
f"- {name}: {info['description']} (Category: {info['category']})"
)
tools_text = "\n".join(tool_descriptions)
# File context
file_context = ""
if file_paths:
file_context = f"\n\nAvailable files:\n" + "\n".join(
[f"- {fp}" for fp in file_paths]
)
# System message for Llama 3.1 (supports system/user format)
system_prompt = """You are a planning agent for answering complex questions.
Your task is to analyze the question and create a step-by-step execution plan.
Consider:
1. What information is needed to answer the question?
2. Which tools can provide that information?
3. In what order should tools be executed?
4. What parameters need to be extracted from the question?
Generate a concise plan with numbered steps."""
user_prompt = f"""Question: {question}{file_context}
Available tools:
{tools_text}
Create an execution plan to answer this question. Format as numbered steps."""
logger.info(f"[plan_question_groq] Calling Groq ({GROQ_MODEL}) for planning")
# Groq uses OpenAI-compatible API
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
response = client.chat.completions.create(
model=GROQ_MODEL,
messages=messages,
max_tokens=MAX_TOKENS,
temperature=TEMPERATURE,
)
plan = response.choices[0].message.content
logger.info(f"[plan_question_groq] Generated plan ({len(plan)} chars)")
return plan
# ============================================================================
# Unified Planning Function with Fallback Chain
# ============================================================================
def plan_question(
question: str,
available_tools: Dict[str, Dict],
file_paths: Optional[List[str]] = None,
) -> str:
"""
Analyze question and generate execution plan using LLM.
Uses LLM_PROVIDER config to select which provider to use.
If ENABLE_LLM_FALLBACK=true, falls back to other providers on failure.
Each provider call wrapped with retry logic (3 attempts with exponential backoff).
Args:
question: GAIA question text
available_tools: Tool registry (name -> {description, category, parameters})
file_paths: Optional list of file paths for file-based questions
Returns:
Execution plan as structured text
"""
return _call_with_fallback("plan_question", question, available_tools, file_paths)
# ============================================================================
# Tool Selection - Claude Implementation
# ============================================================================
def select_tools_claude(
question: str, plan: str, available_tools: Dict[str, Dict], file_paths: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
"""Use Claude function calling to select tools and extract parameters."""
client = create_claude_client()
# Convert tool registry to Claude function calling format
tool_schemas = []
for name, info in available_tools.items():
tool_schemas.append(
{
"name": name,
"description": info["description"],
"input_schema": {
"type": "object",
"properties": info.get("parameters", {}),
"required": info.get("required_params", []),
},
}
)
# File context for tool selection
file_context = ""
if file_paths:
file_context = f"""
IMPORTANT: These files are available for this question:
{chr(10).join(f"- {fp}" for fp in file_paths)}
When selecting tools, use the ACTUAL file paths listed above. Do NOT use placeholder paths like "<provided_path>" or "path_to_chess_image.jpg".
For vision tools with images: vision(image_path="<actual_file_path>")
For file parsing tools: parse_file(file_path="<actual_file_path>")"""
system_prompt = f"""You are a tool selection expert. Based on the question and execution plan, select appropriate tools with correct parameters.
Few-shot examples:
- "How many albums did The Beatles release?" → web_search(query="Beatles discography number of albums")
- "What is 25 * 37 + 100?" → calculator(expression="25 * 37 + 100")
- "Analyze the image at example.com/pic.jpg" → vision(image_path="example.com/pic.jpg")
- "What's in the uploaded Excel file?" → parse_file(file_path="actual_file.xlsx")
Execute the plan step by step. Extract correct parameters from the question.
Use actual file paths when files are provided.{file_context}
Plan:
{plan}"""
user_prompt = f"""Question: {question}
Select and call the tools needed according to the plan. Use exact parameter names from tool schemas."""
logger.info(
f"[select_tools_claude] Calling Claude with function calling for {len(tool_schemas)} tools"
)
response = client.messages.create(
model=CLAUDE_MODEL,
max_tokens=MAX_TOKENS,
temperature=TEMPERATURE,
system=system_prompt,
messages=[{"role": "user", "content": user_prompt}],
tools=tool_schemas,
)
# Extract tool calls from response
tool_calls = []
for content_block in response.content:
if content_block.type == "tool_use":
tool_calls.append(
{
"tool": content_block.name,
"params": content_block.input,
"id": content_block.id,
}
)
logger.info(f"[select_tools_claude] Claude selected {len(tool_calls)} tool(s)")
return tool_calls
# ============================================================================
# Tool Selection - Gemini Implementation
# ============================================================================
def select_tools_gemini(
question: str, plan: str, available_tools: Dict[str, Dict], file_paths: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
"""Use Gemini function calling to select tools and extract parameters."""
model = create_gemini_client()
# Convert tool registry to Gemini function calling format
tools = []
for name, info in available_tools.items():
tools.append(
genai.protos.Tool(
function_declarations=[
genai.protos.FunctionDeclaration(
name=name,
description=info["description"],
parameters=genai.protos.Schema(
type=genai.protos.Type.OBJECT,
properties={
param_name: genai.protos.Schema(
type=genai.protos.Type.STRING,
description=param_info.get("description", ""),
)
for param_name, param_info in info.get(
"parameters", {}
).items()
},
required=info.get("required_params", []),
),
)
]
)
)
# File context for tool selection
file_context = ""
if file_paths:
file_context = f"""
IMPORTANT: These files are available for this question:
{chr(10).join(f"- {fp}" for fp in file_paths)}
When selecting tools, use the ACTUAL file paths listed above. Do NOT use placeholder paths like "<provided_path>" or "path_to_chess_image.jpg".
For vision tools with images: vision(image_path="<actual_file_path>")
For file parsing tools: parse_file(file_path="<actual_file_path>")"""
prompt = f"""You are a tool selection expert. Based on the question and execution plan, select appropriate tools with correct parameters.
Few-shot examples:
- "How many albums did The Beatles release?" → web_search(query="Beatles discography number of albums")
- "What is 25 * 37 + 100?" → calculator(expression="25 * 37 + 100")
- "Analyze the image at example.com/pic.jpg" → vision(image_path="example.com/pic.jpg")
- "What's in the uploaded Excel file?" → parse_file(file_path="actual_file.xlsx")
Execute the plan step by step. Extract correct parameters from the question.
Use actual file paths when files are provided.{file_context}
Plan:
{plan}
Question: {question}
Select and call the tools needed according to the plan. Use exact parameter names from tool schemas."""
logger.info(
f"[select_tools_gemini] Calling Gemini with function calling for {len(available_tools)} tools"
)
response = model.generate_content(
prompt,
tools=tools,
generation_config=genai.types.GenerationConfig(
temperature=TEMPERATURE, max_output_tokens=MAX_TOKENS
),
)
# Extract tool calls from response
tool_calls = []
for part in response.parts:
if hasattr(part, "function_call") and part.function_call:
fc = part.function_call
tool_calls.append(
{
"tool": fc.name,
"params": dict(fc.args),
"id": f"gemini_{len(tool_calls)}",
}
)
logger.info(f"[select_tools_gemini] Gemini selected {len(tool_calls)} tool(s)")
return tool_calls
# ============================================================================
# Tool Selection - HuggingFace Implementation
# ============================================================================
def select_tools_hf(
question: str, plan: str, available_tools: Dict[str, Dict], file_paths: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
"""Use HuggingFace Inference API with function calling to select tools and extract parameters."""
client = create_hf_client()
# Convert tool registry to OpenAI-compatible tool schema (HF uses same format)
tools = []
for name, info in available_tools.items():
tool_schema = {
"type": "function",
"function": {
"name": name,
"description": info["description"],
"parameters": {
"type": "object",
"properties": {},
"required": info.get("required_params", []),
},
},
}
# Add parameter schemas
for param_name, param_info in info.get("parameters", {}).items():
tool_schema["function"]["parameters"]["properties"][param_name] = {
"type": param_info.get("type", "string"),
"description": param_info.get("description", ""),
}
tools.append(tool_schema)
# File context for tool selection
file_context = ""
if file_paths:
file_context = f"""
IMPORTANT: These files are available for this question:
{chr(10).join(f"- {fp}" for fp in file_paths)}
When selecting tools, use the ACTUAL file paths listed above. Do NOT use placeholder paths like "<provided_path>" or "path_to_chess_image.jpg".
For vision tools with images: vision(image_path="<actual_file_path>")
For file parsing tools: parse_file(file_path="<actual_file_path>")"""
system_prompt = f"""You are a tool selection expert. Based on the question and execution plan, select appropriate tools with correct parameters.
Few-shot examples:
- "How many albums did The Beatles release?" → web_search(query="Beatles discography number of albums")
- "What is 25 * 37 + 100?" → calculator(expression="25 * 37 + 100")
- "Analyze the image at example.com/pic.jpg" → vision(image_path="example.com/pic.jpg")
- "What's in the uploaded Excel file?" → parse_file(file_path="actual_file.xlsx")
Execute the plan step by step. Extract correct parameters from the question.
Use actual file paths when files are provided.{file_context}
Plan:
{plan}"""
user_prompt = f"""Question: {question}
Select and call the tools needed according to the plan. Use exact parameter names from tool schemas."""
logger.info(
f"[select_tools_hf] Calling HuggingFace with function calling for {len(tools)} tools, file_paths={file_paths}"
)
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
# HuggingFace Inference API with tools parameter
response = client.chat_completion(
messages=messages, tools=tools, max_tokens=MAX_TOKENS, temperature=TEMPERATURE
)
# Extract tool calls from response
tool_calls = []
if (
hasattr(response.choices[0].message, "tool_calls")
and response.choices[0].message.tool_calls
):
for tool_call in response.choices[0].message.tool_calls:
import json
tool_calls.append(
{
"tool": tool_call.function.name,
"params": json.loads(tool_call.function.arguments),
"id": tool_call.id,
}
)
logger.info(f"[select_tools_hf] HuggingFace selected {len(tool_calls)} tool(s)")
return tool_calls
# ============================================================================
# Tool Selection - Groq Implementation
# ============================================================================
def select_tools_groq(
question: str, plan: str, available_tools: Dict[str, Dict], file_paths: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
"""Use Groq with function calling to select tools and extract parameters."""
client = create_groq_client()
# Convert tool registry to OpenAI-compatible tool schema (Groq uses same format)
tools = []
for name, info in available_tools.items():
tool_schema = {
"type": "function",
"function": {
"name": name,
"description": info["description"],
"parameters": {
"type": "object",
"properties": {},
"required": info.get("required_params", []),
},
},
}
# Add parameter schemas
for param_name, param_info in info.get("parameters", {}).items():
tool_schema["function"]["parameters"]["properties"][param_name] = {
"type": param_info.get("type", "string"),
"description": param_info.get("description", ""),
}
tools.append(tool_schema)
# File context for tool selection
file_context = ""
if file_paths:
file_context = f"""
IMPORTANT: These files are available for this question:
{chr(10).join(f"- {fp}" for fp in file_paths)}
When selecting tools, use the ACTUAL file paths listed above. Do NOT use placeholder paths like "<provided_path>" or "path_to_chess_image.jpg".
For vision tools with images: vision(image_path="<actual_file_path>")
For file parsing tools: parse_file(file_path="<actual_file_path>")"""
system_prompt = f"""You are a tool selection expert. Based on the question and execution plan, select appropriate tools with correct parameters.
Few-shot examples:
- "How many albums did The Beatles release?" → web_search(query="Beatles discography number of albums")
- "What is 25 * 37 + 100?" → calculator(expression="25 * 37 + 100")
- "Analyze the image at example.com/pic.jpg" → vision(image_path="example.com/pic.jpg")
- "What's in the uploaded Excel file?" → parse_file(file_path="actual_file.xlsx")
Execute the plan step by step. Extract correct parameters from the question.
Use actual file paths when files are provided.{file_context}
Plan:
{plan}"""
user_prompt = f"""Question: {question}
Select and call the tools needed according to the plan. Use exact parameter names from tool schemas."""
logger.info(
f"[select_tools_groq] Calling Groq with function calling for {len(tools)} tools"
)
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
# Groq function calling
response = client.chat.completions.create(
model=GROQ_MODEL,
messages=messages,
tools=tools,
max_tokens=MAX_TOKENS,
temperature=TEMPERATURE,
)
# Extract tool calls from response
tool_calls = []
if (
hasattr(response.choices[0].message, "tool_calls")
and response.choices[0].message.tool_calls
):
for tool_call in response.choices[0].message.tool_calls:
import json
tool_calls.append(
{
"tool": tool_call.function.name,
"params": json.loads(tool_call.function.arguments),
"id": tool_call.id,
}
)
logger.info(f"[select_tools_groq] Groq selected {len(tool_calls)} tool(s)")
return tool_calls
# ============================================================================
# Unified Tool Selection with Fallback Chain
# ============================================================================
def select_tools_with_function_calling(
question: str, plan: str, available_tools: Dict[str, Dict], file_paths: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
"""
Use LLM function calling to dynamically select tools and extract parameters.
Uses LLM_PROVIDER config to select which provider to use.
If ENABLE_LLM_FALLBACK=true, falls back to other providers on failure.
Each provider call wrapped with retry logic (3 attempts with exponential backoff).
Args:
question: GAIA question text
plan: Execution plan from planning phase
available_tools: Tool registry
file_paths: Optional list of downloaded file paths for file-based questions
Returns:
List of tool calls with extracted parameters
"""
return _call_with_fallback("select_tools", question, plan, available_tools, file_paths)
# ============================================================================
# Answer Synthesis - Claude Implementation
# ============================================================================
def synthesize_answer_claude(question: str, evidence: List[str]) -> str:
"""Synthesize factoid answer from evidence using Claude."""
client = create_claude_client()
# Format evidence
evidence_text = "\n\n".join(
[f"Evidence {i + 1}:\n{e}" for i, e in enumerate(evidence)]
)
system_prompt = """You are an answer synthesis agent for the GAIA benchmark.
Your task is to extract a factoid answer from the provided evidence.
CRITICAL - Response format (two parts):
1. **REASONING** - Show your step-by-step thought process:
- What information is in the evidence?
- What is the question asking for?
- How do you extract the answer from the evidence?
- Any ambiguities or uncertainties?
2. **FINAL ANSWER** - The factoid answer only:
- A number, a few words, or a comma-separated list
- No explanations, just the answer
- If evidence is insufficient, state "Unable to answer"
Response format:
REASONING: [Your step-by-step thought process here]
FINAL ANSWER: [The factoid answer]
Examples:
REASONING: The evidence mentions the population of Tokyo is 13.9 million. The question asks for the city with highest population. Tokyo is listed as the highest.
FINAL ANSWER: Tokyo
REASONING: The transcript mentions "giant petrel", "emperor", and "adelie" (with typo "deli"). These are three different bird species present in the same scene.
FINAL ANSWER: 3
"""
user_prompt = f"""Question: {question}
{evidence_text}
Extract the factoid answer from the evidence above. Return only the factoid, nothing else."""
logger.info(f"[synthesize_answer_claude] Calling Claude for answer synthesis")
response = client.messages.create(
model=CLAUDE_MODEL,
max_tokens=256, # Factoid answers are short
temperature=TEMPERATURE,
system=system_prompt,
messages=[{"role": "user", "content": user_prompt}],
)
answer = response.content[0].text.strip()
logger.info(f"[synthesize_answer_claude] Generated answer: {answer}")
return answer
# ============================================================================
# Answer Synthesis - Gemini Implementation
# ============================================================================
def synthesize_answer_gemini(question: str, evidence: List[str]) -> str:
"""Synthesize factoid answer from evidence using Gemini."""
model = create_gemini_client()
# Format evidence
evidence_text = "\n\n".join(
[f"Evidence {i + 1}:\n{e}" for i, e in enumerate(evidence)]
)
prompt = f"""You are an answer synthesis agent for the GAIA benchmark.
Your task is to extract a factoid answer from the provided evidence.
CRITICAL - Answer format requirements:
1. Answers must be factoids: a number, a few words, or a comma-separated list
2. Be concise - no explanations, just the answer
3. If evidence conflicts, evaluate source credibility and recency
4. If evidence is insufficient, state "Unable to answer"
Examples of good factoid answers:
- "42"
- "Paris"
- "Albert Einstein"
- "red, blue, green"
- "1969-07-20"
Examples of bad answers (too verbose):
- "The answer is 42 because..."
- "Based on the evidence, it appears that..."
Question: {question}
{evidence_text}
Extract the factoid answer from the evidence above. Return only the factoid, nothing else."""
logger.info(f"[synthesize_answer_gemini] Calling Gemini for answer synthesis")
response = model.generate_content(
prompt,
generation_config=genai.types.GenerationConfig(
temperature=TEMPERATURE,
max_output_tokens=256, # Factoid answers are short
),
)
answer = response.text.strip()
logger.info(f"[synthesize_answer_gemini] Generated answer: {answer}")
return answer
# ============================================================================
# Answer Synthesis - HuggingFace Implementation
# ============================================================================
def synthesize_answer_hf(question: str, evidence: List[str]) -> str:
"""Synthesize factoid answer from evidence using HuggingFace Inference API."""
global _SYSTEM_PROMPT_WRITTEN
client = create_hf_client()
# Format evidence
evidence_text = "\n\n".join(
[f"Evidence {i + 1}:\n{e}" for i, e in enumerate(evidence)]
)
system_prompt = """You are an answer synthesis agent for the GAIA benchmark.
Your task is to extract a factoid answer from the provided evidence.
CRITICAL - Response format (two parts):
1. **REASONING** - Show your step-by-step thought process:
- What information is in the evidence?
- What is the question asking for?
- How do you extract the answer from the evidence?
- Any ambiguities or uncertainties?
2. **FINAL ANSWER** - The factoid answer only:
- A number, a few words, or a comma-separated list
- No explanations, just the answer
- If evidence is insufficient, state "Unable to answer"
Response format:
REASONING: [Your step-by-step thought process here]
FINAL ANSWER: [The factoid answer]
Examples:
REASONING: The evidence mentions the population of Tokyo is 13.9 million. The question asks for the city with highest population. Tokyo is listed as the highest.
FINAL ANSWER: Tokyo
REASONING: The transcript mentions "giant petrel", "emperor", and "adelie" (with typo "deli"). These are three different bird species present in the same scene.
FINAL ANSWER: 3
"""
user_prompt = f"""Question: {question}
{evidence_text}
Extract the factoid answer from the evidence above. Return only the factoid, nothing else."""
# ============================================================================
# BUFFER QUESTION CONTEXT - Write complete block atomically after response
# ============================================================================
context_file = get_session_log_file()
question_timestamp = datetime.datetime.now().isoformat()
# Build question header (include system prompt only on first question)
system_prompt_section = ""
if not _SYSTEM_PROMPT_WRITTEN:
system_prompt_section = f"""
## System Prompt (static - used for all questions)
```text
{system_prompt}
```
"""
_SYSTEM_PROMPT_WRITTEN = True
question_header = f"""
## Question [{question_timestamp}]
**Question:** {question}
**Evidence items:** {len(evidence)}
{system_prompt_section}
### Evidence & Prompt
```text
{user_prompt}
```
"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
response = client.chat_completion(
messages=messages,
max_tokens=1024, # Increased for CoT reasoning
temperature=TEMPERATURE,
)
full_response = response.choices[0].message.content.strip()
# Extract FINAL ANSWER from response (format: "REASONING: ...\nFINAL ANSWER: ...")
if "FINAL ANSWER:" in full_response:
parts = full_response.split("FINAL ANSWER:")
answer = parts[-1].strip()
reasoning = parts[0].replace("REASONING:", "").strip()
else:
# Fallback if LLM doesn't follow format
answer = full_response
reasoning = "No reasoning provided (format not followed)"
logger.info(f"[synthesize_answer_hf] Answer: {answer}")
# ============================================================================
# WRITE COMPLETE QUESTION BLOCK ATOMICALLY (header + response + end)
# ============================================================================
complete_block = f"""{question_header}
### LLM Response
```text
{full_response}
```
**Extracted Answer:** `{answer}`
"""
with open(context_file, "a", encoding="utf-8") as f:
f.write(complete_block)
return answer
# ============================================================================
# Answer Synthesis - Groq Implementation
# ============================================================================
def synthesize_answer_groq(question: str, evidence: List[str]) -> str:
"""Synthesize factoid answer from evidence using Groq."""
client = create_groq_client()
# Format evidence
evidence_text = "\n\n".join(
[f"Evidence {i + 1}:\n{e}" for i, e in enumerate(evidence)]
)
system_prompt = """You are an answer synthesis agent for the GAIA benchmark.
Your task is to extract a factoid answer from the provided evidence.
CRITICAL - Response format (two parts):
1. **REASONING** - Show your step-by-step thought process:
- What information is in the evidence?
- What is the question asking for?
- How do you extract the answer from the evidence?
- Any ambiguities or uncertainties?
2. **FINAL ANSWER** - The factoid answer only:
- A number, a few words, or a comma-separated list
- No explanations, just the answer
- If evidence is insufficient, state "Unable to answer"
Response format:
REASONING: [Your step-by-step thought process here]
FINAL ANSWER: [The factoid answer]
Examples:
REASONING: The evidence mentions the population of Tokyo is 13.9 million. The question asks for the city with highest population. Tokyo is listed as the highest.
FINAL ANSWER: Tokyo
REASONING: The transcript mentions "giant petrel", "emperor", and "adelie" (with typo "deli"). These are three different bird species present in the same scene.
FINAL ANSWER: 3
"""
user_prompt = f"""Question: {question}
{evidence_text}
Extract the factoid answer from the evidence above. Return only the factoid, nothing else."""
logger.info(f"[synthesize_answer_groq] Calling Groq for answer synthesis")
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
response = client.chat.completions.create(
model=GROQ_MODEL,
messages=messages,
max_tokens=256, # Factoid answers are short
temperature=TEMPERATURE,
)
answer = response.choices[0].message.content.strip()
logger.info(f"[synthesize_answer_groq] Generated answer: {answer}")
return answer
# ============================================================================
# Unified Answer Synthesis with Fallback Chain
# ============================================================================
def synthesize_answer(question: str, evidence: List[str]) -> str:
"""
Synthesize factoid answer from collected evidence using LLM.
Uses LLM_PROVIDER config to select which provider to use.
If ENABLE_LLM_FALLBACK=true, falls back to other providers on failure.
Each provider call wrapped with retry logic (3 attempts with exponential backoff).
Args:
question: Original GAIA question
evidence: List of evidence strings from tool executions
Returns:
Factoid answer string
"""
return _call_with_fallback("synthesize_answer", question, evidence)
# ============================================================================
# Conflict Resolution Functions
# ============================================================================
def resolve_conflicts(evidence: List[str]) -> Dict[str, Any]:
"""
Detect and resolve conflicts in evidence using LLM reasoning.
Optional function for advanced conflict handling.
Currently integrated into synthesize_answer().
Uses same Gemini primary, Claude fallback pattern.
Args:
evidence: List of evidence strings that may conflict
Returns:
Dictionary with conflict analysis
"""
try:
# Try Gemini first
model = create_gemini_client()
evidence_text = "\n\n".join(
[f"Evidence {i + 1}:\n{e}" for i, e in enumerate(evidence)]
)
prompt = f"""You are a conflict detection agent.
Analyze the provided evidence and identify any contradictions or conflicts.
Evaluate:
1. Are there contradictory facts?
2. Which sources are more credible?
3. Which information is more recent?
4. How should conflicts be resolved?
Analyze this evidence for conflicts:
{evidence_text}
Respond in JSON format:
{{
"has_conflicts": true/false,
"conflicts": ["description of conflict 1", ...],
"resolution": "recommended resolution strategy"
}}"""
logger.info(f"[resolve_conflicts] Analyzing with Gemini")
response = model.generate_content(prompt)
result = {"has_conflicts": False, "conflicts": [], "resolution": response.text}
return result
except Exception as gemini_error:
logger.warning(
f"[resolve_conflicts] Gemini failed: {gemini_error}, trying Claude"
)
# Fallback to Claude
client = create_claude_client()
evidence_text = "\n\n".join(
[f"Evidence {i + 1}:\n{e}" for i, e in enumerate(evidence)]
)
system_prompt = """You are a conflict detection agent.
Analyze the provided evidence and identify any contradictions or conflicts.
Evaluate:
1. Are there contradictory facts?
2. Which sources are more credible?
3. Which information is more recent?
4. How should conflicts be resolved?"""
user_prompt = f"""Analyze this evidence for conflicts:
{evidence_text}
Respond in JSON format:
{{
"has_conflicts": true/false,
"conflicts": ["description of conflict 1", ...],
"resolution": "recommended resolution strategy"
}}"""
response = client.messages.create(
model=CLAUDE_MODEL,
max_tokens=MAX_TOKENS,
temperature=TEMPERATURE,
system=system_prompt,
messages=[{"role": "user", "content": user_prompt}],
)
result = {
"has_conflicts": False,
"conflicts": [],
"resolution": response.content[0].text,
}
return result