Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Web Scraper MCP Server | |
| A Model Context Protocol server that provides web scraping tools. | |
| Exposes functions to scrape websites, convert content to markdown, and generate sitemaps. | |
| """ | |
| import gradio as gr | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from markdownify import markdownify as md | |
| from urllib.parse import urljoin, urlparse | |
| from typing import Tuple, List, Dict | |
| import re | |
| import tempfile | |
| import zipfile | |
| import os | |
| def scrape_website_content(url: str) -> Tuple[str, str]: | |
| """ | |
| Scrape a website and return its main content formatted as markdown and a downloadable file path. | |
| Args: | |
| url (str): The URL to scrape (can include or omit http/https protocol) | |
| Returns: | |
| Tuple[str, str]: The scraped content formatted as markdown, and a file path for download | |
| """ | |
| try: | |
| # Validate URL | |
| if not url.startswith(('http://', 'https://')): | |
| url = 'https://' + url | |
| # Create session with proper headers | |
| session = requests.Session() | |
| session.headers.update({ | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| }) | |
| # Make request | |
| response = session.get(url, timeout=10) | |
| response.raise_for_status() | |
| # Parse HTML | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # Remove unwanted elements | |
| for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']): | |
| element.decompose() | |
| # Try to find main content area | |
| main_content = ( | |
| soup.find('main') or | |
| soup.find('article') or | |
| soup.find('div', class_=re.compile(r'content|main|post|article')) or | |
| soup.find('body') | |
| ) | |
| if main_content: | |
| # Convert to markdown | |
| markdown_text = md(str(main_content), heading_style="ATX") | |
| # Clean up the markdown | |
| # Remove excessive newlines | |
| markdown_text = re.sub(r'\n{3,}', '\n\n', markdown_text) | |
| # Remove empty links | |
| markdown_text = re.sub(r'\[\s*\]\([^)]*\)', '', markdown_text) | |
| # Clean up whitespace | |
| markdown_text = re.sub(r'[ \t]+', ' ', markdown_text) | |
| # Add title if available | |
| title = soup.find('title') | |
| if title: | |
| markdown_text = f"# {title.get_text().strip()}\n\n{markdown_text}" | |
| markdown_text = markdown_text.strip() | |
| # Write to temp file for download | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".md", mode="w", encoding="utf-8") as f: | |
| f.write(markdown_text) | |
| temp_path = f.name | |
| return markdown_text, temp_path | |
| return "No main content found on the webpage.", None | |
| except requests.exceptions.RequestException as e: | |
| return f"Error fetching URL: {str(e)}", None | |
| except Exception as e: | |
| return f"Error processing content: {str(e)}", None | |
| def generate_sitemap(url: str, max_links_per_domain: int = None) -> Tuple[str, str]: | |
| """ | |
| Generate a sitemap from all links found on a webpage and provide a downloadable file path. | |
| Args: | |
| url (str): The URL to analyze for links (can include or omit http/https protocol) | |
| max_links_per_domain (int, optional): Maximum number of links to display per domain. | |
| If None, shows all links. Defaults to None. | |
| Returns: | |
| Tuple[str, str]: A markdown-formatted sitemap of all links found on the page, and a file path for download | |
| """ | |
| try: | |
| # Validate URL | |
| if not url.startswith(('http://', 'https://')): | |
| url = 'https://' + url | |
| # Create session with proper headers | |
| session = requests.Session() | |
| session.headers.update({ | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| }) | |
| # Make request | |
| response = session.get(url, timeout=10) | |
| response.raise_for_status() | |
| # Parse HTML | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # Find all links | |
| links = soup.find_all('a', href=True) | |
| # Process links | |
| sitemap_data = [] | |
| seen_urls = set() | |
| for link in links: | |
| href = link.get('href') | |
| text = link.get_text().strip() | |
| if not href: | |
| continue | |
| # Convert relative URLs to absolute | |
| full_url = urljoin(url, href) | |
| # Filter out unwanted links | |
| if (full_url in seen_urls or | |
| href.startswith(('#', 'javascript:', 'mailto:', 'tel:')) or | |
| full_url == url): | |
| continue | |
| seen_urls.add(full_url) | |
| # Create link entry | |
| if not text: | |
| text = href | |
| sitemap_data.append({ | |
| 'text': text[:100] + '...' if len(text) > 100 else text, | |
| 'url': full_url | |
| }) | |
| # Generate sitemap markdown | |
| if not sitemap_data: | |
| return "No links found on this page.", None | |
| sitemap_md = "# Sitemap\n\n" | |
| sitemap_md += f"Found {len(sitemap_data)} links:\n\n" | |
| # Group by domain for better organization | |
| domain_groups = {} | |
| parsed_base = urlparse(url) | |
| for item in sitemap_data: | |
| parsed_url = urlparse(item['url']) | |
| if parsed_url.netloc == parsed_base.netloc: | |
| domain_key = "Internal Links" | |
| else: | |
| domain_key = f"External Links ({parsed_url.netloc})" | |
| if domain_key not in domain_groups: | |
| domain_groups[domain_key] = [] | |
| domain_groups[domain_key].append(item) | |
| # Format sitemap | |
| for domain, links in domain_groups.items(): | |
| sitemap_md += f"## {domain}\n\n" | |
| # Use the limit parameter or show all links if None | |
| if max_links_per_domain is None: | |
| links_to_show = links | |
| remaining_links = 0 | |
| else: | |
| links_to_show = links[:max_links_per_domain] | |
| remaining_links = max(0, len(links) - max_links_per_domain) | |
| for link in links_to_show: | |
| sitemap_md += f"- [{link['text']}]({link['url']})\n" | |
| if remaining_links > 0: | |
| sitemap_md += f"- ... and {remaining_links} more links\n" | |
| sitemap_md += "\n" | |
| # Write to temp file for download | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".md", mode="w", encoding="utf-8") as f: | |
| f.write(sitemap_md) | |
| temp_path = f.name | |
| return sitemap_md, temp_path | |
| except requests.exceptions.RequestException as e: | |
| return f"Error fetching URL: {str(e)}", None | |
| except Exception as e: | |
| return f"Error processing content: {str(e)}", None | |
| def extract_all_content_as_zip(url: str, max_links: int = None) -> Tuple[str, str]: | |
| """ | |
| Extract text content from all links found on a webpage and create a downloadable zip file. | |
| Args: | |
| url (str): The URL to analyze for links (can include or omit http/https protocol) | |
| max_links (int, optional): Maximum number of links to process. If None, processes all links. Defaults to None. | |
| Returns: | |
| Tuple[str, str]: Status message and zip file path for download | |
| """ | |
| try: | |
| # Validate URL | |
| if not url.startswith(('http://', 'https://')): | |
| url = 'https://' + url | |
| # Create session with proper headers | |
| session = requests.Session() | |
| session.headers.update({ | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| }) | |
| # First get the sitemap to find all links | |
| response = session.get(url, timeout=10) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| links = soup.find_all('a', href=True) | |
| # Process links to get unique URLs | |
| unique_urls = set() | |
| parsed_base = urlparse(url) | |
| for link in links: | |
| href = link.get('href') | |
| if not href: | |
| continue | |
| full_url = urljoin(url, href) | |
| # Filter out unwanted links and focus on same domain for safety | |
| if (href.startswith(('#', 'javascript:', 'mailto:', 'tel:')) or | |
| full_url == url): | |
| continue | |
| # Only include internal links to avoid scraping too many external sites | |
| parsed_url = urlparse(full_url) | |
| if parsed_url.netloc == parsed_base.netloc: | |
| unique_urls.add(full_url) | |
| if not unique_urls: | |
| return "No internal links found to extract content from.", None | |
| # Use all URLs or limit if specified | |
| urls_to_process = list(unique_urls) | |
| total_links_found = len(urls_to_process) | |
| # Apply limit if specified | |
| if max_links is not None: | |
| urls_to_process = urls_to_process[:max_links] | |
| limited_message = f" (limited to {max_links} out of {total_links_found})" | |
| else: | |
| limited_message = "" | |
| # Create temporary zip file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as temp_zip: | |
| zip_path = temp_zip.name | |
| successful_extractions = 0 | |
| failed_extractions = 0 | |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zip_file: | |
| for i, link_url in enumerate(urls_to_process, 1): | |
| try: | |
| # Get content from each link | |
| link_response = session.get(link_url, timeout=10) | |
| link_response.raise_for_status() | |
| # Parse and extract content | |
| link_soup = BeautifulSoup(link_response.content, 'html.parser') | |
| # Remove unwanted elements | |
| for element in link_soup(['script', 'style', 'nav', 'footer', 'header', 'aside']): | |
| element.decompose() | |
| # Find main content | |
| main_content = ( | |
| link_soup.find('main') or | |
| link_soup.find('article') or | |
| link_soup.find('div', class_=re.compile(r'content|main|post|article')) or | |
| link_soup.find('body') | |
| ) | |
| if main_content: | |
| # Convert to markdown | |
| markdown_text = md(str(main_content), heading_style="ATX") | |
| # Clean up the markdown | |
| markdown_text = re.sub(r'\n{3,}', '\n\n', markdown_text) | |
| markdown_text = re.sub(r'\[\s*\]\([^)]*\)', '', markdown_text) | |
| markdown_text = re.sub(r'[ \t]+', ' ', markdown_text) | |
| # Add title if available | |
| title = link_soup.find('title') | |
| if title: | |
| markdown_text = f"# {title.get_text().strip()}\n\n{markdown_text}" | |
| markdown_text = markdown_text.strip() | |
| # Create safe filename | |
| parsed_link = urlparse(link_url) | |
| safe_filename = re.sub(r'[^\w\-_.]', '_', parsed_link.path or 'index') | |
| if not safe_filename.endswith('.md'): | |
| safe_filename += '.md' | |
| # Ensure unique filename | |
| if safe_filename == '.md' or safe_filename == 'index.md': | |
| safe_filename = f"page_{i}.md" | |
| # Add source URL as header | |
| final_content = f"<!-- Source: {link_url} -->\n\n{markdown_text}" | |
| # Add to zip | |
| zip_file.writestr(safe_filename, final_content) | |
| successful_extractions += 1 | |
| else: failed_extractions += 1 | |
| except Exception as e: | |
| failed_extractions += 1 | |
| continue | |
| status_message = f"Successfully extracted content from {successful_extractions} pages{limited_message}" | |
| if failed_extractions > 0: | |
| status_message += f", failed to extract from {failed_extractions} pages" | |
| status_message += f". Created zip file with {successful_extractions} markdown files." | |
| return status_message, zip_path | |
| except requests.exceptions.RequestException as e: | |
| return f"Error fetching URL: {str(e)}", None | |
| except Exception as e: | |
| return f"Error processing content: {str(e)}", None | |
| def generate_sitemap_for_ui(url: str) -> Tuple[str, str]: | |
| """ | |
| Wrapper function for the Gradio UI that shows all links without limitation. | |
| Args: | |
| url (str): The URL to analyze for links | |
| Returns: | |
| Tuple[str, str]: A markdown-formatted sitemap of all links found on the page, and a file path for download | |
| """ | |
| return generate_sitemap(url, max_links_per_domain=None) | |
| def generate_sitemap_with_limit(url: str, max_links_per_domain: int) -> Tuple[str, str]: | |
| """ | |
| Wrapper function for Gradio UI that allows configurable link limits per domain. | |
| Args: | |
| url (str): The URL to analyze for links | |
| max_links_per_domain (int): Maximum number of links to display per domain (0 = show all) | |
| Returns: | |
| Tuple[str, str]: A markdown-formatted sitemap of all links found on the page, and a file path for download | |
| """ | |
| limit = None if max_links_per_domain == 0 else max_links_per_domain | |
| return generate_sitemap(url, max_links_per_domain=limit) | |
| def extract_all_content_for_ui(url: str) -> Tuple[str, str]: | |
| """ | |
| Wrapper function for the Gradio UI that extracts content from all internal links without limitation. | |
| Args: | |
| url (str): The URL to analyze for links | |
| Returns: | |
| Tuple[str, str]: Status message and zip file path for download | |
| """ | |
| return extract_all_content_as_zip(url, max_links=None) | |
| def extract_limited_content_as_zip(url: str, max_links: int) -> Tuple[str, str]: | |
| """ | |
| Wrapper function for Gradio UI that allows configurable link limits for bulk extraction. | |
| Args: | |
| url (str): The URL to analyze for links | |
| max_links (int): Maximum number of links to process (0 = process all) | |
| Returns: | |
| Tuple[str, str]: Status message and zip file path for download | |
| """ | |
| limit = None if max_links == 0 else max_links | |
| return extract_all_content_as_zip(url, max_links=limit) | |
| # Create Gradio interfaces for each function | |
| def create_mcp_interface(): | |
| """Create Gradio interface that exposes web scraping tools as MCP functions.""" | |
| # Create individual interfaces for each tool | |
| scrape_interface = gr.Interface( | |
| fn=scrape_website_content, | |
| inputs=gr.Textbox( | |
| label="Website URL", | |
| placeholder="https://example.com or example.com" | |
| ), | |
| outputs=[ | |
| gr.Textbox( | |
| label="Scraped Content", | |
| lines=20, | |
| max_lines=50, | |
| show_copy_button=True, | |
| container=True | |
| ), | |
| gr.File(label="Download Markdown") | |
| ], | |
| title="Website Content Scraper", | |
| description="Extract and format website content as markdown", | |
| api_name="scrape_content" ) | |
| sitemap_interface = gr.Interface( | |
| fn=generate_sitemap_for_ui, | |
| inputs=gr.Textbox( | |
| label="Website URL", | |
| placeholder="https://example.com or example.com" | |
| ), | |
| outputs=[ | |
| gr.Textbox( | |
| label="Sitemap", | |
| lines=20, | |
| max_lines=50, | |
| show_copy_button=True, | |
| container=True | |
| ), | |
| gr.File(label="Download Sitemap") | |
| ], | |
| title="Website Sitemap Generator", | |
| description="Generate a sitemap of all links found on a webpage", | |
| api_name="generate_sitemap" | |
| ) | |
| bulk_extract_interface = gr.Interface( | |
| fn=extract_all_content_for_ui, | |
| inputs=gr.Textbox( | |
| label="Website URL", | |
| placeholder="https://example.com or example.com" | |
| ), | |
| outputs=[ | |
| gr.Textbox( | |
| label="Extraction Status", | |
| lines=10, | |
| max_lines=20, | |
| show_copy_button=True, | |
| container=True | |
| ), | |
| gr.File(label="Download ZIP Archive") | |
| ], | |
| title="Bulk Content Extractor", | |
| description="Extract text content from all internal links and download as ZIP", | |
| api_name="extract_all_content" ) | |
| # Enhanced sitemap interface with configurable limits | |
| sitemap_limited_interface = gr.Interface( | |
| fn=generate_sitemap_with_limit, | |
| inputs=[ | |
| gr.Textbox( | |
| label="Website URL", | |
| placeholder="https://example.com or example.com" | |
| ), | |
| gr.Number( | |
| label="Max Links Per Domain", | |
| value=0, | |
| info="Enter 0 to show all links, or a positive number to limit display per domain", | |
| minimum=0, | |
| maximum=1000 | |
| ) | |
| ], | |
| outputs=[ | |
| gr.Textbox( | |
| label="Sitemap", | |
| lines=20, | |
| max_lines=50, | |
| show_copy_button=True, | |
| container=True | |
| ), | |
| gr.File(label="Download Sitemap") | |
| ], | |
| title="Configurable Sitemap Generator", | |
| description="Generate a sitemap with optional display limits (0 = show all links)", | |
| api_name="generate_sitemap_limited" | |
| ) | |
| # Enhanced bulk extract interface with configurable limits | |
| bulk_limited_interface = gr.Interface( | |
| fn=extract_limited_content_as_zip, | |
| inputs=[ | |
| gr.Textbox( | |
| label="Website URL", | |
| placeholder="https://example.com or example.com" | |
| ), | |
| gr.Number( | |
| label="Max Pages to Extract", | |
| value=0, | |
| info="Enter 0 to process all pages, or a positive number to limit extraction", | |
| minimum=0, | |
| maximum=1000 | |
| ) | |
| ], | |
| outputs=[ | |
| gr.Textbox( | |
| label="Extraction Status", | |
| lines=10, | |
| max_lines=20, | |
| show_copy_button=True, | |
| container=True | |
| ), | |
| gr.File(label="Download ZIP Archive") | |
| ], | |
| title="Limited Bulk Content Extractor", | |
| description="Extract text content from internal links with optional processing limits (0 = extract all)", | |
| api_name="extract_limited_content" | |
| ) | |
| # Combine into tabbed interface | |
| demo = gr.TabbedInterface( | |
| [scrape_interface, sitemap_interface, sitemap_limited_interface, bulk_extract_interface, bulk_limited_interface], | |
| ["Content Scraper", "All Links Sitemap", "Limited Sitemap", "Bulk Extractor", "Limited Bulk Extractor"], | |
| title="🕷️ Web Scraper MCP Server" | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| # Create and launch the MCP server | |
| app = create_mcp_interface() | |
| app.launch( | |
| mcp_server=True | |
| ) | |