File size: 14,896 Bytes
2a5f443
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
"""
GAIA Final Challenge agent for the HF AI Agents course.
Uses Claude Haiku 4.5 with tool use.
"""
import os, sys, json, subprocess, tempfile, traceback, base64, mimetypes
sys.stdout.reconfigure(encoding='utf-8')

import requests
import anthropic

API_BASE = "https://agents-course-unit4-scoring.hf.space"
MODEL = "claude-haiku-4-5"
MAX_TURNS = 12
WORK_DIR = "C:/Users/22678/Downloads/test/test/gaia_work"
os.makedirs(WORK_DIR, exist_ok=True)

client = anthropic.Anthropic()


# ---------- TOOLS ----------

def tool_wikipedia_search(query: str) -> str:
    """Search English Wikipedia and return top result extracts (summary text)."""
    try:
        r = requests.get(
            "https://en.wikipedia.org/w/api.php",
            params={
                "action": "query",
                "list": "search",
                "srsearch": query,
                "format": "json",
                "srlimit": 5,
            },
            timeout=20,
            headers={"User-Agent": "gaia-agent/0.1 (course exercise)"},
        )
        results = r.json().get("query", {}).get("search", [])
        if not results:
            return f"No results for '{query}'."

        out = [f"Top {len(results)} Wikipedia hits for '{query}':"]
        for hit in results:
            title = hit["title"]
            snippet = hit.get("snippet", "").replace('<span class="searchmatch">', '**').replace("</span>", "**")
            url = f"https://en.wikipedia.org/wiki/{title.replace(' ', '_')}"
            out.append(f"\n- **{title}** β€” {url}\n  {snippet}")
        return "\n".join(out)
    except Exception as e:
        return f"Error: {e}"


def tool_fetch_url(url: str, max_chars: int = 8000) -> str:
    """Fetch a URL and return its text content (stripped of HTML)."""
    try:
        r = requests.get(url, timeout=30, headers={"User-Agent": "Mozilla/5.0 gaia-agent"})
        ct = r.headers.get("content-type", "")
        if "html" in ct or url.endswith(".html") or "wikipedia.org" in url:
            from bs4 import BeautifulSoup
            soup = BeautifulSoup(r.text, "html.parser")
            for s in soup(["script", "style", "nav", "footer"]):
                s.decompose()
            text = soup.get_text(separator="\n")
            text = "\n".join(line.strip() for line in text.splitlines() if line.strip())
        else:
            text = r.text
        if len(text) > max_chars:
            text = text[:max_chars] + f"\n[...truncated {len(text)-max_chars} chars]"
        return text
    except Exception as e:
        return f"Error fetching {url}: {e}"


def tool_download_task_file(task_id: str) -> str:
    """Download the file attached to a GAIA task. Returns local file path."""
    try:
        r = requests.get(f"{API_BASE}/files/{task_id}", timeout=60)
        r.raise_for_status()
        # Try to get filename from header
        cd = r.headers.get("content-disposition", "")
        fname = task_id
        if "filename=" in cd:
            fname = cd.split("filename=")[1].strip('"; ')
        local = os.path.join(WORK_DIR, fname)
        with open(local, "wb") as f:
            f.write(r.content)
        return f"Downloaded to {local} ({len(r.content)} bytes)"
    except Exception as e:
        return f"Error: {e}"


def tool_run_python(code: str, working_file: str = "") -> str:
    """Execute Python code. If working_file points to a .py file, just run that file."""
    try:
        if working_file and working_file.endswith(".py"):
            r = subprocess.run(
                [sys.executable, working_file],
                capture_output=True, text=True, timeout=60, cwd=WORK_DIR,
            )
            return f"stdout:\n{r.stdout}\n\nstderr:\n{r.stderr}\nreturncode={r.returncode}"
        with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False, encoding="utf-8") as f:
            f.write(code)
            tmp = f.name
        try:
            r = subprocess.run(
                [sys.executable, tmp],
                capture_output=True, text=True, timeout=60, cwd=WORK_DIR,
            )
            return f"stdout:\n{r.stdout}\n\nstderr:\n{r.stderr}\nreturncode={r.returncode}"
        finally:
            os.unlink(tmp)
    except subprocess.TimeoutExpired:
        return "Error: Timed out after 60s"
    except Exception as e:
        return f"Error: {e}\n{traceback.format_exc()}"


def tool_youtube_transcript(video_url: str) -> str:
    """Try to fetch YouTube transcript."""
    try:
        from youtube_transcript_api import YouTubeTranscriptApi
        vid = video_url.split("v=")[1].split("&")[0]
        transcript = YouTubeTranscriptApi.get_transcript(vid)
        return "\n".join(f"[{t['start']:.1f}s] {t['text']}" for t in transcript)
    except Exception as e:
        return f"Error: {e}"


