File size: 3,796 Bytes
10e9b7d
eccf8e4
02721e4
78f7f99
02721e4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b7b1009
02721e4
 
 
 
 
 
 
 
 
 
 
7e4a06b
02721e4
 
 
 
b7b1009
02721e4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
31243f4
02721e4
31243f4
02721e4
 
 
 
 
 
 
 
7d65c66
02721e4
43f8745
02721e4
3c4371f
29fe67c
78f7f99
 
 
 
 
 
43f8745
 
 
 
 
 
78f7f99
02721e4
78f7f99
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import os
import requests
from duckduckgo_search import DDGS
import gradio as gr

# ===============================
# CONFIG
# ===============================

BASE_URL = os.environ.get("GAIA_API_URL")
HF_USERNAME = os.environ.get("SPACE_AUTHOR_NAME", "jatinror")
SPACE_ID = os.environ.get("SPACE_ID", "jatinror/Final_Assignment_Template")
AGENT_CODE_URL = f"https://huggingface.co/spaces/{SPACE_ID}/tree/main"

print("Using SPACE_ID:", SPACE_ID)
print("Agent code URL:", AGENT_CODE_URL)

# ===============================
# SIMPLE SEARCH TOOL
# ===============================

def web_search(query, max_results=3):
    results = []
    with DDGS() as ddgs:
        for r in ddgs.text(query, max_results=max_results):
            results.append(r["body"])
    return "\n".join(results)

# ===============================
# DOWNLOAD FILE IF TASK HAS ONE
# ===============================

def download_task_file(task_id):
    url = f"{BASE_URL}/files/{task_id}"
    response = requests.get(url)
    if response.status_code == 200:
        file_path = f"/tmp/{task_id}"
        with open(file_path, "wb") as f:
            f.write(response.content)
        return file_path
    return None

# ===============================
# BASIC REASONING
# ===============================

def solve_question(question, task_id):
    file_path = download_task_file(task_id)
    context = ""
    if file_path and os.path.exists(file_path):
        try:
            with open(file_path, "r", errors="ignore") as f:
                context = f.read()
        except:
            context = ""
    else:
        context = web_search(question)
    return extract_answer(context)

# ===============================
# ANSWER EXTRACTION
# ===============================

def extract_answer(text):
    import re
    numbers = re.findall(r"\b\d+(?:\.\d+)?\b", text)
    if numbers:
        return numbers[0]
    words = text.split()
    return " ".join(words[:6]).strip()

# ===============================
# FETCH QUESTIONS
# ===============================

def get_questions():
    response = requests.get(f"{BASE_URL}/questions")
    response.raise_for_status()
    return response.json()

# ===============================
# SUBMIT ANSWERS
# ===============================

def submit_answers(answers):
    payload = {
        "username": HF_USERNAME,
        "agent_code": AGENT_CODE_URL,
        "answers": answers
    }
    print("Submitting payload...")
    response = requests.post(f"{BASE_URL}/submit", json=payload)
    print("Server response:", response.text)

# ===============================
# MAIN PIPELINE
# ===============================

def run_agent():
    print("Fetching GAIA questions...")
    questions = get_questions()
    answers = []
    for q in questions:
        task_id = q["task_id"]
        question = q["question"]
        print("Solving:", task_id)
        try:
            result = solve_question(question, task_id)
        except Exception as e:
            print("Error:", e)
            result = ""
        answers.append({
            "task_id": task_id,
            "submitted_answer": result.strip()
        })
    submit_answers(answers)
    print("Finished submission.")

# ===============================
# GRADIO BLOCKS UI
# ===============================

def run_pipeline():
    try:
        run_agent()
        return "✅ GAIA submission completed. Check leaderboard."
    except Exception as e:
        return f"❌ Error occurred: {str(e)}"

with gr.Blocks() as demo:
    run_button = gr.Button("Run GAIA Agent")
    output_text = gr.Textbox(label="Output", lines=4)
    
    # Link button click to function
    run_button.click(fn=run_pipeline, inputs=[], outputs=output_text)

if __name__ == "__main__":
    demo.launch(server_name="0.0.0.0", server_port=7860)