|
|
import os |
|
|
import requests |
|
|
from langchain.agents import create_agent |
|
|
from langchain.tools import tool |
|
|
from dotenv import load_dotenv |
|
|
from langchain_community.document_loaders import ArxivLoader, WikipediaLoader |
|
|
from ddgs import DDGS |
|
|
from bs4 import BeautifulSoup |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
openai_key = os.getenv("OPENAI_API_KEY") |
|
|
googleai_key = os.getenv("GOOGLE_API_KEY") |
|
|
|
|
|
|
|
|
openrouter_key = os.getenv("OPENROUTER_API_KEY") |
|
|
if not openrouter_key: |
|
|
raise RuntimeError("Set OPENROUTER_API_KEY in your .env (OpenRouter API key)") |
|
|
|
|
|
|
|
|
from langchain_openai import ChatOpenAI |
|
|
|
|
|
model = ChatOpenAI( |
|
|
api_key=openrouter_key, |
|
|
base_url="https://openrouter.ai/api/v1", |
|
|
model="gpt-4o-mini", |
|
|
max_completion_tokens=10000, |
|
|
) |
|
|
|
|
|
|
|
|
@tool |
|
|
def multiply(a: int, b: int) -> int: |
|
|
"""Multiply two numbers. |
|
|
Args: |
|
|
a: first int |
|
|
b: second int |
|
|
""" |
|
|
return a * b |
|
|
|
|
|
@tool |
|
|
def add(a: int, b: int) -> int: |
|
|
"""Add two numbers. |
|
|
|
|
|
Args: |
|
|
a: first int |
|
|
b: second int |
|
|
""" |
|
|
return a + b |
|
|
|
|
|
@tool |
|
|
def subtract(a: int, b: int) -> int: |
|
|
"""Subtract two numbers. |
|
|
|
|
|
Args: |
|
|
a: first int |
|
|
b: second int |
|
|
""" |
|
|
return a - b |
|
|
|
|
|
@tool |
|
|
def divide(a: int, b: int) -> int: |
|
|
"""Divide two numbers. |
|
|
|
|
|
Args: |
|
|
a: first int |
|
|
b: second int |
|
|
""" |
|
|
if b == 0: |
|
|
raise ValueError("Cannot divide by zero.") |
|
|
return a / b |
|
|
|
|
|
@tool |
|
|
def modulus(a: int, b: int) -> int: |
|
|
"""Get the modulus of two numbers. |
|
|
|
|
|
Args: |
|
|
a: first int |
|
|
b: second int |
|
|
""" |
|
|
return a % b |
|
|
|
|
|
@tool |
|
|
def wiki_search(query: str) -> str: |
|
|
"""Search Wikipedia for a query and return maximum 2 results.""" |
|
|
search_docs = WikipediaLoader(query=query, load_max_docs=2).load() |
|
|
formatted_search_docs = "\n\n---\n\n".join( |
|
|
[ |
|
|
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' |
|
|
for doc in search_docs |
|
|
] |
|
|
) |
|
|
return formatted_search_docs |
|
|
|
|
|
@tool |
|
|
def web_search(query: str) -> str: |
|
|
"""Search DDGS for a query and return maximum 3 results.""" |
|
|
search_docs = DDGS().text(query, max_results=3) |
|
|
formatted_search_docs = "\n\n---\n\n".join( |
|
|
[ |
|
|
f'Title:{doc["title"]}\nContent:{doc["body"]}\n--\n' |
|
|
for doc in search_docs |
|
|
] |
|
|
) |
|
|
return formatted_search_docs |
|
|
|
|
|
@tool |
|
|
def arxiv_search(query: str) -> str: |
|
|
"""Search arXiv for a query and return maximum 3 results.""" |
|
|
search_docs = ArxivLoader(query=query, load_max_docs=3).load() |
|
|
formatted_search_docs = "\n\n---\n\n".join( |
|
|
[ |
|
|
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>' |
|
|
for doc in search_docs |
|
|
] |
|
|
) |
|
|
return formatted_search_docs |
|
|
|
|
|
@tool |
|
|
def image_search(query: str) -> str: |
|
|
"""Searches DDGS for an image query and returns maximum 10 image results""" |
|
|
search_images = DDGS().images(query=query) |
|
|
formatted_result = "\n\n---\n\n".join( |
|
|
[ |
|
|
f'Image Title:{image["title"]}\nImage URL: {image["url"]}' |
|
|
for image in search_images |
|
|
] |
|
|
) |
|
|
return formatted_result |
|
|
|
|
|
@tool |
|
|
def fetch_url_content(url: str) -> str: |
|
|
"""Fetch and return the text content from a webpage URL.""" |
|
|
try: |
|
|
response = requests.get(url, timeout=10) |
|
|
response.raise_for_status() |
|
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
for script in soup(["script", "style"]): |
|
|
script.decompose() |
|
|
text = soup.get_text(separator='\n', strip=True) |
|
|
return text[:2000] + ("..." if len(text) > 2000 else "") |
|
|
except Exception as e: |
|
|
return f"Error fetching URL: {str(e)}" |
|
|
|
|
|
|
|
|
tools = [ |
|
|
multiply, add, subtract, divide, modulus, |
|
|
wiki_search, web_search, arxiv_search, image_search, |
|
|
fetch_url_content, |
|
|
] |
|
|
|
|
|
|
|
|
sys_prompt = """You are a helpful agent, please provide clear and concise answers to asked questions. |
|
|
Keep your word limit for answers as minimum as you can. You are equipped with the following tools: |
|
|
1. [multiply], [add], [subtract], [divide], [modulus] - basic calculator operations. |
|
|
2. [wiki_search] - search Wikipedia and return up to 2 documents as text. |
|
|
3. [web_search] - perform a web search and return up to 3 documents as text. |
|
|
4. [arxiv_search] - search arXiv and return up to 3 documents as text. |
|
|
5. [image_search] - Searches the internet for an image query and returns maximum 10 image results |
|
|
|
|
|
Under any circumstances, if you fail to provide the accurate answer expected by the user, you may say the same to the user and provide a similar answer which is approximately the closest. Disregard spelling mistakes and provide answer with results retreived from the correct spelling. |
|
|
|
|
|
For every tool you use, append a single line at the end of your response exactly in this format: |
|
|
[TOOLS USED: (tool_name)] |
|
|
When no tools are used, append: |
|
|
[TOOLS USED WERE NONE] |
|
|
""" |
|
|
|
|
|
class GAIAAgent: |
|
|
def __init__(self): |
|
|
|
|
|
try: |
|
|
self.agent = create_agent(model, tools=tools, system_prompt=sys_prompt) |
|
|
except Exception as e: |
|
|
raise |
|
|
|
|
|
def __call__(self, question: str) -> str: |
|
|
result = self.agent.invoke({"messages": [{"role": "user", "content": question}]}) |
|
|
raw_content = result["messages"][-1].content |
|
|
if isinstance(raw_content, list) and len(raw_content) > 0: |
|
|
if isinstance(raw_content[0], dict) and 'text' in raw_content[0]: |
|
|
answer = raw_content[0]['text'] |
|
|
else: |
|
|
answer = str(raw_content) |
|
|
elif isinstance(raw_content, str): |
|
|
answer = raw_content |
|
|
else: |
|
|
answer = str(raw_content) |
|
|
return answer |
|
|
|