AlessandroMasala commited on
Commit
136b7ef
·
verified ·
1 Parent(s): ee6023f

Update tools.py

Browse files
Files changed (1) hide show
  1. tools.py +644 -425
tools.py CHANGED
@@ -1,478 +1,697 @@
1
- from langchain.tools import DuckDuckGoSearchResults, WikipediaQueryRun
2
- from langchain.utilities import WikipediaAPIWrapper
3
- from PIL import Image
4
  import re
5
- import time
6
- import json
7
- import pandas as pd
8
  from pathlib import Path
9
- from typing import List, Dict, Optional, Union
10
- from tabulate import tabulate
11
- import whisper
 
 
 
 
 
 
 
 
12
 
13
- import numpy as np
14
- import os
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15
 
16
- # ----------- Enhanced Search Functionality -----------
17
- class EnhancedSearchTool:
18
- """Enhanced web search with intelligent query processing and result filtering"""
19
-
20
- def __init__(self, max_results: int = 10):
21
- self.base_tool = DuckDuckGoSearchResults(num_results=max_results)
22
- self.max_results = max_results
23
-
24
- def _extract_key_terms(self, question: str) -> List[str]:
25
- """Extract key search terms from the question using LLM"""
26
- try:
27
- extract_prompt = f"""
28
- Extract the most important search terms from this question for web search:
29
- Question: {question}
30
-
31
- Return ONLY a comma-separated list of key terms, no explanations.
32
- Focus on: proper nouns, specific concepts, technical terms, dates, numbers.
33
- Avoid: common words like 'what', 'how', 'when', 'the', 'is', 'are'.
34
-
35
- Example: "What is the population of Tokyo in 2023?" -> "Tokyo population 2023"
36
- """
37
-
38
- response = llm.invoke(extract_prompt).content.strip()
39
- return [term.strip() for term in response.split(',')]
40
- except Exception:
41
- # Fallback to simple keyword extraction
42
- return self._simple_keyword_extraction(question)
43
-
44
- def _simple_keyword_extraction(self, question: str) -> List[str]:
45
- """Fallback keyword extraction using regex"""
46
- # Remove common question words
47
- stop_words = {'what', 'how', 'when', 'where', 'why', 'who', 'which', 'the', 'is', 'are', 'was', 'were', 'do', 'does', 'did', 'can', 'could', 'should', 'would'}
48
- words = re.findall(r'\b[A-Za-z]+\b', question.lower())
49
- return [word for word in words if word not in stop_words and len(word) > 2]
50
-
51
- def _generate_search_queries(self, question: str) -> List[str]:
52
- """Generate multiple search queries for comprehensive results"""
53
- key_terms = self._extract_key_terms(question)
54
-
55
- queries = []
56
-
57
- # Original question (cleaned)
58
- cleaned_question = re.sub(r'[^\w\s]', ' ', question).strip()
59
- queries.append(cleaned_question)
60
-
61
- # Key terms combined
62
- if key_terms:
63
- queries.append(' '.join(key_terms[:5])) # Top 5 terms
64
-
65
- # Specific query patterns based on question type
66
- if any(word in question.lower() for word in ['latest', 'recent', 'current', 'new']):
67
- queries.append(f"{' '.join(key_terms[:3])} 2024 2025")
68
-
69
- if any(word in question.lower() for word in ['statistics', 'data', 'number', 'count']):
70
- queries.append(f"{' '.join(key_terms[:3])} statistics data")
71
-
72
- if any(word in question.lower() for word in ['definition', 'what is', 'meaning']):
73
- queries.append(f"{' '.join(key_terms[:2])} definition meaning")
74
-
75
- return list(dict.fromkeys(queries)) # Remove duplicates while preserving order
76
-
77
- def _filter_and_rank_results(self, results: List[Dict], question: str) -> List[Dict]:
78
- """Filter and rank search results based on relevance"""
79
- if not results:
80
- return results
81
-
82
- key_terms = self._extract_key_terms(question)
83
- key_terms_lower = [term.lower() for term in key_terms]
84
-
85
- scored_results = []
86
- for result in results:
87
- score = 0
88
- text_content = (result.get('snippet', '') + ' ' + result.get('title', '')).lower()
89
-
90
- # Score based on key term matches
91
- for term in key_terms_lower:
92
- if term in text_content:
93
- score += text_content.count(term)
94
-
95
- # Bonus for recent dates
96
- if any(year in text_content for year in ['2024', '2025', '2023']):
97
- score += 2
98
-
99
- # Penalty for very short snippets
100
- if len(result.get('snippet', '')) < 50:
101
- score -= 1
102
-
103
- scored_results.append((score, result))
104
-
105
- # Sort by score and return top results
106
- scored_results.sort(key=lambda x: x[0], reverse=True)
107
- return [result for score, result in scored_results[:self.max_results]]
108
-
109
- def run(self, question: str) -> str:
110
- """Enhanced search execution with multiple queries and result filtering"""
111
- try:
112
- search_queries = self._generate_search_queries(question)
113
- all_results = []
114
-
115
- for query in search_queries[:3]: # Limit to 3 queries to avoid rate limits
116
- try:
117
- results = self.base_tool.run(query)
118
- if isinstance(results, str):
119
- # Parse string results if needed
120
- try:
121
- results = json.loads(results) if results.startswith('[') else [{'snippet': results, 'title': 'Search Result'}]
122
- except:
123
- results = [{'snippet': results, 'title': 'Search Result'}]
124
-
125
- if isinstance(results, list):
126
- all_results.extend(results)
127
-
128
- time.sleep(0.5) # Rate limiting
129
- except Exception as e:
130
- print(f"Search query failed: {query} - {e}")
131
- continue
132
-
133
- if not all_results:
134
- return "No search results found."
135
-
136
- # Filter and rank results
137
- filtered_results = self._filter_and_rank_results(all_results, question)
138
-
139
- # Format results
140
- formatted_results = []
141
- for i, result in enumerate(filtered_results[:5], 1):
142
- title = result.get('title', 'No title')
143
- snippet = result.get('snippet', 'No description')
144
- link = result.get('link', '')
145
-
146
- formatted_results.append(f"{i}. {title}\n {snippet}\n Source: {link}\n")
147
-
148
- return "ENHANCED SEARCH RESULTS:\n" + "\n".join(formatted_results)
149
-
150
- except Exception as e:
151
- return f"Enhanced search error: {str(e)}"
152
 
