File size: 6,026 Bytes
602a16c
b712b2b
602a16c
 
 
 
 
 
 
 
 
 
 
 
b712b2b
602a16c
 
 
 
 
b712b2b
602a16c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b712b2b
602a16c
 
b712b2b
 
602a16c
 
b712b2b
602a16c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b712b2b
602a16c
 
b712b2b
602a16c
 
b712b2b
602a16c
 
 
 
 
 
 
b712b2b
602a16c
 
 
 
 
b712b2b
602a16c
 
 
 
 
 
 
 
 
 
 
 
 
 
b712b2b
602a16c
 
b712b2b
602a16c
 
 
 
 
b712b2b
602a16c
 
b712b2b
 
 
602a16c
 
b712b2b
602a16c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
"""
Utility functions for the GAIA Agent
"""
import os
import re
import shutil
import urllib.parse
import requests
from bs4 import BeautifulSoup

from config import DEFAULT_API_URL, QUESTION_TYPES


def clean_ansi_codes(text):
    """Remove ANSI color codes from terminal output."""
    ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
    return ansi_escape.sub('', text)


def clean_answer(answer):
    """Clean the agent response by removing unnecessary formatting."""
    answer = str(answer).strip()
    
    patterns_to_remove = [
        (r'^Final Answer:\s*', ''),
        (r'^Answer:\s*', ''),
        (r'^The answer is\s*', ''),
        (r'^Based on[^,]*,\s*', ''),
        (r'```', ''),
        (r'\*\*', ''),
        (r'^##\s*', '')
    ]
    
    for pattern, replacement in patterns_to_remove:
        answer = re.sub(pattern, replacement, answer, flags=re.IGNORECASE)
    
    return answer.strip()


def detect_question_type(question, file_name):
    """
    Detect the question type to apply a specific strategy.
    
    Args:
        question: The question text
        file_name: Name of the attached file (if any)
        
    Returns:
        str: Question type (see QUESTION_TYPES in config.py)
    """
    q_lower = question.lower()
    
    if "youtube.com" in question or "youtu.be" in question:
        return QUESTION_TYPES['YOUTUBE_VIDEO']
    elif file_name and file_name.endswith(".png"):
        return QUESTION_TYPES['IMAGE_FILE']
    elif file_name and file_name.endswith(".mp3"):
        return QUESTION_TYPES['AUDIO_FILE']
    elif file_name and (file_name.endswith(".xlsx") or file_name.endswith(".csv")):
        return QUESTION_TYPES['DATA_FILE']
    elif file_name and file_name.endswith(".py"):
        return QUESTION_TYPES['CODE_FILE']
    elif "wikipedia" in q_lower:
        return QUESTION_TYPES['WIKIPEDIA']
    elif any(word in q_lower for word in ["how many", "count", "number of"]):
        return QUESTION_TYPES['COUNTING']
    elif "reverse" in q_lower or "backwards" in q_lower or ".rewsna" in question:
        return QUESTION_TYPES['TEXT_MANIPULATION']
    else:
        return QUESTION_TYPES['GENERAL']


def download_file_for_task(task_id):
    """
    Download the attached file for a task if it exists.
    
    Args:
        task_id: The task ID
        
    Returns:
        str: Path to downloaded file or None if no file exists
    """
    file_url = f"{DEFAULT_API_URL}/files/{task_id}"
    try:
        response = requests.get(file_url, stream=True, timeout=30)
        if response.status_code == 200:
            filename = f"file_{task_id}"
            
            # Get real filename from header
            if "content-disposition" in response.headers:
                cd = response.headers["content-disposition"]
                if "filename=" in cd:
                    filename = cd.split("filename=")[1].strip('"')
            
            # Ensure correct extension
            if "." not in filename:
                content_type = response.headers.get("content-type", "")
                if "excel" in content_type or "spreadsheet" in content_type:
                    filename += ".xlsx"
                elif "audio" in content_type or "mpeg" in content_type:
                    filename += ".mp3"
                elif "image" in content_type or "png" in content_type:
                    filename += ".png"
                elif "python" in content_type:
                    filename += ".py"

            with open(filename, 'wb') as f:
                shutil.copyfileobj(response.raw, f)
            
            print(f"      ✓ File downloaded: {filename} ({os.path.getsize(filename)} bytes)")
            return filename
    except Exception as e:
        print(f"      ✗ Error downloading file: {e}")
    return None


def fetch_and_download_links(url, dest_dir, max_files=20):
    """
    Download linked resources from a URL.
    
    Args:
        url: URL of the page to scan
        dest_dir: Destination directory for files
        max_files: Maximum number of files to download
        
    Returns:
        list: List of downloaded file paths
    """
    downloaded = []
    try:
        os.makedirs(dest_dir, exist_ok=True)
        resp = requests.get(url, timeout=20)
        resp.raise_for_status()
        soup = BeautifulSoup(resp.text, "lxml")

        candidates = []
        for tag in soup.find_all(['a', 'link']):
            href = tag.get('href')
            if href:
                candidates.append(href)
        for tag in soup.find_all(['img', 'script', 'source']):
            src = tag.get('src')
            if src:
                candidates.append(src)

        seen = set()
        allowed_exts = {'.png', '.jpg', '.jpeg', '.gif', '.svg', '.pdf', '.zip', 
                       '.mp3', '.mp4', '.py', '.txt', '.csv', '.xlsx', '.xls'}
        
        for c in candidates:
            if len(downloaded) >= max_files:
                break
            full = urllib.parse.urljoin(url, c)
            if full in seen:
                continue
            seen.add(full)

            path = urllib.parse.urlparse(full).path
            ext = os.path.splitext(path)[1].lower()
            
            if ext in allowed_exts:
                try:
                    r = requests.get(full, stream=True, timeout=20)
                    r.raise_for_status()
                    cd = r.headers.get('content-disposition')
                    if cd and 'filename=' in cd:
                        fname = cd.split('filename=')[1].strip('"')
                    else:
                        fname = os.path.basename(path) or f"resource_{len(downloaded)}{ext}"
                    out_path = os.path.join(dest_dir, fname)
                    with open(out_path, 'wb') as of:
                        shutil.copyfileobj(r.raw, of)
                    downloaded.append(out_path)
                except Exception:
                    continue

    except Exception:
        pass

    return downloaded