File size: 6,023 Bytes
e4a1fe9
 
 
 
 
 
 
 
 
 
e892ea1
e4a1fe9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import os
import requests
from langchain.agents import create_agent
from langchain.tools import tool
from dotenv import load_dotenv
from langchain_community.document_loaders import ArxivLoader, WikipediaLoader
from ddgs import DDGS
from bs4 import BeautifulSoup

# Load environment variables
#load_dotenv()

# --- Agent Setup ---
openai_key = os.getenv("OPENAI_API_KEY")
googleai_key = os.getenv("GOOGLE_API_KEY")

# Use OpenRouter via LangChain's ChatOpenAI
openrouter_key = os.getenv("OPENROUTER_API_KEY")
if not openrouter_key:
    raise RuntimeError("Set OPENROUTER_API_KEY in your .env (OpenRouter API key)")

# Defer ChatOpenAI import until runtime to avoid import-time errors in environments without the package
from langchain_openai import ChatOpenAI

model = ChatOpenAI(
  api_key=openrouter_key,
  base_url="https://openrouter.ai/api/v1",
  model="gpt-4o-mini",
  max_completion_tokens=10000,
)

# --- Tools Definition ---
@tool
def multiply(a: int, b: int) -> int:
    """Multiply two numbers.
    Args:
        a: first int
        b: second int
    """
    return a * b

@tool
def add(a: int, b: int) -> int:
    """Add two numbers.
    
    Args:
        a: first int
        b: second int
    """
    return a + b

@tool
def subtract(a: int, b: int) -> int:
    """Subtract two numbers.
    
    Args:
        a: first int
        b: second int
    """
    return a - b

@tool
def divide(a: int, b: int) -> int:
    """Divide two numbers.
    
    Args:
        a: first int
        b: second int
    """
    if b == 0:
        raise ValueError("Cannot divide by zero.")
    return a / b

@tool
def modulus(a: int, b: int) -> int:
    """Get the modulus of two numbers.
    
    Args:
        a: first int
        b: second int
    """
    return a % b

@tool
def wiki_search(query: str) -> str:
    """Search Wikipedia for a query and return maximum 2 results."""
    search_docs = WikipediaLoader(query=query, load_max_docs=2).load()
    formatted_search_docs = "\n\n---\n\n".join(
        [
            f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
            for doc in search_docs
        ]
    )
    return formatted_search_docs

@tool
def web_search(query: str) -> str:
    """Search DDGS for a query and return maximum 3 results."""
    search_docs = DDGS().text(query, max_results=3)
    formatted_search_docs = "\n\n---\n\n".join(
        [
            f'Title:{doc["title"]}\nContent:{doc["body"]}\n--\n'
            for doc in search_docs
        ]
    )
    return formatted_search_docs

@tool
def arxiv_search(query: str) -> str:
    """Search arXiv for a query and return maximum 3 results."""
    search_docs = ArxivLoader(query=query, load_max_docs=3).load()
    formatted_search_docs = "\n\n---\n\n".join(
        [
            f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>'
            for doc in search_docs
        ]
    )
    return formatted_search_docs

@tool
def image_search(query: str) -> str:
    """Searches DDGS for an image query and returns maximum 10 image results"""
    search_images = DDGS().images(query=query)
    formatted_result = "\n\n---\n\n".join(
        [
            f'Image Title:{image["title"]}\nImage URL: {image["url"]}'
            for image in search_images
        ]
    )
    return formatted_result

@tool
def fetch_url_content(url: str) -> str:
    """Fetch and return the text content from a webpage URL."""
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        soup = BeautifulSoup(response.text, 'html.parser')
        for script in soup(["script", "style"]):
            script.decompose()
        text = soup.get_text(separator='\n', strip=True)
        return text[:2000] + ("..." if len(text) > 2000 else "")
    except Exception as e:
        return f"Error fetching URL: {str(e)}"

# Tools list
tools = [
    multiply, add, subtract, divide, modulus,
    wiki_search, web_search, arxiv_search, image_search,
    fetch_url_content,
]

# System prompt
sys_prompt = """You are a helpful agent, please provide clear and concise answers to asked questions.
Keep your word limit for answers as minimum as you can. You are equipped with the following tools:
1. [multiply], [add], [subtract], [divide], [modulus] - basic calculator operations.
2. [wiki_search] - search Wikipedia and return up to 2 documents as text.
3. [web_search] - perform a web search and return up to 3 documents as text.
4. [arxiv_search] - search arXiv and return up to 3 documents as text.
5. [image_search] - Searches the internet for an image query and returns maximum 10 image results

Under any circumstances, if you fail to provide the accurate answer expected by the user, you may say the same to the user and provide a similar answer which is approximately the closest. Disregard spelling mistakes and provide answer with results retreived from the correct spelling. 

For every tool you use, append a single line at the end of your response exactly in this format:
[TOOLS USED: (tool_name)]
When no tools are used, append:
[TOOLS USED WERE NONE]
"""

class GAIAAgent:
    def __init__(self):
        # create internal agent
        try:
            self.agent = create_agent(model, tools=tools, system_prompt=sys_prompt)
        except Exception as e:
            raise

    def __call__(self, question: str) -> str:
        result = self.agent.invoke({"messages": [{"role": "user", "content": question}]})
        raw_content = result["messages"][-1].content
        if isinstance(raw_content, list) and len(raw_content) > 0:
            if isinstance(raw_content[0], dict) and 'text' in raw_content[0]:
                answer = raw_content[0]['text']
            else:
                answer = str(raw_content)
        elif isinstance(raw_content, str):
            answer = raw_content
        else:
            answer = str(raw_content)
        return answer