153
- # ----------- Enhanced Wikipedia Tool -----------
154
- class EnhancedWikipediaTool:
155
- """Enhanced Wikipedia search with intelligent query processing and content extraction"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
156
 
157
- def __init__(self):
158
- self.base_wrapper = WikipediaAPIWrapper(
159
- top_k_results=3,
160
- doc_content_chars_max=3000,
161
- load_all_available_meta=True
162
- )
163
- self.base_tool = WikipediaQueryRun(api_wrapper=self.base_wrapper)
164
-
165
- def _extract_entities(self, question: str) -> List[str]:
166
- """Extract named entities for Wikipedia search"""
167
- try:
168
- entity_prompt = f"""
169
- Extract named entities (people, places, organizations, concepts) from this question for Wikipedia search:
170
- Question: {question}
171
-
172
- Return ONLY a comma-separated list of the most important entities.
173
- Focus on: proper nouns, specific names, places, organizations, historical events, scientific concepts.
174
-
175
- Example: "Tell me about Einstein's theory of relativity" -> "Albert Einstein, theory of relativity, relativity"
176
- """
177
-
178
- response = llm.invoke(entity_prompt).content.strip()
179
- entities = [entity.strip() for entity in response.split(',')]
180
- return [e for e in entities if len(e) > 2]
181
- except Exception:
182
- # Fallback: extract capitalized words and phrases
183
- return self._extract_capitalized_terms(question)
184
-
185
- def _extract_capitalized_terms(self, question: str) -> List[str]:
186
- """Fallback: extract capitalized terms as potential entities"""
187
- # Find capitalized words and phrases
188
- capitalized_words = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', question)
189
- # Also look for quoted terms
190
- quoted_terms = re.findall(r'"([^"]+)"', question)
191
- quoted_terms.extend(re.findall(r"'([^']+)'", question))
192
-
193
- return capitalized_words + quoted_terms
194
 
195
- def _search_multiple_terms(self, entities: List[str]) -> Dict[str, str]:
196
- """Search Wikipedia for multiple entities and return best results"""
197
- results = {}
198
-
199
- for entity in entities[:3]: # Limit to avoid too many API calls
200
- try:
201
- result = self.base_tool.run(entity)
202
- if result and "Page:" in result and len(result) > 100:
203
- results[entity] = result
204
- time.sleep(0.5) # Rate limiting
205
- except Exception as e:
206
- print(f"Wikipedia search failed for '{entity}': {e}")
207
- continue
208
-
209
- return results
210
 
211
- def _extract_relevant_sections(self, content: str, question: str) -> str:
212
- """Extract the most relevant sections from Wikipedia content"""
213
- if not content or len(content) < 200:
214
- return content
215
-
216
- # Split content into sections (usually separated by double newlines)
217
- sections = re.split(r'\n\s*\n', content)
218
-
219
- # Score sections based on relevance to question
220
- key_terms = self._extract_entities(question)
221
- key_terms_lower = [term.lower() for term in key_terms]
222
-
223
- scored_sections = []
224
- for section in sections:
225
- if len(section.strip()) < 50:
226
- continue
227
-
228
- score = 0
229
- section_lower = section.lower()
230
-
231
- # Score based on key term matches
232
- for term in key_terms_lower:
233
- score += section_lower.count(term)
234
-
235
- # Bonus for sections with dates, numbers, or specific facts
236
- if re.search(r'\b(19|20)\d{2}\b', section): # Years
237
- score += 1
238
- if re.search(r'\b\d+([.,]\d+)?\s*(million|billion|thousand|percent|%)\b', section):
239
- score += 1
240
-
241
- scored_sections.append((score, section))
242
-
243
- # Sort by relevance and take top sections
244
- scored_sections.sort(key=lambda x: x[0], reverse=True)
245
- top_sections = [section for score, section in scored_sections[:3] if score > 0]
246
-
247
- if not top_sections:
248
- # If no highly relevant sections, take first few sections
249
- top_sections = sections[:2]
250
-
251
- return '\n\n'.join(top_sections)
252
 
253
- def run(self, question: str) -> str:
254
- """Enhanced Wikipedia search with entity extraction and content filtering"""
255
- try:
256
- entities = self._extract_entities(question)
257
-
258
- if not entities:
259
- # Fallback to direct search with cleaned question
260
- cleaned_question = re.sub(r'[^\w\s]', ' ', question).strip()
261
- try:
262
- result = self.base_tool.run(cleaned_question)
263
- return self._extract_relevant_sections(result, question) if result else "No Wikipedia results found."
264
- except Exception as e:
265
- return f"Wikipedia search error: {str(e)}"
266
-
267
- # Search for multiple entities
268
- search_results = self._search_multiple_terms(entities)
269
-
270
- if not search_results:
271
- return "No relevant Wikipedia articles found."
272
-
273
- # Combine and format results
274
- formatted_results = []
275
- for entity, content in search_results.items():
276
- relevant_content = self._extract_relevant_sections(content, question)
277
- if relevant_content:
278
- formatted_results.append(f"=== {entity} ===\n{relevant_content}")
279
-
280
- if not formatted_results:
281
- return "No relevant information found in Wikipedia articles."
282
-
283
- return "ENHANCED WIKIPEDIA RESULTS:\n\n" + "\n\n".join(formatted_results)
284
-
285
- except Exception as e:
286
- return f"Enhanced Wikipedia error: {str(e)}"
287
-
288
- # ----------- Enhanced File Processing Tools -----------
289
- def excel_to_markdown(inputs: dict) -> str:
290
- """Enhanced Excel tool with better error handling and data analysis"""
291
- try:
292
- excel_path = inputs["excel_path"]
293
- sheet_name = inputs.get("sheet_name", None)
294
- file_path = Path(excel_path).expanduser().resolve()
295
- if not file_path.is_file():
296
- return f"Error: Excel file not found at {file_path}"
297
-
298
- sheet: Union[str, int] = (
299
- int(sheet_name) if sheet_name and sheet_name.isdigit() else sheet_name or 0
300
- )
301
- df = pd.read_excel(file_path, sheet_name=sheet)
302
-
303
- # Enhanced metadata
304
- metadata = f"EXCEL FILE ANALYSIS:\n"
305
- metadata += f"File: {file_path.name}\n"
306
- metadata += f"Dimensions: {len(df)} rows × {len(df.columns)} columns\n"
307
- metadata += f"Columns: {', '.join(df.columns.tolist())}\n"
308
-
309
- # Data type information
310
- metadata += f"Data types: {dict(df.dtypes)}\n"
311
-
312
- # Basic statistics for numeric columns
313
- numeric_cols = df.select_dtypes(include=['number']).columns
314
- if len(numeric_cols) > 0:
315
- metadata += f"Numeric columns: {list(numeric_cols)}\n"
316
- for col in numeric_cols[:3]: # Limit to first 3 numeric columns
317
- metadata += f" {col}: mean={df[col].mean():.2f}, min={df[col].min()}, max={df[col].max()}\n"
318
-
319
- metadata += "\nSAMPLE DATA (first 10 rows):\n"
320
-
321
- if hasattr(df, "to_markdown"):
322
- sample_data = df.head(10).to_markdown(index=False)
323
- else:
324
- sample_data = tabulate(df.head(10), headers="keys", tablefmt="github", showindex=False)
325
-
326
- return metadata + sample_data + f"\n\n(Showing first 10 rows of {len(df)} total rows)"
327
-
328
- except Exception as e:
329
- return f"Error reading Excel file: {str(e)}"
330
-
331
- def image_file_info(image_path: str, question: str) -> str:
332
- """Enhanced image file analysis using Gemini API"""
333
- try:
334
- from google import genai
335
- from google.genai.types import Part
336
-
337
- client = genai.Client(api_key=os.getenv("GEMINI_API_KEY"))
338
-
339
- # Read content from a local file
340
- with open(image_path, "rb") as f:
341
- img_bytes = f.read()
342
-
343
- response = client.models.generate_content(
344
- model="gemini-2.5-flash-preview-05-20",
345
- contents=[
346
- question,
347
- Part.from_bytes(data=img_bytes, mime_type="image/jpeg")
348
- ],
349
- )
350
- return response.text
351
 
352
- except Exception as e:
353
- return f"Error during image analysis: {e}"
354
-
355
- def audio_file_info(audio_path: str) -> str:
356
- """Returns only the transcription of an audio file."""
357
- try:
358
- model = whisper.load_model("tiny") # Fast + accurate balance
359
- result = model.transcribe(audio_path, fp16=False)
360
- return result['text']
361
- except Exception as e:
362
- return f"Error transcribing audio: {str(e)}"
363
-
364
- def code_file_read(code_path: str) -> str:
365
- """Enhanced code file analysis"""
366
  try:
367
- with open(code_path, "r", encoding="utf-8") as f:
368
- content = f.read()
369
-
370
- file_path = Path(code_path)
371
-
372
- info = f"CODE FILE ANALYSIS:\n"
373
- info += f"File: {file_path.name}\n"
374
- info += f"Extension: {file_path.suffix}\n"
375
- info += f"Size: {len(content)} characters, {len(content.splitlines())} lines\n"
376
-
377
- # Language-specific analysis
378
- if file_path.suffix == '.py':
379
- # Python-specific analysis
380
- import_lines = [line for line in content.splitlines() if line.strip().startswith(('import ', 'from '))]
381
- if import_lines:
382
- info += f"Imports ({len(import_lines)}): {', '.join(import_lines[:5])}\n"
383
-
384
- # Count functions and classes
385
- func_count = len(re.findall(r'^def\s+\w+', content, re.MULTILINE))
386
- class_count = len(re.findall(r'^class\s+\w+', content, re.MULTILINE))
387
- info += f"Functions: {func_count}, Classes: {class_count}\n"
388
-
389
- info += f"\nCODE CONTENT:\n{content}"
390
- return info
391
-
392
  except Exception as e:
393
- return f"Error reading code file: {e}"
 
 
394
 
395
-
396
- import yt_dlp
397
- from pathlib import Path
398
-
399
- def extract_youtube_info(question: str) -> str:
400
- """
401
- Download a YouTube video or audio using yt-dlp without merging.
402
-
403
- Parameters:
404
- - url: str — YouTube URL
405
- - audio_only: bool — if True, downloads audio only; else best single video+audio stream
406
-
407
- Returns:
408
- - str: path to downloaded file or error message
409
- """
410
- pattern = r"(https?://(?:www\.)?(?:youtube\.com/watch\?v=[\w\-]+|youtu\.be/[\w\-]+))"
411
- match = re.search(pattern, question)
412
- youtube_url = match.group(1) if match else None
413
- print(f"Extracting YouTube URL: {youtube_url}")
414
-
415
- match = re.search(r"(?:v=|\/)([a-zA-Z0-9_-]{11})", youtube_url)
416
- video_id = match.group(1) if match else "dummy_id"
417
- file_path = Path(video_id)
418
-
419
- output_dir = Path(file_path).parent
420
- output_dir.mkdir(parents=True, exist_ok=True)
421
-
422
- ydl_opts = {
423
- 'format': 'best[ext=mp4]/best', # best mp4 combined stream or fallback to best available
424
- 'outtmpl': str(file_path),
425
- 'quiet': True,
426
- 'no_warnings': True,
427
- }
428
-
429
- try:
430
- with yt_dlp.YoutubeDL(ydl_opts) as ydl:
431
- ydl.download([youtube_url])
432
- return audio_file_info(str(file_path))
433
- except Exception as e:
434
- return f"Error: {e}"
435
-
436
-
437
-
438
-
439
 
 
 
 
 
440
 
 
 
 
 
 
 
 
 
 
 
 
441
 
 
442
 
 
 
443
 
 
 
 
 
 
 
 
 
444
 
 
 
445
 
 
446
 
 
 
447
 
 
 
448
 
 
 
449
 
 
 
450
 
 
 
451
 
 
 
452
 
 
 
453
 
 
 
454
 
 
 
455
 
 
456
 
 
 
 
 
 
 
 
 
 
 
457
 
 
458
 
459
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
460
 
 
461
 
 
 
 
 
 
 
 
 
462
 
 
 
 
463
 
 
 
464
 
 
465
 
 
466
 
 
 
467
 
 
468
 
 
 
 
 
 
 
 
469
 
 
470
 
 
471
 
 
472
 
 
 
473
 
 
 
474
 
 
 
 
 
 
 
475
 
 
476
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
477
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
478
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
 
 
2
  import re
 
 
 
3
  from pathlib import Path
4
+ from typing import Optional, Union, Dict, List, Any
5
+ from enum import Enum
6
+ import requests
7
+ import tempfile
8
+ import ast
9
+
10
+ from dotenv import load_dotenv
11
+ from langgraph.graph import StateGraph, END
12
+ from langchain.tools import Tool as LangTool
13
+ from langchain_core.runnables import RunnableLambda
14
+ from pathlib import Path
15
 
16
+ # llm
17
+ from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
18
+ from langchain_community.llms import HuggingFacePipeline
19
+ import torch
20
+
21
+ from langchain.tools import StructuredTool
22
+
23
+ from tools import (
24
+ EnhancedSearchTool,
25
+ EnhancedWikipediaTool,
26
+ excel_to_markdown,
27
+ image_file_info,
28
+ audio_file_info,
29
+ code_file_read,
30
+ extract_youtube_info)
31
+
32
+ # llm_config.py
33
+ from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
34
+ from langchain_community.llms import HuggingFacePipeline
35
+ import torch
36
+
37
+ # Load environment variables
38
+ load_dotenv()
39
+
40
+ # --- Constants ---
41
+ DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
42
+ QUESTIONS_URL = f"{DEFAULT_API_URL}/questions"
43
+ SUBMIT_URL = f"{DEFAULT_API_URL}/submit"
44
+ FILE_PATH = f"{DEFAULT_API_URL}/files/"
45
+
46
+ # Initialize LLM
47
+ llm = ChatGoogleGenerativeAI(
48
+ model=os.getenv("GEMINI_MODEL", "gemini-pro"),
49
+ google_api_key=os.getenv("GEMINI_API_KEY")
50
+ )
51
+
52
+ # ----------- Enhanced State Management -----------
53
+ from typing import TypedDict
54
+
55
+ class AgentState(TypedDict):
56
+ """Enhanced state tracking for the agent - using TypedDict for LangGraph compatibility"""
57
+ question: str
58
+ original_question: str
59
+ conversation_history: List[Dict[str, str]]
60
+ selected_tools: List[str]
61
+ tool_results: Dict[str, Any]
62
+ final_answer: str
63
+ current_step: str
64
+ error_count: int
65
+ max_errors: int
66
+
67
+ class AgentStep(Enum):
68
+ ANALYZE_QUESTION = "analyze_question"
69
+ SELECT_TOOLS = "select_tools"
70
+ EXECUTE_TOOLS = "execute_tools"
71
+ SYNTHESIZE_ANSWER = "synthesize_answer"
72
+ ERROR_RECOVERY = "error_recovery"
73
+ COMPLETE = "complete"
74
+
75
+ # ----------- Helper Functions -----------
76
+ def initialize_state(question: str) -> AgentState:
77
+ """Initialize agent state with default values"""
78
+ return {
79
+ "question": question,
80
+ "original_question": question,
81
+ "conversation_history": [],
82
+ "selected_tools": [],
83
+ "tool_results": {},
84
+ "final_answer": "",
85
+ "current_step": "start",
86
+ "error_count": 0,
87
+ "max_errors": 3
88
+ }
89
 
90
+ # Initialize vanilla tools
91
+ from langchain.tools import DuckDuckGoSearchResults, WikipediaQueryRun
92
+ from langchain.utilities import WikipediaAPIWrapper
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
93
 
94
+ duckduckgo_tool = DuckDuckGoSearchResults()
95
+ wiki_tool = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper())
96
+
97
+
98
+ # Initialize enhanced tools
99
+ enhanced_search_tool = LangTool.from_function(
100
+ name="enhanced_web_search",
101
+ func=EnhancedSearchTool().run,
102
+ description="Enhanced web search with intelligent query processing, multiple search strategies, and result filtering. Provides comprehensive and relevant search results."
103
+ )
104
+
105
+ enhanced_wiki_tool = LangTool.from_function(
106
+ name="enhanced_wikipedia",
107
+ func=EnhancedWikipediaTool().run,
108
+ description="Enhanced Wikipedia search with entity extraction, multi-term search, and relevant content filtering. Provides detailed encyclopedic information."
109
+ )
110
+
111
+ excel_tool = StructuredTool.from_function(
112
+ name="excel_to_text",
113
+ func=excel_to_markdown,
114
+ description="Enhanced Excel analysis with metadata, statistics, and structured data preview. Inputs: 'excel_path' (str), 'sheet_name' (str, optional).",
115
+ )
116
+
117
+ image_tool = StructuredTool.from_function(
118
+ name="image_file_info",
119
+ func=image_file_info,
120
+ description="Enhanced image file analysis with detailed metadata and properties."
121
+ )
122
+
123
+ audio_tool = LangTool.from_function(
124
+ name="audio_file_info",
125
+ func=audio_file_info,
126
+ description="Enhanced audio processing with transcription, language detection, and timestamped segments."
127
+ )
128
+
129
+ code_tool = LangTool.from_function(
130
+ name="code_file_read",
131
+ func=code_file_read,
132
+ description="Enhanced code file analysis with language-specific insights and structure analysis."
133
+ )
134
+
135
+ youtube_tool = LangTool.from_function(
136
+ name="extract_youtube_info",
137
+ func=extract_youtube_info,
138
+ description="Extracts transcription from the youtube link"
139
+ )
140
+
141
+ # Enhanced tool registry
142
+ AVAILABLE_TOOLS = {
143
+ "excel": excel_tool,
144
+ "search": wiki_tool,
145
+ "wikipedia": duckduckgo_tool,
146
+ "image": image_tool,
147
+ "audio": audio_tool,
148
+ "code": code_tool,
149
+ "youtube": youtube_tool
150
+ }
151
+
152
+ # ----------- Intelligent Tool Selection -----------
153
+ def analyze_question(state: AgentState) -> AgentState:
154
+ """Enhanced question analysis with better tool recommendation"""
155
+ analysis_prompt = f"""
156
+ Analyze this question and determine the best tools and approach:
157
+ Question: {state["question"]}
158
 
