File size: 4,611 Bytes
81ddc8e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""
Web Scraper Tool - Fetches and extracts text from policy pages
"""
import requests
from bs4 import BeautifulSoup
from crewai.tools import tool
import time

from utils.validators import validate_url, sanitize_text, truncate_content, validate_content_length
from utils.logger import log_agent_action

# Configuration
REQUEST_TIMEOUT = 30
MAX_RETRIES = 2
RETRY_DELAY = 2

HEADERS = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
    'Accept-Language': 'en-US,en;q=0.5',
}


def extract_text_from_html(html: str) -> str:
    """Extract clean text from HTML content."""
    soup = BeautifulSoup(html, 'html.parser')
    
    # Remove unwanted elements
    for element in soup(['script', 'style', 'nav', 'header', 'footer', 'aside', 'form', 'button']):
        element.decompose()
    
    # Try to find main content
    main_content = None
    for selector in ['main', 'article', '[role="main"]', '.content', '.policy-content', '#content']:
        main_content = soup.select_one(selector)
        if main_content:
            break
    
    if not main_content:
        main_content = soup.body if soup.body else soup
    
    text = main_content.get_text(separator='\n', strip=True)
    
    lines = [line.strip() for line in text.split('\n') if line.strip() and len(line.strip()) > 2]
    return '\n'.join(lines)


def get_page_title(html: str) -> str:
    """Extract page title from HTML"""
    soup = BeautifulSoup(html, 'html.parser')
    if soup.title and soup.title.string:
        return soup.title.string.strip()
    h1 = soup.find('h1')
    if h1:
        return h1.get_text(strip=True)
    return "Unknown Policy"


@tool("web_scraper")
def web_scraper_tool(url: str) -> str:
    """
    Scrapes text content from a policy webpage.
    
    Args:
        url: The URL of the policy page to scrape
        
    Returns:
        Extracted text content from the policy page
    """
    start_time = time.time()
    
    # Validate URL
    is_valid, error_msg = validate_url(url)
    if not is_valid:
        log_agent_action("Web Scraper Tool", "URL Validation", f"URL provided", f"Failed: {error_msg}", 
                        time.time() - start_time, False, error_msg)
        return f"Error: {error_msg}"
    
    try:
        # Fetch with retry
        response = None
        for attempt in range(MAX_RETRIES + 1):
            try:
                response = requests.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT)
                response.raise_for_status()
                break
            except requests.exceptions.RequestException as e:
                if attempt < MAX_RETRIES:
                    time.sleep(RETRY_DELAY)
                else:
                    raise e
        
        # Extract content
        html = response.text
        title = get_page_title(html)
        content = extract_text_from_html(html)
        content = sanitize_text(content)
        
        # Validate content
        is_valid, error_msg = validate_content_length(content)
        if not is_valid:
            log_agent_action("Web Scraper Tool", "Content Extraction", "HTML received", error_msg,
                           time.time() - start_time, False, error_msg)
            return f"Error: {error_msg}"
        
        content = truncate_content(content)
        word_count = len(content.split())
        
        log_agent_action("Web Scraper Tool", "Page Scraping", "URL fetched",
                        f"Extracted {word_count} words", time.time() - start_time, True)
        
        return f"TITLE: {title}\nWORD_COUNT: {word_count}\nCONTENT:\n{content}"
        
    except requests.exceptions.Timeout:
        error_msg = f"Request timed out after {REQUEST_TIMEOUT} seconds"
        log_agent_action("Web Scraper Tool", "Page Fetching", "Attempting fetch", error_msg,
                        time.time() - start_time, False, error_msg)
        return f"Error: {error_msg}"
        
    except requests.exceptions.HTTPError as e:
        error_msg = f"HTTP error: {e.response.status_code}"
        log_agent_action("Web Scraper Tool", "Page Fetching", "Attempting fetch", error_msg,
                        time.time() - start_time, False, error_msg)
        return f"Error: {error_msg}"
        
    except Exception as e:
        error_msg = f"Unexpected error: {str(e)}"
        log_agent_action("Web Scraper Tool", "Page Scraping", "Processing", error_msg,
                        time.time() - start_time, False, error_msg)
        return f"Error: {error_msg}"