| import os |
| import requests |
| from langchain.agents import create_agent |
| from langchain.tools import tool |
| from dotenv import load_dotenv |
| from langchain_community.document_loaders import ArxivLoader, WikipediaLoader |
| from ddgs import DDGS |
| from bs4 import BeautifulSoup |
|
|
| |
| |
|
|
| |
| openai_key = os.getenv("OPENAI_API_KEY") |
| googleai_key = os.getenv("GOOGLE_API_KEY") |
|
|
| |
| openrouter_key = os.getenv("OPENROUTER_API_KEY") |
| if not openrouter_key: |
| raise RuntimeError("Set OPENROUTER_API_KEY in your .env (OpenRouter API key)") |
|
|
| |
| from langchain_openai import ChatOpenAI |
|
|
| model = ChatOpenAI( |
| api_key=openrouter_key, |
| base_url="https://openrouter.ai/api/v1", |
| model="gpt-4o-mini", |
| max_completion_tokens=10000, |
| ) |
|
|
| |
| @tool |
| def multiply(a: int, b: int) -> int: |
| """Multiply two numbers. |
| Args: |
| a: first int |
| b: second int |
| """ |
| return a * b |
|
|
| @tool |
| def add(a: int, b: int) -> int: |
| """Add two numbers. |
| |
| Args: |
| a: first int |
| b: second int |
| """ |
| return a + b |
|
|
| @tool |
| def subtract(a: int, b: int) -> int: |
| """Subtract two numbers. |
| |
| Args: |
| a: first int |
| b: second int |
| """ |
| return a - b |
|
|
| @tool |
| def divide(a: int, b: int) -> int: |
| """Divide two numbers. |
| |
| Args: |
| a: first int |
| b: second int |
| """ |
| if b == 0: |
| raise ValueError("Cannot divide by zero.") |
| return a / b |
|
|
| @tool |
| def modulus(a: int, b: int) -> int: |
| """Get the modulus of two numbers. |
| |
| Args: |
| a: first int |
| b: second int |
| """ |
| return a % b |
|
|
| @tool |
| def wiki_search(query: str) -> str: |
| """Search Wikipedia for a query and return maximum 2 results.""" |
| search_docs = WikipediaLoader(query=query, load_max_docs=2).load() |
| formatted_search_docs = "\n\n---\n\n".join( |
| [ |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' |
| for doc in search_docs |
| ] |
| ) |
| return formatted_search_docs |
|
|
| @tool |
| def web_search(query: str) -> str: |
| """Search DDGS for a query and return maximum 3 results.""" |
| search_docs = DDGS().text(query, max_results=3) |
| formatted_search_docs = "\n\n---\n\n".join( |
| [ |
| f'Title:{doc["title"]}\nContent:{doc["body"]}\n--\n' |
| for doc in search_docs |
| ] |
| ) |
| return formatted_search_docs |
|
|
| @tool |
| def arxiv_search(query: str) -> str: |
| """Search arXiv for a query and return maximum 3 results.""" |
| search_docs = ArxivLoader(query=query, load_max_docs=3).load() |
| formatted_search_docs = "\n\n---\n\n".join( |
| [ |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>' |
| for doc in search_docs |
| ] |
| ) |
| return formatted_search_docs |
|
|
| @tool |
| def image_search(query: str) -> str: |
| """Searches DDGS for an image query and returns maximum 10 image results""" |
| search_images = DDGS().images(query=query) |
| formatted_result = "\n\n---\n\n".join( |
| [ |
| f'Image Title:{image["title"]}\nImage URL: {image["url"]}' |
| for image in search_images |
| ] |
| ) |
| return formatted_result |
|
|
| @tool |
| def fetch_url_content(url: str) -> str: |
| """Fetch and return the text content from a webpage URL.""" |
| try: |
| response = requests.get(url, timeout=10) |
| response.raise_for_status() |
| soup = BeautifulSoup(response.text, 'html.parser') |
| for script in soup(["script", "style"]): |
| script.decompose() |
| text = soup.get_text(separator='\n', strip=True) |
| return text[:2000] + ("..." if len(text) > 2000 else "") |
| except Exception as e: |
| return f"Error fetching URL: {str(e)}" |
|
|
| |
| tools = [ |
| multiply, add, subtract, divide, modulus, |
| wiki_search, web_search, arxiv_search, image_search, |
| fetch_url_content, |
| ] |
|
|
| |
| sys_prompt = """You are a helpful agent, please provide clear and concise answers to asked questions. |
| Keep your word limit for answers as minimum as you can. You are equipped with the following tools: |
| 1. [multiply], [add], [subtract], [divide], [modulus] - basic calculator operations. |
| 2. [wiki_search] - search Wikipedia and return up to 2 documents as text. |
| 3. [web_search] - perform a web search and return up to 3 documents as text. |
| 4. [arxiv_search] - search arXiv and return up to 3 documents as text. |
| 5. [image_search] - Searches the internet for an image query and returns maximum 10 image results |
| |
| Under any circumstances, if you fail to provide the accurate answer expected by the user, you may say the same to the user and provide a similar answer which is approximately the closest. Disregard spelling mistakes and provide answer with results retreived from the correct spelling. |
| |
| For every tool you use, append a single line at the end of your response exactly in this format: |
| [TOOLS USED: (tool_name)] |
| When no tools are used, append: |
| [TOOLS USED WERE NONE] |
| """ |
|
|
| class GAIAAgent: |
| def __init__(self): |
| |
| try: |
| self.agent = create_agent(model, tools=tools, system_prompt=sys_prompt) |
| except Exception as e: |
| raise |
|
|
| def __call__(self, question: str) -> str: |
| result = self.agent.invoke({"messages": [{"role": "user", "content": question}]}) |
| raw_content = result["messages"][-1].content |
| if isinstance(raw_content, list) and len(raw_content) > 0: |
| if isinstance(raw_content[0], dict) and 'text' in raw_content[0]: |
| answer = raw_content[0]['text'] |
| else: |
| answer = str(raw_content) |
| elif isinstance(raw_content, str): |
| answer = raw_content |
| else: |
| answer = str(raw_content) |
| return answer |
|
|