TOOLS = [
    {
        "name": "wikipedia_search",
        "description": "Search English Wikipedia and get top 5 results with snippets and URLs. Use this FIRST for any factual question.",
        "input_schema": {"type": "object", "properties": {"query": {"type": "string"}}, "required": ["query"]},
    },
    {
        "name": "fetch_url",
        "description": "Fetch a URL (usually a Wikipedia page) and return its cleaned text content.",
        "input_schema": {"type": "object", "properties": {"url": {"type": "string"}}, "required": ["url"]},
    },
    {
        "name": "download_task_file",
        "description": "Download the file attached to the current GAIA task. Returns the local file path.",
        "input_schema": {"type": "object", "properties": {"task_id": {"type": "string"}}, "required": ["task_id"]},
    },
    {
        "name": "run_python",
        "description": "Execute Python code OR run an existing .py file. For .xlsx parsing, use pandas. For .py files just pass working_file=<path>.",
        "input_schema": {
            "type": "object",
            "properties": {
                "code": {"type": "string", "description": "Python code to run (ignored if working_file is set)"},
                "working_file": {"type": "string", "description": "Path to a .py file to execute directly"},
            },
            "required": ["code"],
        },
    },
    {
        "name": "youtube_transcript",
        "description": "Fetch transcript of a YouTube video by URL.",
        "input_schema": {"type": "object", "properties": {"video_url": {"type": "string"}}, "required": ["video_url"]},
    },
    {
        "name": "submit_final_answer",
        "description": "Submit your final answer. The `answer` string will be scored via exact match - no preamble, no explanation. Call this exactly once at the end.",
        "input_schema": {"type": "object", "properties": {"answer": {"type": "string", "description": "The final answer string, formatted exactly as the question requests"}}, "required": ["answer"]},
    },
]

TOOL_FNS = {
    "wikipedia_search": lambda i: tool_wikipedia_search(i["query"]),
    "fetch_url": lambda i: tool_fetch_url(i["url"]),
    "download_task_file": lambda i: tool_download_task_file(i["task_id"]),
    "run_python": lambda i: tool_run_python(i.get("code", ""), i.get("working_file", "")),
    "youtube_transcript": lambda i: tool_youtube_transcript(i["video_url"]),
}


SYSTEM = """You are a research agent solving GAIA benchmark questions for an EXACT-MATCH scoring system.

CRITICAL: You MUST end every task by calling the `submit_final_answer` tool with the clean answer string.
The `answer` argument is what gets scored - no preamble, no explanation, exact format only.

Workflow:
1. For ANY factual / lookup question (people, dates, statistics, geography, articles, history, sports, etc.):
   ALWAYS call wikipedia_search FIRST. Do not answer from memory - your memory is often wrong on specifics.
   Then call fetch_url on the most relevant Wikipedia URL to read details.
2. For attached file questions: call download_task_file. If it returns "No file path associated",
   the file is permanently unavailable - just guess in the right format.
3. For pure reasoning (math, logic, reversed text, group theory): you may answer directly, but use run_python to verify.
4. For YouTube questions: try youtube_transcript with the URL.

Format rules (CRITICAL for exact-match):
- "comma-separated list, alphabetical order" β†’ "apple, banana, cherry"  (lowercase, space after comma)
- "first name only" β†’ just one word like "Sarah"
- "IOC country code" β†’ 3 uppercase letters like "USA"
- "USD with two decimal places" β†’ "1234.56" (no $ sign unless asked)
- "just the city name without abbreviations" β†’ "Boston" (full name, no state)
- "last names only, in Roman characters" β†’ "Smith, Jones"
- Numeric β†’ bare number, no unit unless requested
- Never include "FINAL ANSWER:" or quotes
- If you can't determine the answer, still submit your best guess in the correct format

You can use up to 10 tool calls. Then you MUST call submit_final_answer."""


