from typing import Dict, Any, List, Optional, BinaryIO from ...core.base import LatticeComponent, LatticeError from pydantic import BaseModel import fitz # PyMuPDF from docx import Document as DocxDocument import pandas as pd import hashlib from pathlib import Path import magic import logging from datetime import datetime class DocumentConfig(BaseModel): """Document processing configuration""" extract_text: bool = True extract_metadata: bool = True extract_images: bool = False chunk_size: int = 500 chunk_overlap: int = 50 encoding: str = 'utf-8' ocr_enabled: bool = False class ProcessedChunk(BaseModel): """Processed document chunk""" content: str start_index: int end_index: int metadata: Dict[str, Any] class ProcessedDocument(BaseModel): """Processed document result""" doc_id: str content: str chunks: List[ProcessedChunk] metadata: Dict[str, Any] file_type: str timestamp: datetime class DocumentProcessor(LatticeComponent): """Main document processor""" SUPPORTED_TYPES = { 'pdf': ['application/pdf'], 'docx': ['application/vnd.openxmlformats-officedocument.wordprocessingml.document'], 'txt': ['text/plain'], 'csv': ['text/csv', 'application/csv'] } def __init__(self, config: Optional[Dict[str, Any]] = None): super().__init__(config) self.doc_config = DocumentConfig(**(config or {})) async def initialize(self) -> None: """Initialize document processor""" try: # Initialize OCR if enabled if self.doc_config.ocr_enabled: import pytesseract self.ocr = pytesseract self._initialized = True except Exception as e: raise LatticeError(f"Failed to initialize document processor: {str(e)}") async def validate_config(self) -> bool: """Validate configuration""" try: DocumentConfig(**(self.config or {})) return True except Exception as e: self.logger.error(f"Invalid configuration: {str(e)}") return False def get_file_type(self, file: BinaryIO) -> str: """Determine file type using magic numbers""" mime = magic.from_buffer(file.read(2048), mime=True) file.seek(0) for file_type, mime_types in self.SUPPORTED_TYPES.items(): if mime in mime_types: return file_type raise LatticeError(f"Unsupported file type: {mime}") async def process_document( self, file: BinaryIO, file_type: Optional[str] = None ) -> ProcessedDocument: """Process document""" self.ensure_initialized() try: # Determine file type if not provided if not file_type: file_type = self.get_file_type(file) # Generate document ID doc_id = self._generate_doc_id(file) # Extract content and metadata if file_type == 'pdf': content, metadata = self._process_pdf(file) elif file_type == 'docx': content, metadata = self._process_docx(file) elif file_type == 'txt': content, metadata = self._process_text(file) elif file_type == 'csv': content, metadata = self._process_csv(file) else: raise LatticeError(f"Unsupported file type: {file_type}") # Create chunks chunks = self._create_chunks(content) return ProcessedDocument( doc_id=doc_id, content=content, chunks=chunks, metadata=metadata, file_type=file_type, timestamp=datetime.now() ) except Exception as e: self.logger.error(f"Error processing document: {str(e)}") raise LatticeError(f"Document processing failed: {str(e)}") def _generate_doc_id(self, file: BinaryIO) -> str: """Generate unique document ID""" file_hash = hashlib.sha256() for chunk in iter(lambda: file.read(4096), b""): file_hash.update(chunk) file.seek(0) return file_hash.hexdigest()[:16] def _process_pdf(self, file: BinaryIO) -> tuple[str, Dict[str, Any]]: """Process PDF document""" pdf = fitz.open(stream=file.read()) # Extract text text = "" if self.doc_config.extract_text: for page in pdf: text += page.get_text() # Extract metadata metadata = {} if self.doc_config.extract_metadata: metadata = { 'title': pdf.metadata.get('title'), 'author': pdf.metadata.get('author'), 'subject': pdf.metadata.get('subject'), 'keywords': pdf.metadata.get('keywords'), 'page_count': len(pdf), 'file_size': file.tell() } return text, metadata def _process_docx(self, file: BinaryIO) -> tuple[str, Dict[str, Any]]: """Process DOCX document""" doc = DocxDocument(file) # Extract text text = "" if self.doc_config.extract_text: for para in doc.paragraphs: text += para.text + "\n" # Extract metadata metadata = {} if self.doc_config.extract_metadata: core_props = doc.core_properties metadata = { 'title': core_props.title, 'author': core_props.author, 'created': core_props.created.isoformat() if core_props.created else None, 'modified': core_props.modified.isoformat() if core_props.modified else None, 'paragraph_count': len(doc.paragraphs), 'file_size': file.tell() } return text, metadata def _process_text(self, file: BinaryIO) -> tuple[str, Dict[str, Any]]: """Process text document""" content = file.read().decode(self.doc_config.encoding) metadata = { 'file_size': file.tell(), 'encoding': self.doc_config.encoding, 'line_count': content.count('\n') + 1 } return content, metadata def _process_csv(self, file: BinaryIO) -> tuple[str, Dict[str, Any]]: """Process CSV document""" df = pd.read_csv(file) # Convert to string representation content = df.to_string() metadata = { 'file_size': file.tell(), 'row_count': len(df), 'column_count': len(df.columns), 'columns': df.columns.tolist() } return content, metadata def _create_chunks(self, content: str) -> List[ProcessedChunk]: """Create document chunks""" chunks = [] start = 0 while start < len(content): end = start + self.doc_config.chunk_size # Adjust end to prevent cutting words if end < len(content): end = content.rfind(' ', start, end) + 1 chunk_content = content[start:end] chunks.append( ProcessedChunk( content=chunk_content, start_index=start, end_index=end, metadata={ 'chunk_size': len(chunk_content), 'position': len(chunks) } ) ) start = end - self.doc_config.chunk_overlap return chunks