159
+ Available enhanced tools:
160
+ 1. excel - Enhanced Excel/CSV analysis with statistics and metadata
161
+ 2. search - Enhanced web search with intelligent query processing and result filtering
162
+ 3. wikipedia - Enhanced Wikipedia search with entity extraction and content filtering
163
+ 4. image - Enhanced image analysis with what the image contains
164
+ 5. audio - Enhanced audio processing with transcription
165
+ 6. code - Enhanced code analysis with language-specific insights
166
+ 7. youtube - Extracts transcription from the youtube link
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
167
 
168
+ Consider:
169
+ - Question type (factual, analytical, current events, technical)
170
+ - Required information sources (files, web, encyclopedic)
171
+ - Time sensitivity (current vs historical information)
172
+ - Complexity level
 
 
 
 
 
 
 
 
 
 
173
 
174
+ Respond with:
175
+ 1. Question type: <type>
176
+ 2. Primary tools needed: <tools>
177
+ 3. Search strategy: <strategy>
178
+ 4. Expected answer format: <format>
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
179
 
180
+ Format: TYPE: <type> | TOOLS: <tools> | STRATEGY: <strategy> | FORMAT: <format>
181
+ """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
182
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
183
  try:
184
+ response = llm.invoke(analysis_prompt).content
185
+ state["conversation_history"].append({"role": "analysis", "content": response})
186
+ state["current_step"] = AgentStep.SELECT_TOOLS.value
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
187
  except Exception as e:
188
+ state["error_count"] += 1
189
+ state["conversation_history"].append({"role": "error", "content": f"Analysis failed: {e}"})
190
+ state["current_step"] = AgentStep.ERROR_RECOVERY.value
191
 
192
+ return state
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
193
 
194
+ def select_tools(state: AgentState) -> AgentState:
195
+ """Enhanced tool selection with smarter logic"""
196
+ question = state["question"].lower()
197
+ selected_tools = []
198
 
199
+ # File-based tool selection
200
+ if any(keyword in question for keyword in ["excel", "csv", "spreadsheet", ".xlsx", ".xls"]):
201
+ selected_tools.append("excel")
202
+ if any(keyword in question for keyword in [".png", ".jpg", ".jpeg", ".bmp", ".gif", "image"]):
203
+ selected_tools.append("image")
204
+ if any(keyword in question for keyword in [".mp3", ".wav", ".ogg", "audio", "transcribe"]):
205
+ selected_tools.append("audio")
206
+ if any(keyword in question for keyword in [".py", ".ipynb", "code", "script", "function"]):
207
+ selected_tools.append("code")
208
+ if any(keyword in question for keyword in ["youtube"]):
209
+ selected_tools.append("youtube")
210
 
211
+ print(f"File-based tools selected: {selected_tools}")
212
 
213
+ tools_prompt = f"""
214
+ You are a smart assistant that selects relevant tools based on the user's natural language question.
215
 
