Diego-Fco's picture
Clean project structure with English comments
b712b2b
"""
Utility functions for the GAIA Agent
"""
import os
import re
import shutil
import urllib.parse
import requests
from bs4 import BeautifulSoup
from config import DEFAULT_API_URL, QUESTION_TYPES
def clean_ansi_codes(text):
"""Remove ANSI color codes from terminal output."""
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
return ansi_escape.sub('', text)
def clean_answer(answer):
"""Clean the agent response by removing unnecessary formatting."""
answer = str(answer).strip()
patterns_to_remove = [
(r'^Final Answer:\s*', ''),
(r'^Answer:\s*', ''),
(r'^The answer is\s*', ''),
(r'^Based on[^,]*,\s*', ''),
(r'```', ''),
(r'\*\*', ''),
(r'^##\s*', '')
]
for pattern, replacement in patterns_to_remove:
answer = re.sub(pattern, replacement, answer, flags=re.IGNORECASE)
return answer.strip()
def detect_question_type(question, file_name):
"""
Detect the question type to apply a specific strategy.
Args:
question: The question text
file_name: Name of the attached file (if any)
Returns:
str: Question type (see QUESTION_TYPES in config.py)
"""
q_lower = question.lower()
if "youtube.com" in question or "youtu.be" in question:
return QUESTION_TYPES['YOUTUBE_VIDEO']
elif file_name and file_name.endswith(".png"):
return QUESTION_TYPES['IMAGE_FILE']
elif file_name and file_name.endswith(".mp3"):
return QUESTION_TYPES['AUDIO_FILE']
elif file_name and (file_name.endswith(".xlsx") or file_name.endswith(".csv")):
return QUESTION_TYPES['DATA_FILE']
elif file_name and file_name.endswith(".py"):
return QUESTION_TYPES['CODE_FILE']
elif "wikipedia" in q_lower:
return QUESTION_TYPES['WIKIPEDIA']
elif any(word in q_lower for word in ["how many", "count", "number of"]):
return QUESTION_TYPES['COUNTING']
elif "reverse" in q_lower or "backwards" in q_lower or ".rewsna" in question:
return QUESTION_TYPES['TEXT_MANIPULATION']
else:
return QUESTION_TYPES['GENERAL']
def download_file_for_task(task_id):
"""
Download the attached file for a task if it exists.
Args:
task_id: The task ID
Returns:
str: Path to downloaded file or None if no file exists
"""
file_url = f"{DEFAULT_API_URL}/files/{task_id}"
try:
response = requests.get(file_url, stream=True, timeout=30)
if response.status_code == 200:
filename = f"file_{task_id}"
# Get real filename from header
if "content-disposition" in response.headers:
cd = response.headers["content-disposition"]
if "filename=" in cd:
filename = cd.split("filename=")[1].strip('"')
# Ensure correct extension
if "." not in filename:
content_type = response.headers.get("content-type", "")
if "excel" in content_type or "spreadsheet" in content_type:
filename += ".xlsx"
elif "audio" in content_type or "mpeg" in content_type:
filename += ".mp3"
elif "image" in content_type or "png" in content_type:
filename += ".png"
elif "python" in content_type:
filename += ".py"
with open(filename, 'wb') as f:
shutil.copyfileobj(response.raw, f)
print(f" ✓ File downloaded: {filename} ({os.path.getsize(filename)} bytes)")
return filename
except Exception as e:
print(f" ✗ Error downloading file: {e}")
return None
def fetch_and_download_links(url, dest_dir, max_files=20):
"""
Download linked resources from a URL.
Args:
url: URL of the page to scan
dest_dir: Destination directory for files
max_files: Maximum number of files to download
Returns:
list: List of downloaded file paths
"""
downloaded = []
try:
os.makedirs(dest_dir, exist_ok=True)
resp = requests.get(url, timeout=20)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "lxml")
candidates = []
for tag in soup.find_all(['a', 'link']):
href = tag.get('href')
if href:
candidates.append(href)
for tag in soup.find_all(['img', 'script', 'source']):
src = tag.get('src')
if src:
candidates.append(src)
seen = set()
allowed_exts = {'.png', '.jpg', '.jpeg', '.gif', '.svg', '.pdf', '.zip',
'.mp3', '.mp4', '.py', '.txt', '.csv', '.xlsx', '.xls'}
for c in candidates:
if len(downloaded) >= max_files:
break
full = urllib.parse.urljoin(url, c)
if full in seen:
continue
seen.add(full)
path = urllib.parse.urlparse(full).path
ext = os.path.splitext(path)[1].lower()
if ext in allowed_exts:
try:
r = requests.get(full, stream=True, timeout=20)
r.raise_for_status()
cd = r.headers.get('content-disposition')
if cd and 'filename=' in cd:
fname = cd.split('filename=')[1].strip('"')
else:
fname = os.path.basename(path) or f"resource_{len(downloaded)}{ext}"
out_path = os.path.join(dest_dir, fname)
with open(out_path, 'wb') as of:
shutil.copyfileobj(r.raw, of)
downloaded.append(out_path)
except Exception:
continue
except Exception:
pass
return downloaded