|
|
""" |
|
|
Main document processing pipeline |
|
|
Orchestrates extraction, cleaning, and chunking of legal documents |
|
|
""" |
|
|
|
|
|
import logging |
|
|
import time |
|
|
from pathlib import Path |
|
|
from typing import List |
|
|
|
|
|
from .config import LAW_DIR, CHUNKS_OUTPUT_FILE, LOG_LEVEL, LOG_FORMAT |
|
|
from .extractors import PDFExtractor |
|
|
from .cleaners import TextCleaner |
|
|
from .chunkers import LegalDocumentChunker |
|
|
from .storage import ChunkStorage |
|
|
from .models import DocumentChunk, ProcessingStats |
|
|
|
|
|
|
|
|
logging.basicConfig(level=LOG_LEVEL, format=LOG_FORMAT) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class DocumentProcessor: |
|
|
"""Main pipeline for processing legal documents""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize processor with all components""" |
|
|
self.extractor = PDFExtractor() |
|
|
self.cleaner = TextCleaner() |
|
|
self.chunker = LegalDocumentChunker() |
|
|
self.storage = ChunkStorage(CHUNKS_OUTPUT_FILE) |
|
|
|
|
|
def process_all_documents(self) -> ProcessingStats: |
|
|
""" |
|
|
Process all PDF documents in the law directory |
|
|
|
|
|
Returns: |
|
|
Processing statistics |
|
|
""" |
|
|
logger.info("=" * 80) |
|
|
logger.info("Starting document processing pipeline") |
|
|
logger.info("=" * 80) |
|
|
|
|
|
start_time = time.time() |
|
|
|
|
|
|
|
|
pdf_files = list(LAW_DIR.glob("*.pdf")) |
|
|
logger.info(f"Found {len(pdf_files)} PDF files to process") |
|
|
|
|
|
if not pdf_files: |
|
|
raise FileNotFoundError(f"No PDF files found in {LAW_DIR}") |
|
|
|
|
|
|
|
|
all_chunks: List[DocumentChunk] = [] |
|
|
total_words = 0 |
|
|
|
|
|
for pdf_file in pdf_files: |
|
|
logger.info(f"\n{'=' * 80}") |
|
|
logger.info(f"Processing: {pdf_file.name}") |
|
|
logger.info(f"{'=' * 80}") |
|
|
|
|
|
try: |
|
|
chunks = self.process_single_document(pdf_file) |
|
|
all_chunks.extend(chunks) |
|
|
|
|
|
|
|
|
doc_words = sum(chunk.metadata.word_count for chunk in chunks) |
|
|
total_words += doc_words |
|
|
|
|
|
logger.info(f"β Created {len(chunks)} chunks ({doc_words} words) from {pdf_file.name}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Failed to process {pdf_file.name}: {e}") |
|
|
continue |
|
|
|
|
|
|
|
|
processing_time = time.time() - start_time |
|
|
avg_chunk_size = total_words / len(all_chunks) if all_chunks else 0 |
|
|
|
|
|
stats = ProcessingStats( |
|
|
total_documents=len(pdf_files), |
|
|
total_chunks=len(all_chunks), |
|
|
total_words=total_words, |
|
|
avg_chunk_size=avg_chunk_size, |
|
|
processing_time_seconds=processing_time, |
|
|
documents_processed=[f.name for f in pdf_files] |
|
|
) |
|
|
|
|
|
|
|
|
logger.info(f"\n{'=' * 80}") |
|
|
logger.info("Validating and saving chunks...") |
|
|
logger.info(f"{'=' * 80}") |
|
|
|
|
|
self.storage.validate_chunks(all_chunks) |
|
|
self.storage.save_chunks(all_chunks, stats) |
|
|
|
|
|
|
|
|
self._print_summary(stats) |
|
|
|
|
|
return stats |
|
|
|
|
|
def process_single_document(self, pdf_path: Path) -> List[DocumentChunk]: |
|
|
""" |
|
|
Process a single PDF document |
|
|
|
|
|
Args: |
|
|
pdf_path: Path to PDF file |
|
|
|
|
|
Returns: |
|
|
List of chunks from this document |
|
|
""" |
|
|
|
|
|
logger.info("Step 1: Extracting text from PDF...") |
|
|
pages_data = self.extractor.extract_from_file(pdf_path) |
|
|
|
|
|
if not pages_data: |
|
|
raise ValueError(f"No text extracted from {pdf_path.name}") |
|
|
|
|
|
|
|
|
logger.info("Step 2: Cleaning extracted text...") |
|
|
cleaned_text = self.cleaner.clean_pages(pages_data) |
|
|
|
|
|
if not cleaned_text: |
|
|
raise ValueError(f"No text remaining after cleaning {pdf_path.name}") |
|
|
|
|
|
|
|
|
logger.info("Step 3: Chunking text into meaningful pieces...") |
|
|
chunks = self.chunker.chunk_document( |
|
|
text=cleaned_text, |
|
|
source_file=pdf_path.name, |
|
|
pages_data=pages_data |
|
|
) |
|
|
|
|
|
return chunks |
|
|
|
|
|
def _print_summary(self, stats: ProcessingStats): |
|
|
"""Print processing summary""" |
|
|
logger.info(f"\n{'=' * 80}") |
|
|
logger.info("PROCESSING COMPLETE!") |
|
|
logger.info(f"{'=' * 80}") |
|
|
logger.info(f"Documents Processed: {stats.total_documents}") |
|
|
logger.info(f"Total Chunks Created: {stats.total_chunks}") |
|
|
logger.info(f"Total Words: {stats.total_words:,}") |
|
|
logger.info(f"Average Chunk Size: {stats.avg_chunk_size:.1f} words") |
|
|
logger.info(f"Processing Time: {stats.processing_time_seconds:.2f} seconds") |
|
|
logger.info(f"\nOutput saved to: {CHUNKS_OUTPUT_FILE}") |
|
|
logger.info(f"Summary saved to: {CHUNKS_OUTPUT_FILE.parent / 'chunks_summary.txt'}") |
|
|
logger.info(f"{'=' * 80}\n") |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main entry point""" |
|
|
try: |
|
|
processor = DocumentProcessor() |
|
|
stats = processor.process_all_documents() |
|
|
|
|
|
print("\nβ Processing completed successfully!") |
|
|
print(f"β Created {stats.total_chunks} chunks from {stats.total_documents} documents") |
|
|
print(f"β Output: {CHUNKS_OUTPUT_FILE}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Processing failed: {e}", exc_info=True) |
|
|
print(f"\nβ Processing failed: {e}") |
|
|
return 1 |
|
|
|
|
|
return 0 |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
exit(main()) |
|
|
|