SamyAgent / web_search_tools.py
nynuzz's picture
upload files
158ed3d verified
import os
from dotenv import load_dotenv
from urllib.parse import unquote
import tempfile
import wikipedia
from playwright.sync_api import sync_playwright, TimeoutError
import bs4
import pandas as pd
from langchain_openai import ChatOpenAI
from langchain_community.document_loaders import UnstructuredHTMLLoader
from langchain_google_community import GoogleSearchAPIWrapper
from langchain_community.utilities import ArxivAPIWrapper
from langchain_core.tools import tool
# Carica le variabili d'ambiente per i tool
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
OPENAI_API_MODEL = os.getenv("OPENAI_API_MODEL")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
GOOGLE_CSE_ID = os.getenv("GOOGLE_CSE_ID")
# --- Tool di Ricerca Google ---
@tool("google_search_tool")
def google_search_tool(query: str) -> str:
"""
Performs a Google search and returns the top results.
Use this for general web searches, finding articles, or recent information.
"""
print(f"--- TOOL: Executing Google Search for: '{query}' ---")
google_search = GoogleSearchAPIWrapper(google_api_key=GOOGLE_API_KEY, google_cse_id=GOOGLE_CSE_ID)
return google_search.results(query, num_results=3)
# --- Tool di Ricerca Wikipedia ---
@tool("wikipedia_search_tool")
def wikipedia_search_tool(query_or_url: str, max_results: int = 1) -> str:
"""
Fetches content from a Wikipedia page. This tool is dual-purpose:
1. If the input is a search query, it finds the most relevant Wikipedia page and returns its full content.
2. If the input is a full Wikipedia URL, it directly fetches and returns the content of that page.
This is the preferred tool for all interactions with Wikipedia.
Args:
query_or_url (str): A search query (e.g., "Mercedes Sosa discography") or a full Wikipedia URL.
"""
print(f"--- WIKIPEDIA TOOL (Dual-Purpose): Input is '{query_or_url}' ---")
wikipedia.set_lang("en")
page_title = ""
try:
# --- LOGICA DI DECISIONE E DECODIFICA ---
if query_or_url.startswith("http://") or query_or_url.startswith("https://"):
# Caso 1: L'input è un URL
# Estraiamo l'ultima parte dell'URL
raw_title = query_or_url.split('/')[-1]
# CORREZIONE: Decodifica i caratteri speciali (es. %C4%85 -> ą)
# e sostituisci gli underscore con spazi.
page_title = unquote(raw_title).replace('_', ' ')
print(f"Input is a URL. Decoded page title: '{page_title}'")
page = wikipedia.page(page_title, auto_suggest=False, redirect=True)
else:
# Caso 2: L'input è una query di ricerca
print("Input is a search query. Finding best page...")
search_results = wikipedia.search(query_or_url, results=1)
if not search_results:
return f"Error: No Wikipedia page found for query '{query_or_url}'."
page_title = search_results[0]
page = wikipedia.page(page_title, auto_suggest=False, redirect=True)
# --- ESTRAZIONE HTML E PARSING (invariato) ---
print(f"Fetching HTML for page: '{page.title}'")
html_content_str = page.html()
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=".html", encoding='utf-8') as temp_file:
temp_file.write(html_content_str)
temp_filepath = temp_file.name
try:
loader = UnstructuredHTMLLoader(temp_filepath, strategy="fast")
docs = loader.load()
finally:
os.remove(temp_filepath)
if not docs:
return f"Content from Wikipedia page '{page.title}': Could not extract any content."
page_content = docs[0].page_content
formatted_output = f"Content from Wikipedia page: '{page.title}'\nURL: {page.url}\n\n{page_content[:20000]}"
return formatted_output
except wikipedia.exceptions.DisambiguationError as e:
return f"Error: Your query '{query_or_url}' is ambiguous. Options: {e.options[:5]}"
except wikipedia.exceptions.PageError:
return f"Error: Could not find or load the Wikipedia page for title derived from '{query_or_url}'."
except Exception as e:
return f"An unexpected error occurred in the Wikipedia tool: {e}"
# --- Tool di Navigazione ---
@tool("browse_web_page_tool")
def browse_web_page_tool(url: str) -> str:
"""
Navigates a web page using a headless browser, then uses Unstructured to extract
the full, clean content, including text and tables.
Args:
url (str): The full URL of the page to browse and extract content from.
"""
print(f"--- TOOL: Browsing and extracting from: {url} ---")
try:
# 1. Usa Playwright per ottenere l'HTML completo
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto(url, timeout=30000, wait_until="domcontentloaded")
html_content = page.content()
browser.close()
# 2. Usa un file temporaneo IN MEMORIA per passare l'HTML a Unstructured
# Questo evita di scrivere su disco, è veloce e pulito.
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=".html", encoding='utf-8') as temp_file:
temp_file.write(html_content)
temp_filepath = temp_file.name
try:
# 3. Carica e parsa l'HTML con UnstructuredFileLoader
loader = UnstructuredHTMLLoader(temp_filepath, strategy="fast")
docs = loader.load()
finally:
# Assicurati di cancellare sempre il file temporaneo
os.remove(temp_filepath)
# 4. Formatta l'output
if not docs:
return f"Content from URL '{url}': Could not extract any content using Unstructured."
# Unstructured di solito mette tutto in un unico documento
page_content = docs[0].page_content
formatted_output = f"Content from URL: '{url}'\n\n{page_content[:20000]}"
return formatted_output
except TimeoutError:
return f"Error browsing '{url}': The page took too long to load and timed out."
except Exception as e:
return f"An unexpected error occurred while browsing '{url}': {e}"
# --- Tool di analisi del contenuto web ---
@tool("text_analyzer_tool")
def text_analyzer_tool(text_to_analyze: str, question: str) -> str:
"""
Analyzes a given text to answer a specific question or extract information.
Use this tool when you have already gathered content (e.g., from browsing a page)
and need to find a specific answer within that text.
Args:
text_to_analyze (str): The text content to be analyzed.
question (str): The specific question to answer based on the text.
"""
print(f"--- TOOL: Analyzing text to answer: '{question}' ---")
# Usiamo un LLM per fare l'analisi
analyzer_llm = ChatOpenAI(model=OPENAI_API_MODEL, temperature=0)
prompt = f"""
You are a text analysis expert. Your task is to carefully read the provided text and answer the user's question based ONLY on that text.
Provide a concise and direct answer.
**Text to Analyze:**
---
{text_to_analyze}
---
**Question to Answer:**
"{question}"
Your concise answer:
"""
response = analyzer_llm.invoke(prompt)
return response.content