arjitmat's picture
Upload 72 files
13e7acd verified
"""
Document parser for extracting text from various file formats.
Supports PDF, TXT, HTML, and detects document types.
"""
import re
import logging
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import pdfplumber
from datetime import datetime
logger = logging.getLogger(__name__)
class DocumentParser:
"""Parse and extract text from various document formats."""
# Document type detection patterns
DOCUMENT_TYPES = {
'whitepaper': [
r'whitepaper', r'technical\s+paper', r'protocol\s+specification',
r'tokenomics', r'blockchain\s+architecture'
],
'regulation': [
r'regulation\s+\(eu\)', r'securities\s+act', r'guidance\s+note',
r'consultation\s+paper', r'policy\s+statement', r'final\s+rule'
],
'business_plan': [
r'business\s+plan', r'executive\s+summary', r'market\s+analysis',
r'financial\s+projections', r'revenue\s+model'
],
'license_application': [
r'license\s+application', r'registration\s+form', r'compliance\s+declaration',
r'fit\s+and\s+proper', r'aml\s+policy'
],
'financial_statement': [
r'balance\s+sheet', r'income\s+statement', r'cash\s+flow',
r'financial\s+statements', r'audit\s+report'
],
'legal_contract': [
r'terms\s+of\s+service', r'user\s+agreement', r'smart\s+contract',
r'memorandum\s+of\s+understanding', r'partnership\s+agreement'
]
}
def __init__(self):
"""Initialize document parser."""
self.supported_formats = {'.pdf', '.txt', '.html', '.md'}
def extract_text_from_pdf(self, file_path: str) -> str:
"""
Extract text from a PDF file using pdfplumber.
Args:
file_path: Path to PDF file
Returns:
Extracted text as string
Raises:
FileNotFoundError: If file doesn't exist
ValueError: If file is not a PDF
"""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"PDF file not found: {file_path}")
if path.suffix.lower() != '.pdf':
raise ValueError(f"File is not a PDF: {file_path}")
try:
text_content = []
with pdfplumber.open(file_path) as pdf:
logger.info(f"Extracting text from PDF: {file_path} ({len(pdf.pages)} pages)")
for page_num, page in enumerate(pdf.pages, 1):
page_text = page.extract_text()
if page_text:
text_content.append(page_text)
else:
logger.warning(f"No text extracted from page {page_num}")
full_text = "\n\n".join(text_content)
logger.info(f"Successfully extracted {len(full_text)} characters from PDF")
return full_text
except Exception as e:
logger.error(f"Error extracting text from PDF {file_path}: {e}")
raise
def extract_text_from_file(self, file_path: str) -> str:
"""
Extract text from any supported file format.
Args:
file_path: Path to file
Returns:
Extracted text
Raises:
ValueError: If file format not supported
"""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
suffix = path.suffix.lower()
if suffix not in self.supported_formats:
raise ValueError(
f"Unsupported file format: {suffix}. "
f"Supported: {', '.join(self.supported_formats)}"
)
# PDF extraction
if suffix == '.pdf':
return self.extract_text_from_pdf(file_path)
# Text-based formats
try:
with open(file_path, 'r', encoding='utf-8') as f:
text = f.read()
logger.info(f"Extracted {len(text)} characters from {file_path}")
return text
except UnicodeDecodeError:
# Try with different encoding
with open(file_path, 'r', encoding='latin-1') as f:
text = f.read()
logger.warning(f"Used latin-1 encoding for {file_path}")
return text
def clean_text(self, text: str) -> str:
"""
Clean and normalize extracted text.
Args:
text: Raw text
Returns:
Cleaned text
"""
if not text:
return ""
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text)
# Remove page numbers (common patterns)
text = re.sub(r'\n\s*\d+\s*\n', '\n', text)
# Remove headers/footers (repeated patterns)
lines = text.split('\n')
if len(lines) > 10:
# Remove first/last lines if they appear to be headers/footers
text = '\n'.join(lines[1:-1])
# Normalize unicode characters
text = text.replace('\u2019', "'") # Smart quote
text = text.replace('\u2018', "'")
text = text.replace('\u201c', '"')
text = text.replace('\u201d', '"')
text = text.replace('\u2013', '-') # En dash
text = text.replace('\u2014', '-') # Em dash
# Remove excessive newlines
text = re.sub(r'\n{3,}', '\n\n', text)
return text.strip()
def detect_document_type(self, text: str) -> Tuple[str, float]:
"""
Detect the type of document based on content.
Args:
text: Document text
Returns:
Tuple of (document_type, confidence_score)
"""
if not text:
return "unknown", 0.0
text_lower = text.lower()
# Count matches for each document type
type_scores = {}
for doc_type, patterns in self.DOCUMENT_TYPES.items():
matches = 0
for pattern in patterns:
matches += len(re.findall(pattern, text_lower, re.IGNORECASE))
type_scores[doc_type] = matches
# Find type with most matches
if not any(type_scores.values()):
return "unknown", 0.0
best_type = max(type_scores.items(), key=lambda x: x[1])
doc_type, match_count = best_type
# Calculate confidence based on match density
# More matches per 1000 words = higher confidence
word_count = len(text_lower.split())
match_density = (match_count / (word_count / 1000)) if word_count > 0 else 0
confidence = min(match_density / 10, 1.0) # Cap at 1.0
logger.info(f"Detected document type: {doc_type} (confidence: {confidence:.2f})")
return doc_type, confidence
def extract_metadata(self, file_path: str) -> Dict:
"""
Extract metadata from document.
Args:
file_path: Path to document
Returns:
Dictionary of metadata
"""
path = Path(file_path)
metadata = {
'filename': path.name,
'file_size': path.stat().st_size,
'file_type': path.suffix.lower(),
'modified_date': datetime.fromtimestamp(path.stat().st_mtime).isoformat()
}
# PDF-specific metadata
if path.suffix.lower() == '.pdf':
try:
with pdfplumber.open(file_path) as pdf:
metadata['page_count'] = len(pdf.pages)
# Extract PDF metadata if available
if pdf.metadata:
metadata['pdf_metadata'] = {
'title': pdf.metadata.get('Title', ''),
'author': pdf.metadata.get('Author', ''),
'subject': pdf.metadata.get('Subject', ''),
'creator': pdf.metadata.get('Creator', ''),
'creation_date': pdf.metadata.get('CreationDate', '')
}
except Exception as e:
logger.warning(f"Could not extract PDF metadata: {e}")
return metadata
def parse_document(self, file_path: str) -> Dict:
"""
Parse a document and extract all information.
Args:
file_path: Path to document
Returns:
Dictionary containing:
- text: Cleaned text content
- document_type: Detected type
- confidence: Type detection confidence
- metadata: File metadata
- char_count: Character count
- word_count: Word count
"""
logger.info(f"Parsing document: {file_path}")
# Extract raw text
raw_text = self.extract_text_from_file(file_path)
# Clean text
cleaned_text = self.clean_text(raw_text)
# Detect document type
doc_type, confidence = self.detect_document_type(cleaned_text)
# Extract metadata
metadata = self.extract_metadata(file_path)
# Calculate statistics
char_count = len(cleaned_text)
word_count = len(cleaned_text.split())
result = {
'text': cleaned_text,
'document_type': doc_type,
'type_confidence': confidence,
'metadata': metadata,
'char_count': char_count,
'word_count': word_count,
'extracted_at': datetime.now().isoformat()
}
logger.info(
f"Parsed {metadata['filename']}: {word_count} words, "
f"type={doc_type} ({confidence:.2f})"
)
return result
def chunk_text(
self,
text: str,
chunk_size: int = 1000,
overlap: int = 200
) -> List[str]:
"""
Split text into overlapping chunks for processing.
Useful for handling long documents with LLMs.
Args:
text: Input text
chunk_size: Maximum words per chunk
overlap: Number of overlapping words between chunks
Returns:
List of text chunks
"""
if not text:
return []
words = text.split()
chunks = []
if len(words) <= chunk_size:
return [text]
start = 0
while start < len(words):
end = start + chunk_size
chunk_words = words[start:end]
chunks.append(' '.join(chunk_words))
# Move start forward, accounting for overlap
start = end - overlap
if start < 0:
start = 0
logger.info(f"Split text into {len(chunks)} chunks ({chunk_size} words each)")
return chunks
# Convenience function for quick parsing
def parse_document(file_path: str) -> Dict:
"""
Quick parse a document.
Args:
file_path: Path to document
Returns:
Parsed document dictionary
"""
parser = DocumentParser()
return parser.parse_document(file_path)
if __name__ == "__main__":
# Example usage
import sys
if len(sys.argv) > 1:
file_path = sys.argv[1]
result = parse_document(file_path)
print(f"\nDocument Type: {result['document_type']}")
print(f"Confidence: {result['type_confidence']:.2f}")
print(f"Words: {result['word_count']}")
print(f"\nFirst 500 characters:")
print(result['text'][:500])
else:
print("Usage: python document_parser.py <file_path>")