Spaces:
Build error
Build error
| """ | |
| PDF processing service that orchestrates extraction, mapping, and filling. | |
| """ | |
| import uuid | |
| import shutil | |
| from pathlib import Path | |
| from typing import Optional | |
| from datetime import datetime | |
| from app.config import settings | |
| from app.extractor import PDFExtractor | |
| from app.ocr import OCRProcessor | |
| from app.mapper import FieldMapper | |
| from app.filler import PDFFiller | |
| from app.utils.logging import get_logger | |
| from app.utils.exceptions import ( | |
| PDFProcessorError, | |
| PDFExtractionError, | |
| OCRError, | |
| MappingError, | |
| PDFFillingError, | |
| FileValidationError | |
| ) | |
| logger = get_logger(__name__) | |
| class ProcessingSession: | |
| """Represents a single PDF processing session.""" | |
| def __init__(self, session_id: str): | |
| self.session_id = session_id | |
| self.created_at = datetime.utcnow() | |
| self.source_pdf: Optional[Path] = None | |
| self.template_pdf: Optional[Path] = None | |
| self.output_pdf: Optional[Path] = None | |
| self.extracted_data: Optional[dict] = None | |
| self.mapped_fields: Optional[dict] = None | |
| self.status: str = "initialized" | |
| self.errors: list[str] = [] | |
| self.warnings: list[str] = [] | |
| def session_dir(self) -> Path: | |
| return settings.upload_dir / self.session_id | |
| class PDFProcessorService: | |
| """ | |
| Main service class that coordinates PDF processing. | |
| """ | |
| def __init__(self, mapping_config_path: Optional[Path] = None): | |
| self.extractor = PDFExtractor() | |
| self.filler = PDFFiller() | |
| self.mapper = FieldMapper(config_path=mapping_config_path) | |
| # Initialize OCR (may not be available) | |
| try: | |
| self.ocr = OCRProcessor() | |
| self.ocr_available = self.ocr.is_available() | |
| except Exception as e: | |
| logger.warning(f"OCR not available: {e}") | |
| self.ocr = None | |
| self.ocr_available = False | |
| # Track active sessions | |
| self.sessions: dict[str, ProcessingSession] = {} | |
| def create_session(self) -> ProcessingSession: | |
| """Create a new processing session.""" | |
| session_id = str(uuid.uuid4()) | |
| session = ProcessingSession(session_id) | |
| session.session_dir.mkdir(parents=True, exist_ok=True) | |
| self.sessions[session_id] = session | |
| logger.info(f"Created new session: {session_id}") | |
| return session | |
| def get_session(self, session_id: str) -> ProcessingSession: | |
| """Get an existing session.""" | |
| if session_id not in self.sessions: | |
| # Try to restore from disk | |
| session_dir = settings.upload_dir / session_id | |
| if session_dir.exists(): | |
| session = ProcessingSession(session_id) | |
| self.sessions[session_id] = session | |
| return session | |
| raise FileValidationError(f"Session not found: {session_id}") | |
| return self.sessions[session_id] | |
| def save_uploaded_file( | |
| self, | |
| session: ProcessingSession, | |
| file_content: bytes, | |
| filename: str, | |
| file_type: str | |
| ) -> Path: | |
| """ | |
| Save an uploaded file to the session directory. | |
| Args: | |
| session: Processing session. | |
| file_content: File content as bytes. | |
| filename: Original filename. | |
| file_type: 'source' or 'template'. | |
| Returns: | |
| Path to saved file. | |
| """ | |
| # Validate extension | |
| ext = Path(filename).suffix.lower() | |
| if ext not in settings.allowed_extensions: | |
| raise FileValidationError( | |
| f"Invalid file extension: {ext}. Allowed: {settings.allowed_extensions}" | |
| ) | |
| # Validate file size | |
| size_mb = len(file_content) / (1024 * 1024) | |
| if size_mb > settings.max_file_size_mb: | |
| raise FileValidationError( | |
| f"File too large: {size_mb:.2f}MB. Max: {settings.max_file_size_mb}MB" | |
| ) | |
| # Save file | |
| safe_filename = f"{file_type}_{filename}" | |
| file_path = session.session_dir / safe_filename | |
| file_path.write_bytes(file_content) | |
| if file_type == "source": | |
| session.source_pdf = file_path | |
| elif file_type == "template": | |
| session.template_pdf = file_path | |
| logger.info(f"Saved {file_type} file: {file_path}") | |
| return file_path | |
| def process( | |
| self, | |
| session: ProcessingSession, | |
| line_numbers: Optional[list[str]] = None, | |
| use_ocr: Optional[bool] = None, | |
| flatten: bool = False | |
| ) -> dict: | |
| """ | |
| Process the uploaded PDFs. | |
| Args: | |
| session: Processing session. | |
| line_numbers: Specific line numbers to extract. | |
| use_ocr: Force OCR processing. | |
| flatten: Flatten output PDF form. | |
| Returns: | |
| Processing result dictionary. | |
| """ | |
| if not session.source_pdf: | |
| raise FileValidationError("No source PDF uploaded") | |
| if not session.template_pdf: | |
| raise FileValidationError("No template PDF uploaded") | |
| session.status = "processing" | |
| try: | |
| # Step 1: Extract data from source PDF | |
| extracted = self._extract_data(session, line_numbers, use_ocr) | |
| session.extracted_data = extracted | |
| # Step 2: Map extracted values to form fields | |
| mapped = self.mapper.map_values(extracted["line_values"]) | |
| session.mapped_fields = mapped | |
| # Step 3: Fill the template PDF | |
| output_filename = f"filled_{session.session_id}.pdf" | |
| output_path = settings.output_dir / output_filename | |
| self.filler.fill_form( | |
| template_path=session.template_pdf, | |
| output_path=output_path, | |
| field_values=mapped, | |
| flatten=flatten | |
| ) | |
| session.output_pdf = output_path | |
| session.status = "completed" | |
| logger.info(f"Processing completed for session {session.session_id}") | |
| return { | |
| "status": "success", | |
| "extracted_data": extracted, | |
| "mapped_fields": mapped, | |
| "output_filename": output_filename | |
| } | |
| except Exception as e: | |
| session.status = "failed" | |
| session.errors.append(str(e)) | |
| logger.error(f"Processing failed for session {session.session_id}: {e}") | |
| raise | |
| def _extract_data( | |
| self, | |
| session: ProcessingSession, | |
| line_numbers: Optional[list[str]] = None, | |
| use_ocr: Optional[bool] = None | |
| ) -> dict: | |
| """Extract data from source PDF.""" | |
| # First try text extraction | |
| has_text = self.extractor.has_text_content(session.source_pdf) | |
| if has_text and not use_ocr: | |
| # Use text extraction | |
| result = self.extractor.extract_all_data(session.source_pdf, line_numbers) | |
| result["extraction_method"] = "text" | |
| elif self.ocr_available: | |
| # Use OCR | |
| logger.info("Using OCR for extraction") | |
| text = self.ocr.process_pdf(session.source_pdf) | |
| line_values = self.extractor.extract_line_values(text, line_numbers) | |
| result = { | |
| "text": text, | |
| "line_values": line_values, | |
| "has_text": False, | |
| "extraction_method": "ocr" | |
| } | |
| else: | |
| # No text and no OCR available | |
| session.warnings.append("PDF appears to be scanned but OCR is not available") | |
| result = self.extractor.extract_all_data(session.source_pdf, line_numbers) | |
| result["extraction_method"] = "text_fallback" | |
| # Add text preview (first 500 chars) | |
| if result.get("text"): | |
| result["text_preview"] = result["text"][:500] | |
| return result | |
| def get_output_path(self, session: ProcessingSession) -> Optional[Path]: | |
| """Get the output PDF path for a session.""" | |
| if session.output_pdf and session.output_pdf.exists(): | |
| return session.output_pdf | |
| return None | |
| def get_template_fields(self, session: ProcessingSession) -> dict: | |
| """Get form fields from the template PDF.""" | |
| if not session.template_pdf: | |
| raise FileValidationError("No template PDF uploaded") | |
| return self.filler.get_form_fields(session.template_pdf) | |
| def update_mapping(self, line_number: str, field_name: str, description: str = "") -> None: | |
| """Update field mapping.""" | |
| self.mapper.add_mapping(line_number, field_name, description) | |
| def get_all_mappings(self) -> dict: | |
| """Get all current mappings.""" | |
| return self.mapper.get_all_mappings() | |
| def save_mapping_config(self, path: Optional[Path] = None) -> None: | |
| """Save current mappings to config file.""" | |
| if path is None: | |
| path = settings.config_dir / settings.default_mapping_file | |
| self.mapper.save_config(path) | |
| def cleanup_session(self, session_id: str) -> None: | |
| """Clean up session files.""" | |
| if session_id in self.sessions: | |
| session = self.sessions[session_id] | |
| if session.session_dir.exists(): | |
| shutil.rmtree(session.session_dir) | |
| del self.sessions[session_id] | |
| logger.info(f"Cleaned up session: {session_id}") | |
| # Global service instance | |
| processor_service = PDFProcessorService() | |