Spaces:
Sleeping
Sleeping
File size: 4,611 Bytes
81ddc8e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 | """
Web Scraper Tool - Fetches and extracts text from policy pages
"""
import requests
from bs4 import BeautifulSoup
from crewai.tools import tool
import time
from utils.validators import validate_url, sanitize_text, truncate_content, validate_content_length
from utils.logger import log_agent_action
# Configuration
REQUEST_TIMEOUT = 30
MAX_RETRIES = 2
RETRY_DELAY = 2
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
}
def extract_text_from_html(html: str) -> str:
"""Extract clean text from HTML content."""
soup = BeautifulSoup(html, 'html.parser')
# Remove unwanted elements
for element in soup(['script', 'style', 'nav', 'header', 'footer', 'aside', 'form', 'button']):
element.decompose()
# Try to find main content
main_content = None
for selector in ['main', 'article', '[role="main"]', '.content', '.policy-content', '#content']:
main_content = soup.select_one(selector)
if main_content:
break
if not main_content:
main_content = soup.body if soup.body else soup
text = main_content.get_text(separator='\n', strip=True)
lines = [line.strip() for line in text.split('\n') if line.strip() and len(line.strip()) > 2]
return '\n'.join(lines)
def get_page_title(html: str) -> str:
"""Extract page title from HTML"""
soup = BeautifulSoup(html, 'html.parser')
if soup.title and soup.title.string:
return soup.title.string.strip()
h1 = soup.find('h1')
if h1:
return h1.get_text(strip=True)
return "Unknown Policy"
@tool("web_scraper")
def web_scraper_tool(url: str) -> str:
"""
Scrapes text content from a policy webpage.
Args:
url: The URL of the policy page to scrape
Returns:
Extracted text content from the policy page
"""
start_time = time.time()
# Validate URL
is_valid, error_msg = validate_url(url)
if not is_valid:
log_agent_action("Web Scraper Tool", "URL Validation", f"URL provided", f"Failed: {error_msg}",
time.time() - start_time, False, error_msg)
return f"Error: {error_msg}"
try:
# Fetch with retry
response = None
for attempt in range(MAX_RETRIES + 1):
try:
response = requests.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT)
response.raise_for_status()
break
except requests.exceptions.RequestException as e:
if attempt < MAX_RETRIES:
time.sleep(RETRY_DELAY)
else:
raise e
# Extract content
html = response.text
title = get_page_title(html)
content = extract_text_from_html(html)
content = sanitize_text(content)
# Validate content
is_valid, error_msg = validate_content_length(content)
if not is_valid:
log_agent_action("Web Scraper Tool", "Content Extraction", "HTML received", error_msg,
time.time() - start_time, False, error_msg)
return f"Error: {error_msg}"
content = truncate_content(content)
word_count = len(content.split())
log_agent_action("Web Scraper Tool", "Page Scraping", "URL fetched",
f"Extracted {word_count} words", time.time() - start_time, True)
return f"TITLE: {title}\nWORD_COUNT: {word_count}\nCONTENT:\n{content}"
except requests.exceptions.Timeout:
error_msg = f"Request timed out after {REQUEST_TIMEOUT} seconds"
log_agent_action("Web Scraper Tool", "Page Fetching", "Attempting fetch", error_msg,
time.time() - start_time, False, error_msg)
return f"Error: {error_msg}"
except requests.exceptions.HTTPError as e:
error_msg = f"HTTP error: {e.response.status_code}"
log_agent_action("Web Scraper Tool", "Page Fetching", "Attempting fetch", error_msg,
time.time() - start_time, False, error_msg)
return f"Error: {error_msg}"
except Exception as e:
error_msg = f"Unexpected error: {str(e)}"
log_agent_action("Web Scraper Tool", "Page Scraping", "Processing", error_msg,
time.time() - start_time, False, error_msg)
return f"Error: {error_msg}"
|