216
+ Available tools:
217
+ - "search" → Use for real-time, recent, or broad web information.
218
+ - "wikipedia" → Use for factual or encyclopedic knowledge.
219
+ - "excel" → Use for spreadsheet-related questions (.xlsx, .csv).
220
+ - "image" → Use for image files (.png, .jpg, etc.) or image-based tasks.
221
+ - "audio" → Use for sound files (.mp3, .wav, etc.) or transcription.
222
+ - "code" → Use for programming-related questions or when files like .py are mentioned.
223
+ - "youtube" → Use for questions involving YouTube videos.
224
 
225
+ Return the result as a **Python list of strings**, no explanation. Use only the relevant tools.
226
+ If not relevant tool is found, return an empty list such as [].
227
 
228
+ ### Examples:
229
 
230
+ Q: "Show me recent news about elections in 2025"
231
+ A: ["search"]
232
 
233
+ Q: "Summarize this Wikipedia article about Einstein"
234
+ A: ["wikipedia"]
235
 
236
+ Q: "Analyze this .csv file"
237
+ A: ["excel"]
238
 
239
+ Q: "Transcribe this .wav audio file"
240
+ A: ["audio"]
241
 
242
+ Q: "Generate Python code from this prompt"
243
+ A: ["code"]
244
 
245
+ Q: "Who was the president of USA in 1945?"
246
+ A: ["wikipedia"]
247
 
