File size: 6,023 Bytes
e4a1fe9 e892ea1 e4a1fe9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
import os
import requests
from langchain.agents import create_agent
from langchain.tools import tool
from dotenv import load_dotenv
from langchain_community.document_loaders import ArxivLoader, WikipediaLoader
from ddgs import DDGS
from bs4 import BeautifulSoup
# Load environment variables
#load_dotenv()
# --- Agent Setup ---
openai_key = os.getenv("OPENAI_API_KEY")
googleai_key = os.getenv("GOOGLE_API_KEY")
# Use OpenRouter via LangChain's ChatOpenAI
openrouter_key = os.getenv("OPENROUTER_API_KEY")
if not openrouter_key:
raise RuntimeError("Set OPENROUTER_API_KEY in your .env (OpenRouter API key)")
# Defer ChatOpenAI import until runtime to avoid import-time errors in environments without the package
from langchain_openai import ChatOpenAI
model = ChatOpenAI(
api_key=openrouter_key,
base_url="https://openrouter.ai/api/v1",
model="gpt-4o-mini",
max_completion_tokens=10000,
)
# --- Tools Definition ---
@tool
def multiply(a: int, b: int) -> int:
"""Multiply two numbers.
Args:
a: first int
b: second int
"""
return a * b
@tool
def add(a: int, b: int) -> int:
"""Add two numbers.
Args:
a: first int
b: second int
"""
return a + b
@tool
def subtract(a: int, b: int) -> int:
"""Subtract two numbers.
Args:
a: first int
b: second int
"""
return a - b
@tool
def divide(a: int, b: int) -> int:
"""Divide two numbers.
Args:
a: first int
b: second int
"""
if b == 0:
raise ValueError("Cannot divide by zero.")
return a / b
@tool
def modulus(a: int, b: int) -> int:
"""Get the modulus of two numbers.
Args:
a: first int
b: second int
"""
return a % b
@tool
def wiki_search(query: str) -> str:
"""Search Wikipedia for a query and return maximum 2 results."""
search_docs = WikipediaLoader(query=query, load_max_docs=2).load()
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
for doc in search_docs
]
)
return formatted_search_docs
@tool
def web_search(query: str) -> str:
"""Search DDGS for a query and return maximum 3 results."""
search_docs = DDGS().text(query, max_results=3)
formatted_search_docs = "\n\n---\n\n".join(
[
f'Title:{doc["title"]}\nContent:{doc["body"]}\n--\n'
for doc in search_docs
]
)
return formatted_search_docs
@tool
def arxiv_search(query: str) -> str:
"""Search arXiv for a query and return maximum 3 results."""
search_docs = ArxivLoader(query=query, load_max_docs=3).load()
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>'
for doc in search_docs
]
)
return formatted_search_docs
@tool
def image_search(query: str) -> str:
"""Searches DDGS for an image query and returns maximum 10 image results"""
search_images = DDGS().images(query=query)
formatted_result = "\n\n---\n\n".join(
[
f'Image Title:{image["title"]}\nImage URL: {image["url"]}'
for image in search_images
]
)
return formatted_result
@tool
def fetch_url_content(url: str) -> str:
"""Fetch and return the text content from a webpage URL."""
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
for script in soup(["script", "style"]):
script.decompose()
text = soup.get_text(separator='\n', strip=True)
return text[:2000] + ("..." if len(text) > 2000 else "")
except Exception as e:
return f"Error fetching URL: {str(e)}"
# Tools list
tools = [
multiply, add, subtract, divide, modulus,
wiki_search, web_search, arxiv_search, image_search,
fetch_url_content,
]
# System prompt
sys_prompt = """You are a helpful agent, please provide clear and concise answers to asked questions.
Keep your word limit for answers as minimum as you can. You are equipped with the following tools:
1. [multiply], [add], [subtract], [divide], [modulus] - basic calculator operations.
2. [wiki_search] - search Wikipedia and return up to 2 documents as text.
3. [web_search] - perform a web search and return up to 3 documents as text.
4. [arxiv_search] - search arXiv and return up to 3 documents as text.
5. [image_search] - Searches the internet for an image query and returns maximum 10 image results
Under any circumstances, if you fail to provide the accurate answer expected by the user, you may say the same to the user and provide a similar answer which is approximately the closest. Disregard spelling mistakes and provide answer with results retreived from the correct spelling.
For every tool you use, append a single line at the end of your response exactly in this format:
[TOOLS USED: (tool_name)]
When no tools are used, append:
[TOOLS USED WERE NONE]
"""
class GAIAAgent:
def __init__(self):
# create internal agent
try:
self.agent = create_agent(model, tools=tools, system_prompt=sys_prompt)
except Exception as e:
raise
def __call__(self, question: str) -> str:
result = self.agent.invoke({"messages": [{"role": "user", "content": question}]})
raw_content = result["messages"][-1].content
if isinstance(raw_content, list) and len(raw_content) > 0:
if isinstance(raw_content[0], dict) and 'text' in raw_content[0]:
answer = raw_content[0]['text']
else:
answer = str(raw_content)
elif isinstance(raw_content, str):
answer = raw_content
else:
answer = str(raw_content)
return answer
|