Spaces:
Sleeping
Sleeping
| """ | |
| Document parser for extracting text from various file formats. | |
| Supports PDF, TXT, HTML, and detects document types. | |
| """ | |
| import re | |
| import logging | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple | |
| import pdfplumber | |
| from datetime import datetime | |
| logger = logging.getLogger(__name__) | |
| class DocumentParser: | |
| """Parse and extract text from various document formats.""" | |
| # Document type detection patterns | |
| DOCUMENT_TYPES = { | |
| 'whitepaper': [ | |
| r'whitepaper', r'technical\s+paper', r'protocol\s+specification', | |
| r'tokenomics', r'blockchain\s+architecture' | |
| ], | |
| 'regulation': [ | |
| r'regulation\s+\(eu\)', r'securities\s+act', r'guidance\s+note', | |
| r'consultation\s+paper', r'policy\s+statement', r'final\s+rule' | |
| ], | |
| 'business_plan': [ | |
| r'business\s+plan', r'executive\s+summary', r'market\s+analysis', | |
| r'financial\s+projections', r'revenue\s+model' | |
| ], | |
| 'license_application': [ | |
| r'license\s+application', r'registration\s+form', r'compliance\s+declaration', | |
| r'fit\s+and\s+proper', r'aml\s+policy' | |
| ], | |
| 'financial_statement': [ | |
| r'balance\s+sheet', r'income\s+statement', r'cash\s+flow', | |
| r'financial\s+statements', r'audit\s+report' | |
| ], | |
| 'legal_contract': [ | |
| r'terms\s+of\s+service', r'user\s+agreement', r'smart\s+contract', | |
| r'memorandum\s+of\s+understanding', r'partnership\s+agreement' | |
| ] | |
| } | |
| def __init__(self): | |
| """Initialize document parser.""" | |
| self.supported_formats = {'.pdf', '.txt', '.html', '.md'} | |
| def extract_text_from_pdf(self, file_path: str) -> str: | |
| """ | |
| Extract text from a PDF file using pdfplumber. | |
| Args: | |
| file_path: Path to PDF file | |
| Returns: | |
| Extracted text as string | |
| Raises: | |
| FileNotFoundError: If file doesn't exist | |
| ValueError: If file is not a PDF | |
| """ | |
| path = Path(file_path) | |
| if not path.exists(): | |
| raise FileNotFoundError(f"PDF file not found: {file_path}") | |
| if path.suffix.lower() != '.pdf': | |
| raise ValueError(f"File is not a PDF: {file_path}") | |
| try: | |
| text_content = [] | |
| with pdfplumber.open(file_path) as pdf: | |
| logger.info(f"Extracting text from PDF: {file_path} ({len(pdf.pages)} pages)") | |
| for page_num, page in enumerate(pdf.pages, 1): | |
| page_text = page.extract_text() | |
| if page_text: | |
| text_content.append(page_text) | |
| else: | |
| logger.warning(f"No text extracted from page {page_num}") | |
| full_text = "\n\n".join(text_content) | |
| logger.info(f"Successfully extracted {len(full_text)} characters from PDF") | |
| return full_text | |
| except Exception as e: | |
| logger.error(f"Error extracting text from PDF {file_path}: {e}") | |
| raise | |
| def extract_text_from_file(self, file_path: str) -> str: | |
| """ | |
| Extract text from any supported file format. | |
| Args: | |
| file_path: Path to file | |
| Returns: | |
| Extracted text | |
| Raises: | |
| ValueError: If file format not supported | |
| """ | |
| path = Path(file_path) | |
| if not path.exists(): | |
| raise FileNotFoundError(f"File not found: {file_path}") | |
| suffix = path.suffix.lower() | |
| if suffix not in self.supported_formats: | |
| raise ValueError( | |
| f"Unsupported file format: {suffix}. " | |
| f"Supported: {', '.join(self.supported_formats)}" | |
| ) | |
| # PDF extraction | |
| if suffix == '.pdf': | |
| return self.extract_text_from_pdf(file_path) | |
| # Text-based formats | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| text = f.read() | |
| logger.info(f"Extracted {len(text)} characters from {file_path}") | |
| return text | |
| except UnicodeDecodeError: | |
| # Try with different encoding | |
| with open(file_path, 'r', encoding='latin-1') as f: | |
| text = f.read() | |
| logger.warning(f"Used latin-1 encoding for {file_path}") | |
| return text | |
| def clean_text(self, text: str) -> str: | |
| """ | |
| Clean and normalize extracted text. | |
| Args: | |
| text: Raw text | |
| Returns: | |
| Cleaned text | |
| """ | |
| if not text: | |
| return "" | |
| # Remove excessive whitespace | |
| text = re.sub(r'\s+', ' ', text) | |
| # Remove page numbers (common patterns) | |
| text = re.sub(r'\n\s*\d+\s*\n', '\n', text) | |
| # Remove headers/footers (repeated patterns) | |
| lines = text.split('\n') | |
| if len(lines) > 10: | |
| # Remove first/last lines if they appear to be headers/footers | |
| text = '\n'.join(lines[1:-1]) | |
| # Normalize unicode characters | |
| text = text.replace('\u2019', "'") # Smart quote | |
| text = text.replace('\u2018', "'") | |
| text = text.replace('\u201c', '"') | |
| text = text.replace('\u201d', '"') | |
| text = text.replace('\u2013', '-') # En dash | |
| text = text.replace('\u2014', '-') # Em dash | |
| # Remove excessive newlines | |
| text = re.sub(r'\n{3,}', '\n\n', text) | |
| return text.strip() | |
| def detect_document_type(self, text: str) -> Tuple[str, float]: | |
| """ | |
| Detect the type of document based on content. | |
| Args: | |
| text: Document text | |
| Returns: | |
| Tuple of (document_type, confidence_score) | |
| """ | |
| if not text: | |
| return "unknown", 0.0 | |
| text_lower = text.lower() | |
| # Count matches for each document type | |
| type_scores = {} | |
| for doc_type, patterns in self.DOCUMENT_TYPES.items(): | |
| matches = 0 | |
| for pattern in patterns: | |
| matches += len(re.findall(pattern, text_lower, re.IGNORECASE)) | |
| type_scores[doc_type] = matches | |
| # Find type with most matches | |
| if not any(type_scores.values()): | |
| return "unknown", 0.0 | |
| best_type = max(type_scores.items(), key=lambda x: x[1]) | |
| doc_type, match_count = best_type | |
| # Calculate confidence based on match density | |
| # More matches per 1000 words = higher confidence | |
| word_count = len(text_lower.split()) | |
| match_density = (match_count / (word_count / 1000)) if word_count > 0 else 0 | |
| confidence = min(match_density / 10, 1.0) # Cap at 1.0 | |
| logger.info(f"Detected document type: {doc_type} (confidence: {confidence:.2f})") | |
| return doc_type, confidence | |
| def extract_metadata(self, file_path: str) -> Dict: | |
| """ | |
| Extract metadata from document. | |
| Args: | |
| file_path: Path to document | |
| Returns: | |
| Dictionary of metadata | |
| """ | |
| path = Path(file_path) | |
| metadata = { | |
| 'filename': path.name, | |
| 'file_size': path.stat().st_size, | |
| 'file_type': path.suffix.lower(), | |
| 'modified_date': datetime.fromtimestamp(path.stat().st_mtime).isoformat() | |
| } | |
| # PDF-specific metadata | |
| if path.suffix.lower() == '.pdf': | |
| try: | |
| with pdfplumber.open(file_path) as pdf: | |
| metadata['page_count'] = len(pdf.pages) | |
| # Extract PDF metadata if available | |
| if pdf.metadata: | |
| metadata['pdf_metadata'] = { | |
| 'title': pdf.metadata.get('Title', ''), | |
| 'author': pdf.metadata.get('Author', ''), | |
| 'subject': pdf.metadata.get('Subject', ''), | |
| 'creator': pdf.metadata.get('Creator', ''), | |
| 'creation_date': pdf.metadata.get('CreationDate', '') | |
| } | |
| except Exception as e: | |
| logger.warning(f"Could not extract PDF metadata: {e}") | |
| return metadata | |
| def parse_document(self, file_path: str) -> Dict: | |
| """ | |
| Parse a document and extract all information. | |
| Args: | |
| file_path: Path to document | |
| Returns: | |
| Dictionary containing: | |
| - text: Cleaned text content | |
| - document_type: Detected type | |
| - confidence: Type detection confidence | |
| - metadata: File metadata | |
| - char_count: Character count | |
| - word_count: Word count | |
| """ | |
| logger.info(f"Parsing document: {file_path}") | |
| # Extract raw text | |
| raw_text = self.extract_text_from_file(file_path) | |
| # Clean text | |
| cleaned_text = self.clean_text(raw_text) | |
| # Detect document type | |
| doc_type, confidence = self.detect_document_type(cleaned_text) | |
| # Extract metadata | |
| metadata = self.extract_metadata(file_path) | |
| # Calculate statistics | |
| char_count = len(cleaned_text) | |
| word_count = len(cleaned_text.split()) | |
| result = { | |
| 'text': cleaned_text, | |
| 'document_type': doc_type, | |
| 'type_confidence': confidence, | |
| 'metadata': metadata, | |
| 'char_count': char_count, | |
| 'word_count': word_count, | |
| 'extracted_at': datetime.now().isoformat() | |
| } | |
| logger.info( | |
| f"Parsed {metadata['filename']}: {word_count} words, " | |
| f"type={doc_type} ({confidence:.2f})" | |
| ) | |
| return result | |
| def chunk_text( | |
| self, | |
| text: str, | |
| chunk_size: int = 1000, | |
| overlap: int = 200 | |
| ) -> List[str]: | |
| """ | |
| Split text into overlapping chunks for processing. | |
| Useful for handling long documents with LLMs. | |
| Args: | |
| text: Input text | |
| chunk_size: Maximum words per chunk | |
| overlap: Number of overlapping words between chunks | |
| Returns: | |
| List of text chunks | |
| """ | |
| if not text: | |
| return [] | |
| words = text.split() | |
| chunks = [] | |
| if len(words) <= chunk_size: | |
| return [text] | |
| start = 0 | |
| while start < len(words): | |
| end = start + chunk_size | |
| chunk_words = words[start:end] | |
| chunks.append(' '.join(chunk_words)) | |
| # Move start forward, accounting for overlap | |
| start = end - overlap | |
| if start < 0: | |
| start = 0 | |
| logger.info(f"Split text into {len(chunks)} chunks ({chunk_size} words each)") | |
| return chunks | |
| # Convenience function for quick parsing | |
| def parse_document(file_path: str) -> Dict: | |
| """ | |
| Quick parse a document. | |
| Args: | |
| file_path: Path to document | |
| Returns: | |
| Parsed document dictionary | |
| """ | |
| parser = DocumentParser() | |
| return parser.parse_document(file_path) | |
| if __name__ == "__main__": | |
| # Example usage | |
| import sys | |
| if len(sys.argv) > 1: | |
| file_path = sys.argv[1] | |
| result = parse_document(file_path) | |
| print(f"\nDocument Type: {result['document_type']}") | |
| print(f"Confidence: {result['type_confidence']:.2f}") | |
| print(f"Words: {result['word_count']}") | |
| print(f"\nFirst 500 characters:") | |
| print(result['text'][:500]) | |
| else: | |
| print("Usage: python document_parser.py <file_path>") | |