248
+ Q: "Give me current weather updates"
249
+ A: ["search"]
250
 
251
+ Q: "Look up the history of space exploration"
252
+ A: ["search", "wikipedia"]
253
 
254
+ Q: "What is 2 + 2?"
255
+ A: []
256
 
257
+ ### Now answer:
258
 
259
+ Q: {state["question"]}
260
+ A:
261
+ """
262
+
263
+ llm_tools = ast.literal_eval(llm.invoke(tools_prompt).content.strip())
264
+ if not isinstance(llm_tools, list):
265
+ llm_tools = []
266
+ print(f"LLM suggested tools: {llm_tools}")
267
+ selected_tools.extend(llm_tools)
268
+ selected_tools = list(set(selected_tools)) # Remove duplicates
269
 
270
+ print(f"Final selected tools after LLM suggestion: {selected_tools}")
271
 
272
 
273
+ # # Information-based tool selection
274
+ # current_indicators = ["recent", "current", "news", "today", "2025", "now"]
275
+ # encyclopedia_indicators = ["wiki", "wikipedia"]
276
+
277
+ # if any(indicator in question for indicator in current_indicators):
278
+ # selected_tools.append("search")
279
+ # elif any(indicator in question for indicator in encyclopedia_indicators):
280
+ # selected_tools.append("wikipedia")
281
+ # elif any(keyword in question for keyword in ["search", "find", "look up", "information about"]):
282
+ # # Use both for comprehensive coverage
283
+ # selected_tools.extend(["search", "wikipedia"])
284
+
285
+ # # Default fallback
286
+ # if not selected_tools:
287
+ # if any(word in question for word in ["who", "what", "when", "where"]):
288
+ # selected_tools.append("wikipedia")
289
+ # selected_tools.append("search")
290
+
291
+ # # Remove duplicates while preserving order
292
+ # selected_tools = list(dict.fromkeys(selected_tools))
293
+
294
+ state["selected_tools"] = selected_tools
295
+ state["current_step"] = AgentStep.EXECUTE_TOOLS.value
296
+ return state
297
+
298
+ def execute_tools(state: AgentState) -> AgentState:
299
+ """Enhanced tool execution with better error handling"""
300
+ results = {}
301
+
302
+ # Enhanced file detection
303
+ file_path = None
304
+ downloaded_file_marker = "A file was downloaded for this task and saved locally at:"
305
+ if downloaded_file_marker in state["question"]:
306
+ lines = state["question"].splitlines()
307
+ for i, line in enumerate(lines):
308
+ if downloaded_file_marker in line:
309
+ if i + 1 < len(lines):
310
+ file_path_candidate = lines[i + 1].strip()
311
+ if Path(file_path_candidate).exists():
312
+ file_path = file_path_candidate
313
+ print(f"Detected file path: {file_path}")
314
+ break
315
+
316
+ for tool_name in state["selected_tools"]:
317
+ try:
318
+ print(f"Executing tool: {tool_name}")
319
+
320
+ # File-based tools
321
+ if tool_name in ["excel", "image", "audio", "code"] and file_path:
322
+ if tool_name == "excel":
323
+ result = AVAILABLE_TOOLS["excel"].run({"excel_path": file_path, "sheet_name": None})
324
+ elif tool_name == "image":
325
+ result = AVAILABLE_TOOLS["image"].run({"image_path": file_path, "question": state["question"]})
326
+ elif tool_name == "youtube":
327
+ print(f"Running YouTube tool with file path: {file_path}")
328
+ result = AVAILABLE_TOOLS["youtube"].run(state["question"])
329
+ else:
330
+ result = AVAILABLE_TOOLS[tool_name].run(file_path)
331
+ # Information-based tools
332
+ else:
333
+ # Extract clean query for search tools
334
+ clean_query = state["question"]
335
+ if downloaded_file_marker in clean_query:
336
+ clean_query = clean_query.split(downloaded_file_marker)[0].strip()
337
+
338
+ result = AVAILABLE_TOOLS[tool_name].run(clean_query)
339
 
340
+ results[tool_name] = result
341
 
342
+ print(f"Tool {tool_name} completed successfully.")
343
+ print(f"Output for {tool_name}: {result}")
344
+
345
+ except Exception as e:
346
+ error_msg = f"Error using {tool_name}: {str(e)}"
347
+ results[tool_name] = error_msg
348
+ state["error_count"] += 1
349
+ print(error_msg)
350
 
351
+ state["tool_results"] = results
352
+ state["current_step"] = AgentStep.SYNTHESIZE_ANSWER.value
353
+ return state
354
 
355
+ def synthesize_answer(state: AgentState) -> AgentState:
356
+ """Enhanced answer synthesis with better formatting"""
357
 
358
+ tool_results_str = "\n".join([f"=== {tool.upper()} RESULTS ===\n{result}\n" for tool, result in state["tool_results"].items()])
359
 
360
+ cot_prompt = f"""You are a precise assistant tasked with analyzing the user's question{" using the available tool outputs" if state["tool_results"] else ""}.
361
 
