File size: 5,774 Bytes
3998131 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
"""
Main document processing pipeline
Orchestrates extraction, cleaning, and chunking of legal documents
"""
import logging
import time
from pathlib import Path
from typing import List
from .config import LAW_DIR, CHUNKS_OUTPUT_FILE, LOG_LEVEL, LOG_FORMAT
from .extractors import PDFExtractor
from .cleaners import TextCleaner
from .chunkers import LegalDocumentChunker
from .storage import ChunkStorage
from .models import DocumentChunk, ProcessingStats
# Configure logging
logging.basicConfig(level=LOG_LEVEL, format=LOG_FORMAT)
logger = logging.getLogger(__name__)
class DocumentProcessor:
"""Main pipeline for processing legal documents"""
def __init__(self):
"""Initialize processor with all components"""
self.extractor = PDFExtractor()
self.cleaner = TextCleaner()
self.chunker = LegalDocumentChunker()
self.storage = ChunkStorage(CHUNKS_OUTPUT_FILE)
def process_all_documents(self) -> ProcessingStats:
"""
Process all PDF documents in the law directory
Returns:
Processing statistics
"""
logger.info("=" * 80)
logger.info("Starting document processing pipeline")
logger.info("=" * 80)
start_time = time.time()
# Get all PDF files
pdf_files = list(LAW_DIR.glob("*.pdf"))
logger.info(f"Found {len(pdf_files)} PDF files to process")
if not pdf_files:
raise FileNotFoundError(f"No PDF files found in {LAW_DIR}")
# Process each document
all_chunks: List[DocumentChunk] = []
total_words = 0
for pdf_file in pdf_files:
logger.info(f"\n{'=' * 80}")
logger.info(f"Processing: {pdf_file.name}")
logger.info(f"{'=' * 80}")
try:
chunks = self.process_single_document(pdf_file)
all_chunks.extend(chunks)
# Calculate words
doc_words = sum(chunk.metadata.word_count for chunk in chunks)
total_words += doc_words
logger.info(f"✓ Created {len(chunks)} chunks ({doc_words} words) from {pdf_file.name}")
except Exception as e:
logger.error(f"✗ Failed to process {pdf_file.name}: {e}")
continue
# Calculate statistics
processing_time = time.time() - start_time
avg_chunk_size = total_words / len(all_chunks) if all_chunks else 0
stats = ProcessingStats(
total_documents=len(pdf_files),
total_chunks=len(all_chunks),
total_words=total_words,
avg_chunk_size=avg_chunk_size,
processing_time_seconds=processing_time,
documents_processed=[f.name for f in pdf_files]
)
# Validate and save chunks
logger.info(f"\n{'=' * 80}")
logger.info("Validating and saving chunks...")
logger.info(f"{'=' * 80}")
self.storage.validate_chunks(all_chunks)
self.storage.save_chunks(all_chunks, stats)
# Print summary
self._print_summary(stats)
return stats
def process_single_document(self, pdf_path: Path) -> List[DocumentChunk]:
"""
Process a single PDF document
Args:
pdf_path: Path to PDF file
Returns:
List of chunks from this document
"""
# Step 1: Extract text from PDF
logger.info("Step 1: Extracting text from PDF...")
pages_data = self.extractor.extract_from_file(pdf_path)
if not pages_data:
raise ValueError(f"No text extracted from {pdf_path.name}")
# Step 2: Clean the text
logger.info("Step 2: Cleaning extracted text...")
cleaned_text = self.cleaner.clean_pages(pages_data)
if not cleaned_text:
raise ValueError(f"No text remaining after cleaning {pdf_path.name}")
# Step 3: Chunk the text
logger.info("Step 3: Chunking text into meaningful pieces...")
chunks = self.chunker.chunk_document(
text=cleaned_text,
source_file=pdf_path.name,
pages_data=pages_data
)
return chunks
def _print_summary(self, stats: ProcessingStats):
"""Print processing summary"""
logger.info(f"\n{'=' * 80}")
logger.info("PROCESSING COMPLETE!")
logger.info(f"{'=' * 80}")
logger.info(f"Documents Processed: {stats.total_documents}")
logger.info(f"Total Chunks Created: {stats.total_chunks}")
logger.info(f"Total Words: {stats.total_words:,}")
logger.info(f"Average Chunk Size: {stats.avg_chunk_size:.1f} words")
logger.info(f"Processing Time: {stats.processing_time_seconds:.2f} seconds")
logger.info(f"\nOutput saved to: {CHUNKS_OUTPUT_FILE}")
logger.info(f"Summary saved to: {CHUNKS_OUTPUT_FILE.parent / 'chunks_summary.txt'}")
logger.info(f"{'=' * 80}\n")
def main():
"""Main entry point"""
try:
processor = DocumentProcessor()
stats = processor.process_all_documents()
print("\n✓ Processing completed successfully!")
print(f"✓ Created {stats.total_chunks} chunks from {stats.total_documents} documents")
print(f"✓ Output: {CHUNKS_OUTPUT_FILE}")
except Exception as e:
logger.error(f"Processing failed: {e}", exc_info=True)
print(f"\n✗ Processing failed: {e}")
return 1
return 0
if __name__ == "__main__":
exit(main())
|