""" Utility functions for the GAIA Agent """ import os import re import shutil import urllib.parse import requests from bs4 import BeautifulSoup from config import DEFAULT_API_URL, QUESTION_TYPES def clean_ansi_codes(text): """Remove ANSI color codes from terminal output.""" ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') return ansi_escape.sub('', text) def clean_answer(answer): """Clean the agent response by removing unnecessary formatting.""" answer = str(answer).strip() patterns_to_remove = [ (r'^Final Answer:\s*', ''), (r'^Answer:\s*', ''), (r'^The answer is\s*', ''), (r'^Based on[^,]*,\s*', ''), (r'```', ''), (r'\*\*', ''), (r'^##\s*', '') ] for pattern, replacement in patterns_to_remove: answer = re.sub(pattern, replacement, answer, flags=re.IGNORECASE) return answer.strip() def detect_question_type(question, file_name): """ Detect the question type to apply a specific strategy. Args: question: The question text file_name: Name of the attached file (if any) Returns: str: Question type (see QUESTION_TYPES in config.py) """ q_lower = question.lower() if "youtube.com" in question or "youtu.be" in question: return QUESTION_TYPES['YOUTUBE_VIDEO'] elif file_name and file_name.endswith(".png"): return QUESTION_TYPES['IMAGE_FILE'] elif file_name and file_name.endswith(".mp3"): return QUESTION_TYPES['AUDIO_FILE'] elif file_name and (file_name.endswith(".xlsx") or file_name.endswith(".csv")): return QUESTION_TYPES['DATA_FILE'] elif file_name and file_name.endswith(".py"): return QUESTION_TYPES['CODE_FILE'] elif "wikipedia" in q_lower: return QUESTION_TYPES['WIKIPEDIA'] elif any(word in q_lower for word in ["how many", "count", "number of"]): return QUESTION_TYPES['COUNTING'] elif "reverse" in q_lower or "backwards" in q_lower or ".rewsna" in question: return QUESTION_TYPES['TEXT_MANIPULATION'] else: return QUESTION_TYPES['GENERAL'] def download_file_for_task(task_id): """ Download the attached file for a task if it exists. Args: task_id: The task ID Returns: str: Path to downloaded file or None if no file exists """ file_url = f"{DEFAULT_API_URL}/files/{task_id}" try: response = requests.get(file_url, stream=True, timeout=30) if response.status_code == 200: filename = f"file_{task_id}" # Get real filename from header if "content-disposition" in response.headers: cd = response.headers["content-disposition"] if "filename=" in cd: filename = cd.split("filename=")[1].strip('"') # Ensure correct extension if "." not in filename: content_type = response.headers.get("content-type", "") if "excel" in content_type or "spreadsheet" in content_type: filename += ".xlsx" elif "audio" in content_type or "mpeg" in content_type: filename += ".mp3" elif "image" in content_type or "png" in content_type: filename += ".png" elif "python" in content_type: filename += ".py" with open(filename, 'wb') as f: shutil.copyfileobj(response.raw, f) print(f" ✓ File downloaded: {filename} ({os.path.getsize(filename)} bytes)") return filename except Exception as e: print(f" ✗ Error downloading file: {e}") return None def fetch_and_download_links(url, dest_dir, max_files=20): """ Download linked resources from a URL. Args: url: URL of the page to scan dest_dir: Destination directory for files max_files: Maximum number of files to download Returns: list: List of downloaded file paths """ downloaded = [] try: os.makedirs(dest_dir, exist_ok=True) resp = requests.get(url, timeout=20) resp.raise_for_status() soup = BeautifulSoup(resp.text, "lxml") candidates = [] for tag in soup.find_all(['a', 'link']): href = tag.get('href') if href: candidates.append(href) for tag in soup.find_all(['img', 'script', 'source']): src = tag.get('src') if src: candidates.append(src) seen = set() allowed_exts = {'.png', '.jpg', '.jpeg', '.gif', '.svg', '.pdf', '.zip', '.mp3', '.mp4', '.py', '.txt', '.csv', '.xlsx', '.xls'} for c in candidates: if len(downloaded) >= max_files: break full = urllib.parse.urljoin(url, c) if full in seen: continue seen.add(full) path = urllib.parse.urlparse(full).path ext = os.path.splitext(path)[1].lower() if ext in allowed_exts: try: r = requests.get(full, stream=True, timeout=20) r.raise_for_status() cd = r.headers.get('content-disposition') if cd and 'filename=' in cd: fname = cd.split('filename=')[1].strip('"') else: fname = os.path.basename(path) or f"resource_{len(downloaded)}{ext}" out_path = os.path.join(dest_dir, fname) with open(out_path, 'wb') as of: shutil.copyfileobj(r.raw, of) downloaded.append(out_path) except Exception: continue except Exception: pass return downloaded