SuoMoto.AI / utils /document_processor.py
cryogenic22's picture
Update utils/document_processor.py
289cd09 verified
# utils/document_processor.py
import pytesseract
from pdf2image import convert_from_path
import docx
import fitz # PyMuPDF
from PIL import Image
import io
from typing import List, Dict, Optional, Union, Any
import re
import tempfile
import os
import streamlit as st
class DocumentProcessor:
def __init__(self):
self.supported_formats = {
'pdf': self._process_pdf,
'docx': self._process_docx,
'txt': self._process_text,
'jpg': self._process_image,
'jpeg': self._process_image,
'png': self._process_image
}
def process_document(self, uploaded_file: Any) -> str:
"""Process uploaded document and extract text"""
try:
# Get file extension
file_extension = uploaded_file.name.split('.')[-1].lower()
if file_extension not in self.supported_formats:
raise ValueError(f"Unsupported file format: {file_extension}")
# Create a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=f'.{file_extension}') as tmp_file:
# Write the uploaded file's content to the temporary file
tmp_file.write(uploaded_file.getbuffer())
tmp_file.flush()
# Process the temporary file
processor = self.supported_formats[file_extension]
text = processor(tmp_file.name)
# Clean up
os.unlink(tmp_file.name)
return self._clean_text(text)
except Exception as e:
st.error(f"Error processing document: {str(e)}")
return ""
def _process_pdf(self, file_path: str) -> str:
"""Process PDF files"""
try:
# Open PDF file
with fitz.open(file_path) as doc:
text = ""
for page_num in range(len(doc)):
page = doc[page_num]
text += page.get_text()
return text
except Exception as e:
st.error(f"Error processing PDF: {str(e)}")
return ""
def _process_docx(self, file_path: str) -> str:
"""Process DOCX files"""
try:
doc = docx.Document(file_path)
text = []
# Get paragraphs
for para in doc.paragraphs:
text.append(para.text)
# Get tables
for table in doc.tables:
for row in table.rows:
text.append(" | ".join(cell.text for cell in row.cells))
return "\n\n".join(text)
except Exception as e:
st.error(f"Error processing DOCX: {str(e)}")
return ""
def _process_text(self, file_path: str) -> str:
"""Process text files"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
except UnicodeDecodeError:
# Try different encodings
for encoding in ['latin-1', 'iso-8859-1', 'cp1252']:
try:
with open(file_path, 'r', encoding=encoding) as file:
return file.read()
except:
continue
return ""
except Exception as e:
st.error(f"Error processing text file: {str(e)}")
return ""
def _process_image(self, file_path: str) -> str:
"""Process image files"""
try:
image = Image.open(file_path)
if image.mode != 'RGB':
image = image.convert('RGB')
return pytesseract.image_to_string(image)
except Exception as e:
st.error(f"Error processing image: {str(e)}")
return ""
def _clean_text(self, text: str) -> str:
"""Clean and normalize text"""
if not text:
return ""
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text)
# Remove special characters but keep basic punctuation
text = re.sub(r'[^\w\s.,!?-]', '', text)
# Split into lines and remove empty ones
lines = [line.strip() for line in text.split('\n') if line.strip()]
return '\n'.join(lines)
def chunk_document(self, text: str, chunk_size: int = 2000) -> List[Dict]:
"""Split document into chunks"""
if not text:
return []
# Split into paragraphs
paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
chunks = []
current_chunk = ""
for para in paragraphs:
if len(current_chunk) + len(para) > chunk_size and current_chunk:
chunks.append({
"text": current_chunk,
"metadata": {
"length": len(current_chunk),
"type": "paragraph"
}
})
current_chunk = para
else:
current_chunk += "\n\n" + para if current_chunk else para
if current_chunk:
chunks.append({
"text": current_chunk,
"metadata": {
"length": len(current_chunk),
"type": "paragraph"
}
})
return chunks
def get_document_metadata(self, file_path: str) -> Dict:
"""
Extract metadata from document
"""
try:
file_extension = file_path.split('.')[-1].lower()
file_size = os.path.getsize(file_path)
created_time = os.path.getctime(file_path)
modified_time = os.path.getmtime(file_path)
metadata = {
"filename": os.path.basename(file_path),
"file_type": file_extension,
"file_size": file_size,
"created_time": created_time,
"modified_time": modified_time
}
# Add format-specific metadata
if file_extension == 'pdf':
doc = fitz.open(file_path)
metadata.update({
"page_count": doc.page_count,
"pdf_metadata": doc.metadata
})
elif file_extension == 'docx':
doc = docx.Document(file_path)
metadata.update({
"paragraph_count": len(doc.paragraphs),
"table_count": len(doc.tables)
})
return metadata
except Exception as e:
print(f"Error extracting metadata: {str(e)}")
return {
"filename": os.path.basename(file_path),
"error": str(e)
}