|
|
"""
|
|
|
Content Parsing Module
|
|
|
Handles extraction of content from PDFs, text, and webpages
|
|
|
"""
|
|
|
|
|
|
import requests
|
|
|
from bs4 import BeautifulSoup
|
|
|
from urllib.parse import urljoin, urlparse
|
|
|
from typing import List, Dict, Any
|
|
|
import time
|
|
|
from langchain_community.document_loaders import PyPDFLoader
|
|
|
from langchain.schema import Document
|
|
|
|
|
|
|
|
|
class BaseParser:
|
|
|
"""Base class for all content parsers"""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.supported_formats = []
|
|
|
|
|
|
def parse(self, source: str) -> List[Document]:
|
|
|
"""Parse content from source and return LangChain Documents"""
|
|
|
raise NotImplementedError("Subclasses must implement parse method")
|
|
|
|
|
|
def validate_source(self, source: str) -> bool:
|
|
|
"""Validate if the source can be processed"""
|
|
|
return True
|
|
|
|
|
|
|
|
|
class PDFParser(BaseParser):
|
|
|
"""Parser for PDF documents"""
|
|
|
|
|
|
def __init__(self):
|
|
|
super().__init__()
|
|
|
self.supported_formats = ['.pdf']
|
|
|
|
|
|
def parse(self, pdf_path: str) -> List[Document]:
|
|
|
"""
|
|
|
Parse PDF file and return list of Document objects
|
|
|
|
|
|
Args:
|
|
|
pdf_path (str): Path to the PDF file
|
|
|
|
|
|
Returns:
|
|
|
List[Document]: List of parsed documents with metadata
|
|
|
"""
|
|
|
try:
|
|
|
loader = PyPDFLoader(pdf_path)
|
|
|
documents = loader.load_and_split()
|
|
|
|
|
|
|
|
|
for i, doc in enumerate(documents):
|
|
|
doc.metadata.update({
|
|
|
'source_type': 'pdf',
|
|
|
'page_number': i + 1,
|
|
|
'total_pages': len(documents),
|
|
|
'parser': 'PDFParser'
|
|
|
})
|
|
|
|
|
|
return documents
|
|
|
|
|
|
except Exception as e:
|
|
|
raise Exception(f"Error parsing PDF: {str(e)}")
|
|
|
|
|
|
def get_pdf_metadata(self, pdf_path: str) -> Dict[str, Any]:
|
|
|
"""Extract metadata from PDF file"""
|
|
|
try:
|
|
|
loader = PyPDFLoader(pdf_path)
|
|
|
documents = loader.load()
|
|
|
|
|
|
total_pages = len(documents)
|
|
|
total_words = sum(len(doc.page_content.split()) for doc in documents)
|
|
|
|
|
|
return {
|
|
|
'total_pages': total_pages,
|
|
|
'total_words': total_words,
|
|
|
'average_words_per_page': total_words / total_pages if total_pages > 0 else 0,
|
|
|
'file_type': 'PDF',
|
|
|
'parser_used': 'PyPDFLoader'
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'error': f"Could not extract metadata: {str(e)}"}
|
|
|
|
|
|
|
|
|
class TextParser(BaseParser):
|
|
|
"""Parser for plain text content"""
|
|
|
|
|
|
def __init__(self):
|
|
|
super().__init__()
|
|
|
self.supported_formats = ['.txt', 'plain_text']
|
|
|
self.chunk_size = 1000
|
|
|
|
|
|
def parse(self, text_content: str, chunk_size: int = None) -> List[Document]:
|
|
|
"""
|
|
|
Parse text content and return list of Document objects
|
|
|
|
|
|
Args:
|
|
|
text_content (str): Raw text content
|
|
|
chunk_size (int): Optional chunk size for splitting long texts
|
|
|
|
|
|
Returns:
|
|
|
List[Document]: List of documents, potentially chunked
|
|
|
"""
|
|
|
try:
|
|
|
if not text_content.strip():
|
|
|
raise ValueError("Empty text content provided")
|
|
|
|
|
|
chunk_size = chunk_size or self.chunk_size
|
|
|
|
|
|
|
|
|
if len(text_content) <= chunk_size:
|
|
|
doc = Document(
|
|
|
page_content=text_content,
|
|
|
metadata={
|
|
|
'source_type': 'text',
|
|
|
'word_count': len(text_content.split()),
|
|
|
'char_count': len(text_content),
|
|
|
'chunk_index': 0,
|
|
|
'total_chunks': 1,
|
|
|
'parser': 'TextParser'
|
|
|
}
|
|
|
)
|
|
|
return [doc]
|
|
|
|
|
|
|
|
|
chunks = self._split_text_into_chunks(text_content, chunk_size)
|
|
|
documents = []
|
|
|
|
|
|
for i, chunk in enumerate(chunks):
|
|
|
doc = Document(
|
|
|
page_content=chunk,
|
|
|
metadata={
|
|
|
'source_type': 'text',
|
|
|
'word_count': len(chunk.split()),
|
|
|
'char_count': len(chunk),
|
|
|
'chunk_index': i,
|
|
|
'total_chunks': len(chunks),
|
|
|
'parser': 'TextParser'
|
|
|
}
|
|
|
)
|
|
|
documents.append(doc)
|
|
|
|
|
|
return documents
|
|
|
|
|
|
except Exception as e:
|
|
|
raise Exception(f"Error parsing text: {str(e)}")
|
|
|
|
|
|
def _split_text_into_chunks(self, text: str, chunk_size: int) -> List[str]:
|
|
|
"""Split text into chunks while preserving sentence boundaries"""
|
|
|
sentences = text.split('. ')
|
|
|
chunks = []
|
|
|
current_chunk = ""
|
|
|
|
|
|
for sentence in sentences:
|
|
|
|
|
|
test_chunk = current_chunk + sentence + ". "
|
|
|
|
|
|
if len(test_chunk) <= chunk_size:
|
|
|
current_chunk = test_chunk
|
|
|
else:
|
|
|
|
|
|
if current_chunk.strip():
|
|
|
chunks.append(current_chunk.strip())
|
|
|
current_chunk = sentence + ". "
|
|
|
|
|
|
|
|
|
if current_chunk.strip():
|
|
|
chunks.append(current_chunk.strip())
|
|
|
|
|
|
return chunks
|
|
|
|
|
|
def analyze_text_structure(self, text_content: str) -> Dict[str, Any]:
|
|
|
"""Analyze the structure and characteristics of text content"""
|
|
|
try:
|
|
|
lines = text_content.split('\n')
|
|
|
words = text_content.split()
|
|
|
sentences = text_content.split('.')
|
|
|
|
|
|
|
|
|
paragraphs = [p.strip() for p in text_content.split('\n\n') if p.strip()]
|
|
|
|
|
|
return {
|
|
|
'total_words': len(words),
|
|
|
'total_sentences': len([s for s in sentences if s.strip()]),
|
|
|
'total_lines': len(lines),
|
|
|
'total_paragraphs': len(paragraphs),
|
|
|
'average_words_per_sentence': len(words) / len(sentences) if sentences else 0,
|
|
|
'average_sentences_per_paragraph': len(sentences) / len(paragraphs) if paragraphs else 0,
|
|
|
'character_count': len(text_content),
|
|
|
'reading_time_minutes': len(words) / 200,
|
|
|
'complexity_score': self._calculate_text_complexity(text_content)
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'error': f"Could not analyze text structure: {str(e)}"}
|
|
|
|
|
|
def _calculate_text_complexity(self, text: str) -> float:
|
|
|
"""Calculate a simple text complexity score"""
|
|
|
words = text.split()
|
|
|
sentences = [s for s in text.split('.') if s.strip()]
|
|
|
|
|
|
if not sentences:
|
|
|
return 0.0
|
|
|
|
|
|
|
|
|
avg_words_per_sentence = len(words) / len(sentences)
|
|
|
|
|
|
|
|
|
avg_chars_per_word = sum(len(word) for word in words) / len(words) if words else 0
|
|
|
|
|
|
|
|
|
complexity = (avg_words_per_sentence * 0.1) + (avg_chars_per_word * 0.5)
|
|
|
return min(complexity, 10.0)
|
|
|
|
|
|
|
|
|
class WebpageParser(BaseParser):
|
|
|
"""Parser for web content"""
|
|
|
|
|
|
def __init__(self):
|
|
|
super().__init__()
|
|
|
self.supported_formats = ['http', 'https']
|
|
|
self.headers = {
|
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
|
|
|
}
|
|
|
self.timeout = 10
|
|
|
self.max_retries = 3
|
|
|
|
|
|
def parse_website(self, url: str, max_pages: int = 1, include_subpages: bool = False) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Parse website content and return structured data
|
|
|
|
|
|
Args:
|
|
|
url (str): Website URL to parse
|
|
|
max_pages (int): Maximum number of pages to parse
|
|
|
include_subpages (bool): Whether to include subpages
|
|
|
|
|
|
Returns:
|
|
|
List[Dict]: List of page data with content and metadata
|
|
|
"""
|
|
|
try:
|
|
|
pages_data = []
|
|
|
urls_to_process = [url]
|
|
|
processed_urls = set()
|
|
|
|
|
|
|
|
|
if include_subpages and max_pages > 1:
|
|
|
subpage_urls = self._find_subpages(url, max_pages - 1)
|
|
|
urls_to_process.extend(subpage_urls)
|
|
|
|
|
|
|
|
|
for current_url in urls_to_process[:max_pages]:
|
|
|
if current_url in processed_urls:
|
|
|
continue
|
|
|
|
|
|
page_data = self._parse_single_page(current_url)
|
|
|
if page_data:
|
|
|
pages_data.append(page_data)
|
|
|
processed_urls.add(current_url)
|
|
|
|
|
|
|
|
|
time.sleep(1)
|
|
|
|
|
|
return pages_data
|
|
|
|
|
|
except Exception as e:
|
|
|
raise Exception(f"Error parsing website: {str(e)}")
|
|
|
|
|
|
def _parse_single_page(self, url: str) -> Dict[str, Any]:
|
|
|
"""Parse a single webpage and extract content"""
|
|
|
try:
|
|
|
|
|
|
response = None
|
|
|
for attempt in range(self.max_retries):
|
|
|
try:
|
|
|
response = requests.get(url, headers=self.headers, timeout=self.timeout)
|
|
|
response.raise_for_status()
|
|
|
break
|
|
|
except requests.RequestException as e:
|
|
|
if attempt == self.max_retries - 1:
|
|
|
raise e
|
|
|
time.sleep(2 ** attempt)
|
|
|
|
|
|
if not response:
|
|
|
return None
|
|
|
|
|
|
|
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
|
|
|
|
|
|
|
for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']):
|
|
|
element.decompose()
|
|
|
|
|
|
|
|
|
main_content = self._extract_main_content(soup)
|
|
|
|
|
|
|
|
|
title = self._extract_title(soup)
|
|
|
description = self._extract_description(soup)
|
|
|
headings = self._extract_headings(soup)
|
|
|
links = self._extract_links(soup, url)
|
|
|
|
|
|
|
|
|
cleaned_text = self._clean_text_content(main_content)
|
|
|
|
|
|
return {
|
|
|
'url': url,
|
|
|
'title': title,
|
|
|
'description': description,
|
|
|
'content': cleaned_text,
|
|
|
'headings': headings,
|
|
|
'internal_links': links['internal'],
|
|
|
'external_links': links['external'],
|
|
|
'word_count': len(cleaned_text.split()),
|
|
|
'char_count': len(cleaned_text),
|
|
|
'meta_keywords': self._extract_meta_keywords(soup),
|
|
|
'images': self._extract_images(soup, url),
|
|
|
'parser': 'WebpageParser',
|
|
|
'parsed_at': time.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'url': url, 'error': f"Failed to parse page: {str(e)}"}
|
|
|
|
|
|
def _extract_main_content(self, soup: BeautifulSoup) -> str:
|
|
|
"""Extract the main content from the page"""
|
|
|
|
|
|
content_selectors = [
|
|
|
'main',
|
|
|
'article',
|
|
|
'[role="main"]',
|
|
|
'.content',
|
|
|
'.main-content',
|
|
|
'#content',
|
|
|
'#main',
|
|
|
'.post-content',
|
|
|
'.entry-content'
|
|
|
]
|
|
|
|
|
|
for selector in content_selectors:
|
|
|
element = soup.select_one(selector)
|
|
|
if element:
|
|
|
return element.get_text(separator=' ', strip=True)
|
|
|
|
|
|
|
|
|
body = soup.find('body')
|
|
|
if body:
|
|
|
return body.get_text(separator=' ', strip=True)
|
|
|
|
|
|
return soup.get_text(separator=' ', strip=True)
|
|
|
|
|
|
def _extract_title(self, soup: BeautifulSoup) -> str:
|
|
|
"""Extract page title"""
|
|
|
title_tag = soup.find('title')
|
|
|
if title_tag:
|
|
|
return title_tag.get_text().strip()
|
|
|
|
|
|
|
|
|
h1 = soup.find('h1')
|
|
|
if h1:
|
|
|
return h1.get_text().strip()
|
|
|
|
|
|
return "No Title Found"
|
|
|
|
|
|
def _extract_description(self, soup: BeautifulSoup) -> str:
|
|
|
"""Extract meta description"""
|
|
|
meta_desc = soup.find('meta', attrs={'name': 'description'})
|
|
|
if meta_desc and meta_desc.get('content'):
|
|
|
return meta_desc['content'].strip()
|
|
|
|
|
|
|
|
|
og_desc = soup.find('meta', attrs={'property': 'og:description'})
|
|
|
if og_desc and og_desc.get('content'):
|
|
|
return og_desc['content'].strip()
|
|
|
|
|
|
return "No Description Found"
|
|
|
|
|
|
def _extract_headings(self, soup: BeautifulSoup) -> List[Dict[str, Any]]:
|
|
|
"""Extract all headings with their hierarchy"""
|
|
|
headings = []
|
|
|
|
|
|
for i in range(1, 7):
|
|
|
for heading in soup.find_all(f'h{i}'):
|
|
|
text = heading.get_text(strip=True)
|
|
|
if text:
|
|
|
headings.append({
|
|
|
'level': i,
|
|
|
'text': text,
|
|
|
'id': heading.get('id', ''),
|
|
|
'class': heading.get('class', [])
|
|
|
})
|
|
|
|
|
|
return headings
|
|
|
|
|
|
def _extract_links(self, soup: BeautifulSoup, base_url: str) -> Dict[str, List[str]]:
|
|
|
"""Extract internal and external links"""
|
|
|
internal_links = []
|
|
|
external_links = []
|
|
|
base_domain = urlparse(base_url).netloc
|
|
|
|
|
|
for link in soup.find_all('a', href=True):
|
|
|
href = link['href']
|
|
|
full_url = urljoin(base_url, href)
|
|
|
parsed_url = urlparse(full_url)
|
|
|
|
|
|
if parsed_url.netloc == base_domain:
|
|
|
internal_links.append(full_url)
|
|
|
elif parsed_url.netloc:
|
|
|
external_links.append(full_url)
|
|
|
|
|
|
return {
|
|
|
'internal': list(set(internal_links)),
|
|
|
'external': list(set(external_links))
|
|
|
}
|
|
|
|
|
|
def _extract_meta_keywords(self, soup: BeautifulSoup) -> List[str]:
|
|
|
"""Extract meta keywords if available"""
|
|
|
meta_keywords = soup.find('meta', attrs={'name': 'keywords'})
|
|
|
if meta_keywords and meta_keywords.get('content'):
|
|
|
keywords = meta_keywords['content'].split(',')
|
|
|
return [kw.strip() for kw in keywords if kw.strip()]
|
|
|
return []
|
|
|
|
|
|
def _extract_images(self, soup: BeautifulSoup, base_url: str) -> List[Dict[str, str]]:
|
|
|
"""Extract image information"""
|
|
|
images = []
|
|
|
|
|
|
for img in soup.find_all('img'):
|
|
|
src = img.get('src')
|
|
|
if src:
|
|
|
full_url = urljoin(base_url, src)
|
|
|
images.append({
|
|
|
'src': full_url,
|
|
|
'alt': img.get('alt', ''),
|
|
|
'title': img.get('title', '')
|
|
|
})
|
|
|
|
|
|
return images
|
|
|
|
|
|
def _clean_text_content(self, text: str) -> str:
|
|
|
"""Clean and normalize text content"""
|
|
|
if not text:
|
|
|
return ""
|
|
|
|
|
|
|
|
|
lines = text.split('\n')
|
|
|
cleaned_lines = []
|
|
|
|
|
|
for line in lines:
|
|
|
line = line.strip()
|
|
|
if line and len(line) > 1:
|
|
|
cleaned_lines.append(line)
|
|
|
|
|
|
|
|
|
cleaned_text = ' '.join(cleaned_lines)
|
|
|
|
|
|
|
|
|
while ' ' in cleaned_text:
|
|
|
cleaned_text = cleaned_text.replace(' ', ' ')
|
|
|
|
|
|
return cleaned_text
|
|
|
|
|
|
def _find_subpages(self, url: str, max_subpages: int) -> List[str]:
|
|
|
"""Find subpages from the main page"""
|
|
|
try:
|
|
|
response = requests.get(url, headers=self.headers, timeout=self.timeout)
|
|
|
response.raise_for_status()
|
|
|
|
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
|
base_domain = urlparse(url).netloc
|
|
|
subpages = set()
|
|
|
|
|
|
|
|
|
for link in soup.find_all('a', href=True):
|
|
|
href = link['href']
|
|
|
full_url = urljoin(url, href)
|
|
|
parsed_url = urlparse(full_url)
|
|
|
|
|
|
|
|
|
if (parsed_url.netloc == base_domain and
|
|
|
full_url != url and
|
|
|
not any(ext in full_url.lower() for ext in ['.pdf', '.jpg', '.png', '.gif', '.zip'])):
|
|
|
subpages.add(full_url)
|
|
|
|
|
|
if len(subpages) >= max_subpages:
|
|
|
break
|
|
|
|
|
|
return list(subpages)[:max_subpages]
|
|
|
|
|
|
except Exception:
|
|
|
return []
|
|
|
|
|
|
def validate_url(self, url: str) -> bool:
|
|
|
"""Validate if URL is accessible"""
|
|
|
try:
|
|
|
response = requests.head(url, headers=self.headers, timeout=5)
|
|
|
return response.status_code == 200
|
|
|
except:
|
|
|
return False
|
|
|
|
|
|
def get_website_info(self, url: str) -> Dict[str, Any]:
|
|
|
"""Get basic information about a website"""
|
|
|
try:
|
|
|
response = requests.get(url, headers=self.headers, timeout=self.timeout)
|
|
|
response.raise_for_status()
|
|
|
|
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
|
|
|
|
return {
|
|
|
'url': url,
|
|
|
'title': self._extract_title(soup),
|
|
|
'description': self._extract_description(soup),
|
|
|
'meta_keywords': self._extract_meta_keywords(soup),
|
|
|
'has_robots_meta': bool(soup.find('meta', attrs={'name': 'robots'})),
|
|
|
'has_viewport_meta': bool(soup.find('meta', attrs={'name': 'viewport'})),
|
|
|
'language': soup.get('lang', 'unknown'),
|
|
|
'status_code': response.status_code,
|
|
|
'content_type': response.headers.get('content-type', 'unknown'),
|
|
|
'server': response.headers.get('server', 'unknown')
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'url': url, 'error': f"Could not get website info: {str(e)}"}
|
|
|
|
|
|
|
|
|
class ParserFactory:
|
|
|
"""Factory class to create appropriate parsers"""
|
|
|
|
|
|
@staticmethod
|
|
|
def get_parser(source_type: str):
|
|
|
"""Get the appropriate parser for the source type"""
|
|
|
parsers = {
|
|
|
'pdf': PDFParser(),
|
|
|
'text': TextParser(),
|
|
|
'webpage': WebpageParser(),
|
|
|
'url': WebpageParser()
|
|
|
}
|
|
|
|
|
|
return parsers.get(source_type.lower())
|
|
|
|
|
|
@staticmethod
|
|
|
def detect_source_type(source: str) -> str:
|
|
|
"""Detect the type of content source"""
|
|
|
if source.startswith(('http://', 'https://')):
|
|
|
return 'webpage'
|
|
|
elif source.endswith('.pdf'):
|
|
|
return 'pdf'
|
|
|
else:
|
|
|
return 'text' |