|
|
from typing import Dict, Any, List, Optional, BinaryIO |
|
|
from ...core.base import LatticeComponent, LatticeError |
|
|
from pydantic import BaseModel |
|
|
import fitz |
|
|
from docx import Document as DocxDocument |
|
|
import pandas as pd |
|
|
import hashlib |
|
|
from pathlib import Path |
|
|
import magic |
|
|
import logging |
|
|
from datetime import datetime |
|
|
|
|
|
class DocumentConfig(BaseModel): |
|
|
"""Document processing configuration""" |
|
|
extract_text: bool = True |
|
|
extract_metadata: bool = True |
|
|
extract_images: bool = False |
|
|
chunk_size: int = 500 |
|
|
chunk_overlap: int = 50 |
|
|
encoding: str = 'utf-8' |
|
|
ocr_enabled: bool = False |
|
|
|
|
|
class ProcessedChunk(BaseModel): |
|
|
"""Processed document chunk""" |
|
|
content: str |
|
|
start_index: int |
|
|
end_index: int |
|
|
metadata: Dict[str, Any] |
|
|
|
|
|
class ProcessedDocument(BaseModel): |
|
|
"""Processed document result""" |
|
|
doc_id: str |
|
|
content: str |
|
|
chunks: List[ProcessedChunk] |
|
|
metadata: Dict[str, Any] |
|
|
file_type: str |
|
|
timestamp: datetime |
|
|
|
|
|
class DocumentProcessor(LatticeComponent): |
|
|
"""Main document processor""" |
|
|
|
|
|
SUPPORTED_TYPES = { |
|
|
'pdf': ['application/pdf'], |
|
|
'docx': ['application/vnd.openxmlformats-officedocument.wordprocessingml.document'], |
|
|
'txt': ['text/plain'], |
|
|
'csv': ['text/csv', 'application/csv'] |
|
|
} |
|
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None): |
|
|
super().__init__(config) |
|
|
self.doc_config = DocumentConfig(**(config or {})) |
|
|
|
|
|
async def initialize(self) -> None: |
|
|
"""Initialize document processor""" |
|
|
try: |
|
|
|
|
|
if self.doc_config.ocr_enabled: |
|
|
import pytesseract |
|
|
self.ocr = pytesseract |
|
|
|
|
|
self._initialized = True |
|
|
|
|
|
except Exception as e: |
|
|
raise LatticeError(f"Failed to initialize document processor: {str(e)}") |
|
|
|
|
|
async def validate_config(self) -> bool: |
|
|
"""Validate configuration""" |
|
|
try: |
|
|
DocumentConfig(**(self.config or {})) |
|
|
return True |
|
|
except Exception as e: |
|
|
self.logger.error(f"Invalid configuration: {str(e)}") |
|
|
return False |
|
|
|
|
|
def get_file_type(self, file: BinaryIO) -> str: |
|
|
"""Determine file type using magic numbers""" |
|
|
mime = magic.from_buffer(file.read(2048), mime=True) |
|
|
file.seek(0) |
|
|
|
|
|
for file_type, mime_types in self.SUPPORTED_TYPES.items(): |
|
|
if mime in mime_types: |
|
|
return file_type |
|
|
|
|
|
raise LatticeError(f"Unsupported file type: {mime}") |
|
|
|
|
|
async def process_document( |
|
|
self, |
|
|
file: BinaryIO, |
|
|
file_type: Optional[str] = None |
|
|
) -> ProcessedDocument: |
|
|
"""Process document""" |
|
|
self.ensure_initialized() |
|
|
|
|
|
try: |
|
|
|
|
|
if not file_type: |
|
|
file_type = self.get_file_type(file) |
|
|
|
|
|
|
|
|
doc_id = self._generate_doc_id(file) |
|
|
|
|
|
|
|
|
if file_type == 'pdf': |
|
|
content, metadata = self._process_pdf(file) |
|
|
elif file_type == 'docx': |
|
|
content, metadata = self._process_docx(file) |
|
|
elif file_type == 'txt': |
|
|
content, metadata = self._process_text(file) |
|
|
elif file_type == 'csv': |
|
|
content, metadata = self._process_csv(file) |
|
|
else: |
|
|
raise LatticeError(f"Unsupported file type: {file_type}") |
|
|
|
|
|
|
|
|
chunks = self._create_chunks(content) |
|
|
|
|
|
return ProcessedDocument( |
|
|
doc_id=doc_id, |
|
|
content=content, |
|
|
chunks=chunks, |
|
|
metadata=metadata, |
|
|
file_type=file_type, |
|
|
timestamp=datetime.now() |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Error processing document: {str(e)}") |
|
|
raise LatticeError(f"Document processing failed: {str(e)}") |
|
|
|
|
|
def _generate_doc_id(self, file: BinaryIO) -> str: |
|
|
"""Generate unique document ID""" |
|
|
file_hash = hashlib.sha256() |
|
|
for chunk in iter(lambda: file.read(4096), b""): |
|
|
file_hash.update(chunk) |
|
|
file.seek(0) |
|
|
return file_hash.hexdigest()[:16] |
|
|
|
|
|
def _process_pdf(self, file: BinaryIO) -> tuple[str, Dict[str, Any]]: |
|
|
"""Process PDF document""" |
|
|
pdf = fitz.open(stream=file.read()) |
|
|
|
|
|
|
|
|
text = "" |
|
|
if self.doc_config.extract_text: |
|
|
for page in pdf: |
|
|
text += page.get_text() |
|
|
|
|
|
|
|
|
metadata = {} |
|
|
if self.doc_config.extract_metadata: |
|
|
metadata = { |
|
|
'title': pdf.metadata.get('title'), |
|
|
'author': pdf.metadata.get('author'), |
|
|
'subject': pdf.metadata.get('subject'), |
|
|
'keywords': pdf.metadata.get('keywords'), |
|
|
'page_count': len(pdf), |
|
|
'file_size': file.tell() |
|
|
} |
|
|
|
|
|
return text, metadata |
|
|
|
|
|
def _process_docx(self, file: BinaryIO) -> tuple[str, Dict[str, Any]]: |
|
|
"""Process DOCX document""" |
|
|
doc = DocxDocument(file) |
|
|
|
|
|
|
|
|
text = "" |
|
|
if self.doc_config.extract_text: |
|
|
for para in doc.paragraphs: |
|
|
text += para.text + "\n" |
|
|
|
|
|
|
|
|
metadata = {} |
|
|
if self.doc_config.extract_metadata: |
|
|
core_props = doc.core_properties |
|
|
metadata = { |
|
|
'title': core_props.title, |
|
|
'author': core_props.author, |
|
|
'created': core_props.created.isoformat() if core_props.created else None, |
|
|
'modified': core_props.modified.isoformat() if core_props.modified else None, |
|
|
'paragraph_count': len(doc.paragraphs), |
|
|
'file_size': file.tell() |
|
|
} |
|
|
|
|
|
return text, metadata |
|
|
|
|
|
def _process_text(self, file: BinaryIO) -> tuple[str, Dict[str, Any]]: |
|
|
"""Process text document""" |
|
|
content = file.read().decode(self.doc_config.encoding) |
|
|
|
|
|
metadata = { |
|
|
'file_size': file.tell(), |
|
|
'encoding': self.doc_config.encoding, |
|
|
'line_count': content.count('\n') + 1 |
|
|
} |
|
|
|
|
|
return content, metadata |
|
|
|
|
|
def _process_csv(self, file: BinaryIO) -> tuple[str, Dict[str, Any]]: |
|
|
"""Process CSV document""" |
|
|
df = pd.read_csv(file) |
|
|
|
|
|
|
|
|
content = df.to_string() |
|
|
|
|
|
metadata = { |
|
|
'file_size': file.tell(), |
|
|
'row_count': len(df), |
|
|
'column_count': len(df.columns), |
|
|
'columns': df.columns.tolist() |
|
|
} |
|
|
|
|
|
return content, metadata |
|
|
|
|
|
def _create_chunks(self, content: str) -> List[ProcessedChunk]: |
|
|
"""Create document chunks""" |
|
|
chunks = [] |
|
|
start = 0 |
|
|
|
|
|
while start < len(content): |
|
|
end = start + self.doc_config.chunk_size |
|
|
|
|
|
|
|
|
if end < len(content): |
|
|
end = content.rfind(' ', start, end) + 1 |
|
|
|
|
|
chunk_content = content[start:end] |
|
|
chunks.append( |
|
|
ProcessedChunk( |
|
|
content=chunk_content, |
|
|
start_index=start, |
|
|
end_index=end, |
|
|
metadata={ |
|
|
'chunk_size': len(chunk_content), |
|
|
'position': len(chunks) |
|
|
} |
|
|
) |
|
|
) |
|
|
|
|
|
start = end - self.doc_config.chunk_overlap |
|
|
|
|
|
return chunks |