Spaces:
Runtime error
Runtime error
| """ | |
| Content Parsing Module | |
| Handles extraction of content from PDFs, text, and webpages | |
| """ | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from urllib.parse import urljoin, urlparse | |
| from typing import List, Dict, Any | |
| import time | |
| from langchain_community.document_loaders import PyPDFLoader | |
| from langchain.schema import Document | |
| class BaseParser: | |
| """Base class for all content parsers""" | |
| def __init__(self): | |
| self.supported_formats = [] | |
| def parse(self, source: str) -> List[Document]: | |
| """Parse content from source and return LangChain Documents""" | |
| raise NotImplementedError("Subclasses must implement parse method") | |
| def validate_source(self, source: str) -> bool: | |
| """Validate if the source can be processed""" | |
| return True | |
| class PDFParser(BaseParser): | |
| """Parser for PDF documents""" | |
| def __init__(self): | |
| super().__init__() | |
| self.supported_formats = ['.pdf'] | |
| def parse(self, pdf_path: str) -> List[Document]: | |
| """ | |
| Parse PDF file and return list of Document objects | |
| Args: | |
| pdf_path (str): Path to the PDF file | |
| Returns: | |
| List[Document]: List of parsed documents with metadata | |
| """ | |
| try: | |
| loader = PyPDFLoader(pdf_path) | |
| documents = loader.load_and_split() | |
| # Add additional metadata | |
| for i, doc in enumerate(documents): | |
| doc.metadata.update({ | |
| 'source_type': 'pdf', | |
| 'page_number': i + 1, | |
| 'total_pages': len(documents), | |
| 'parser': 'PDFParser' | |
| }) | |
| return documents | |
| except Exception as e: | |
| raise Exception(f"Error parsing PDF: {str(e)}") | |
| def get_pdf_metadata(self, pdf_path: str) -> Dict[str, Any]: | |
| """Extract metadata from PDF file""" | |
| try: | |
| loader = PyPDFLoader(pdf_path) | |
| documents = loader.load() | |
| total_pages = len(documents) | |
| total_words = sum(len(doc.page_content.split()) for doc in documents) | |
| return { | |
| 'total_pages': total_pages, | |
| 'total_words': total_words, | |
| 'average_words_per_page': total_words / total_pages if total_pages > 0 else 0, | |
| 'file_type': 'PDF', | |
| 'parser_used': 'PyPDFLoader' | |
| } | |
| except Exception as e: | |
| return {'error': f"Could not extract metadata: {str(e)}"} | |
| class TextParser(BaseParser): | |
| """Parser for plain text content""" | |
| def __init__(self): | |
| super().__init__() | |
| self.supported_formats = ['.txt', 'plain_text'] | |
| self.chunk_size = 1000 # Default chunk size for long texts | |
| def parse(self, text_content: str, chunk_size: int = None) -> List[Document]: | |
| """ | |
| Parse text content and return list of Document objects | |
| Args: | |
| text_content (str): Raw text content | |
| chunk_size (int): Optional chunk size for splitting long texts | |
| Returns: | |
| List[Document]: List of documents, potentially chunked | |
| """ | |
| try: | |
| if not text_content.strip(): | |
| raise ValueError("Empty text content provided") | |
| chunk_size = chunk_size or self.chunk_size | |
| # If text is short, return as single document | |
| if len(text_content) <= chunk_size: | |
| doc = Document( | |
| page_content=text_content, | |
| metadata={ | |
| 'source_type': 'text', | |
| 'word_count': len(text_content.split()), | |
| 'char_count': len(text_content), | |
| 'chunk_index': 0, | |
| 'total_chunks': 1, | |
| 'parser': 'TextParser' | |
| } | |
| ) | |
| return [doc] | |
| # Split long text into chunks | |
| chunks = self._split_text_into_chunks(text_content, chunk_size) | |
| documents = [] | |
| for i, chunk in enumerate(chunks): | |
| doc = Document( | |
| page_content=chunk, | |
| metadata={ | |
| 'source_type': 'text', | |
| 'word_count': len(chunk.split()), | |
| 'char_count': len(chunk), | |
| 'chunk_index': i, | |
| 'total_chunks': len(chunks), | |
| 'parser': 'TextParser' | |
| } | |
| ) | |
| documents.append(doc) | |
| return documents | |
| except Exception as e: | |
| raise Exception(f"Error parsing text: {str(e)}") | |
| def _split_text_into_chunks(self, text: str, chunk_size: int) -> List[str]: | |
| """Split text into chunks while preserving sentence boundaries""" | |
| sentences = text.split('. ') | |
| chunks = [] | |
| current_chunk = "" | |
| for sentence in sentences: | |
| # Add sentence to current chunk if it fits | |
| test_chunk = current_chunk + sentence + ". " | |
| if len(test_chunk) <= chunk_size: | |
| current_chunk = test_chunk | |
| else: | |
| # Start new chunk if current chunk has content | |
| if current_chunk.strip(): | |
| chunks.append(current_chunk.strip()) | |
| current_chunk = sentence + ". " | |
| # Add final chunk if it has content | |
| if current_chunk.strip(): | |
| chunks.append(current_chunk.strip()) | |
| return chunks | |
| def analyze_text_structure(self, text_content: str) -> Dict[str, Any]: | |
| """Analyze the structure and characteristics of text content""" | |
| try: | |
| lines = text_content.split('\n') | |
| words = text_content.split() | |
| sentences = text_content.split('.') | |
| # Count different elements | |
| paragraphs = [p.strip() for p in text_content.split('\n\n') if p.strip()] | |
| return { | |
| 'total_words': len(words), | |
| 'total_sentences': len([s for s in sentences if s.strip()]), | |
| 'total_lines': len(lines), | |
| 'total_paragraphs': len(paragraphs), | |
| 'average_words_per_sentence': len(words) / len(sentences) if sentences else 0, | |
| 'average_sentences_per_paragraph': len(sentences) / len(paragraphs) if paragraphs else 0, | |
| 'character_count': len(text_content), | |
| 'reading_time_minutes': len(words) / 200, # Assuming 200 words per minute | |
| 'complexity_score': self._calculate_text_complexity(text_content) | |
| } | |
| except Exception as e: | |
| return {'error': f"Could not analyze text structure: {str(e)}"} | |
| def _calculate_text_complexity(self, text: str) -> float: | |
| """Calculate a simple text complexity score""" | |
| words = text.split() | |
| sentences = [s for s in text.split('.') if s.strip()] | |
| if not sentences: | |
| return 0.0 | |
| # Average words per sentence (higher = more complex) | |
| avg_words_per_sentence = len(words) / len(sentences) | |
| # Average characters per word (higher = more complex) | |
| avg_chars_per_word = sum(len(word) for word in words) / len(words) if words else 0 | |
| # Simple complexity score (normalized to 1-10 scale) | |
| complexity = (avg_words_per_sentence * 0.1) + (avg_chars_per_word * 0.5) | |
| return min(complexity, 10.0) | |
| class WebpageParser(BaseParser): | |
| """Parser for web content""" | |
| def __init__(self): | |
| super().__init__() | |
| self.supported_formats = ['http', 'https'] | |
| self.headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| } | |
| self.timeout = 10 | |
| self.max_retries = 3 | |
| def parse_website(self, url: str, max_pages: int = 1, include_subpages: bool = False) -> List[Dict[str, Any]]: | |
| """ | |
| Parse website content and return structured data | |
| Args: | |
| url (str): Website URL to parse | |
| max_pages (int): Maximum number of pages to parse | |
| include_subpages (bool): Whether to include subpages | |
| Returns: | |
| List[Dict]: List of page data with content and metadata | |
| """ | |
| try: | |
| pages_data = [] | |
| urls_to_process = [url] | |
| processed_urls = set() | |
| # If including subpages, find additional URLs | |
| if include_subpages and max_pages > 1: | |
| subpage_urls = self._find_subpages(url, max_pages - 1) | |
| urls_to_process.extend(subpage_urls) | |
| # Process each URL | |
| for current_url in urls_to_process[:max_pages]: | |
| if current_url in processed_urls: | |
| continue | |
| page_data = self._parse_single_page(current_url) | |
| if page_data: | |
| pages_data.append(page_data) | |
| processed_urls.add(current_url) | |
| # Add small delay to be respectful | |
| time.sleep(1) | |
| return pages_data | |
| except Exception as e: | |
| raise Exception(f"Error parsing website: {str(e)}") | |
| def _parse_single_page(self, url: str) -> Dict[str, Any]: | |
| """Parse a single webpage and extract content""" | |
| try: | |
| # Make request with retries | |
| response = None | |
| for attempt in range(self.max_retries): | |
| try: | |
| response = requests.get(url, headers=self.headers, timeout=self.timeout) | |
| response.raise_for_status() | |
| break | |
| except requests.RequestException as e: | |
| if attempt == self.max_retries - 1: | |
| raise e | |
| time.sleep(2 ** attempt) # Exponential backoff | |
| if not response: | |
| return None | |
| # Parse HTML content | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # Remove unwanted elements | |
| for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']): | |
| element.decompose() | |
| # Extract main content | |
| main_content = self._extract_main_content(soup) | |
| # Extract metadata | |
| title = self._extract_title(soup) | |
| description = self._extract_description(soup) | |
| headings = self._extract_headings(soup) | |
| links = self._extract_links(soup, url) | |
| # Clean and process text | |
| cleaned_text = self._clean_text_content(main_content) | |
| return { | |
| 'url': url, | |
| 'title': title, | |
| 'description': description, | |
| 'content': cleaned_text, | |
| 'headings': headings, | |
| 'internal_links': links['internal'], | |
| 'external_links': links['external'], | |
| 'word_count': len(cleaned_text.split()), | |
| 'char_count': len(cleaned_text), | |
| 'meta_keywords': self._extract_meta_keywords(soup), | |
| 'images': self._extract_images(soup, url), | |
| 'parser': 'WebpageParser', | |
| 'parsed_at': time.strftime('%Y-%m-%d %H:%M:%S') | |
| } | |
| except Exception as e: | |
| return {'url': url, 'error': f"Failed to parse page: {str(e)}"} | |
| def _extract_main_content(self, soup: BeautifulSoup) -> str: | |
| """Extract the main content from the page""" | |
| # Try to find main content in order of preference | |
| content_selectors = [ | |
| 'main', | |
| 'article', | |
| '[role="main"]', | |
| '.content', | |
| '.main-content', | |
| '#content', | |
| '#main', | |
| '.post-content', | |
| '.entry-content' | |
| ] | |
| for selector in content_selectors: | |
| element = soup.select_one(selector) | |
| if element: | |
| return element.get_text(separator=' ', strip=True) | |
| # Fallback to body content | |
| body = soup.find('body') | |
| if body: | |
| return body.get_text(separator=' ', strip=True) | |
| return soup.get_text(separator=' ', strip=True) | |
| def _extract_title(self, soup: BeautifulSoup) -> str: | |
| """Extract page title""" | |
| title_tag = soup.find('title') | |
| if title_tag: | |
| return title_tag.get_text().strip() | |
| # Fallback to h1 | |
| h1 = soup.find('h1') | |
| if h1: | |
| return h1.get_text().strip() | |
| return "No Title Found" | |
| def _extract_description(self, soup: BeautifulSoup) -> str: | |
| """Extract meta description""" | |
| meta_desc = soup.find('meta', attrs={'name': 'description'}) | |
| if meta_desc and meta_desc.get('content'): | |
| return meta_desc['content'].strip() | |
| # Fallback to Open Graph description | |
| og_desc = soup.find('meta', attrs={'property': 'og:description'}) | |
| if og_desc and og_desc.get('content'): | |
| return og_desc['content'].strip() | |
| return "No Description Found" | |
| def _extract_headings(self, soup: BeautifulSoup) -> List[Dict[str, Any]]: | |
| """Extract all headings with their hierarchy""" | |
| headings = [] | |
| for i in range(1, 7): # h1 to h6 | |
| for heading in soup.find_all(f'h{i}'): | |
| text = heading.get_text(strip=True) | |
| if text: | |
| headings.append({ | |
| 'level': i, | |
| 'text': text, | |
| 'id': heading.get('id', ''), | |
| 'class': heading.get('class', []) | |
| }) | |
| return headings | |
| def _extract_links(self, soup: BeautifulSoup, base_url: str) -> Dict[str, List[str]]: | |
| """Extract internal and external links""" | |
| internal_links = [] | |
| external_links = [] | |
| base_domain = urlparse(base_url).netloc | |
| for link in soup.find_all('a', href=True): | |
| href = link['href'] | |
| full_url = urljoin(base_url, href) | |
| parsed_url = urlparse(full_url) | |
| if parsed_url.netloc == base_domain: | |
| internal_links.append(full_url) | |
| elif parsed_url.netloc: # External link with domain | |
| external_links.append(full_url) | |
| return { | |
| 'internal': list(set(internal_links)), | |
| 'external': list(set(external_links)) | |
| } | |
| def _extract_meta_keywords(self, soup: BeautifulSoup) -> List[str]: | |
| """Extract meta keywords if available""" | |
| meta_keywords = soup.find('meta', attrs={'name': 'keywords'}) | |
| if meta_keywords and meta_keywords.get('content'): | |
| keywords = meta_keywords['content'].split(',') | |
| return [kw.strip() for kw in keywords if kw.strip()] | |
| return [] | |
| def _extract_images(self, soup: BeautifulSoup, base_url: str) -> List[Dict[str, str]]: | |
| """Extract image information""" | |
| images = [] | |
| for img in soup.find_all('img'): | |
| src = img.get('src') | |
| if src: | |
| full_url = urljoin(base_url, src) | |
| images.append({ | |
| 'src': full_url, | |
| 'alt': img.get('alt', ''), | |
| 'title': img.get('title', '') | |
| }) | |
| return images | |
| def _clean_text_content(self, text: str) -> str: | |
| """Clean and normalize text content""" | |
| if not text: | |
| return "" | |
| # Split into lines and clean each line | |
| lines = text.split('\n') | |
| cleaned_lines = [] | |
| for line in lines: | |
| line = line.strip() | |
| if line and len(line) > 1: # Skip empty lines and single characters | |
| cleaned_lines.append(line) | |
| # Join lines with single spaces | |
| cleaned_text = ' '.join(cleaned_lines) | |
| # Remove multiple spaces | |
| while ' ' in cleaned_text: | |
| cleaned_text = cleaned_text.replace(' ', ' ') | |
| return cleaned_text | |
| def _find_subpages(self, url: str, max_subpages: int) -> List[str]: | |
| """Find subpages from the main page""" | |
| try: | |
| response = requests.get(url, headers=self.headers, timeout=self.timeout) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| base_domain = urlparse(url).netloc | |
| subpages = set() | |
| # Find internal links | |
| for link in soup.find_all('a', href=True): | |
| href = link['href'] | |
| full_url = urljoin(url, href) | |
| parsed_url = urlparse(full_url) | |
| # Only include internal links from same domain | |
| if (parsed_url.netloc == base_domain and | |
| full_url != url and | |
| not any(ext in full_url.lower() for ext in ['.pdf', '.jpg', '.png', '.gif', '.zip'])): | |
| subpages.add(full_url) | |
| if len(subpages) >= max_subpages: | |
| break | |
| return list(subpages)[:max_subpages] | |
| except Exception: | |
| return [] | |
| def validate_url(self, url: str) -> bool: | |
| """Validate if URL is accessible""" | |
| try: | |
| response = requests.head(url, headers=self.headers, timeout=5) | |
| return response.status_code == 200 | |
| except: | |
| return False | |
| def get_website_info(self, url: str) -> Dict[str, Any]: | |
| """Get basic information about a website""" | |
| try: | |
| response = requests.get(url, headers=self.headers, timeout=self.timeout) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| return { | |
| 'url': url, | |
| 'title': self._extract_title(soup), | |
| 'description': self._extract_description(soup), | |
| 'meta_keywords': self._extract_meta_keywords(soup), | |
| 'has_robots_meta': bool(soup.find('meta', attrs={'name': 'robots'})), | |
| 'has_viewport_meta': bool(soup.find('meta', attrs={'name': 'viewport'})), | |
| 'language': soup.get('lang', 'unknown'), | |
| 'status_code': response.status_code, | |
| 'content_type': response.headers.get('content-type', 'unknown'), | |
| 'server': response.headers.get('server', 'unknown') | |
| } | |
| except Exception as e: | |
| return {'url': url, 'error': f"Could not get website info: {str(e)}"} | |
| class ParserFactory: | |
| """Factory class to create appropriate parsers""" | |
| def get_parser(source_type: str): | |
| """Get the appropriate parser for the source type""" | |
| parsers = { | |
| 'pdf': PDFParser(), | |
| 'text': TextParser(), | |
| 'webpage': WebpageParser(), | |
| 'url': WebpageParser() | |
| } | |
| return parsers.get(source_type.lower()) | |
| def detect_source_type(source: str) -> str: | |
| """Detect the type of content source""" | |
| if source.startswith(('http://', 'https://')): | |
| return 'webpage' | |
| elif source.endswith('.pdf'): | |
| return 'pdf' | |
| else: | |
| return 'text' |