| | from typing import Callable, Dict, List, Any |
| | import time |
| | import wikipedia |
| | from googlesearch import search |
| | from bs4 import BeautifulSoup |
| | from PIL import Image |
| | from io import BytesIO |
| | import pytesseract |
| | import requests |
| | from PyPDF2 import PdfReader |
| | import re |
| | |
| | import random |
| | from tavily import TavilyClient |
| | from playwright.sync_api import sync_playwright |
| | from bs4 import BeautifulSoup |
| | import os |
| | |
| | def tool(func: Callable) -> Callable: |
| | """Decorator to mark functions as tools.""" |
| | func.is_tool = True |
| | return func |
| | |
| |
|
| | import requests |
| | import xml.etree.ElementTree as ET |
| | from datetime import datetime |
| |
|
| | @tool |
| | def ArxivPaperSearcher(topic: str, max_results: int = 5): |
| |
|
| | """ |
| | Search arXiv for papers. |
| | |
| | Args: |
| | query (str): Search term(s), e.g. "machine learning". |
| | max_results (int): Number of results to fetch. |
| | |
| | Returns: |
| | List of dicts with paper info (title, authors, summary, url). |
| | """ |
| | base_url = "http://export.arxiv.org/api/query" |
| | timestamp = datetime.now().strftime("%Y%m%d%H%M%S") |
| | print(f"[{timestamp}] Searching arXiv for: {topic}") |
| |
|
| | |
| | params = { |
| | "search_query": f"all:{topic}", |
| | "start": 0, |
| | "max_results": max_results |
| | } |
| |
|
| | response = requests.get(base_url, params=params) |
| | response.raise_for_status() |
| |
|
| | |
| | root = ET.fromstring(response.text) |
| | ns = {"atom": "http://www.w3.org/2005/Atom"} |
| |
|
| | results = [] |
| | for entry in root.findall("atom:entry", ns): |
| | title = entry.find("atom:title", ns).text.strip() |
| | summary = entry.find("atom:summary", ns).text.strip() |
| | link = entry.find("atom:id", ns).text.strip() |
| | authors = [a.find("atom:name", ns).text for a in entry.findall("atom:author", ns)] |
| |
|
| | results.append({ |
| | "title": title, |
| | "authors": authors, |
| | "summary": summary, |
| | "url": link |
| | }) |
| |
|
| | print(f"Found {len(results)} results.") |
| | return results |
| |
|
| |
|
| |
|
| | |
| | |
| | _COOKIE_ATTR_RE = re.compile( |
| | r"(cookie|consent|gdpr|cmp|onetrust|ot-sdk|osano|iubenda|didomi|trustarc|truste|" |
| | r"quantcast|qc-cmp|axeptio|sp_message|sp-cc|privacy|manage-choices|preferences)", |
| | re.I, |
| | ) |
| | _COOKIE_TEXT_RE = re.compile( |
| | r"(cookies?|cookie settings|we (use|value) your privacy|consent|gdpr|" |
| | r"manage (cookies|choices)|your choices|accept all|reject all|use of cookies|" |
| | r"privacy (policy|preferences))", |
| | re.I, |
| | ) |
| |
|
| | def _normalize_text(txt: str) -> str: |
| | |
| | lines = [line.strip() for line in txt.splitlines()] |
| | lines = [ln for ln in lines if ln] |
| | return "\n".join(lines) |
| |
|
| | def _strip_noise_and_cookies(soup: BeautifulSoup) -> None: |
| | |
| | for tag in soup(["script", "style", "noscript", "template", "svg", "iframe"]): |
| | tag.decompose() |
| | for c in soup.find_all(string=lambda t: isinstance(t, Comment)): |
| | c.extract() |
| |
|
| | def looks_like_cookie(el) -> bool: |
| | |
| | attrs = [] |
| | for k in ("id", "class", "data-component", "data-testid", "aria-label"): |
| | v = el.get(k) |
| | if isinstance(v, list): |
| | v = " ".join(v) |
| | if v: |
| | attrs.append(v) |
| | attr_str = " ".join(attrs) |
| |
|
| | |
| | text = el.get_text(" ", strip=True) |
| | style = (el.get("style") or "") |
| | role = (el.get("role") or "").lower() |
| |
|
| | if _COOKIE_ATTR_RE.search(attr_str): |
| | return True |
| | if role in ("dialog", "alert", "banner") and (_COOKIE_ATTR_RE.search(attr_str) or _COOKIE_TEXT_RE.search(text)): |
| | return True |
| |
|
| | |
| | if text and len(text) < 1200 and _COOKIE_TEXT_RE.search(text): |
| | return True |
| |
|
| | style_l = style.replace(" ", "").lower() |
| | if ("position:fixed" in style_l or "position:sticky" in style_l) and ( |
| | _COOKIE_TEXT_RE.search(text) or _COOKIE_ATTR_RE.search(attr_str) |
| | ): |
| | return True |
| |
|
| | return False |
| |
|
| | |
| | for el in soup.find_all(["div", "section", "aside", "form", "nav", "footer", "header", "dialog"]): |
| | if looks_like_cookie(el): |
| | target = el |
| | for _ in range(3): |
| | p = target.parent |
| | if not p or p.name in ("body", "html"): |
| | break |
| | p_style = (p.get("style") or "").replace(" ", "").lower() |
| | p_attrs = " ".join( |
| | filter( |
| | None, |
| | [ |
| | p.get("id") or "", |
| | " ".join(p.get("class", [])) if isinstance(p.get("class"), list) else (p.get("class") or ""), |
| | ], |
| | ) |
| | ) |
| | if "position:fixed" in p_style or _COOKIE_ATTR_RE.search(p_attrs): |
| | target = p |
| | else: |
| | break |
| | target.decompose() |
| |
|
| | |
| | for el in soup.find_all(style=True): |
| | s = el["style"].replace(" ", "").lower() |
| | if "position:fixed" in s and ("width:100%" in s or "inset:" in s or "top:0" in s): |
| | if len(el.get_text(strip=True)) < 200: |
| | el.decompose() |
| |
|
| | @tool |
| | def extract_content_with_playwright(url): |
| | with sync_playwright() as p: |
| | browser = p.chromium.launch(headless=False) |
| | context = browser.new_context( |
| | user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) " |
| | "AppleWebKit/537.36 (KHTML, like Gecko) " |
| | "Chrome/115.0.0.0 Safari/537.36", |
| | viewport={"width": 1280, "height": 800}, |
| | locale="en-US", |
| | ) |
| | page = context.new_page() |
| | response = page.goto(url, wait_until="networkidle", timeout=30000) |
| | print(f"Page response status: {response.status}") |
| |
|
| | |
| | try: |
| | page.click('button:has-text("Accept")', timeout=5000) |
| | page.wait_for_load_state("networkidle") |
| | print("Clicked Accept on cookie consent.") |
| | except Exception as e: |
| | print("No cookie accept button found or clicking failed:", e) |
| |
|
| | content = page.content() |
| | browser.close() |
| | return content |
| |
|
| | def extract_webpage_content(url: str) -> str: |
| |
|
| | |
| | |
| | |
| | if url and not url.startswith(('http://', 'https://')): |
| | url = f"https://{url}" |
| | |
| | |
| | |
| | if not url: |
| | return "ERROR: Empty URL provided" |
| |
|
| | html = extract_content_with_playwright(url) |
| | print(html[:1000]) |
| |
|
| | soup = BeautifulSoup(html, 'html.parser') |
| | _strip_noise_and_cookies(soup) |
| |
|
| | candidates = soup.find_all(['article', 'main', 'section', 'div']) |
| | candidates = [c for c in candidates if len(c.get_text(strip=True)) > 300] |
| | best = max(candidates, key=lambda c: len(c.get_text()), default=soup.body) |
| |
|
| | return best.get_text(separator="\n", strip=True)[:10000] |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| |
|
| | |
| | |
| |
|
| | def preprocess_search_results(results, n_results=4, min_score=0): |
| | |
| | filtered = [ |
| | { |
| | "title": item["title"], |
| | "url": item["url"], |
| | "summary": item["content"], |
| | "score": item["score"] |
| | } |
| | for item in results |
| | if item["score"] >= min_score |
| | ] |
| |
|
| | |
| | sorted_results = sorted(filtered, key=lambda x: x["score"], reverse=True) |
| |
|
| | |
| | return [ |
| | { |
| | "title": item["title"], |
| | "url": item["url"], |
| | "summary": item["summary"] |
| | } |
| | for item in sorted_results[:n_results] |
| | ] |
| |
|
| | @tool |
| | def web_search_tool(query: str) -> str: |
| | """Performs a web search using and return url, title, and summary.""" |
| | def duck_duck_go(query): |
| | print('Print:Tool: Performing DuckDuckGo Search...') |
| | print('Print:query:', query) |
| | try: |
| | time.sleep(random.uniform(2, 5)) |
| | with DDGS() as ddgs: |
| | results = ddgs.text(query, max_results=5) |
| | except Exception as e: |
| | print(f"Print:❌ DuckDuckGo search failed: {e}") |
| | results = None |
| |
|
| | |
| | output = "## DuckDuckGo Search Results\n\n" |
| | for i, r in enumerate(results, 1): |
| | output += f"{i}. {r['title']}\n{r['href']}\n\n" |
| | return output |
| | |
| |
|
| | def tavily(query): |
| | print("Print:🔁 Falling back to Tavily Search...") |
| | try: |
| | tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY")) |
| | response_out = tavily_client.search(query) |
| | |
| | response_out = preprocess_search_results(response_out['results']) |
| | except Exception as e: |
| | print(f"Print:❌ tavily search failed: {e}") |
| |
|
| | |
| | return response_out |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | try: |
| | query_out = tavily(query) |
| | except: |
| | time.sleep(3) |
| | query_out = tavily(query) |
| | print('Print Search results:', query_out) |
| | return query_out |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | |
| |
|
| | @tool |
| | def retrieve_images_from_url(url: str) -> list: |
| | """Extracts image from a URLs.""" |
| | print('Print:Tool: Retrieving Image') |
| | try: |
| | response = requests.get(url) |
| | response.raise_for_status() |
| | soup = BeautifulSoup(response.text, 'html.parser') |
| | images = [img['src'] for img in soup.find_all('img') if img.get('src')] |
| | return images if images else ["No images found."] |
| | except Exception as e: |
| | print(f"Print:Image retrieval error: {e}") |
| | return [f"Failed to retrieve images. {e}"] |
| |
|
| | @tool |
| | def perform_ocr_on_image(image_url: str) -> str: |
| | """Download an image and perform OCR to extract text.""" |
| | print('Print:Tool: Performing OCR') |
| | try: |
| | response = requests.get(image_url) |
| | response.raise_for_status() |
| | image = Image.open(BytesIO(response.content)) |
| | text = pytesseract.image_to_string(image) |
| | return text.strip() if text else "No text found in image." |
| | except Exception as e: |
| | print(f"Print:OCR failed: {e}") |
| | return f"OCR failed. {e}" |
| |
|
| | @tool |
| | def wikipedia_tool(topic: str) -> str: |
| | """Search Wikipedia for a given topic and return the summary.""" |
| | print('Print:Tool: Performing Wiki Search') |
| | try: |
| | page = wikipedia.page(topic, auto_suggest=True) |
| | return f"Title: {page.title}\n\nSummary:\n{page.summary}" |
| | except wikipedia.exceptions.DisambiguationError as e: |
| | return f"Disambiguation required. Options: {e.options[:5]}" |
| | except wikipedia.exceptions.PageError: |
| | return "Page not found on Wikipedia." |
| | except Exception as e: |
| | print(f"Print:Wikipedia fetch error: {e}") |
| | return f"Wikipedia lookup failed. {e}" |
| |
|
| |
|
| | @tool |
| | def extract_text_from_pdf_url(pdf_url: str) -> str: |
| | """ |
| | Downloads a PDF from the provided URL and extracts text from the first few pages. |
| | """ |
| | print('Tool: Extracting PDF content') |
| | try: |
| | response = requests.get(pdf_url) |
| | response.raise_for_status() |
| | pdf_reader = PdfReader(BytesIO(response.content)) |
| |
|
| | |
| | text = "" |
| | for i, page in enumerate(pdf_reader.pages[:3]): |
| | page_text = page.extract_text() |
| | if page_text: |
| | text += page_text |
| | print('Print:PDF txt', text[:3000]) |
| | return text[:3000] if text else "No text found in PDF." |
| | except Exception as e: |
| | print(f"Print:PDF extraction failed: {e}") |
| | return f"Failed to extract PDF content from {pdf_url}: {e}" |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | import re |
| |
|
| | |
| | def llm_content_checker(llm_input: str): |
| | """ |
| | Simple security checker that flags dangerous imports and patterns. |
| | Returns the input unchanged if safe, or a safe error message if dangerous. |
| | """ |
| |
|
| | |
| | dangerous_imports = [ |
| | 'os', 'subprocess', 'shutil', 'sys', 'socket', 'urllib', |
| | 'requests', 'pickle', 'eval', 'exec', 'compile', '__import__' |
| | ] |
| |
|
| | |
| | dangerous_patterns = [ |
| | r'rm\s+-rf', |
| | r'del\s+/[fs]', |
| | r'format\s+c:', |
| | r'shutdown', |
| | r'system\s*\(', |
| | r'exec\s*\(', |
| | r'eval\s*\(', |
| | r'open\s*\([\'"][/\\]', |
| | r'\.\./', |
| | r'DROP\s+TABLE', |
| | r'DELETE\s+FROM', |
| | ] |
| |
|
| | |
| | content_lower = llm_input.lower() |
| |
|
| | |
| | for dangerous_import in dangerous_imports: |
| | if f'import {dangerous_import}' in content_lower or f'from {dangerous_import}' in content_lower: |
| | print(f"🚫 Blocked dangerous import: {dangerous_import}") |
| | return '{"final_answer": "Request blocked due to security restrictions."}' |
| |
|
| | |
| | for pattern in dangerous_patterns: |
| | if re.search(pattern, content_lower): |
| | print(f"🚫 Blocked dangerous pattern: {pattern}") |
| | return '{"final_answer": "Request blocked due to security restrictions."}' |
| |
|
| | |
| | return llm_input |
| |
|
| |
|
| |
|
| |
|