Spaces:
Sleeping
Sleeping
| # utils/document_processor.py | |
| import pytesseract | |
| from pdf2image import convert_from_path | |
| import docx | |
| import fitz # PyMuPDF | |
| from PIL import Image | |
| import io | |
| from typing import List, Dict, Optional, Union, Any | |
| import re | |
| import tempfile | |
| import os | |
| import streamlit as st | |
| class DocumentProcessor: | |
| def __init__(self): | |
| self.supported_formats = { | |
| 'pdf': self._process_pdf, | |
| 'docx': self._process_docx, | |
| 'txt': self._process_text, | |
| 'jpg': self._process_image, | |
| 'jpeg': self._process_image, | |
| 'png': self._process_image | |
| } | |
| def process_document(self, uploaded_file: Any) -> str: | |
| """Process uploaded document and extract text""" | |
| try: | |
| # Get file extension | |
| file_extension = uploaded_file.name.split('.')[-1].lower() | |
| if file_extension not in self.supported_formats: | |
| raise ValueError(f"Unsupported file format: {file_extension}") | |
| # Create a temporary file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=f'.{file_extension}') as tmp_file: | |
| # Write the uploaded file's content to the temporary file | |
| tmp_file.write(uploaded_file.getbuffer()) | |
| tmp_file.flush() | |
| # Process the temporary file | |
| processor = self.supported_formats[file_extension] | |
| text = processor(tmp_file.name) | |
| # Clean up | |
| os.unlink(tmp_file.name) | |
| return self._clean_text(text) | |
| except Exception as e: | |
| st.error(f"Error processing document: {str(e)}") | |
| return "" | |
| def _process_pdf(self, file_path: str) -> str: | |
| """Process PDF files""" | |
| try: | |
| # Open PDF file | |
| with fitz.open(file_path) as doc: | |
| text = "" | |
| for page_num in range(len(doc)): | |
| page = doc[page_num] | |
| text += page.get_text() | |
| return text | |
| except Exception as e: | |
| st.error(f"Error processing PDF: {str(e)}") | |
| return "" | |
| def _process_docx(self, file_path: str) -> str: | |
| """Process DOCX files""" | |
| try: | |
| doc = docx.Document(file_path) | |
| text = [] | |
| # Get paragraphs | |
| for para in doc.paragraphs: | |
| text.append(para.text) | |
| # Get tables | |
| for table in doc.tables: | |
| for row in table.rows: | |
| text.append(" | ".join(cell.text for cell in row.cells)) | |
| return "\n\n".join(text) | |
| except Exception as e: | |
| st.error(f"Error processing DOCX: {str(e)}") | |
| return "" | |
| def _process_text(self, file_path: str) -> str: | |
| """Process text files""" | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as file: | |
| return file.read() | |
| except UnicodeDecodeError: | |
| # Try different encodings | |
| for encoding in ['latin-1', 'iso-8859-1', 'cp1252']: | |
| try: | |
| with open(file_path, 'r', encoding=encoding) as file: | |
| return file.read() | |
| except: | |
| continue | |
| return "" | |
| except Exception as e: | |
| st.error(f"Error processing text file: {str(e)}") | |
| return "" | |
| def _process_image(self, file_path: str) -> str: | |
| """Process image files""" | |
| try: | |
| image = Image.open(file_path) | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| return pytesseract.image_to_string(image) | |
| except Exception as e: | |
| st.error(f"Error processing image: {str(e)}") | |
| return "" | |
| def _clean_text(self, text: str) -> str: | |
| """Clean and normalize text""" | |
| if not text: | |
| return "" | |
| # Remove excessive whitespace | |
| text = re.sub(r'\s+', ' ', text) | |
| # Remove special characters but keep basic punctuation | |
| text = re.sub(r'[^\w\s.,!?-]', '', text) | |
| # Split into lines and remove empty ones | |
| lines = [line.strip() for line in text.split('\n') if line.strip()] | |
| return '\n'.join(lines) | |
| def chunk_document(self, text: str, chunk_size: int = 2000) -> List[Dict]: | |
| """Split document into chunks""" | |
| if not text: | |
| return [] | |
| # Split into paragraphs | |
| paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()] | |
| chunks = [] | |
| current_chunk = "" | |
| for para in paragraphs: | |
| if len(current_chunk) + len(para) > chunk_size and current_chunk: | |
| chunks.append({ | |
| "text": current_chunk, | |
| "metadata": { | |
| "length": len(current_chunk), | |
| "type": "paragraph" | |
| } | |
| }) | |
| current_chunk = para | |
| else: | |
| current_chunk += "\n\n" + para if current_chunk else para | |
| if current_chunk: | |
| chunks.append({ | |
| "text": current_chunk, | |
| "metadata": { | |
| "length": len(current_chunk), | |
| "type": "paragraph" | |
| } | |
| }) | |
| return chunks | |
| def get_document_metadata(self, file_path: str) -> Dict: | |
| """ | |
| Extract metadata from document | |
| """ | |
| try: | |
| file_extension = file_path.split('.')[-1].lower() | |
| file_size = os.path.getsize(file_path) | |
| created_time = os.path.getctime(file_path) | |
| modified_time = os.path.getmtime(file_path) | |
| metadata = { | |
| "filename": os.path.basename(file_path), | |
| "file_type": file_extension, | |
| "file_size": file_size, | |
| "created_time": created_time, | |
| "modified_time": modified_time | |
| } | |
| # Add format-specific metadata | |
| if file_extension == 'pdf': | |
| doc = fitz.open(file_path) | |
| metadata.update({ | |
| "page_count": doc.page_count, | |
| "pdf_metadata": doc.metadata | |
| }) | |
| elif file_extension == 'docx': | |
| doc = docx.Document(file_path) | |
| metadata.update({ | |
| "paragraph_count": len(doc.paragraphs), | |
| "table_count": len(doc.tables) | |
| }) | |
| return metadata | |
| except Exception as e: | |
| print(f"Error extracting metadata: {str(e)}") | |
| return { | |
| "filename": os.path.basename(file_path), | |
| "error": str(e) | |
| } |