Spaces:
Running
Running
| """ | |
| Document Processor Service | |
| Handles text extraction from various document types: | |
| - PDF (text extraction + OCR fallback) | |
| - DOCX (Word documents) | |
| - Excel (XLS, XLSX) | |
| - Images (via OCR) | |
| - Plain text (TXT, MD) | |
| """ | |
| import os | |
| import io | |
| from pathlib import Path | |
| from typing import Optional | |
| import fitz # PyMuPDF | |
| from docx import Document | |
| from pptx import Presentation | |
| from pptx.util import Inches | |
| import pandas as pd | |
| from PIL import Image | |
| from services.ocr_service import ocr_service | |
| from config import Config | |
| class DocumentProcessor: | |
| def __init__(self): | |
| self.supported_extensions = Config.ALLOWED_EXTENSIONS | |
| def get_file_type(self, filename: str) -> str: | |
| """Determine file type from extension""" | |
| ext = Path(filename).suffix.lower().lstrip('.') | |
| type_map = { | |
| 'pdf': 'pdf', | |
| 'doc': 'word', | |
| 'docx': 'word', | |
| 'ppt': 'powerpoint', | |
| 'pptx': 'powerpoint', | |
| 'xls': 'excel', | |
| 'xlsx': 'excel', | |
| 'txt': 'text', | |
| 'md': 'text', | |
| 'png': 'image', | |
| 'jpg': 'image', | |
| 'jpeg': 'image', | |
| 'gif': 'image', | |
| 'webp': 'image' | |
| } | |
| return type_map.get(ext, 'unknown') | |
| def is_supported(self, filename: str) -> bool: | |
| """Check if file type is supported""" | |
| ext = Path(filename).suffix.lower().lstrip('.') | |
| return ext in self.supported_extensions | |
| def process(self, file_path: str, filename: str) -> dict: | |
| """ | |
| Process a document and extract text | |
| Returns: {"success": bool, "text": str, "method": str, "error": str} | |
| """ | |
| file_type = self.get_file_type(filename) | |
| try: | |
| if file_type == 'pdf': | |
| return self._process_pdf(file_path) | |
| elif file_type == 'word': | |
| return self._process_word(file_path) | |
| elif file_type == 'powerpoint': | |
| return self._process_pptx(file_path) | |
| elif file_type == 'excel': | |
| return self._process_excel(file_path) | |
| elif file_type == 'image': | |
| return self._process_image(file_path) | |
| elif file_type == 'text': | |
| return self._process_text(file_path) | |
| else: | |
| return { | |
| "success": False, | |
| "error": f"Unsupported file type: {file_type}" | |
| } | |
| except Exception as e: | |
| return {"success": False, "error": str(e)} | |
| def _process_pdf(self, file_path: str) -> dict: | |
| """ | |
| Process PDF - Always use complete OpenRouter vision OCR for best accuracy | |
| """ | |
| try: | |
| doc = fitz.open(file_path) | |
| total_pages = len(doc) | |
| doc.close() | |
| print(f"Processing {total_pages} page PDF with OpenRouter vision OCR...") | |
| # Use OpenRouter vision models for OCR | |
| ocr_result = ocr_service.extract_text_from_pdf(file_path) | |
| if ocr_result['success']: | |
| print(f"PDF OCR successful") | |
| return { | |
| "success": True, | |
| "text": ocr_result['text'], | |
| "method": ocr_result.get('model', 'OpenRouter Vision OCR'), | |
| "page_count": total_pages | |
| } | |
| else: | |
| return { | |
| "success": False, | |
| "error": f"OCR failed: {ocr_result['error']}" | |
| } | |
| except Exception as e: | |
| return {"success": False, "error": f"PDF processing error: {str(e)}"} | |
| def _process_pdf_hybrid(self, file_path: str, text_pages: list, ocr_needed_pages: list) -> dict: | |
| """ | |
| Hybrid PDF processing: combine text extraction with OCR for scanned pages only | |
| Used as fallback when full PDF OCR fails | |
| """ | |
| try: | |
| doc = fitz.open(file_path) | |
| total_pages = len(doc) | |
| all_pages = {} | |
| # Add already extracted text pages | |
| for page_num, text in text_pages: | |
| all_pages[page_num] = f"--- Page {page_num + 1} ---\n{text}" | |
| # OCR the scanned pages in batches | |
| print(f"OCR processing {len(ocr_needed_pages)} scanned pages...") | |
| for i, page_num in enumerate(ocr_needed_pages): | |
| page = doc[page_num] | |
| # Render page to image | |
| mat = fitz.Matrix(2, 2) # 2x zoom for better OCR | |
| pix = page.get_pixmap(matrix=mat) | |
| temp_path = f"{file_path}_page_{page_num}.png" | |
| pix.save(temp_path) | |
| ocr_result = ocr_service.extract_text(temp_path) | |
| # Clean up temp file | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| if ocr_result['success']: | |
| all_pages[page_num] = f"--- Page {page_num + 1} (OCR) ---\n{ocr_result['text']}" | |
| else: | |
| all_pages[page_num] = f"--- Page {page_num + 1} ---\n[OCR failed: {ocr_result['error']}]" | |
| # Progress logging every 10 pages | |
| if (i + 1) % 10 == 0: | |
| print(f"OCR progress: {i + 1}/{len(ocr_needed_pages)} pages") | |
| doc.close() | |
| # Combine all pages in order | |
| text_parts = [all_pages[i] for i in sorted(all_pages.keys())] | |
| return { | |
| "success": True, | |
| "text": "\n\n".join(text_parts), | |
| "method": "hybrid (text + OCR)", | |
| "page_count": total_pages | |
| } | |
| except Exception as e: | |
| return {"success": False, "error": f"Hybrid PDF processing error: {str(e)}"} | |
| def _process_word(self, file_path: str) -> dict: | |
| """Process Word documents (DOCX)""" | |
| try: | |
| doc = Document(file_path) | |
| text_parts = [] | |
| # Extract paragraphs | |
| for para in doc.paragraphs: | |
| if para.text.strip(): | |
| text_parts.append(para.text) | |
| # Extract tables | |
| for table in doc.tables: | |
| table_text = [] | |
| for row in table.rows: | |
| row_text = [cell.text.strip() for cell in row.cells] | |
| table_text.append(" | ".join(row_text)) | |
| if table_text: | |
| text_parts.append("\n[Table]\n" + "\n".join(table_text)) | |
| return { | |
| "success": True, | |
| "text": "\n\n".join(text_parts), | |
| "method": "docx extraction" | |
| } | |
| except Exception as e: | |
| return {"success": False, "error": f"Word processing error: {str(e)}"} | |
| def _process_pptx(self, file_path: str) -> dict: | |
| """Process PowerPoint files (PPTX) - extracts all text from slides""" | |
| try: | |
| prs = Presentation(file_path) | |
| text_parts = [] | |
| slide_count = 0 | |
| for slide_num, slide in enumerate(prs.slides, 1): | |
| slide_count += 1 | |
| slide_text_parts = [] | |
| # Extract text from all shapes | |
| for shape in slide.shapes: | |
| # Text frames (text boxes, titles, etc.) | |
| if shape.has_text_frame: | |
| for paragraph in shape.text_frame.paragraphs: | |
| para_text = "" | |
| for run in paragraph.runs: | |
| para_text += run.text | |
| if para_text.strip(): | |
| slide_text_parts.append(para_text.strip()) | |
| # Tables in slides | |
| if shape.has_table: | |
| table = shape.table | |
| table_rows = [] | |
| for row in table.rows: | |
| row_cells = [] | |
| for cell in row.cells: | |
| cell_text = "" | |
| for paragraph in cell.text_frame.paragraphs: | |
| for run in paragraph.runs: | |
| cell_text += run.text | |
| row_cells.append(cell_text.strip()) | |
| table_rows.append(" | ".join(row_cells)) | |
| if table_rows: | |
| slide_text_parts.append("[Table]\n" + "\n".join(table_rows)) | |
| # Speaker notes | |
| if slide.has_notes_slide: | |
| notes_frame = slide.notes_slide.notes_text_frame | |
| if notes_frame: | |
| notes_text = "" | |
| for paragraph in notes_frame.paragraphs: | |
| for run in paragraph.runs: | |
| notes_text += run.text | |
| if notes_text.strip(): | |
| slide_text_parts.append(f"[Speaker Notes]\n{notes_text.strip()}") | |
| if slide_text_parts: | |
| text_parts.append(f"--- Slide {slide_num} ---\n" + "\n".join(slide_text_parts)) | |
| if not text_parts: | |
| return { | |
| "success": False, | |
| "error": "No text content found in PowerPoint file" | |
| } | |
| return { | |
| "success": True, | |
| "text": "\n\n".join(text_parts), | |
| "method": "pptx extraction", | |
| "slide_count": slide_count | |
| } | |
| except Exception as e: | |
| return {"success": False, "error": f"PowerPoint processing error: {str(e)}"} | |
| def _process_excel(self, file_path: str) -> dict: | |
| """Process Excel files""" | |
| try: | |
| # Read all sheets | |
| excel_file = pd.ExcelFile(file_path) | |
| text_parts = [] | |
| for sheet_name in excel_file.sheet_names: | |
| df = pd.read_excel(excel_file, sheet_name=sheet_name) | |
| if not df.empty: | |
| # Convert to string representation | |
| sheet_text = f"=== Sheet: {sheet_name} ===\n" | |
| sheet_text += df.to_string(index=False) | |
| text_parts.append(sheet_text) | |
| return { | |
| "success": True, | |
| "text": "\n\n".join(text_parts), | |
| "method": "excel extraction", | |
| "sheet_count": len(excel_file.sheet_names) | |
| } | |
| except Exception as e: | |
| return {"success": False, "error": f"Excel processing error: {str(e)}"} | |
| def _process_image(self, file_path: str) -> dict: | |
| """Process images using OCR""" | |
| result = ocr_service.extract_text(file_path) | |
| if result['success']: | |
| return { | |
| "success": True, | |
| "text": result['text'], | |
| "method": f"OCR ({result.get('model', 'unknown')})" | |
| } | |
| else: | |
| return {"success": False, "error": result['error']} | |
| def _process_text(self, file_path: str) -> dict: | |
| """Process plain text files""" | |
| try: | |
| # Try different encodings | |
| encodings = ['utf-8', 'latin-1', 'cp1252'] | |
| for encoding in encodings: | |
| try: | |
| with open(file_path, 'r', encoding=encoding) as f: | |
| text = f.read() | |
| return { | |
| "success": True, | |
| "text": text, | |
| "method": f"text read ({encoding})" | |
| } | |
| except UnicodeDecodeError: | |
| continue | |
| return {"success": False, "error": "Could not decode text file"} | |
| except Exception as e: | |
| return {"success": False, "error": f"Text processing error: {str(e)}"} | |
| # Singleton instance | |
| document_processor = DocumentProcessor() | |