# utils/document_processor.py import pytesseract from pdf2image import convert_from_path import docx import fitz # PyMuPDF from PIL import Image import io from typing import List, Dict, Optional, Union, Any import re import tempfile import os import streamlit as st class DocumentProcessor: def __init__(self): self.supported_formats = { 'pdf': self._process_pdf, 'docx': self._process_docx, 'txt': self._process_text, 'jpg': self._process_image, 'jpeg': self._process_image, 'png': self._process_image } def process_document(self, uploaded_file: Any) -> str: """Process uploaded document and extract text""" try: # Get file extension file_extension = uploaded_file.name.split('.')[-1].lower() if file_extension not in self.supported_formats: raise ValueError(f"Unsupported file format: {file_extension}") # Create a temporary file with tempfile.NamedTemporaryFile(delete=False, suffix=f'.{file_extension}') as tmp_file: # Write the uploaded file's content to the temporary file tmp_file.write(uploaded_file.getbuffer()) tmp_file.flush() # Process the temporary file processor = self.supported_formats[file_extension] text = processor(tmp_file.name) # Clean up os.unlink(tmp_file.name) return self._clean_text(text) except Exception as e: st.error(f"Error processing document: {str(e)}") return "" def _process_pdf(self, file_path: str) -> str: """Process PDF files""" try: # Open PDF file with fitz.open(file_path) as doc: text = "" for page_num in range(len(doc)): page = doc[page_num] text += page.get_text() return text except Exception as e: st.error(f"Error processing PDF: {str(e)}") return "" def _process_docx(self, file_path: str) -> str: """Process DOCX files""" try: doc = docx.Document(file_path) text = [] # Get paragraphs for para in doc.paragraphs: text.append(para.text) # Get tables for table in doc.tables: for row in table.rows: text.append(" | ".join(cell.text for cell in row.cells)) return "\n\n".join(text) except Exception as e: st.error(f"Error processing DOCX: {str(e)}") return "" def _process_text(self, file_path: str) -> str: """Process text files""" try: with open(file_path, 'r', encoding='utf-8') as file: return file.read() except UnicodeDecodeError: # Try different encodings for encoding in ['latin-1', 'iso-8859-1', 'cp1252']: try: with open(file_path, 'r', encoding=encoding) as file: return file.read() except: continue return "" except Exception as e: st.error(f"Error processing text file: {str(e)}") return "" def _process_image(self, file_path: str) -> str: """Process image files""" try: image = Image.open(file_path) if image.mode != 'RGB': image = image.convert('RGB') return pytesseract.image_to_string(image) except Exception as e: st.error(f"Error processing image: {str(e)}") return "" def _clean_text(self, text: str) -> str: """Clean and normalize text""" if not text: return "" # Remove excessive whitespace text = re.sub(r'\s+', ' ', text) # Remove special characters but keep basic punctuation text = re.sub(r'[^\w\s.,!?-]', '', text) # Split into lines and remove empty ones lines = [line.strip() for line in text.split('\n') if line.strip()] return '\n'.join(lines) def chunk_document(self, text: str, chunk_size: int = 2000) -> List[Dict]: """Split document into chunks""" if not text: return [] # Split into paragraphs paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()] chunks = [] current_chunk = "" for para in paragraphs: if len(current_chunk) + len(para) > chunk_size and current_chunk: chunks.append({ "text": current_chunk, "metadata": { "length": len(current_chunk), "type": "paragraph" } }) current_chunk = para else: current_chunk += "\n\n" + para if current_chunk else para if current_chunk: chunks.append({ "text": current_chunk, "metadata": { "length": len(current_chunk), "type": "paragraph" } }) return chunks def get_document_metadata(self, file_path: str) -> Dict: """ Extract metadata from document """ try: file_extension = file_path.split('.')[-1].lower() file_size = os.path.getsize(file_path) created_time = os.path.getctime(file_path) modified_time = os.path.getmtime(file_path) metadata = { "filename": os.path.basename(file_path), "file_type": file_extension, "file_size": file_size, "created_time": created_time, "modified_time": modified_time } # Add format-specific metadata if file_extension == 'pdf': doc = fitz.open(file_path) metadata.update({ "page_count": doc.page_count, "pdf_metadata": doc.metadata }) elif file_extension == 'docx': doc = docx.Document(file_path) metadata.update({ "paragraph_count": len(doc.paragraphs), "table_count": len(doc.tables) }) return metadata except Exception as e: print(f"Error extracting metadata: {str(e)}") return { "filename": os.path.basename(file_path), "error": str(e) }