from typing import Callable, Dict, List, Any import time import wikipedia from googlesearch import search from bs4 import BeautifulSoup from PIL import Image from io import BytesIO import pytesseract import requests from PyPDF2 import PdfReader import re #from duckduckgo_search import DDGS import random from tavily import TavilyClient from playwright.sync_api import sync_playwright from bs4 import BeautifulSoup import os # Tool definitions def tool(func: Callable) -> Callable: """Decorator to mark functions as tools.""" func.is_tool = True return func ########################### import requests import xml.etree.ElementTree as ET from datetime import datetime @tool def ArxivPaperSearcher(topic: str, max_results: int = 5): """ Search arXiv for papers. Args: query (str): Search term(s), e.g. "machine learning". max_results (int): Number of results to fetch. Returns: List of dicts with paper info (title, authors, summary, url). """ base_url = "http://export.arxiv.org/api/query" timestamp = datetime.now().strftime("%Y%m%d%H%M%S") print(f"[{timestamp}] Searching arXiv for: {topic}") # Build query params = { "search_query": f"all:{topic}", "start": 0, "max_results": max_results } response = requests.get(base_url, params=params) response.raise_for_status() # Parse XML feed root = ET.fromstring(response.text) ns = {"atom": "http://www.w3.org/2005/Atom"} results = [] for entry in root.findall("atom:entry", ns): title = entry.find("atom:title", ns).text.strip() summary = entry.find("atom:summary", ns).text.strip() link = entry.find("atom:id", ns).text.strip() authors = [a.find("atom:name", ns).text for a in entry.findall("atom:author", ns)] results.append({ "title": title, "authors": authors, "summary": summary, "url": link }) print(f"Found {len(results)} results.") return results ############################## # --- heuristics for cookie/consent elements --- _COOKIE_ATTR_RE = re.compile( r"(cookie|consent|gdpr|cmp|onetrust|ot-sdk|osano|iubenda|didomi|trustarc|truste|" r"quantcast|qc-cmp|axeptio|sp_message|sp-cc|privacy|manage-choices|preferences)", re.I, ) _COOKIE_TEXT_RE = re.compile( r"(cookies?|cookie settings|we (use|value) your privacy|consent|gdpr|" r"manage (cookies|choices)|your choices|accept all|reject all|use of cookies|" r"privacy (policy|preferences))", re.I, ) def _normalize_text(txt: str) -> str: # neat paragraphs, no extra blanks lines = [line.strip() for line in txt.splitlines()] lines = [ln for ln in lines if ln] return "\n".join(lines) def _strip_noise_and_cookies(soup: BeautifulSoup) -> None: # Remove obvious non-content for tag in soup(["script", "style", "noscript", "template", "svg", "iframe"]): tag.decompose() for c in soup.find_all(string=lambda t: isinstance(t, Comment)): c.extract() def looks_like_cookie(el) -> bool: # attributes attrs = [] for k in ("id", "class", "data-component", "data-testid", "aria-label"): v = el.get(k) if isinstance(v, list): v = " ".join(v) if v: attrs.append(v) attr_str = " ".join(attrs) # text text = el.get_text(" ", strip=True) style = (el.get("style") or "") role = (el.get("role") or "").lower() if _COOKIE_ATTR_RE.search(attr_str): return True if role in ("dialog", "alert", "banner") and (_COOKIE_ATTR_RE.search(attr_str) or _COOKIE_TEXT_RE.search(text)): return True # shorter text is typical for banners; don't nuke long articles accidentally if text and len(text) < 1200 and _COOKIE_TEXT_RE.search(text): return True style_l = style.replace(" ", "").lower() if ("position:fixed" in style_l or "position:sticky" in style_l) and ( _COOKIE_TEXT_RE.search(text) or _COOKIE_ATTR_RE.search(attr_str) ): return True return False # Remove cookie/consent blocks and their fixed-position parents (up to 3 levels) for el in soup.find_all(["div", "section", "aside", "form", "nav", "footer", "header", "dialog"]): if looks_like_cookie(el): target = el for _ in range(3): p = target.parent if not p or p.name in ("body", "html"): break p_style = (p.get("style") or "").replace(" ", "").lower() p_attrs = " ".join( filter( None, [ p.get("id") or "", " ".join(p.get("class", [])) if isinstance(p.get("class"), list) else (p.get("class") or ""), ], ) ) if "position:fixed" in p_style or _COOKIE_ATTR_RE.search(p_attrs): target = p else: break target.decompose() # Kill generic fixed overlays/backdrops with almost no text (safety net) for el in soup.find_all(style=True): s = el["style"].replace(" ", "").lower() if "position:fixed" in s and ("width:100%" in s or "inset:" in s or "top:0" in s): if len(el.get_text(strip=True)) < 200: el.decompose() @tool def extract_content_with_playwright(url): with sync_playwright() as p: browser = p.chromium.launch(headless=False) context = browser.new_context( user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/115.0.0.0 Safari/537.36", viewport={"width": 1280, "height": 800}, locale="en-US", ) page = context.new_page() response = page.goto(url, wait_until="networkidle", timeout=30000) print(f"Page response status: {response.status}") # Try to accept cookies if the button is present try: page.click('button:has-text("Accept")', timeout=5000) page.wait_for_load_state("networkidle") print("Clicked Accept on cookie consent.") except Exception as e: print("No cookie accept button found or clicking failed:", e) content = page.content() browser.close() return content def extract_webpage_content(url: str) -> str: #logger.info(f"Starting content extraction for: {url}") # Fix URL format if missing protocol if url and not url.startswith(('http://', 'https://')): url = f"https://{url}" #logger.info(f"Fixed URL to: {url}") # Validate URL if not url: return "ERROR: Empty URL provided" html = extract_content_with_playwright(url) print(html[:1000]) # preview content soup = BeautifulSoup(html, 'html.parser') _strip_noise_and_cookies(soup) # modifies soup in-place candidates = soup.find_all(['article', 'main', 'section', 'div']) candidates = [c for c in candidates if len(c.get_text(strip=True)) > 300] best = max(candidates, key=lambda c: len(c.get_text()), default=soup.body) return best.get_text(separator="\n", strip=True)[:10000] # @tool # def extract_webpage_content(url: str) -> str: # """Extracts the main visible content from a webpage, omitting headers, footers, and nav bars.""" # import requests # from bs4 import BeautifulSoup # headers = { # "User-Agent": ( # "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " # "AppleWebKit/537.36 (KHTML, like Gecko) " # "Chrome/115.0.0.0 Safari/537.36" # ), # "Accept-Language": "en-US,en;q=0.9", # "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", # "Referer": "https://www.google.com/", # "Connection": "keep-alive", # } # try: # session = requests.Session() # response = session.get(url, headers=headers, timeout=10) # response.raise_for_status() # soup = BeautifulSoup(response.text, 'html.parser') # # Find likely main content containers # candidates = soup.find_all(['article', 'main', 'section', 'div']) # candidates = [c for c in candidates if len(c.get_text(strip=True)) > 300] # # Pick the largest one # best = max(candidates, key=lambda c: len(c.get_text()), default=soup.body) # return best.get_text(separator="\n", strip=True)[:10000] # except Exception as e: # return f"[Error extracting content] {e}" def preprocess_search_results(results, n_results=4, min_score=0): # Filter and keep the score temporarily for sorting filtered = [ { "title": item["title"], "url": item["url"], "summary": item["content"], "score": item["score"] } for item in results if item["score"] >= min_score ] # Sort by score descending sorted_results = sorted(filtered, key=lambda x: x["score"], reverse=True) # Return only the top N, excluding the score from the final output return [ { "title": item["title"], "url": item["url"], "summary": item["summary"] } for item in sorted_results[:n_results] ] @tool def web_search_tool(query: str) -> str: """Performs a web search using and return url, title, and summary.""" def duck_duck_go(query): print('Print:Tool: Performing DuckDuckGo Search...') print('Print:query:', query) try: time.sleep(random.uniform(2, 5)) # Respectful pause with DDGS() as ddgs: results = ddgs.text(query, max_results=5) except Exception as e: print(f"Print:❌ DuckDuckGo search failed: {e}") results = None output = "## DuckDuckGo Search Results\n\n" for i, r in enumerate(results, 1): output += f"{i}. {r['title']}\n{r['href']}\n\n" return output def tavily(query): print("Print:🔁 Falling back to Tavily Search...") try: tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY")) response_out = tavily_client.search(query) #print('Print Tavily Raw:', response_out) response_out = preprocess_search_results(response_out['results']) except Exception as e: print(f"Print:❌ tavily search failed: {e}") #print('Print:', response_out) return response_out # try: # query_out = duck_duck_go(query) # if not query_out: # query_out = tavily(query) # except: # query_out = tavily(query) # if not query_out: # query_out = tavily(query) # print('Print Search results:', query_out) # return query_out try: query_out = tavily(query) except: time.sleep(3) query_out = tavily(query) print('Print Search results:', query_out) return query_out # @tool # def web_search_tool(query: str) -> str: # """Perform a web search using Google Search with retry logic.""" # print('Tool: Performing Search') # retries = 3 # delay = 3 # seconds # for attempt in range(retries): # try: # # Convert generator to list and limit results # search_results = list(search(query, num_results=5)) # if not search_results: # raise Exception("No results found.") # # Format results nicely # formatted_results = "## Search Results\n\n" # for i, url in enumerate(search_results, 1): # formatted_results += f"{i}. {url}\n" # return formatted_results # except Exception as e: # print(f"[Attempt {attempt+1}] Web search failed: {e}") # if attempt < retries - 1: # Don't sleep on last attempt # time.sleep(delay) # return "Failed to retrieve search results after retries." @tool def retrieve_images_from_url(url: str) -> list: """Extracts image from a URLs.""" print('Print:Tool: Retrieving Image') try: response = requests.get(url) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') images = [img['src'] for img in soup.find_all('img') if img.get('src')] return images if images else ["No images found."] except Exception as e: print(f"Print:Image retrieval error: {e}") return [f"Failed to retrieve images. {e}"] @tool def perform_ocr_on_image(image_url: str) -> str: """Download an image and perform OCR to extract text.""" print('Print:Tool: Performing OCR') try: response = requests.get(image_url) response.raise_for_status() image = Image.open(BytesIO(response.content)) text = pytesseract.image_to_string(image) return text.strip() if text else "No text found in image." except Exception as e: print(f"Print:OCR failed: {e}") return f"OCR failed. {e}" @tool def wikipedia_tool(topic: str) -> str: """Search Wikipedia for a given topic and return the summary.""" print('Print:Tool: Performing Wiki Search') try: page = wikipedia.page(topic, auto_suggest=True) return f"Title: {page.title}\n\nSummary:\n{page.summary}" except wikipedia.exceptions.DisambiguationError as e: return f"Disambiguation required. Options: {e.options[:5]}" except wikipedia.exceptions.PageError: return "Page not found on Wikipedia." except Exception as e: print(f"Print:Wikipedia fetch error: {e}") return f"Wikipedia lookup failed. {e}" @tool def extract_text_from_pdf_url(pdf_url: str) -> str: """ Downloads a PDF from the provided URL and extracts text from the first few pages. """ print('Tool: Extracting PDF content') try: response = requests.get(pdf_url) response.raise_for_status() pdf_reader = PdfReader(BytesIO(response.content)) # Read text from the first few pages text = "" for i, page in enumerate(pdf_reader.pages[:3]): page_text = page.extract_text() if page_text: text += page_text print('Print:PDF txt', text[:3000]) return text[:3000] if text else "No text found in PDF." except Exception as e: print(f"Print:PDF extraction failed: {e}") return f"Failed to extract PDF content from {pdf_url}: {e}" # # NOT A REAL TOOL, only for debuggin # import re # Simple checker for potentially dangerous code def llm_content_checker(llm_input: str): """ Simple security checker that flags dangerous imports and patterns. Returns the input unchanged if safe, or a safe error message if dangerous. """ # List of dangerous imports to check for dangerous_imports = [ 'os', 'subprocess', 'shutil', 'sys', 'socket', 'urllib', 'requests', 'pickle', 'eval', 'exec', 'compile', '__import__' ] # List of dangerous patterns dangerous_patterns = [ r'rm\s+-rf', # Delete commands r'del\s+/[fs]', # Windows delete r'format\s+c:', # Format drive r'shutdown', # System shutdown r'system\s*\(', # System calls r'exec\s*\(', # Code execution r'eval\s*\(', # Code evaluation r'open\s*\([\'"][/\\]', # File access with absolute paths r'\.\./', # Directory traversal r'DROP\s+TABLE', # SQL deletion r'DELETE\s+FROM', # SQL deletion ] # Convert to lowercase for checking content_lower = llm_input.lower() # Check for dangerous imports for dangerous_import in dangerous_imports: if f'import {dangerous_import}' in content_lower or f'from {dangerous_import}' in content_lower: print(f"🚫 Blocked dangerous import: {dangerous_import}") return '{"final_answer": "Request blocked due to security restrictions."}' # Check for dangerous patterns for pattern in dangerous_patterns: if re.search(pattern, content_lower): print(f"🚫 Blocked dangerous pattern: {pattern}") return '{"final_answer": "Request blocked due to security restrictions."}' # If no dangerous content found, return original input return llm_input