def solve_question(q: dict) -> str:
    """Run agent loop for a single question, return final answer string."""
    task_id = q["task_id"]
    question = q["question"]
    file_name = q.get("file_name", "")

    user_content = f"task_id: {task_id}\n\nQuestion:\n{question}"
    if file_name:
        user_content += f"\n\nAttached file: {file_name}  (call download_task_file with the task_id above to get it)"

    # For chess image (Q4), include image in initial message
    image_content = None
    if file_name.lower().endswith((".png", ".jpg", ".jpeg")):
        # Download the image first
        tool_download_task_file(task_id)
        local_img = os.path.join(WORK_DIR, file_name)
        if os.path.exists(local_img):
            with open(local_img, "rb") as f:
                img_data = base64.standard_b64encode(f.read()).decode("utf-8")
            media_type = mimetypes.guess_type(local_img)[0] or "image/png"
            image_content = {"type": "image", "source": {"type": "base64", "media_type": media_type, "data": img_data}}

    messages = [{"role": "user", "content": ([image_content, {"type": "text", "text": user_content}] if image_content else user_content)}]

    final_answer = None
    for turn in range(MAX_TURNS):
        resp = client.messages.create(
            model=MODEL,
            max_tokens=4096,
            system=SYSTEM,
            tools=TOOLS,
            messages=messages,
        )

        if resp.stop_reason == "tool_use":
            messages.append({"role": "assistant", "content": resp.content})
            tool_results = []
            for block in resp.content:
                if block.type == "tool_use":
                    if block.name == "submit_final_answer":
                        final_answer = block.input.get("answer", "").strip()
                        print(f"   [turn {turn}] >>> submit_final_answer: {final_answer!r}")
                        return final_answer
                    print(f"   [turn {turn}] tool: {block.name}({json.dumps(block.input)[:120]})")
                    try:
                        result = TOOL_FNS[block.name](block.input)
                    except Exception as e:
                        result = f"Tool error: {e}"
                    if len(result) > 12000:
                        result = result[:12000] + "\n[truncated]"
                    tool_results.append({"type": "tool_result", "tool_use_id": block.id, "content": result})
            messages.append({"role": "user", "content": tool_results})
            continue

        # Reached end_turn without submitting β€” force a final answer
        text_blocks = [b.text for b in resp.content if b.type == "text"]
        partial_text = " ".join(text_blocks).strip()
        print(f"   [turn {turn}] end_turn without submit, forcing final answer...")
        messages.append({"role": "assistant", "content": resp.content})
        messages.append({"role": "user", "content": "You did not call submit_final_answer. Please call it now with your best answer in the exact format requested."})
        # Loop one more time to force the tool call
        continue

    # Hit max turns - force one more attempt
    if final_answer is None:
        messages.append({"role": "user", "content": "Max turns reached. Call submit_final_answer NOW with your best guess in the right format."})
        try:
            resp = client.messages.create(model=MODEL, max_tokens=512, system=SYSTEM, tools=TOOLS, messages=messages, tool_choice={"type": "tool", "name": "submit_final_answer"})
            for block in resp.content:
                if block.type == "tool_use" and block.name == "submit_final_answer":
                    return block.input.get("answer", "").strip()
        except Exception:
            pass
    return final_answer or "(no answer)"


def extract_clean_answer(question: str, agent_response: str) -> str:
    """Second-pass cleanup: extract just the answer in the exact format requested."""
    if not agent_response.strip():
        return agent_response
    resp = client.messages.create(
        model=MODEL,
        max_tokens=200,
        system=EXTRACTOR_SYSTEM,
        messages=[{
            "role": "user",
            "content": f"QUESTION:\n{question}\n\nAGENT'S REASONING:\n{agent_response}\n\nNow output ONLY the final answer string (no quotes, no preamble):",
        }],
    )
    text = "".join(b.text for b in resp.content if b.type == "text").strip()
    # Strip surrounding quotes
    if (text.startswith('"') and text.endswith('"')) or (text.startswith("'") and text.endswith("'")):
        text = text[1:-1]
    return text


def main():
    with open("C:/Users/22678/Downloads/test/test/gaia_questions.json", "r", encoding="utf-8") as f:
        questions = json.load(f)

    only = sys.argv[1:] if len(sys.argv) > 1 else None
    results = {}
    out_path = "C:/Users/22678/Downloads/test/test/gaia_answers.json"
    if os.path.exists(out_path):
        with open(out_path, "r", encoding="utf-8") as f:
            results = json.load(f)

    for i, q in enumerate(questions):
        tid = q["task_id"]
        if only and tid not in only and str(i+1) not in only and f"Q{i+1}" not in only:
            continue
        if tid in results and not only:
            print(f"Q{i+1} {tid[:8]} already answered, skipping")
            continue
        print(f"\n{'='*60}\nQ{i+1} task_id={tid[:8]}  file={q.get('file_name','')}\n{'='*60}")
        print(f"Q: {q['question'][:200]}")
        try:
            answer = solve_question(q)
            print(f"\n>>> FINAL: {answer}")
            results[tid] = answer
        except Exception as e:
            print(f"\nERROR: {e}")
            traceback.print_exc()
            results[tid] = f"(error: {e})"
        with open(out_path, "w", encoding="utf-8") as f:
            json.dump(results, f, indent=2, ensure_ascii=False)

    print(f"\n\nSaved {len(results)} answers to {out_path}")


if __name__ == "__main__":
    main()