Spaces:
Runtime error
Runtime error
| """ | |
| Fetch and extract content from web pages. | |
| Includes Wayback Machine fallback for 403 errors. | |
| """ | |
| import httpx | |
| from bs4 import BeautifulSoup | |
| from typing import Optional, Dict | |
| from urllib.parse import urljoin, urlparse | |
| import re | |
| import json | |
| import time | |
| def clean_text(text: str) -> str: | |
| """Clean extracted text.""" | |
| text = re.sub(r'\s+', ' ', text) | |
| text = text.strip() | |
| return text | |
| def extract_main_content(html: str, url: str, max_chars: int = 120000) -> str: | |
| """Extract main content from HTML, removing scripts, styles, nav, footer.""" | |
| soup = BeautifulSoup(html, 'lxml') | |
| for script in soup(['script', 'style', 'noscript']): | |
| script.decompose() | |
| for nav in soup.find_all(['nav', 'header', 'footer', 'aside']): | |
| nav.decompose() | |
| junk_patterns = ['nav', 'navigation', 'menu', 'sidebar', 'footer', 'header', 'cookie', 'banner', 'advertisement', 'ad-', 'social-'] | |
| for element in soup.find_all(class_=re.compile('|'.join(junk_patterns), re.I)): | |
| element.decompose() | |
| for element in soup.find_all(id=re.compile('|'.join(junk_patterns), re.I)): | |
| element.decompose() | |
| main_content = None | |
| main_selectors = ['main', 'article', '[role="main"]', '.content', '.main-content', '.post-content', '#content', '#main-content', '#main'] | |
| for selector in main_selectors: | |
| main_content = soup.select_one(selector) | |
| if main_content: | |
| break | |
| if not main_content: | |
| main_content = soup.find('body') | |
| if not main_content: | |
| return "" | |
| text = main_content.get_text(separator=' ', strip=True) | |
| text = clean_text(text) | |
| if len(text) > max_chars: | |
| text = text[:max_chars] + "... [truncated]" | |
| return text | |
| def get_wayback_snapshot(url: str, timeout: float = 20.0) -> Optional[str]: | |
| """Get the latest Wayback Machine snapshot URL for a given URL.""" | |
| try: | |
| api_url = f"https://web.archive.org/cdx/search/cdx?url={url}&output=json&limit=1&collapse=urlkey" | |
| headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36'} | |
| for attempt in range(2): | |
| try: | |
| with httpx.Client(timeout=timeout, headers=headers) as client: | |
| response = client.get(api_url) | |
| response.raise_for_status() | |
| data = response.json() | |
| if len(data) > 1: | |
| timestamp = data[1][1] | |
| wayback_url = f"https://web.archive.org/web/{timestamp}/{url}" | |
| return wayback_url | |
| break | |
| except httpx.TimeoutException: | |
| if attempt == 0: | |
| print(f"Wayback API timeout, retrying...") | |
| time.sleep(1) | |
| continue | |
| else: | |
| raise | |
| except Exception as e: | |
| if attempt == 0: | |
| print(f"Wayback API error, retrying...") | |
| time.sleep(1) | |
| continue | |
| else: | |
| raise | |
| return None | |
| except Exception as e: | |
| print(f"Error getting Wayback snapshot for {url}: {e}") | |
| return None | |
| def extract_wayback_content(html: str) -> Optional[str]: | |
| """Extract original content from Wayback Machine wrapper HTML.""" | |
| try: | |
| soup = BeautifulSoup(html, 'html.parser') | |
| content_divs = soup.find_all('div', id='webpage') | |
| if not content_divs: | |
| content_divs = soup.find_all('div', class_='webpage') | |
| if not content_divs: | |
| content_divs = soup.find_all('div', {'id': re.compile('content|main', re.I)}) | |
| if content_divs: | |
| return str(content_divs[0]) | |
| body = soup.find('body') | |
| if body: | |
| for elem in body.find_all(['div', 'script', 'style'], class_=re.compile('wm-|wayback', re.I)): | |
| elem.decompose() | |
| return str(body) | |
| return html | |
| except Exception as e: | |
| print(f"Error extracting Wayback content: {e}") | |
| return html | |
| def fetch_page(url: str, timeout: float = 30.0, use_wayback_fallback: bool = True) -> Optional[Dict[str, str]]: | |
| """Fetch a webpage and extract its content. Falls back to Wayback Machine if 403 error occurs.""" | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
| 'Accept-Language': 'en-US,en;q=0.9', | |
| 'Accept-Encoding': 'gzip, deflate, br', | |
| 'Connection': 'keep-alive', | |
| 'Upgrade-Insecure-Requests': '1', | |
| 'Sec-Fetch-Dest': 'document', | |
| 'Sec-Fetch-Mode': 'navigate', | |
| 'Sec-Fetch-Site': 'none', | |
| 'Cache-Control': 'max-age=0' | |
| } | |
| try: | |
| with httpx.Client(timeout=timeout, follow_redirects=True, headers=headers) as client: | |
| response = client.get(url, headers=headers) | |
| response.raise_for_status() | |
| content_type = response.headers.get('content-type', '').lower() | |
| if 'text/html' not in content_type: | |
| print(f"Skipping non-HTML content: {content_type}") | |
| return None | |
| html = response.text | |
| soup = BeautifulSoup(html, 'lxml') | |
| title_tag = soup.find('title') | |
| title = title_tag.get_text(strip=True) if title_tag else 'No title' | |
| content = extract_main_content(html, url) | |
| if not content or len(content) < 100: | |
| print(f"Warning: Very little content extracted from {url}") | |
| return {'url': url, 'title': title, 'content': content} | |
| except httpx.TimeoutException: | |
| print(f"Timeout fetching {url}") | |
| return None | |
| except httpx.HTTPStatusError as e: | |
| if e.response.status_code == 403 and use_wayback_fallback: | |
| print(f"HTTP 403 error fetching {url}, trying Wayback Machine...") | |
| wayback_result = fetch_from_wayback(url, timeout) | |
| if wayback_result: | |
| return wayback_result | |
| # If Wayback also fails, try MCP browser fallback | |
| try: | |
| from mcp_fallback import mcp_fetch_url_fallback | |
| print(f"Wayback Machine failed, trying MCP browser fallback for {url}...") | |
| mcp_result = mcp_fetch_url_fallback(url) | |
| if mcp_result: | |
| return mcp_result | |
| except ImportError: | |
| pass # MCP not available | |
| except Exception as mcp_error: | |
| print(f"MCP fallback error: {mcp_error}") | |
| return None | |
| else: | |
| print(f"HTTP error {e.response.status_code} fetching {url}") | |
| # Try MCP browser as last resort | |
| try: | |
| from mcp_fallback import mcp_fetch_url_fallback | |
| print(f"Trying MCP browser fallback for {url}...") | |
| mcp_result = mcp_fetch_url_fallback(url) | |
| if mcp_result: | |
| return mcp_result | |
| except ImportError: | |
| pass | |
| except Exception as mcp_error: | |
| print(f"MCP fallback error: {mcp_error}") | |
| return None | |
| except Exception as e: | |
| print(f"Error fetching {url}: {e}") | |
| # Try MCP browser as last resort | |
| try: | |
| from mcp_fallback import mcp_fetch_url_fallback | |
| print(f"Trying MCP browser fallback for {url}...") | |
| mcp_result = mcp_fetch_url_fallback(url) | |
| if mcp_result: | |
| return mcp_result | |
| except ImportError: | |
| pass | |
| except Exception as mcp_error: | |
| print(f"MCP fallback error: {mcp_error}") | |
| return None | |
| def fetch_from_wayback(url: str, timeout: float = 30.0) -> Optional[Dict[str, str]]: | |
| """Fetch a webpage from Wayback Machine (Internet Archive).""" | |
| try: | |
| wayback_url = get_wayback_snapshot(url, timeout=10.0) | |
| if not wayback_url: | |
| print(f"No Wayback Machine snapshot found for {url}") | |
| return None | |
| print(f"Fetching from Wayback Machine: {wayback_url}") | |
| headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36'} | |
| with httpx.Client(timeout=timeout, follow_redirects=True, headers=headers) as client: | |
| response = client.get(wayback_url) | |
| response.raise_for_status() | |
| html = response.text | |
| original_html = extract_wayback_content(html) | |
| if original_html: | |
| html = original_html | |
| soup = BeautifulSoup(html, 'lxml') | |
| title_tag = soup.find('title') | |
| title = title_tag.get_text(strip=True) if title_tag else '' | |
| title = re.sub(r'^.*?Wayback Machine\s*[:\-]\s*', '', title, flags=re.I) | |
| title = re.sub(r'\s*[:\-]\s*Wayback Machine.*?$', '', title, flags=re.I) | |
| title = re.sub(r'^.*?Internet Archive\s*[:\-]\s*', '', title, flags=re.I) | |
| if not title or title.lower() in ['wayback machine', 'internet archive']: | |
| h1 = soup.find('h1') | |
| if h1: | |
| title = h1.get_text(strip=True) | |
| else: | |
| meta_title = soup.find('meta', property='og:title') | |
| if meta_title: | |
| title = meta_title.get('content', '') | |
| if not title or title.lower() in ['wayback machine', 'internet archive', 'no title']: | |
| parsed = urlparse(url) | |
| title = parsed.path.strip('/').replace('/', ' - ').replace('-', ' ').title() | |
| if not title: | |
| title = parsed.netloc.replace('.', ' ').title() | |
| if not title: | |
| title = 'No title' | |
| content = extract_main_content(html, url) | |
| if not content or len(content) < 100: | |
| print(f"Warning: Very little content extracted from Wayback snapshot for {url}") | |
| return {'url': url, 'title': title, 'content': content, 'source': 'wayback_machine'} | |
| except httpx.TimeoutException: | |
| print(f"Timeout fetching from Wayback Machine: {url}") | |
| return None | |
| except httpx.HTTPStatusError as e: | |
| print(f"HTTP error {e.response.status_code} fetching from Wayback Machine: {url}") | |
| return None | |
| except Exception as e: | |
| print(f"Error fetching from Wayback Machine {url}: {e}") | |
| return None | |
| def get_internal_links(html: str, base_url: str, same_domain_only: bool = True) -> list: | |
| """Extract internal links from HTML.""" | |
| soup = BeautifulSoup(html, 'lxml') | |
| base_parsed = urlparse(base_url) | |
| base_domain = base_parsed.netloc.lower() | |
| links = [] | |
| seen = set() | |
| for a_tag in soup.find_all('a', href=True): | |
| href = a_tag['href'] | |
| absolute_url = urljoin(base_url, href) | |
| parsed = urlparse(absolute_url) | |
| clean_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}" | |
| if parsed.query: | |
| clean_url += f"?{parsed.query}" | |
| if same_domain_only: | |
| if parsed.netloc.lower() != base_domain: | |
| continue | |
| if clean_url in seen or clean_url == base_url: | |
| continue | |
| seen.add(clean_url) | |
| links.append(clean_url) | |
| return links | |