DIY_Agent / ai_tool.py
bwilkie's picture
Update ai_tool.py
aa39e68 verified
from typing import Callable, Dict, List, Any
import time
import wikipedia
from googlesearch import search
from bs4 import BeautifulSoup
from PIL import Image
from io import BytesIO
import pytesseract
import requests
from PyPDF2 import PdfReader
import re
#from duckduckgo_search import DDGS
import random
from tavily import TavilyClient
from playwright.sync_api import sync_playwright
from bs4 import BeautifulSoup
import os
# Tool definitions
def tool(func: Callable) -> Callable:
"""Decorator to mark functions as tools."""
func.is_tool = True
return func
###########################
import requests
import xml.etree.ElementTree as ET
from datetime import datetime
@tool
def ArxivPaperSearcher(topic: str, max_results: int = 5):
"""
Search arXiv for papers.
Args:
query (str): Search term(s), e.g. "machine learning".
max_results (int): Number of results to fetch.
Returns:
List of dicts with paper info (title, authors, summary, url).
"""
base_url = "http://export.arxiv.org/api/query"
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
print(f"[{timestamp}] Searching arXiv for: {topic}")
# Build query
params = {
"search_query": f"all:{topic}",
"start": 0,
"max_results": max_results
}
response = requests.get(base_url, params=params)
response.raise_for_status()
# Parse XML feed
root = ET.fromstring(response.text)
ns = {"atom": "http://www.w3.org/2005/Atom"}
results = []
for entry in root.findall("atom:entry", ns):
title = entry.find("atom:title", ns).text.strip()
summary = entry.find("atom:summary", ns).text.strip()
link = entry.find("atom:id", ns).text.strip()
authors = [a.find("atom:name", ns).text for a in entry.findall("atom:author", ns)]
results.append({
"title": title,
"authors": authors,
"summary": summary,
"url": link
})
print(f"Found {len(results)} results.")
return results
##############################
# --- heuristics for cookie/consent elements ---
_COOKIE_ATTR_RE = re.compile(
r"(cookie|consent|gdpr|cmp|onetrust|ot-sdk|osano|iubenda|didomi|trustarc|truste|"
r"quantcast|qc-cmp|axeptio|sp_message|sp-cc|privacy|manage-choices|preferences)",
re.I,
)
_COOKIE_TEXT_RE = re.compile(
r"(cookies?|cookie settings|we (use|value) your privacy|consent|gdpr|"
r"manage (cookies|choices)|your choices|accept all|reject all|use of cookies|"
r"privacy (policy|preferences))",
re.I,
)
def _normalize_text(txt: str) -> str:
# neat paragraphs, no extra blanks
lines = [line.strip() for line in txt.splitlines()]
lines = [ln for ln in lines if ln]
return "\n".join(lines)
def _strip_noise_and_cookies(soup: BeautifulSoup) -> None:
# Remove obvious non-content
for tag in soup(["script", "style", "noscript", "template", "svg", "iframe"]):
tag.decompose()
for c in soup.find_all(string=lambda t: isinstance(t, Comment)):
c.extract()
def looks_like_cookie(el) -> bool:
# attributes
attrs = []
for k in ("id", "class", "data-component", "data-testid", "aria-label"):
v = el.get(k)
if isinstance(v, list):
v = " ".join(v)
if v:
attrs.append(v)
attr_str = " ".join(attrs)
# text
text = el.get_text(" ", strip=True)
style = (el.get("style") or "")
role = (el.get("role") or "").lower()
if _COOKIE_ATTR_RE.search(attr_str):
return True
if role in ("dialog", "alert", "banner") and (_COOKIE_ATTR_RE.search(attr_str) or _COOKIE_TEXT_RE.search(text)):
return True
# shorter text is typical for banners; don't nuke long articles accidentally
if text and len(text) < 1200 and _COOKIE_TEXT_RE.search(text):
return True
style_l = style.replace(" ", "").lower()
if ("position:fixed" in style_l or "position:sticky" in style_l) and (
_COOKIE_TEXT_RE.search(text) or _COOKIE_ATTR_RE.search(attr_str)
):
return True
return False
# Remove cookie/consent blocks and their fixed-position parents (up to 3 levels)
for el in soup.find_all(["div", "section", "aside", "form", "nav", "footer", "header", "dialog"]):
if looks_like_cookie(el):
target = el
for _ in range(3):
p = target.parent
if not p or p.name in ("body", "html"):
break
p_style = (p.get("style") or "").replace(" ", "").lower()
p_attrs = " ".join(
filter(
None,
[
p.get("id") or "",
" ".join(p.get("class", [])) if isinstance(p.get("class"), list) else (p.get("class") or ""),
],
)
)
if "position:fixed" in p_style or _COOKIE_ATTR_RE.search(p_attrs):
target = p
else:
break
target.decompose()
# Kill generic fixed overlays/backdrops with almost no text (safety net)
for el in soup.find_all(style=True):
s = el["style"].replace(" ", "").lower()
if "position:fixed" in s and ("width:100%" in s or "inset:" in s or "top:0" in s):
if len(el.get_text(strip=True)) < 200:
el.decompose()
@tool
def extract_content_with_playwright(url):
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
context = browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/115.0.0.0 Safari/537.36",
viewport={"width": 1280, "height": 800},
locale="en-US",
)
page = context.new_page()
response = page.goto(url, wait_until="networkidle", timeout=30000)
print(f"Page response status: {response.status}")
# Try to accept cookies if the button is present
try:
page.click('button:has-text("Accept")', timeout=5000)
page.wait_for_load_state("networkidle")
print("Clicked Accept on cookie consent.")
except Exception as e:
print("No cookie accept button found or clicking failed:", e)
content = page.content()
browser.close()
return content
def extract_webpage_content(url: str) -> str:
#logger.info(f"Starting content extraction for: {url}")
# Fix URL format if missing protocol
if url and not url.startswith(('http://', 'https://')):
url = f"https://{url}"
#logger.info(f"Fixed URL to: {url}")
# Validate URL
if not url:
return "ERROR: Empty URL provided"
html = extract_content_with_playwright(url)
print(html[:1000]) # preview content
soup = BeautifulSoup(html, 'html.parser')
_strip_noise_and_cookies(soup) # modifies soup in-place
candidates = soup.find_all(['article', 'main', 'section', 'div'])
candidates = [c for c in candidates if len(c.get_text(strip=True)) > 300]
best = max(candidates, key=lambda c: len(c.get_text()), default=soup.body)
return best.get_text(separator="\n", strip=True)[:10000]
# @tool
# def extract_webpage_content(url: str) -> str:
# """Extracts the main visible content from a webpage, omitting headers, footers, and nav bars."""
# import requests
# from bs4 import BeautifulSoup
# headers = {
# "User-Agent": (
# "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
# "AppleWebKit/537.36 (KHTML, like Gecko) "
# "Chrome/115.0.0.0 Safari/537.36"
# ),
# "Accept-Language": "en-US,en;q=0.9",
# "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
# "Referer": "https://www.google.com/",
# "Connection": "keep-alive",
# }
# try:
# session = requests.Session()
# response = session.get(url, headers=headers, timeout=10)
# response.raise_for_status()
# soup = BeautifulSoup(response.text, 'html.parser')
# # Find likely main content containers
# candidates = soup.find_all(['article', 'main', 'section', 'div'])
# candidates = [c for c in candidates if len(c.get_text(strip=True)) > 300]
# # Pick the largest one
# best = max(candidates, key=lambda c: len(c.get_text()), default=soup.body)
# return best.get_text(separator="\n", strip=True)[:10000]
# except Exception as e:
# return f"[Error extracting content] {e}"
def preprocess_search_results(results, n_results=4, min_score=0):
# Filter and keep the score temporarily for sorting
filtered = [
{
"title": item["title"],
"url": item["url"],
"summary": item["content"],
"score": item["score"]
}
for item in results
if item["score"] >= min_score
]
# Sort by score descending
sorted_results = sorted(filtered, key=lambda x: x["score"], reverse=True)
# Return only the top N, excluding the score from the final output
return [
{
"title": item["title"],
"url": item["url"],
"summary": item["summary"]
}
for item in sorted_results[:n_results]
]
@tool
def web_search_tool(query: str) -> str:
"""Performs a web search using and return url, title, and summary."""
def duck_duck_go(query):
print('Print:Tool: Performing DuckDuckGo Search...')
print('Print:query:', query)
try:
time.sleep(random.uniform(2, 5)) # Respectful pause
with DDGS() as ddgs:
results = ddgs.text(query, max_results=5)
except Exception as e:
print(f"Print:❌ DuckDuckGo search failed: {e}")
results = None
output = "## DuckDuckGo Search Results\n\n"
for i, r in enumerate(results, 1):
output += f"{i}. {r['title']}\n{r['href']}\n\n"
return output
def tavily(query):
print("Print:🔁 Falling back to Tavily Search...")
try:
tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))
response_out = tavily_client.search(query)
#print('Print Tavily Raw:', response_out)
response_out = preprocess_search_results(response_out['results'])
except Exception as e:
print(f"Print:❌ tavily search failed: {e}")
#print('Print:', response_out)
return response_out
# try:
# query_out = duck_duck_go(query)
# if not query_out:
# query_out = tavily(query)
# except:
# query_out = tavily(query)
# if not query_out:
# query_out = tavily(query)
# print('Print Search results:', query_out)
# return query_out
try:
query_out = tavily(query)
except:
time.sleep(3)
query_out = tavily(query)
print('Print Search results:', query_out)
return query_out
# @tool
# def web_search_tool(query: str) -> str:
# """Perform a web search using Google Search with retry logic."""
# print('Tool: Performing Search')
# retries = 3
# delay = 3 # seconds
# for attempt in range(retries):
# try:
# # Convert generator to list and limit results
# search_results = list(search(query, num_results=5))
# if not search_results:
# raise Exception("No results found.")
# # Format results nicely
# formatted_results = "## Search Results\n\n"
# for i, url in enumerate(search_results, 1):
# formatted_results += f"{i}. {url}\n"
# return formatted_results
# except Exception as e:
# print(f"[Attempt {attempt+1}] Web search failed: {e}")
# if attempt < retries - 1: # Don't sleep on last attempt
# time.sleep(delay)
# return "Failed to retrieve search results after retries."
@tool
def retrieve_images_from_url(url: str) -> list:
"""Extracts image from a URLs."""
print('Print:Tool: Retrieving Image')
try:
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
images = [img['src'] for img in soup.find_all('img') if img.get('src')]
return images if images else ["No images found."]
except Exception as e:
print(f"Print:Image retrieval error: {e}")
return [f"Failed to retrieve images. {e}"]
@tool
def perform_ocr_on_image(image_url: str) -> str:
"""Download an image and perform OCR to extract text."""
print('Print:Tool: Performing OCR')
try:
response = requests.get(image_url)
response.raise_for_status()
image = Image.open(BytesIO(response.content))
text = pytesseract.image_to_string(image)
return text.strip() if text else "No text found in image."
except Exception as e:
print(f"Print:OCR failed: {e}")
return f"OCR failed. {e}"
@tool
def wikipedia_tool(topic: str) -> str:
"""Search Wikipedia for a given topic and return the summary."""
print('Print:Tool: Performing Wiki Search')
try:
page = wikipedia.page(topic, auto_suggest=True)
return f"Title: {page.title}\n\nSummary:\n{page.summary}"
except wikipedia.exceptions.DisambiguationError as e:
return f"Disambiguation required. Options: {e.options[:5]}"
except wikipedia.exceptions.PageError:
return "Page not found on Wikipedia."
except Exception as e:
print(f"Print:Wikipedia fetch error: {e}")
return f"Wikipedia lookup failed. {e}"
@tool
def extract_text_from_pdf_url(pdf_url: str) -> str:
"""
Downloads a PDF from the provided URL and extracts text from the first few pages.
"""
print('Tool: Extracting PDF content')
try:
response = requests.get(pdf_url)
response.raise_for_status()
pdf_reader = PdfReader(BytesIO(response.content))
# Read text from the first few pages
text = ""
for i, page in enumerate(pdf_reader.pages[:3]):
page_text = page.extract_text()
if page_text:
text += page_text
print('Print:PDF txt', text[:3000])
return text[:3000] if text else "No text found in PDF."
except Exception as e:
print(f"Print:PDF extraction failed: {e}")
return f"Failed to extract PDF content from {pdf_url}: {e}"
#
# NOT A REAL TOOL, only for debuggin
#
import re
# Simple checker for potentially dangerous code
def llm_content_checker(llm_input: str):
"""
Simple security checker that flags dangerous imports and patterns.
Returns the input unchanged if safe, or a safe error message if dangerous.
"""
# List of dangerous imports to check for
dangerous_imports = [
'os', 'subprocess', 'shutil', 'sys', 'socket', 'urllib',
'requests', 'pickle', 'eval', 'exec', 'compile', '__import__'
]
# List of dangerous patterns
dangerous_patterns = [
r'rm\s+-rf', # Delete commands
r'del\s+/[fs]', # Windows delete
r'format\s+c:', # Format drive
r'shutdown', # System shutdown
r'system\s*\(', # System calls
r'exec\s*\(', # Code execution
r'eval\s*\(', # Code evaluation
r'open\s*\([\'"][/\\]', # File access with absolute paths
r'\.\./', # Directory traversal
r'DROP\s+TABLE', # SQL deletion
r'DELETE\s+FROM', # SQL deletion
]
# Convert to lowercase for checking
content_lower = llm_input.lower()
# Check for dangerous imports
for dangerous_import in dangerous_imports:
if f'import {dangerous_import}' in content_lower or f'from {dangerous_import}' in content_lower:
print(f"🚫 Blocked dangerous import: {dangerous_import}")
return '{"final_answer": "Request blocked due to security restrictions."}'
# Check for dangerous patterns
for pattern in dangerous_patterns:
if re.search(pattern, content_lower):
print(f"🚫 Blocked dangerous pattern: {pattern}")
return '{"final_answer": "Request blocked due to security restrictions."}'
# If no dangerous content found, return original input
return llm_input