362
+ Question:
363
+ {state["question"]}
364
 
365
+ {f"Available tool outputs: {tool_results_str}" if state["tool_results"] else ""}
366
 
367
+ Instructions:
368
+ - Think step-by-step to determine the best strategy to answer the question.
369
+ - Use only the given information; do not hallucinate or infer from external knowledge.
370
+ - If decoding, logical deduction, counting, or interpretation is required, show each step clearly.
371
+ - If any part of the tool output is unclear or incomplete, mention it and its impact.
372
+ - Do not guess. If the information is insufficient, say so clearly.
373
+ - Finish with a clearly marked line: `---END OF ANALYSIS---`
374
 
375
+ Your step-by-step analysis:"""
376
 
377
+ cot_response = llm.invoke(cot_prompt).content
378
 
379
+ final_answer_prompt = f"""You are a precise assistant tasked with deriving the **final answer** from the step-by-step analysis below.
380
 
381
+ Question:
382
+ {state["question"]}
383
 
384
+ Step-by-step analysis:
385
+ {cot_response}
386
 
387
+ Instructions:
388
+ - Read the analysis thoroughly before responding.
389
+ - Output ONLY the final answer. Do NOT include any reasoning or explanation.
390
+ - Remove any punctuation at the corners of the answer unless it is explicitly mentioned in the question.
391
+ - The answer must be concise and factual.
392
+ - If the analysis concluded that a definitive answer cannot be determined, respond with: `NA` (exactly).
393
 
