""" Document Processor Service Handles text extraction from various document types: - PDF (text extraction + OCR fallback) - DOCX (Word documents) - Excel (XLS, XLSX) - Images (via OCR) - Plain text (TXT, MD) """ import os import io from pathlib import Path from typing import Optional import fitz # PyMuPDF from docx import Document from pptx import Presentation from pptx.util import Inches import pandas as pd from PIL import Image from services.ocr_service import ocr_service from config import Config class DocumentProcessor: def __init__(self): self.supported_extensions = Config.ALLOWED_EXTENSIONS def get_file_type(self, filename: str) -> str: """Determine file type from extension""" ext = Path(filename).suffix.lower().lstrip('.') type_map = { 'pdf': 'pdf', 'doc': 'word', 'docx': 'word', 'ppt': 'powerpoint', 'pptx': 'powerpoint', 'xls': 'excel', 'xlsx': 'excel', 'txt': 'text', 'md': 'text', 'png': 'image', 'jpg': 'image', 'jpeg': 'image', 'gif': 'image', 'webp': 'image' } return type_map.get(ext, 'unknown') def is_supported(self, filename: str) -> bool: """Check if file type is supported""" ext = Path(filename).suffix.lower().lstrip('.') return ext in self.supported_extensions def process(self, file_path: str, filename: str) -> dict: """ Process a document and extract text Returns: {"success": bool, "text": str, "method": str, "error": str} """ file_type = self.get_file_type(filename) try: if file_type == 'pdf': return self._process_pdf(file_path) elif file_type == 'word': return self._process_word(file_path) elif file_type == 'powerpoint': return self._process_pptx(file_path) elif file_type == 'excel': return self._process_excel(file_path) elif file_type == 'image': return self._process_image(file_path) elif file_type == 'text': return self._process_text(file_path) else: return { "success": False, "error": f"Unsupported file type: {file_type}" } except Exception as e: return {"success": False, "error": str(e)} def _process_pdf(self, file_path: str) -> dict: """ Process PDF - Always use complete OpenRouter vision OCR for best accuracy """ try: doc = fitz.open(file_path) total_pages = len(doc) doc.close() print(f"Processing {total_pages} page PDF with OpenRouter vision OCR...") # Use OpenRouter vision models for OCR ocr_result = ocr_service.extract_text_from_pdf(file_path) if ocr_result['success']: print(f"PDF OCR successful") return { "success": True, "text": ocr_result['text'], "method": ocr_result.get('model', 'OpenRouter Vision OCR'), "page_count": total_pages } else: return { "success": False, "error": f"OCR failed: {ocr_result['error']}" } except Exception as e: return {"success": False, "error": f"PDF processing error: {str(e)}"} def _process_pdf_hybrid(self, file_path: str, text_pages: list, ocr_needed_pages: list) -> dict: """ Hybrid PDF processing: combine text extraction with OCR for scanned pages only Used as fallback when full PDF OCR fails """ try: doc = fitz.open(file_path) total_pages = len(doc) all_pages = {} # Add already extracted text pages for page_num, text in text_pages: all_pages[page_num] = f"--- Page {page_num + 1} ---\n{text}" # OCR the scanned pages in batches print(f"OCR processing {len(ocr_needed_pages)} scanned pages...") for i, page_num in enumerate(ocr_needed_pages): page = doc[page_num] # Render page to image mat = fitz.Matrix(2, 2) # 2x zoom for better OCR pix = page.get_pixmap(matrix=mat) temp_path = f"{file_path}_page_{page_num}.png" pix.save(temp_path) ocr_result = ocr_service.extract_text(temp_path) # Clean up temp file if os.path.exists(temp_path): os.remove(temp_path) if ocr_result['success']: all_pages[page_num] = f"--- Page {page_num + 1} (OCR) ---\n{ocr_result['text']}" else: all_pages[page_num] = f"--- Page {page_num + 1} ---\n[OCR failed: {ocr_result['error']}]" # Progress logging every 10 pages if (i + 1) % 10 == 0: print(f"OCR progress: {i + 1}/{len(ocr_needed_pages)} pages") doc.close() # Combine all pages in order text_parts = [all_pages[i] for i in sorted(all_pages.keys())] return { "success": True, "text": "\n\n".join(text_parts), "method": "hybrid (text + OCR)", "page_count": total_pages } except Exception as e: return {"success": False, "error": f"Hybrid PDF processing error: {str(e)}"} def _process_word(self, file_path: str) -> dict: """Process Word documents (DOCX)""" try: doc = Document(file_path) text_parts = [] # Extract paragraphs for para in doc.paragraphs: if para.text.strip(): text_parts.append(para.text) # Extract tables for table in doc.tables: table_text = [] for row in table.rows: row_text = [cell.text.strip() for cell in row.cells] table_text.append(" | ".join(row_text)) if table_text: text_parts.append("\n[Table]\n" + "\n".join(table_text)) return { "success": True, "text": "\n\n".join(text_parts), "method": "docx extraction" } except Exception as e: return {"success": False, "error": f"Word processing error: {str(e)}"} def _process_pptx(self, file_path: str) -> dict: """Process PowerPoint files (PPTX) - extracts all text from slides""" try: prs = Presentation(file_path) text_parts = [] slide_count = 0 for slide_num, slide in enumerate(prs.slides, 1): slide_count += 1 slide_text_parts = [] # Extract text from all shapes for shape in slide.shapes: # Text frames (text boxes, titles, etc.) if shape.has_text_frame: for paragraph in shape.text_frame.paragraphs: para_text = "" for run in paragraph.runs: para_text += run.text if para_text.strip(): slide_text_parts.append(para_text.strip()) # Tables in slides if shape.has_table: table = shape.table table_rows = [] for row in table.rows: row_cells = [] for cell in row.cells: cell_text = "" for paragraph in cell.text_frame.paragraphs: for run in paragraph.runs: cell_text += run.text row_cells.append(cell_text.strip()) table_rows.append(" | ".join(row_cells)) if table_rows: slide_text_parts.append("[Table]\n" + "\n".join(table_rows)) # Speaker notes if slide.has_notes_slide: notes_frame = slide.notes_slide.notes_text_frame if notes_frame: notes_text = "" for paragraph in notes_frame.paragraphs: for run in paragraph.runs: notes_text += run.text if notes_text.strip(): slide_text_parts.append(f"[Speaker Notes]\n{notes_text.strip()}") if slide_text_parts: text_parts.append(f"--- Slide {slide_num} ---\n" + "\n".join(slide_text_parts)) if not text_parts: return { "success": False, "error": "No text content found in PowerPoint file" } return { "success": True, "text": "\n\n".join(text_parts), "method": "pptx extraction", "slide_count": slide_count } except Exception as e: return {"success": False, "error": f"PowerPoint processing error: {str(e)}"} def _process_excel(self, file_path: str) -> dict: """Process Excel files""" try: # Read all sheets excel_file = pd.ExcelFile(file_path) text_parts = [] for sheet_name in excel_file.sheet_names: df = pd.read_excel(excel_file, sheet_name=sheet_name) if not df.empty: # Convert to string representation sheet_text = f"=== Sheet: {sheet_name} ===\n" sheet_text += df.to_string(index=False) text_parts.append(sheet_text) return { "success": True, "text": "\n\n".join(text_parts), "method": "excel extraction", "sheet_count": len(excel_file.sheet_names) } except Exception as e: return {"success": False, "error": f"Excel processing error: {str(e)}"} def _process_image(self, file_path: str) -> dict: """Process images using OCR""" result = ocr_service.extract_text(file_path) if result['success']: return { "success": True, "text": result['text'], "method": f"OCR ({result.get('model', 'unknown')})" } else: return {"success": False, "error": result['error']} def _process_text(self, file_path: str) -> dict: """Process plain text files""" try: # Try different encodings encodings = ['utf-8', 'latin-1', 'cp1252'] for encoding in encodings: try: with open(file_path, 'r', encoding=encoding) as f: text = f.read() return { "success": True, "text": text, "method": f"text read ({encoding})" } except UnicodeDecodeError: continue return {"success": False, "error": "Could not decode text file"} except Exception as e: return {"success": False, "error": f"Text processing error: {str(e)}"} # Singleton instance document_processor = DocumentProcessor()