| """ |
| AI-Powered Web Scraper - app.py |
| Professional-grade web content extraction and AI summarization tool for Hugging Face Spaces |
| """ |
|
|
| import gradio as gr |
| import requests |
| from bs4 import BeautifulSoup |
| from urllib.parse import urljoin, urlparse |
| import pandas as pd |
| from datetime import datetime |
| import json |
| import re |
| import time |
| from typing import List, Dict, Optional, Tuple |
| import logging |
| from pathlib import Path |
| import os |
| from dataclasses import dataclass |
| from transformers import pipeline |
| import nltk |
| from nltk.tokenize import sent_tokenize |
| import asyncio |
| import aiohttp |
| from concurrent.futures import ThreadPoolExecutor |
| import hashlib |
|
|
| |
| try: |
| nltk.data.find('tokenizers/punkt') |
| except LookupError: |
| nltk.download('punkt', quiet=True) |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| @dataclass |
| class ScrapedContent: |
| """Data class for scraped content with metadata""" |
| url: str |
| title: str |
| content: str |
| summary: str |
| word_count: int |
| reading_time: int |
| extracted_at: str |
| author: Optional[str] = None |
| publish_date: Optional[str] = None |
| meta_description: Optional[str] = None |
| keywords: List[str] = None |
|
|
| class SecurityValidator: |
| """Security validation for URLs and content""" |
| |
| ALLOWED_SCHEMES = {'http', 'https'} |
| BLOCKED_DOMAINS = { |
| 'localhost', '127.0.0.1', '0.0.0.0', |
| '192.168.', '10.', '172.16.', '172.17.', |
| '172.18.', '172.19.', '172.20.', '172.21.', |
| '172.22.', '172.23.', '172.24.', '172.25.', |
| '172.26.', '172.27.', '172.28.', '172.29.', |
| '172.30.', '172.31.' |
| } |
| |
| @classmethod |
| def validate_url(cls, url: str) -> Tuple[bool, str]: |
| """Validate URL for security concerns""" |
| try: |
| parsed = urlparse(url) |
| |
| |
| if parsed.scheme not in cls.ALLOWED_SCHEMES: |
| return False, f"Invalid scheme: {parsed.scheme}. Only HTTP/HTTPS allowed." |
| |
| |
| hostname = parsed.hostname or '' |
| if any(blocked in hostname for blocked in cls.BLOCKED_DOMAINS): |
| return False, "Access to internal/local networks is not allowed." |
| |
| |
| if not parsed.netloc: |
| return False, "Invalid URL format." |
| |
| return True, "URL is valid." |
| |
| except Exception as e: |
| return False, f"URL validation error: {str(e)}" |
|
|
| class RobotsTxtChecker: |
| """Check robots.txt compliance""" |
| |
| @staticmethod |
| def can_fetch(url: str, user_agent: str = "*") -> bool: |
| """Check if URL can be fetched according to robots.txt""" |
| try: |
| parsed_url = urlparse(url) |
| robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt" |
| |
| response = requests.get(robots_url, timeout=5) |
| if response.status_code == 200: |
| |
| lines = response.text.split('\n') |
| user_agent_section = False |
| |
| for line in lines: |
| line = line.strip() |
| if line.startswith('User-agent:'): |
| agent = line.split(':', 1)[1].strip() |
| user_agent_section = agent == '*' or agent.lower() == user_agent.lower() |
| elif user_agent_section and line.startswith('Disallow:'): |
| disallowed = line.split(':', 1)[1].strip() |
| if disallowed and url.endswith(disallowed): |
| return False |
| |
| return True |
| |
| except Exception: |
| |
| return True |
|
|
| class ContentExtractor: |
| """Advanced content extraction with multiple strategies""" |
| |
| def __init__(self): |
| self.session = requests.Session() |
| self.session.headers.update({ |
| 'User-Agent': 'Mozilla/5.0 (compatible; AI-WebScraper/1.0; Research Tool)', |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', |
| 'Accept-Language': 'en-US,en;q=0.5', |
| 'Accept-Encoding': 'gzip, deflate', |
| 'Connection': 'keep-alive', |
| 'Upgrade-Insecure-Requests': '1', |
| }) |
| |
| def extract_content(self, url: str) -> Optional[ScrapedContent]: |
| """Extract content from URL with robust error handling""" |
| try: |
| |
| is_valid, validation_msg = SecurityValidator.validate_url(url) |
| if not is_valid: |
| raise ValueError(f"Security validation failed: {validation_msg}") |
| |
| |
| if not RobotsTxtChecker.can_fetch(url): |
| raise ValueError("robots.txt disallows scraping this URL") |
| |
| |
| response = self.session.get(url, timeout=15) |
| response.raise_for_status() |
| |
| |
| soup = BeautifulSoup(response.content, 'html.parser') |
| |
| |
| title = self._extract_title(soup) |
| author = self._extract_author(soup) |
| publish_date = self._extract_publish_date(soup) |
| meta_description = self._extract_meta_description(soup) |
| |
| |
| content = self._extract_main_content(soup) |
| |
| if not content or len(content.strip()) < 100: |
| raise ValueError("Insufficient content extracted") |
| |
| |
| word_count = len(content.split()) |
| reading_time = max(1, word_count // 200) |
| |
| |
| keywords = self._extract_keywords(content) |
| |
| return ScrapedContent( |
| url=url, |
| title=title, |
| content=content, |
| summary="", |
| word_count=word_count, |
| reading_time=reading_time, |
| extracted_at=datetime.now().isoformat(), |
| author=author, |
| publish_date=publish_date, |
| meta_description=meta_description, |
| keywords=keywords |
| ) |
| |
| except Exception as e: |
| logger.error(f"Content extraction failed for {url}: {str(e)}") |
| raise |
| |
| def _extract_title(self, soup: BeautifulSoup) -> str: |
| """Extract page title with fallbacks""" |
| |
| og_title = soup.find('meta', property='og:title') |
| if og_title and og_title.get('content'): |
| return og_title['content'].strip() |
| |
| |
| title_tag = soup.find('title') |
| if title_tag: |
| return title_tag.get_text().strip() |
| |
| |
| h1_tag = soup.find('h1') |
| if h1_tag: |
| return h1_tag.get_text().strip() |
| |
| return "No title found" |
| |
| def _extract_author(self, soup: BeautifulSoup) -> Optional[str]: |
| """Extract author information""" |
| |
| author_selectors = [ |
| 'meta[name="author"]', |
| 'meta[property="article:author"]', |
| '.author', |
| '.byline', |
| '[rel="author"]' |
| ] |
| |
| for selector in author_selectors: |
| element = soup.select_one(selector) |
| if element: |
| if element.name == 'meta': |
| return element.get('content', '').strip() |
| else: |
| return element.get_text().strip() |
| |
| return None |
| |
| def _extract_publish_date(self, soup: BeautifulSoup) -> Optional[str]: |
| """Extract publication date""" |
| date_selectors = [ |
| 'meta[property="article:published_time"]', |
| 'meta[name="publishdate"]', |
| 'time[datetime]', |
| '.publish-date', |
| '.date' |
| ] |
| |
| for selector in date_selectors: |
| element = soup.select_one(selector) |
| if element: |
| if element.name == 'meta': |
| return element.get('content', '').strip() |
| elif element.name == 'time': |
| return element.get('datetime', '').strip() |
| else: |
| return element.get_text().strip() |
| |
| return None |
| |
| def _extract_meta_description(self, soup: BeautifulSoup) -> Optional[str]: |
| """Extract meta description""" |
| meta_desc = soup.find('meta', attrs={'name': 'description'}) |
| if meta_desc: |
| return meta_desc.get('content', '').strip() |
| |
| og_desc = soup.find('meta', property='og:description') |
| if og_desc: |
| return og_desc.get('content', '').strip() |
| |
| return None |
| |
| def _extract_main_content(self, soup: BeautifulSoup) -> str: |
| """Extract main content with multiple strategies""" |
| |
| for element in soup(['script', 'style', 'nav', 'header', 'footer', |
| 'aside', 'advertisement', '.ads', '.sidebar']): |
| element.decompose() |
| |
| |
| content_selectors = [ |
| 'article', |
| 'main', |
| '.content', |
| '.post-content', |
| '.entry-content', |
| '.article-body', |
| '#content', |
| '.story-body' |
| ] |
| |
| for selector in content_selectors: |
| element = soup.select_one(selector) |
| if element: |
| text = element.get_text(separator=' ', strip=True) |
| if len(text) > 200: |
| return self._clean_text(text) |
| |
| |
| body = soup.find('body') |
| if body: |
| text = body.get_text(separator=' ', strip=True) |
| return self._clean_text(text) |
| |
| |
| return self._clean_text(soup.get_text(separator=' ', strip=True)) |
| |
| def _clean_text(self, text: str) -> str: |
| """Clean extracted text""" |
| |
| text = re.sub(r'\s+', ' ', text) |
| |
| |
| text = re.sub(r'Subscribe.*?newsletter', '', text, flags=re.IGNORECASE) |
| text = re.sub(r'Click here.*?more', '', text, flags=re.IGNORECASE) |
| text = re.sub(r'Advertisement', '', text, flags=re.IGNORECASE) |
| |
| return text.strip() |
| |
| def _extract_keywords(self, content: str) -> List[str]: |
| """Extract basic keywords from content""" |
| |
| words = re.findall(r'\b[A-Za-z]{4,}\b', content.lower()) |
| word_freq = {} |
| |
| for word in words: |
| if word not in ['that', 'this', 'with', 'from', 'they', 'have', 'been', 'were', 'said']: |
| word_freq[word] = word_freq.get(word, 0) + 1 |
| |
| |
| sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True) |
| return [word for word, freq in sorted_words[:10]] |
|
|
| class AISummarizer: |
| """AI-powered content summarization""" |
| |
| def __init__(self): |
| self.summarizer = None |
| self._load_model() |
| |
| def _load_model(self): |
| """Load summarization model with error handling""" |
| try: |
| self.summarizer = pipeline( |
| "summarization", |
| model="facebook/bart-large-cnn", |
| tokenizer="facebook/bart-large-cnn" |
| ) |
| logger.info("Summarization model loaded successfully") |
| except Exception as e: |
| logger.error(f"Failed to load summarization model: {e}") |
| |
| try: |
| self.summarizer = pipeline( |
| "summarization", |
| model="sshleifer/distilbart-cnn-12-6" |
| ) |
| logger.info("Fallback summarization model loaded") |
| except Exception as e2: |
| logger.error(f"Failed to load fallback model: {e2}") |
| self.summarizer = None |
| |
| def summarize(self, content: str, max_length: int = 300) -> str: |
| """Generate AI summary of content""" |
| if not self.summarizer: |
| return self._extractive_summary(content) |
| |
| try: |
| |
| max_input_length = 1024 |
| chunks = self._split_content(content, max_input_length) |
| |
| summaries = [] |
| for chunk in chunks: |
| if len(chunk.split()) < 20: |
| continue |
| |
| result = self.summarizer( |
| chunk, |
| max_length=min(max_length, len(chunk.split()) // 2), |
| min_length=30, |
| do_sample=False |
| ) |
| summaries.append(result[0]['summary_text']) |
| |
| |
| combined = ' '.join(summaries) |
| |
| |
| if len(combined.split()) > max_length: |
| result = self.summarizer( |
| combined, |
| max_length=max_length, |
| min_length=50, |
| do_sample=False |
| ) |
| return result[0]['summary_text'] |
| |
| return combined |
| |
| except Exception as e: |
| logger.error(f"AI summarization failed: {e}") |
| return self._extractive_summary(content) |
| |
| def _split_content(self, content: str, max_length: int) -> List[str]: |
| """Split content into manageable chunks""" |
| sentences = sent_tokenize(content) |
| chunks = [] |
| current_chunk = [] |
| current_length = 0 |
| |
| for sentence in sentences: |
| sentence_length = len(sentence.split()) |
| if current_length + sentence_length > max_length and current_chunk: |
| chunks.append(' '.join(current_chunk)) |
| current_chunk = [sentence] |
| current_length = sentence_length |
| else: |
| current_chunk.append(sentence) |
| current_length += sentence_length |
| |
| if current_chunk: |
| chunks.append(' '.join(current_chunk)) |
| |
| return chunks |
| |
| def _extractive_summary(self, content: str) -> str: |
| """Fallback extractive summarization""" |
| sentences = sent_tokenize(content) |
| if len(sentences) <= 3: |
| return content |
| |
| |
| summary_sentences = [ |
| sentences[0], |
| sentences[len(sentences) // 2], |
| sentences[-1] |
| ] |
| |
| return ' '.join(summary_sentences) |
|
|
| class WebScraperApp: |
| """Main application class""" |
| |
| def __init__(self): |
| self.extractor = ContentExtractor() |
| self.summarizer = AISummarizer() |
| self.scraped_data = [] |
| |
| def process_url(self, url: str, summary_length: int = 300) -> Tuple[str, str, str, str]: |
| """Process a single URL and return results""" |
| try: |
| if not url.strip(): |
| return "β Error", "Please enter a valid URL", "", "" |
| |
| |
| if not url.startswith(('http://', 'https://')): |
| url = 'https://' + url |
| |
| |
| with gr.update(): |
| scraped_content = self.extractor.extract_content(url) |
| |
| |
| summary = self.summarizer.summarize(scraped_content.content, summary_length) |
| scraped_content.summary = summary |
| |
| |
| self.scraped_data.append(scraped_content) |
| |
| |
| metadata = f""" |
| **π Content Analysis** |
| - **Title:** {scraped_content.title} |
| - **Author:** {scraped_content.author or 'Not found'} |
| - **Published:** {scraped_content.publish_date or 'Not found'} |
| - **Word Count:** {scraped_content.word_count:,} |
| - **Reading Time:** {scraped_content.reading_time} minutes |
| - **Extracted:** {scraped_content.extracted_at} |
| """ |
| |
| keywords_text = f"**π·οΈ Keywords:** {', '.join(scraped_content.keywords[:10])}" if scraped_content.keywords else "" |
| |
| return ( |
| "β
Success", |
| metadata, |
| f"**π AI Summary ({len(summary.split())} words):**\n\n{summary}", |
| keywords_text |
| ) |
| |
| except Exception as e: |
| error_msg = f"Failed to process URL: {str(e)}" |
| logger.error(error_msg) |
| return "β Error", error_msg, "", "" |
| |
| def export_data(self, format_type: str) -> str: |
| """Export scraped data to file""" |
| if not self.scraped_data: |
| return "No data to export" |
| |
| try: |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| |
| if format_type == "CSV": |
| filename = f"scraped_data_{timestamp}.csv" |
| df = pd.DataFrame([ |
| { |
| 'URL': item.url, |
| 'Title': item.title, |
| 'Author': item.author, |
| 'Published': item.publish_date, |
| 'Word Count': item.word_count, |
| 'Reading Time': item.reading_time, |
| 'Summary': item.summary, |
| 'Keywords': ', '.join(item.keywords) if item.keywords else '', |
| 'Extracted At': item.extracted_at |
| } |
| for item in self.scraped_data |
| ]) |
| df.to_csv(filename, index=False) |
| |
| elif format_type == "JSON": |
| filename = f"scraped_data_{timestamp}.json" |
| data = [ |
| { |
| 'url': item.url, |
| 'title': item.title, |
| 'content': item.content, |
| 'summary': item.summary, |
| 'metadata': { |
| 'author': item.author, |
| 'publish_date': item.publish_date, |
| 'word_count': item.word_count, |
| 'reading_time': item.reading_time, |
| 'keywords': item.keywords, |
| 'extracted_at': item.extracted_at |
| } |
| } |
| for item in self.scraped_data |
| ] |
| with open(filename, 'w', encoding='utf-8') as f: |
| json.dump(data, f, indent=2, ensure_ascii=False) |
| |
| return filename |
| |
| except Exception as e: |
| logger.error(f"Export failed: {e}") |
| return f"Export failed: {str(e)}" |
| |
| def clear_data(self) -> str: |
| """Clear all scraped data""" |
| self.scraped_data.clear() |
| return "Data cleared successfully" |
|
|
| def create_interface(): |
| """Create the Gradio interface""" |
| app = WebScraperApp() |
| |
| |
| custom_css = """ |
| .gradio-container { |
| max-width: 1200px; |
| margin: auto; |
| } |
| .main-header { |
| text-align: center; |
| background: linear-gradient(90deg, #667eea 0%, #764ba2 100%); |
| color: white; |
| padding: 2rem; |
| border-radius: 10px; |
| margin-bottom: 2rem; |
| } |
| .feature-box { |
| background: #f8f9fa; |
| border: 1px solid #e9ecef; |
| border-radius: 8px; |
| padding: 1.5rem; |
| margin: 1rem 0; |
| } |
| .status-success { |
| color: #28a745; |
| font-weight: bold; |
| } |
| .status-error { |
| color: #dc3545; |
| font-weight: bold; |
| } |
| """ |
| |
| with gr.Blocks(css=custom_css, title="AI Web Scraper") as interface: |
| |
| |
| gr.HTML(""" |
| <div class="main-header"> |
| <h1>π€ AI-Powered Web Scraper</h1> |
| <p>Professional content extraction and summarization for journalists, analysts, and researchers</p> |
| </div> |
| """) |
| |
| |
| with gr.Row(): |
| with gr.Column(scale=2): |
| |
| gr.HTML("<div class='feature-box'><h3>π‘ Content Extraction</h3></div>") |
| |
| url_input = gr.Textbox( |
| label="Enter URL to scrape", |
| placeholder="https://example.com/article", |
| lines=1 |
| ) |
| |
| with gr.Row(): |
| summary_length = gr.Slider( |
| minimum=100, |
| maximum=500, |
| value=300, |
| step=50, |
| label="Summary Length (words)" |
| ) |
| |
| scrape_btn = gr.Button("π Extract & Summarize", variant="primary", size="lg") |
| |
| |
| gr.HTML("<div class='feature-box'><h3>π Results</h3></div>") |
| |
| status_output = gr.Textbox(label="Status", lines=1, interactive=False) |
| metadata_output = gr.Markdown(label="Metadata") |
| summary_output = gr.Markdown(label="AI Summary") |
| keywords_output = gr.Markdown(label="Keywords") |
| |
| with gr.Column(scale=1): |
| |
| gr.HTML("<div class='feature-box'><h3>πΎ Export Options</h3></div>") |
| |
| export_format = gr.Radio( |
| choices=["CSV", "JSON"], |
| label="Export Format", |
| value="CSV" |
| ) |
| |
| export_btn = gr.Button("π₯ Export Data", variant="secondary") |
| export_status = gr.Textbox(label="Export Status", lines=2, interactive=False) |
| |
| gr.HTML("<div class='feature-box'><h3>π§Ή Data Management</h3></div>") |
| clear_btn = gr.Button("ποΈ Clear All Data", variant="secondary") |
| clear_status = gr.Textbox(label="Clear Status", lines=1, interactive=False) |
| |
| |
| with gr.Accordion("π Usage Instructions", open=False): |
| gr.Markdown(""" |
| ### How to Use This Tool |
| |
| 1. **Enter URL**: Paste the URL of the article or webpage you want to analyze |
| 2. **Adjust Settings**: Set your preferred summary length |
| 3. **Extract Content**: Click "Extract & Summarize" to process the content |
| 4. **Review Results**: View the extracted metadata, AI summary, and keywords |
| 5. **Export Data**: Save your results in CSV or JSON format |
| |
| ### Features |
| - π‘οΈ **Security**: Built-in URL validation and robots.txt compliance |
| - π€ **AI Summarization**: Advanced BART model for intelligent summarization |
| - π **Rich Metadata**: Author, publication date, reading time, and more |
| - π·οΈ **Keyword Extraction**: Automatic identification of key terms |
| - πΎ **Export Options**: CSV and JSON formats for further analysis |
| - π **Batch Processing**: Process multiple URLs and export all results |
| |
| ### Supported Content |
| - News articles and blog posts |
| - Research papers and reports |
| - Documentation and guides |
| - Most HTML-based content |
| |
| ### Limitations |
| - Respects robots.txt restrictions |
| - Cannot access password-protected content |
| - Some dynamic content may not be captured |
| - Processing time varies with content length |
| """) |
| |
| |
| scrape_btn.click( |
| fn=app.process_url, |
| inputs=[url_input, summary_length], |
| outputs=[status_output, metadata_output, summary_output, keywords_output] |
| ) |
| |
| export_btn.click( |
| fn=app.export_data, |
| inputs=[export_format], |
| outputs=[export_status] |
| ) |
| |
| clear_btn.click( |
| fn=app.clear_data, |
| outputs=[clear_status] |
| ) |
| |
| return interface |
|
|
| |
| if __name__ == "__main__": |
| interface = create_interface() |
| interface.launch( |
| server_name="0.0.0.0", |
| server_port=7860, |
| share=False, |
| show_error=True |
| ) |