File size: 7,734 Bytes
158ed3d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
import os
from dotenv import load_dotenv
from urllib.parse import unquote
import tempfile
import wikipedia
from playwright.sync_api import sync_playwright, TimeoutError
import bs4
import pandas as pd
from langchain_openai import ChatOpenAI
from langchain_community.document_loaders import UnstructuredHTMLLoader
from langchain_google_community import GoogleSearchAPIWrapper
from langchain_community.utilities import ArxivAPIWrapper
from langchain_core.tools import tool
# Carica le variabili d'ambiente per i tool
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
OPENAI_API_MODEL = os.getenv("OPENAI_API_MODEL")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
GOOGLE_CSE_ID = os.getenv("GOOGLE_CSE_ID")
# --- Tool di Ricerca Google ---
@tool("google_search_tool")
def google_search_tool(query: str) -> str:
"""
Performs a Google search and returns the top results.
Use this for general web searches, finding articles, or recent information.
"""
print(f"--- TOOL: Executing Google Search for: '{query}' ---")
google_search = GoogleSearchAPIWrapper(google_api_key=GOOGLE_API_KEY, google_cse_id=GOOGLE_CSE_ID)
return google_search.results(query, num_results=3)
# --- Tool di Ricerca Wikipedia ---
@tool("wikipedia_search_tool")
def wikipedia_search_tool(query_or_url: str, max_results: int = 1) -> str:
"""
Fetches content from a Wikipedia page. This tool is dual-purpose:
1. If the input is a search query, it finds the most relevant Wikipedia page and returns its full content.
2. If the input is a full Wikipedia URL, it directly fetches and returns the content of that page.
This is the preferred tool for all interactions with Wikipedia.
Args:
query_or_url (str): A search query (e.g., "Mercedes Sosa discography") or a full Wikipedia URL.
"""
print(f"--- WIKIPEDIA TOOL (Dual-Purpose): Input is '{query_or_url}' ---")
wikipedia.set_lang("en")
page_title = ""
try:
# --- LOGICA DI DECISIONE E DECODIFICA ---
if query_or_url.startswith("http://") or query_or_url.startswith("https://"):
# Caso 1: L'input è un URL
# Estraiamo l'ultima parte dell'URL
raw_title = query_or_url.split('/')[-1]
# CORREZIONE: Decodifica i caratteri speciali (es. %C4%85 -> ą)
# e sostituisci gli underscore con spazi.
page_title = unquote(raw_title).replace('_', ' ')
print(f"Input is a URL. Decoded page title: '{page_title}'")
page = wikipedia.page(page_title, auto_suggest=False, redirect=True)
else:
# Caso 2: L'input è una query di ricerca
print("Input is a search query. Finding best page...")
search_results = wikipedia.search(query_or_url, results=1)
if not search_results:
return f"Error: No Wikipedia page found for query '{query_or_url}'."
page_title = search_results[0]
page = wikipedia.page(page_title, auto_suggest=False, redirect=True)
# --- ESTRAZIONE HTML E PARSING (invariato) ---
print(f"Fetching HTML for page: '{page.title}'")
html_content_str = page.html()
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=".html", encoding='utf-8') as temp_file:
temp_file.write(html_content_str)
temp_filepath = temp_file.name
try:
loader = UnstructuredHTMLLoader(temp_filepath, strategy="fast")
docs = loader.load()
finally:
os.remove(temp_filepath)
if not docs:
return f"Content from Wikipedia page '{page.title}': Could not extract any content."
page_content = docs[0].page_content
formatted_output = f"Content from Wikipedia page: '{page.title}'\nURL: {page.url}\n\n{page_content[:20000]}"
return formatted_output
except wikipedia.exceptions.DisambiguationError as e:
return f"Error: Your query '{query_or_url}' is ambiguous. Options: {e.options[:5]}"
except wikipedia.exceptions.PageError:
return f"Error: Could not find or load the Wikipedia page for title derived from '{query_or_url}'."
except Exception as e:
return f"An unexpected error occurred in the Wikipedia tool: {e}"
# --- Tool di Navigazione ---
@tool("browse_web_page_tool")
def browse_web_page_tool(url: str) -> str:
"""
Navigates a web page using a headless browser, then uses Unstructured to extract
the full, clean content, including text and tables.
Args:
url (str): The full URL of the page to browse and extract content from.
"""
print(f"--- TOOL: Browsing and extracting from: {url} ---")
try:
# 1. Usa Playwright per ottenere l'HTML completo
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto(url, timeout=30000, wait_until="domcontentloaded")
html_content = page.content()
browser.close()
# 2. Usa un file temporaneo IN MEMORIA per passare l'HTML a Unstructured
# Questo evita di scrivere su disco, è veloce e pulito.
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=".html", encoding='utf-8') as temp_file:
temp_file.write(html_content)
temp_filepath = temp_file.name
try:
# 3. Carica e parsa l'HTML con UnstructuredFileLoader
loader = UnstructuredHTMLLoader(temp_filepath, strategy="fast")
docs = loader.load()
finally:
# Assicurati di cancellare sempre il file temporaneo
os.remove(temp_filepath)
# 4. Formatta l'output
if not docs:
return f"Content from URL '{url}': Could not extract any content using Unstructured."
# Unstructured di solito mette tutto in un unico documento
page_content = docs[0].page_content
formatted_output = f"Content from URL: '{url}'\n\n{page_content[:20000]}"
return formatted_output
except TimeoutError:
return f"Error browsing '{url}': The page took too long to load and timed out."
except Exception as e:
return f"An unexpected error occurred while browsing '{url}': {e}"
# --- Tool di analisi del contenuto web ---
@tool("text_analyzer_tool")
def text_analyzer_tool(text_to_analyze: str, question: str) -> str:
"""
Analyzes a given text to answer a specific question or extract information.
Use this tool when you have already gathered content (e.g., from browsing a page)
and need to find a specific answer within that text.
Args:
text_to_analyze (str): The text content to be analyzed.
question (str): The specific question to answer based on the text.
"""
print(f"--- TOOL: Analyzing text to answer: '{question}' ---")
# Usiamo un LLM per fare l'analisi
analyzer_llm = ChatOpenAI(model=OPENAI_API_MODEL, temperature=0)
prompt = f"""
You are a text analysis expert. Your task is to carefully read the provided text and answer the user's question based ONLY on that text.
Provide a concise and direct answer.
**Text to Analyze:**
---
{text_to_analyze}
---
**Question to Answer:**
"{question}"
Your concise answer:
"""
response = analyzer_llm.invoke(prompt)
return response.content |