394
+ Final answer:"""
395
 
396
+
397
+ try:
398
+ response = llm.invoke(final_answer_prompt).content
399
+ state["final_answer"] = response
400
+ state["current_step"] = AgentStep.COMPLETE.value
401
+ except Exception as e:
402
+ state["error_count"] += 1
403
+ state["final_answer"] = f"Error synthesizing answer: {e}"
404
+ state["current_step"] = AgentStep.ERROR_RECOVERY.value
405
+
406
+ return state
407
+
408
+ def error_recovery(state: AgentState) -> AgentState:
409
+ """Enhanced error recovery with multiple fallback strategies"""
410
+ if state["error_count"] >= state["max_errors"]:
411
+ state["final_answer"] = "I encountered multiple errors and cannot complete this task reliably."
412
+ state["current_step"] = AgentStep.COMPLETE.value
413
+ else:
414
+ # Enhanced fallback: try with simplified approach
415
+ try:
416
+ fallback_prompt = f"""
417
+ Answer this question directly using your knowledge:
418
+ {state["original_question"]}
419
+
420
+ Provide a helpful response even if you cannot access external tools.
421
+ Be clear about any limitations in your answer.
422
+ """
423
+ response = llm.invoke(fallback_prompt).content
424
+ state["final_answer"] = f"Using available knowledge (some tools unavailable): {response}"
425
+ state["current_step"] = AgentStep.COMPLETE.value
426
+ except Exception as e:
427
+ state["final_answer"] = f"All approaches failed. Error: {e}"
428
+ state["current_step"] = AgentStep.COMPLETE.value
429
+
430
+ return state
431
+
432
+ # ----------- Enhanced LangGraph Workflow -----------
433
+ def route_next_step(state: AgentState) -> str:
434
+ """Route to next step based on current state"""
435
+ step_routing = {
436
+ "start": AgentStep.ANALYZE_QUESTION.value,
437
+ AgentStep.ANALYZE_QUESTION.value: AgentStep.SELECT_TOOLS.value,
438
+ AgentStep.SELECT_TOOLS.value: AgentStep.EXECUTE_TOOLS.value,
439
+ AgentStep.EXECUTE_TOOLS.value: AgentStep.SYNTHESIZE_ANSWER.value,
440
+ AgentStep.SYNTHESIZE_ANSWER.value: AgentStep.COMPLETE.value,
441
+ AgentStep.ERROR_RECOVERY.value: AgentStep.COMPLETE.value,
442
+ AgentStep.COMPLETE.value: END,
443
+ }
444
+
445
+ return step_routing.get(state["current_step"], END)
446
+
447
+ # Create enhanced workflow
448
+ workflow = StateGraph(AgentState)
449
+
450
+ # Add nodes
451
+ workflow.add_node("analyze_question", RunnableLambda(analyze_question))
452
+ workflow.add_node("select_tools", RunnableLambda(select_tools))
453
+ workflow.add_node("execute_tools", RunnableLambda(execute_tools))
454
+ workflow.add_node("synthesize_answer", RunnableLambda(synthesize_answer))
455
+ workflow.add_node("error_recovery", RunnableLambda(error_recovery))
456
+
457
+ # Set entry point
458
+ workflow.set_entry_point("analyze_question")
459
+
460
+ # Add conditional edges
461
+ workflow.add_conditional_edges(
462
+ "analyze_question",
463
+ lambda state: "select_tools" if state["current_step"] == AgentStep.SELECT_TOOLS.value else "error_recovery"
464
+ )
465
+ workflow.add_edge("select_tools", "execute_tools")
466
+ workflow.add_conditional_edges(
467
+ "execute_tools",
468
+ lambda state: "synthesize_answer" if state["current_step"] == AgentStep.SYNTHESIZE_ANSWER.value else "error_recovery"
469
+ )
470
+ workflow.add_conditional_edges(
471
+ "synthesize_answer",
472
+ lambda state: END if state["current_step"] == AgentStep.COMPLETE.value else "error_recovery"
473
+ )
474
+ workflow.add_edge("error_recovery", END)
475
+
476
+ # Compile the enhanced graph
477
+ graph = workflow.compile()
478
+
479
+ # ----------- Agent Class -----------
480
+ class GaiaAgent:
481
+ """GAIA Agent with tools and intelligent processing"""
482
+
483
+ def __init__(self):
484
+ self.graph = graph
485
+ self.tool_usage_stats = {}
486
+ print("Enhanced GAIA Agent initialized with:")
487
+ print("✓ Intelligent multi-query web search")
488
+ print("✓ Entity-aware Wikipedia search")
489
+ print("✓ Enhanced file processing tools")
490
+ print("✓ Advanced error recovery")
491
+ print("✓ Comprehensive result synthesis")
492
+
493
+ def get_tool_stats(self) -> Dict[str, int]:
494
+ """Get usage statistics for tools"""
495
+ return self.tool_usage_stats.copy()
496
+
497
+ def __call__(self, task_id: str, question: str) -> str:
498
+ print(f"\n{'='*60}")
499
+ print(f"[{task_id}] ENHANCED PROCESSING: {question}")
500
+
501
+ # Initialize state
502
+ processed_question = process_file(task_id, question)
503
+ initial_state = initialize_state(processed_question)
504
+
505
+ try:
506
+ # Execute the enhanced workflow
507
+ result = self.graph.invoke(initial_state)
508
+
509
+ # Extract results
510
+ answer = result.get("final_answer", "No answer generated")
511
+ selected_tools = result.get("selected_tools", [])
512
+ conversation_history = result.get("conversation_history", [])
513
+ tool_results = result.get("tool_results", {})
514
+ error_count = result.get("error_count", 0)
515
+
516
+ # Update tool usage statistics
517
+ for tool in selected_tools:
518
+ self.tool_usage_stats[tool] = self.tool_usage_stats.get(tool, 0) + 1
519
+
520
+ # Enhanced logging
521
+ print(f"[{task_id}] Selected tools: {selected_tools}")
522
+ print(f"[{task_id}] Tools executed: {list(tool_results.keys())}")
523
+ print(f"[{task_id}] Processing steps: {len(conversation_history)}")
524
+ print(f"[{task_id}] Errors encountered: {error_count}")
525
+
526
+ # Log tool result sizes for debugging
527
+ for tool, result in tool_results.items():
528
+ result_size = len(str(result)) if result else 0
529
+ print(f"[{task_id}] {tool} result size: {result_size} chars")
530
+
531
+ print(f"[{task_id}] FINAL ANSWER: {answer}")
532
+ print(f"{'='*60}")
533
+
534
+ return answer
535
+
536
+ except Exception as e:
537
+ error_msg = f"Critical error in enhanced agent execution: {str(e)}"
538
+ print(f"[{task_id}] {error_msg}")
539
+
540
+ # Try fallback direct LLM response
541
+ try:
542
+ fallback_response = llm.invoke(f"Please answer this question: {question}").content
543
+ return f"Fallback response: {fallback_response}"
544
+ except:
545
+ return error_msg
546
+
547
+ # ----------- Enhanced File Processing -----------
548
+ def detect_file_type(file_path: str) -> Optional[str]:
549
+ """Enhanced file type detection with more formats"""
550
+ ext = Path(file_path).suffix.lower()
551
+
552
+ file_type_mapping = {
553
+ # Spreadsheets
554
+ '.xlsx': 'excel', '.xls': 'excel', '.csv': 'excel',
555
+ # Images
556
+ '.png': 'image', '.jpg': 'image', '.jpeg': 'image',
557
+ '.bmp': 'image', '.gif': 'image', '.tiff': 'image', '.webp': 'image',
558
+ # Audio
559
+ '.mp3': 'audio', '.wav': 'audio', '.ogg': 'audio',
560
+ '.flac': 'audio', '.m4a': 'audio', '.aac': 'audio',
561
+ # Code
562
+ '.py': 'code', '.ipynb': 'code', '.js': 'code', '.html': 'code',
563
+ '.css': 'code', '.java': 'code', '.cpp': 'code', '.c': 'code',
564
+ '.sql': 'code', '.r': 'code', '.json': 'code', '.xml': 'code',
565
+ # Documents
566
+ '.txt': 'text', '.md': 'text', '.pdf': 'document',
567
+ '.doc': 'document', '.docx': 'document'
568
+ }
569
+
570
+ return file_type_mapping.get(ext)
571
 
572
+ def process_file(task_id: str, question_text: str) -> str:
573
+ """Enhanced file processing with better error handling and metadata"""
574
+ file_url = f"{FILE_PATH}{task_id}"
575
+
576
+ try:
577
+ print(f"[{task_id}] Attempting to download file from: {file_url}")
578
+ response = requests.get(file_url, timeout=30)
579
+ response.raise_for_status()
580
+ print(f"[{task_id}] File download successful. Status: {response.status_code}")
581
+
582
+ except requests.exceptions.RequestException as exc:
583
+ print(f"[{task_id}] File download failed: {str(exc)}")
584
+ return question_text # Return original question if no file
585
+
586
+ # Enhanced filename extraction
587
+ content_disposition = response.headers.get("content-disposition", "")
588
+ filename = task_id # Default fallback
589
+
590
+ # Try to extract filename from Content-Disposition header
591
+ filename_match = re.search(r'filename[*]?=(?:"([^"]+)"|([^;]+))', content_disposition)
592
+ if filename_match:
593
+ filename = filename_match.group(1) or filename_match.group(2)
594
+ filename = filename.strip()
595
+
596
+ # Create enhanced temp directory structure
597
+ temp_storage_dir = Path(tempfile.gettempdir()) / "gaia_enhanced_files" / task_id
598
+ temp_storage_dir.mkdir(parents=True, exist_ok=True)
599
+
600
+ file_path = temp_storage_dir / filename
601
+ file_path.write_bytes(response.content)
602
+
603
+ # Get file metadata
604
+ file_size = len(response.content)
605
+ file_type = detect_file_type(filename)
606
+
607
+ print(f"[{task_id}] File saved: {filename} ({file_size:,} bytes, type: {file_type})")
608
+
609
+ # Enhanced question augmentation
610
+ enhanced_question = f"{question_text}\n\n"
611
+ enhanced_question += f"{'='*50}\n"
612
+ enhanced_question += f"FILE INFORMATION:\n"
613
+ enhanced_question += f"A file was downloaded for this task and saved locally at:\n"
614
+ enhanced_question += f"{str(file_path)}\n"
615
+ enhanced_question += f"File details:\n"
616
+ enhanced_question += f"- Name: {filename}\n"
617
+ enhanced_question += f"- Size: {file_size:,} bytes ({file_size/1024:.1f} KB)\n"
618
+ enhanced_question += f"- Type: {file_type or 'unknown'}\n"
619
+ enhanced_question += f"{'='*50}\n\n"
620
+
621
+ return enhanced_question
622
 
623
+ # ----------- Usage Examples and Testing -----------
624
+ def run_enhanced_tests():
625
+ """Run comprehensive tests of the enhanced agent"""
626
+ agent = GaiaAgent()
627
+
628
+ test_cases = [
629
+ {
630
+ "id": "test_search_1",
631
+ "question": "What are the latest developments in artificial intelligence in 2024?",
632
+ "expected_tools": ["search"]
633
+ },
634
+ {
635
+ "id": "test_wiki_1",
636
+ "question": "Tell me about Albert Einstein's contributions to physics",
637
+ "expected_tools": ["wikipedia"]
638
+ },
639
+ {
640
+ "id": "test_combined_1",
641
+ "question": "What is machine learning and what are recent breakthroughs?",
642
+ "expected_tools": ["wikipedia", "search"]
643
+ },
644
+ {
645
+ "id": "test_excel_1",
646
+ "question": "Analyze the data in the Excel file sales_data.xlsx",
647
+ "expected_tools": ["excel"]
648
+ }
649
+ ]
650
+
651
+ print("\n" + "="*80)
652
+ print("RUNNING ENHANCED AGENT TESTS")
653
+ print("="*80)
654
+
655
+ for test_case in test_cases:
656
+ print(f"\nTest Case: {test_case['id']}")
657
+ print(f"Question: {test_case['question']}")
658
+ print(f"Expected tools: {test_case['expected_tools']}")
659
+
660
+ try:
661
+ result = agent(test_case['id'], test_case['question'])
662
+ print(f"Result length: {len(result)} characters")
663
+ print(f"Result preview: {result[:200]}...")
664
+ except Exception as e:
665
+ print(f"Test failed: {e}")
666
+
667
+ print("-" * 60)
668
+
669
+ # Print tool usage statistics
670
+ print(f"\nTool Usage Statistics:")
671
+ for tool, count in agent.get_tool_stats().items():
672
+ print(f" {tool}: {count} times")
673
+
674
+ # Usage example
675
+ if __name__ == "__main__":
676
+ # Create enhanced agent
677
+ agent = GaiaAgent()
678
+
679
+ # Example usage
680
+ sample_questions = [
681
+ "What is the current population of Tokyo and how has it changed recently?",
682
+ "Explain quantum computing and its recent developments",
683
+ "Tell me about the history of machine learning and current AI trends",
684
+ ]
685
+
686
+ print("\n" + "="*80)
687
+ print("ENHANCED GAIA AGENT DEMONSTRATION")
688
+ print("="*80)
689
+
690
+ for i, question in enumerate(sample_questions):
691
+ print(f"\nExample {i+1}: {question}")
692
+ result = agent(f"demo_{i}", question)
693
+ print(f"Answer: {result[:300]}...")
694
+ print("-" * 60)
695
+
696
+ # Uncomment to run comprehensive tests
697
+ # run_enhanced_tests()