|
|
import os |
|
|
import re |
|
|
from pathlib import Path |
|
|
from typing import Optional, Union, Dict, List, Any |
|
|
from enum import Enum |
|
|
import requests |
|
|
import tempfile |
|
|
import ast |
|
|
|
|
|
from dotenv import load_dotenv |
|
|
from langgraph.graph import StateGraph, END |
|
|
from langchain.tools import Tool as LangTool |
|
|
from langchain_core.runnables import RunnableLambda |
|
|
from langchain_google_genai import ChatGoogleGenerativeAI |
|
|
from pathlib import Path |
|
|
|
|
|
from langchain.tools import StructuredTool |
|
|
from langchain_openai import ChatOpenAI |
|
|
|
|
|
|
|
|
import pandas as pd |
|
|
|
|
|
|
|
|
from tools import ( |
|
|
EnhancedSearchTool, |
|
|
EnhancedWikipediaTool, |
|
|
excel_to_markdown, |
|
|
image_file_info, |
|
|
audio_file_info, |
|
|
code_file_read, |
|
|
extract_youtube_info) |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
QUESTIONS_URL = f"{DEFAULT_API_URL}/questions" |
|
|
SUBMIT_URL = f"{DEFAULT_API_URL}/submit" |
|
|
FILE_PATH = f"{DEFAULT_API_URL}/files/" |
|
|
|
|
|
|
|
|
os.environ["openai_api_key"] = os.environ.get("OPENAI_API_KEY") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
llm = ChatGoogleGenerativeAI( |
|
|
model=os.getenv("GEMINI_MODEL", "gemini-pro"), |
|
|
google_api_key=os.getenv("google_api_key") |
|
|
) |
|
|
|
|
|
print(os.getenv('google_api_key')) |
|
|
|
|
|
|
|
|
|
|
|
from typing import TypedDict |
|
|
|
|
|
class AgentState(TypedDict): |
|
|
"""Enhanced state tracking for the agent - using TypedDict for LangGraph compatibility""" |
|
|
question: str |
|
|
original_question: str |
|
|
conversation_history: List[Dict[str, str]] |
|
|
selected_tools: List[str] |
|
|
tool_results: Dict[str, Any] |
|
|
final_answer: str |
|
|
current_step: str |
|
|
error_count: int |
|
|
max_errors: int |
|
|
|
|
|
class AgentStep(Enum): |
|
|
ANALYZE_QUESTION = "analyze_question" |
|
|
SELECT_TOOLS = "select_tools" |
|
|
EXECUTE_TOOLS = "execute_tools" |
|
|
SYNTHESIZE_ANSWER = "synthesize_answer" |
|
|
ERROR_RECOVERY = "error_recovery" |
|
|
COMPLETE = "complete" |
|
|
|
|
|
|
|
|
def initialize_state(question: str) -> AgentState: |
|
|
"""Initialize agent state with default values""" |
|
|
return { |
|
|
"question": question, |
|
|
"original_question": question, |
|
|
"conversation_history": [], |
|
|
"selected_tools": [], |
|
|
"tool_results": {}, |
|
|
"final_answer": "", |
|
|
"current_step": "start", |
|
|
"error_count": 0, |
|
|
"max_errors": 3 |
|
|
} |
|
|
|
|
|
|
|
|
from langchain.tools import DuckDuckGoSearchResults, WikipediaQueryRun |
|
|
from langchain.utilities import WikipediaAPIWrapper |
|
|
|
|
|
duckduckgo_tool = DuckDuckGoSearchResults() |
|
|
wiki_tool = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper()) |
|
|
|
|
|
|
|
|
|
|
|
enhanced_search_tool = LangTool.from_function( |
|
|
name="enhanced_web_search", |
|
|
func=EnhancedSearchTool().run, |
|
|
description="Enhanced web search with intelligent query processing, multiple search strategies, and result filtering. Provides comprehensive and relevant search results." |
|
|
) |
|
|
|
|
|
enhanced_wiki_tool = LangTool.from_function( |
|
|
name="enhanced_wikipedia", |
|
|
func=EnhancedWikipediaTool().run, |
|
|
description="Enhanced Wikipedia search with entity extraction, multi-term search, and relevant content filtering. Provides detailed encyclopedic information." |
|
|
) |
|
|
|
|
|
excel_tool = StructuredTool.from_function( |
|
|
name="excel_to_text", |
|
|
func=excel_to_markdown, |
|
|
description="Enhanced Excel analysis with metadata, statistics, and structured data preview. Inputs: 'excel_path' (str), 'sheet_name' (str, optional).", |
|
|
) |
|
|
|
|
|
image_tool = StructuredTool.from_function( |
|
|
name="image_file_info", |
|
|
func=image_file_info, |
|
|
description="Enhanced image file analysis with detailed metadata and properties." |
|
|
) |
|
|
|
|
|
audio_tool = LangTool.from_function( |
|
|
name="audio_file_info", |
|
|
func=audio_file_info, |
|
|
description="Enhanced audio processing with transcription, language detection, and timestamped segments." |
|
|
) |
|
|
|
|
|
code_tool = LangTool.from_function( |
|
|
name="code_file_read", |
|
|
func=code_file_read, |
|
|
description="Enhanced code file analysis with language-specific insights and structure analysis." |
|
|
) |
|
|
|
|
|
youtube_tool = LangTool.from_function( |
|
|
name="extract_youtube_info", |
|
|
func=extract_youtube_info, |
|
|
description="Extracts transcription from the youtube link" |
|
|
) |
|
|
|
|
|
|
|
|
AVAILABLE_TOOLS = { |
|
|
"excel": excel_tool, |
|
|
"search": wiki_tool, |
|
|
"wikipedia": duckduckgo_tool, |
|
|
"image": image_tool, |
|
|
"audio": audio_tool, |
|
|
"code": code_tool, |
|
|
"youtube": youtube_tool |
|
|
} |
|
|
|
|
|
|
|
|
def analyze_question(state: AgentState) -> AgentState: |
|
|
"""Enhanced question analysis with better tool recommendation""" |
|
|
analysis_prompt = f""" |
|
|
Analyze this question and determine the best tools and approach: |
|
|
Question: {state["question"]} |
|
|
|
|
|
Available enhanced tools: |
|
|
1. excel - Enhanced Excel/CSV analysis with statistics and metadata |
|
|
2. search - Enhanced web search with intelligent query processing and result filtering |
|
|
3. wikipedia - Enhanced Wikipedia search with entity extraction and content filtering |
|
|
4. image - Enhanced image analysis with what the image contains |
|
|
5. audio - Enhanced audio processing with transcription |
|
|
6. code - Enhanced code analysis with language-specific insights |
|
|
7. youtube - Extracts transcription from the youtube link |
|
|
|
|
|
Consider: |
|
|
- Question type (factual, analytical, current events, technical) |
|
|
- Required information sources (files, web, encyclopedic) |
|
|
- Time sensitivity (current vs historical information) |
|
|
- Complexity level |
|
|
|
|
|
Respond with: |
|
|
1. Question type: <type> |
|
|
2. Primary tools needed: <tools> |
|
|
3. Search strategy: <strategy> |
|
|
4. Expected answer format: <format> |
|
|
|
|
|
Format: TYPE: <type> | TOOLS: <tools> | STRATEGY: <strategy> | FORMAT: <format> |
|
|
""" |
|
|
|
|
|
try: |
|
|
response = llm.invoke(analysis_prompt).content |
|
|
state["conversation_history"].append({"role": "analysis", "content": response}) |
|
|
state["current_step"] = AgentStep.SELECT_TOOLS.value |
|
|
except Exception as e: |
|
|
state["error_count"] += 1 |
|
|
state["conversation_history"].append({"role": "error", "content": f"Analysis failed: {e}"}) |
|
|
state["current_step"] = AgentStep.ERROR_RECOVERY.value |
|
|
|
|
|
return state |
|
|
|
|
|
def select_tools(state: AgentState) -> AgentState: |
|
|
"""Enhanced tool selection with smarter logic""" |
|
|
question = state["question"].lower() |
|
|
selected_tools = [] |
|
|
|
|
|
|
|
|
if any(keyword in question for keyword in ["excel", "csv", "spreadsheet", ".xlsx", ".xls"]): |
|
|
selected_tools.append("excel") |
|
|
if any(keyword in question for keyword in [".png", ".jpg", ".jpeg", ".bmp", ".gif", "image"]): |
|
|
selected_tools.append("image") |
|
|
if any(keyword in question for keyword in [".mp3", ".wav", ".ogg", "audio", "transcribe"]): |
|
|
selected_tools.append("audio") |
|
|
if any(keyword in question for keyword in [".py", ".ipynb", "code", "script", "function"]): |
|
|
selected_tools.append("code") |
|
|
if any(keyword in question for keyword in ["youtube"]): |
|
|
selected_tools.append("youtube") |
|
|
|
|
|
print(f"File-based tools selected: {selected_tools}") |
|
|
|
|
|
tools_prompt = f""" |
|
|
You are a smart assistant that selects relevant tools based on the user's natural language question. |
|
|
|
|
|
Available tools: |
|
|
- "search" → Use for real-time, recent, or broad web information. |
|
|
- "wikipedia" → Use for factual or encyclopedic knowledge. |
|
|
- "excel" → Use for spreadsheet-related questions (.xlsx, .csv). |
|
|
- "image" → Use for image files (.png, .jpg, etc.) or image-based tasks. |
|
|
- "audio" → Use for sound files (.mp3, .wav, etc.) or transcription. |
|
|
- "code" → Use for programming-related questions or when files like .py are mentioned. |
|
|
- "youtube" → Use for questions involving YouTube videos. |
|
|
|
|
|
Return the result as a **Python list of strings**, no explanation. Use only the relevant tools. |
|
|
If not relevant tool is found, return an empty list such as []. |
|
|
|
|
|
### Examples: |
|
|
|
|
|
Q: "Show me recent news about elections in 2025" |
|
|
A: ["search"] |
|
|
|
|
|
Q: "Summarize this Wikipedia article about Einstein" |
|
|
A: ["wikipedia"] |
|
|
|
|
|
Q: "Analyze this .csv file" |
|
|
A: ["excel"] |
|
|
|
|
|
Q: "Transcribe this .wav audio file" |
|
|
A: ["audio"] |
|
|
|
|
|
Q: "Generate Python code from this prompt" |
|
|
A: ["code"] |
|
|
|
|
|
Q: "Who was the president of USA in 1945?" |
|
|
A: ["wikipedia"] |
|
|
|
|
|
Q: "Give me current weather updates" |
|
|
A: ["search"] |
|
|
|
|
|
Q: "Look up the history of space exploration" |
|
|
A: ["search", "wikipedia"] |
|
|
|
|
|
Q: "What is 2 + 2?" |
|
|
A: [] |
|
|
|
|
|
### Now answer: |
|
|
|
|
|
Q: {state["question"]} |
|
|
A: |
|
|
""" |
|
|
|
|
|
llm_tools = ast.literal_eval(llm.invoke(tools_prompt).content.strip()) |
|
|
if not isinstance(llm_tools, list): |
|
|
llm_tools = [] |
|
|
print(f"LLM suggested tools: {llm_tools}") |
|
|
selected_tools.extend(llm_tools) |
|
|
selected_tools = list(set(selected_tools)) |
|
|
|
|
|
print(f"Final selected tools after LLM suggestion: {selected_tools}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
state["selected_tools"] = selected_tools |
|
|
state["current_step"] = AgentStep.EXECUTE_TOOLS.value |
|
|
|
|
|
print(f"Inside select tools, result:{state['selected_tools']}") |
|
|
|
|
|
print(f"Inside select tools, current step: {state['current_step']}") |
|
|
return state |
|
|
|
|
|
def execute_tools(state: AgentState) -> AgentState: |
|
|
"""Enhanced tool execution with better error handling""" |
|
|
results = {} |
|
|
|
|
|
|
|
|
file_path = None |
|
|
downloaded_file_marker = "A file was downloaded for this task and saved locally at:" |
|
|
if downloaded_file_marker in state["question"]: |
|
|
lines = state["question"].splitlines() |
|
|
for i, line in enumerate(lines): |
|
|
if downloaded_file_marker in line: |
|
|
if i + 1 < len(lines): |
|
|
file_path_candidate = lines[i + 1].strip() |
|
|
if Path(file_path_candidate).exists(): |
|
|
file_path = file_path_candidate |
|
|
print('****') |
|
|
print(f"Detected file path: {file_path}") |
|
|
print(f"Detected file path type: {type(file_path)}") |
|
|
|
|
|
break |
|
|
|
|
|
for tool_name in state["selected_tools"]: |
|
|
try: |
|
|
print(f"Executing tool: {tool_name}") |
|
|
|
|
|
|
|
|
if tool_name in ["excel", "image", "audio", "code"] and file_path: |
|
|
if tool_name == "excel": |
|
|
result = AVAILABLE_TOOLS["excel"].run({"excel_path": file_path, "sheet_name": None}) |
|
|
elif tool_name == "image": |
|
|
result = AVAILABLE_TOOLS["image"].run({"image_path": file_path, "question": state["question"]}) |
|
|
elif tool_name == "youtube": |
|
|
print(f"Running YouTube tool with file path: {file_path}") |
|
|
result = AVAILABLE_TOOLS["youtube"].run(state["question"]) |
|
|
else: |
|
|
result = AVAILABLE_TOOLS[tool_name].run(file_path) |
|
|
|
|
|
else: |
|
|
|
|
|
clean_query = state["question"] |
|
|
if downloaded_file_marker in clean_query: |
|
|
clean_query = clean_query.split(downloaded_file_marker)[0].strip() |
|
|
|
|
|
result = AVAILABLE_TOOLS[tool_name].run(clean_query) |
|
|
|
|
|
results[tool_name] = result |
|
|
|
|
|
print(f"Tool {tool_name} completed successfully.") |
|
|
print(f"Output for {tool_name}: {result}") |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Error using {tool_name}: {str(e)}" |
|
|
results[tool_name] = error_msg |
|
|
state["error_count"] += 1 |
|
|
print(error_msg) |
|
|
|
|
|
state["tool_results"] = results |
|
|
state["current_step"] = AgentStep.SYNTHESIZE_ANSWER.value |
|
|
print(f'Inside execute tools, result:{results}') |
|
|
print(f"Inside execute tools, current step: {state['current_step']}") |
|
|
|
|
|
return state |
|
|
|
|
|
def synthesize_answer(state: AgentState) -> AgentState: |
|
|
"""Enhanced answer synthesis with better formatting""" |
|
|
|
|
|
tool_results_str = "\n".join([f"=== {tool.upper()} RESULTS ===\n{result}\n" for tool, result in state["tool_results"].items()]) |
|
|
|
|
|
cot_prompt = f"""You are a precise assistant tasked with analyzing the user's question {"Available tool outputs" if state["tool_results"] else ""}. |
|
|
|
|
|
Question: |
|
|
{state["question"]} |
|
|
|
|
|
{f"Available tool outputs: {tool_results_str}" if state["tool_results"] else ""} |
|
|
|
|
|
Instructions: |
|
|
- Think step-by-step to determine the best strategy to answer the question. |
|
|
- Use only the given information; do not hallucinate or infer from external knowledge. |
|
|
- If decoding, logical deduction, counting, or interpretation is required, show each step clearly. |
|
|
- If any part of the tool output is unclear or incomplete, mention it and its impact. |
|
|
- Do not guess. If the information is insufficient, say so clearly. |
|
|
- Finish with a clearly marked line: `---END OF ANALYSIS---` |
|
|
|
|
|
Your step-by-step analysis:""" |
|
|
|
|
|
cot_response = llm.invoke(cot_prompt).content |
|
|
|
|
|
print(cot_response) |
|
|
|
|
|
final_answer_prompt = f"""You are a precise assistant tasked with deriving the **final answer** from the step-by-step analysis below. |
|
|
|
|
|
Question: |
|
|
{state["question"]} |
|
|
|
|
|
Step-by-step analysis: |
|
|
{cot_response} |
|
|
|
|
|
Instructions: |
|
|
- Read the analysis thoroughly before responding. |
|
|
- Output ONLY the final answer. Do NOT include any reasoning or explanation. |
|
|
- Remove any punctuation at the corners of the answer unless it is explicitly mentioned in the question. |
|
|
- The answer must be concise and factual. |
|
|
- If the analysis concluded that a definitive answer cannot be determined, respond with: `NA` (exactly). |
|
|
|
|
|
Final answer:""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
response = llm.invoke(final_answer_prompt).content |
|
|
|
|
|
print(f'Inside Synthesis: {response}') |
|
|
state["final_answer"] = response |
|
|
state["current_step"] = AgentStep.COMPLETE.value |
|
|
except Exception as e: |
|
|
state["error_count"] += 1 |
|
|
state["final_answer"] = f"Error synthesizing answer: {e}" |
|
|
state["current_step"] = AgentStep.ERROR_RECOVERY.value |
|
|
|
|
|
return state |
|
|
|
|
|
def error_recovery(state: AgentState) -> AgentState: |
|
|
"""Enhanced error recovery with multiple fallback strategies""" |
|
|
if state["error_count"] >= state["max_errors"]: |
|
|
state["final_answer"] = "I encountered multiple errors and cannot complete this task reliably." |
|
|
state["current_step"] = AgentStep.COMPLETE.value |
|
|
else: |
|
|
|
|
|
try: |
|
|
fallback_prompt = f""" |
|
|
Answer this question directly using your knowledge: |
|
|
{state["original_question"]} |
|
|
|
|
|
Provide a helpful response even if you cannot access external tools. |
|
|
Be clear about any limitations in your answer. |
|
|
""" |
|
|
response = llm.invoke(fallback_prompt).content |
|
|
state["final_answer"] = f"Using available knowledge (some tools unavailable): {response}" |
|
|
state["current_step"] = AgentStep.COMPLETE.value |
|
|
except Exception as e: |
|
|
state["final_answer"] = f"All approaches failed. Error: {e}" |
|
|
state["current_step"] = AgentStep.COMPLETE.value |
|
|
|
|
|
return state |
|
|
|
|
|
|
|
|
def route_next_step(state: AgentState) -> str: |
|
|
"""Route to next step based on current state""" |
|
|
step_routing = { |
|
|
"start": AgentStep.ANALYZE_QUESTION.value, |
|
|
AgentStep.ANALYZE_QUESTION.value: AgentStep.SELECT_TOOLS.value, |
|
|
AgentStep.SELECT_TOOLS.value: AgentStep.EXECUTE_TOOLS.value, |
|
|
AgentStep.EXECUTE_TOOLS.value: AgentStep.SYNTHESIZE_ANSWER.value, |
|
|
AgentStep.SYNTHESIZE_ANSWER.value: AgentStep.COMPLETE.value, |
|
|
AgentStep.ERROR_RECOVERY.value: AgentStep.COMPLETE.value, |
|
|
AgentStep.COMPLETE.value: END, |
|
|
} |
|
|
|
|
|
return step_routing.get(state["current_step"], END) |
|
|
|
|
|
|
|
|
workflow = StateGraph(AgentState) |
|
|
|
|
|
|
|
|
workflow.add_node("analyze_question", RunnableLambda(analyze_question)) |
|
|
workflow.add_node("select_tools", RunnableLambda(select_tools)) |
|
|
workflow.add_node("execute_tools", RunnableLambda(execute_tools)) |
|
|
workflow.add_node("synthesize_answer", RunnableLambda(synthesize_answer)) |
|
|
workflow.add_node("error_recovery", RunnableLambda(error_recovery)) |
|
|
|
|
|
|
|
|
workflow.set_entry_point("analyze_question") |
|
|
|
|
|
|
|
|
workflow.add_conditional_edges( |
|
|
"analyze_question", |
|
|
lambda state: "select_tools" if state["current_step"] == AgentStep.SELECT_TOOLS.value else "error_recovery" |
|
|
) |
|
|
workflow.add_edge("select_tools", "execute_tools") |
|
|
workflow.add_conditional_edges( |
|
|
"execute_tools", |
|
|
lambda state: "synthesize_answer" if state["current_step"] == AgentStep.SYNTHESIZE_ANSWER.value else "error_recovery" |
|
|
) |
|
|
workflow.add_conditional_edges( |
|
|
"synthesize_answer", |
|
|
lambda state: END if state["current_step"] == AgentStep.COMPLETE.value else "error_recovery" |
|
|
) |
|
|
workflow.add_edge("error_recovery", END) |
|
|
|
|
|
|
|
|
graph = workflow.compile() |
|
|
|
|
|
|
|
|
class GaiaAgent: |
|
|
"""GAIA Agent with tools and intelligent processing""" |
|
|
|
|
|
def __init__(self): |
|
|
self.graph = graph |
|
|
self.tool_usage_stats = {} |
|
|
print("Enhanced GAIA Agent initialized with:") |
|
|
print("✓ Intelligent multi-query web search") |
|
|
print("✓ Entity-aware Wikipedia search") |
|
|
print("✓ Enhanced file processing tools") |
|
|
print("✓ Advanced error recovery") |
|
|
print("✓ Comprehensive result synthesis") |
|
|
|
|
|
def get_tool_stats(self) -> Dict[str, int]: |
|
|
"""Get usage statistics for tools""" |
|
|
return self.tool_usage_stats.copy() |
|
|
|
|
|
def __call__(self, task_id: str, question: str) -> str: |
|
|
print(f"\n{'='*60}") |
|
|
print(f"[{task_id}] ENHANCED PROCESSING: {question}") |
|
|
|
|
|
|
|
|
processed_question = process_file(task_id, question) |
|
|
initial_state = initialize_state(processed_question) |
|
|
|
|
|
try: |
|
|
|
|
|
result = self.graph.invoke(initial_state) |
|
|
|
|
|
|
|
|
answer = result.get("final_answer", "No answer generated") |
|
|
selected_tools = result.get("selected_tools", []) |
|
|
conversation_history = result.get("conversation_history", []) |
|
|
tool_results = result.get("tool_results", {}) |
|
|
error_count = result.get("error_count", 0) |
|
|
|
|
|
|
|
|
for tool in selected_tools: |
|
|
self.tool_usage_stats[tool] = self.tool_usage_stats.get(tool, 0) + 1 |
|
|
|
|
|
|
|
|
print(f"[{task_id}] Selected tools: {selected_tools}") |
|
|
print(f"[{task_id}] Tools executed: {list(tool_results.keys())}") |
|
|
print(f"[{task_id}] Processing steps: {len(conversation_history)}") |
|
|
print(f"[{task_id}] Errors encountered: {error_count}") |
|
|
|
|
|
|
|
|
for tool, result in tool_results.items(): |
|
|
result_size = len(str(result)) if result else 0 |
|
|
print(f"[{task_id}] {tool} result size: {result_size} chars") |
|
|
|
|
|
print(f"[{task_id}] FINAL ANSWER: {answer}") |
|
|
print(f"{'='*60}") |
|
|
|
|
|
return answer |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Critical error in enhanced agent execution: {str(e)}" |
|
|
print(f"[{task_id}] {error_msg}") |
|
|
|
|
|
|
|
|
try: |
|
|
fallback_response = llm.invoke(f"Please answer this question: {question}").content |
|
|
return f"Fallback response: {fallback_response}" |
|
|
except: |
|
|
return error_msg |
|
|
|
|
|
|
|
|
def detect_file_type(file_path: str) -> Optional[str]: |
|
|
"""Enhanced file type detection with more formats""" |
|
|
ext = Path(file_path).suffix.lower() |
|
|
|
|
|
file_type_mapping = { |
|
|
|
|
|
'.xlsx': 'excel', '.xls': 'excel', '.csv': 'excel', |
|
|
|
|
|
'.png': 'image', '.jpg': 'image', '.jpeg': 'image', |
|
|
'.bmp': 'image', '.gif': 'image', '.tiff': 'image', '.webp': 'image', |
|
|
|
|
|
'.mp3': 'audio', '.wav': 'audio', '.ogg': 'audio', |
|
|
'.flac': 'audio', '.m4a': 'audio', '.aac': 'audio', |
|
|
|
|
|
'.py': 'code', '.ipynb': 'code', '.js': 'code', '.html': 'code', |
|
|
'.css': 'code', '.java': 'code', '.cpp': 'code', '.c': 'code', |
|
|
'.sql': 'code', '.r': 'code', '.json': 'code', '.xml': 'code', |
|
|
|
|
|
'.txt': 'text', '.md': 'text', '.pdf': 'document', |
|
|
'.doc': 'document', '.docx': 'document' |
|
|
} |
|
|
|
|
|
return file_type_mapping.get(ext) |
|
|
|
|
|
def process_file(task_id: str, question_text: str) -> str: |
|
|
"""Enhanced file processing with better error handling and metadata""" |
|
|
file_url = f"{FILE_PATH}{task_id}" |
|
|
|
|
|
try: |
|
|
print(f"[{task_id}] Attempting to download file from: {file_url}") |
|
|
response = requests.get(file_url, timeout=30) |
|
|
response.raise_for_status() |
|
|
print(f"[{task_id}] File download successful. Status: {response.status_code}") |
|
|
|
|
|
except requests.exceptions.RequestException as exc: |
|
|
print(f"[{task_id}] File download failed: {str(exc)}") |
|
|
return question_text |
|
|
|
|
|
|
|
|
content_disposition = response.headers.get("content-disposition", "") |
|
|
filename = task_id |
|
|
|
|
|
|
|
|
filename_match = re.search(r'filename[*]?=(?:"([^"]+)"|([^;]+))', content_disposition) |
|
|
if filename_match: |
|
|
filename = filename_match.group(1) or filename_match.group(2) |
|
|
filename = filename.strip() |
|
|
|
|
|
|
|
|
temp_storage_dir = Path(tempfile.gettempdir()) / "gaia_enhanced_files" / task_id |
|
|
temp_storage_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
file_path = temp_storage_dir / filename |
|
|
file_path.write_bytes(response.content) |
|
|
|
|
|
|
|
|
file_size = len(response.content) |
|
|
file_type = detect_file_type(filename) |
|
|
|
|
|
print(f"[{task_id}] File saved: {filename} ({file_size:,} bytes, type: {file_type})") |
|
|
|
|
|
|
|
|
enhanced_question = f"{question_text}\n\n" |
|
|
enhanced_question += f"{'='*50}\n" |
|
|
enhanced_question += f"FILE INFORMATION:\n" |
|
|
enhanced_question += f"A file was downloaded for this task and saved locally at:\n" |
|
|
enhanced_question += f"{file_path}\n" |
|
|
enhanced_question += f"File details:\n" |
|
|
enhanced_question += f"- Name: {filename}\n" |
|
|
enhanced_question += f"- Size: {file_size:,} bytes ({file_size/1024:.1f} KB)\n" |
|
|
enhanced_question += f"- Type: {file_type or 'unknown'}\n" |
|
|
enhanced_question += f"{'='*50}\n\n" |
|
|
|
|
|
return enhanced_question |
|
|
|
|
|
|
|
|
def run_enhanced_tests(): |
|
|
"""Run comprehensive tests of the enhanced agent""" |
|
|
agent = GaiaAgent() |
|
|
|
|
|
test_cases = [ |
|
|
{ |
|
|
"id": "test_search_1", |
|
|
"question": "What are the latest developments in artificial intelligence in 2024?", |
|
|
"expected_tools": ["search"] |
|
|
}, |
|
|
{ |
|
|
"id": "test_wiki_1", |
|
|
"question": "Tell me about Albert Einstein's contributions to physics", |
|
|
"expected_tools": ["wikipedia"] |
|
|
}, |
|
|
{ |
|
|
"id": "test_combined_1", |
|
|
"question": "What is machine learning and what are recent breakthroughs?", |
|
|
"expected_tools": ["wikipedia", "search"] |
|
|
}, |
|
|
{ |
|
|
"id": "test_excel_1", |
|
|
"question": "Analyze the data in the Excel file sales_data.xlsx", |
|
|
"expected_tools": ["excel"] |
|
|
} |
|
|
] |
|
|
|
|
|
print("\n" + "="*80) |
|
|
print("RUNNING ENHANCED AGENT TESTS") |
|
|
print("="*80) |
|
|
|
|
|
for test_case in test_cases: |
|
|
print(f"\nTest Case: {test_case['id']}") |
|
|
print(f"Question: {test_case['question']}") |
|
|
print(f"Expected tools: {test_case['expected_tools']}") |
|
|
|
|
|
try: |
|
|
result = agent(test_case['id'], test_case['question']) |
|
|
print(f"Result length: {len(result)} characters") |
|
|
print(f"Result preview: {result[:200]}...") |
|
|
except Exception as e: |
|
|
print(f"Test failed: {e}") |
|
|
|
|
|
print("-" * 60) |
|
|
|
|
|
|
|
|
print(f"\nTool Usage Statistics:") |
|
|
for tool, count in agent.get_tool_stats().items(): |
|
|
print(f" {tool}: {count} times") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
agent = GaiaAgent() |
|
|
|
|
|
|
|
|
sample_questions = [ |
|
|
|
|
|
{ |
|
|
"task_id": "bda648d7-d618-4883-88f4-3466eabd860e", |
|
|
"question": "Where were the Vietnamese specimens described by Kuznetzov in Nedoshivina's 2010 paper eventually deposited? Just give me the city name without abbreviations.", |
|
|
"Level": "1", |
|
|
"file_name": "" |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
] |
|
|
|
|
|
print("\n" + "="*80) |
|
|
print("ENHANCED GAIA AGENT DEMONSTRATION") |
|
|
print("="*80) |
|
|
|
|
|
for i, task in enumerate(sample_questions): |
|
|
print(f"\nExample {i+1}: {task['question']}") |
|
|
result = agent(task['task_id'], task['question']) |
|
|
print(f"Answer: {result[:300]}...") |
|
|
print("-" * 60) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|