Spaces:
Build error
Build error
Update utils/document_processor.py
Browse files- utils/document_processor.py +30 -39
utils/document_processor.py
CHANGED
|
@@ -1,18 +1,11 @@
|
|
| 1 |
-
import
|
| 2 |
-
import pytesseract
|
| 3 |
-
from pytesseract import Output
|
| 4 |
-
from PIL import Image
|
| 5 |
import pypdf
|
| 6 |
-
from pdf2image import convert_from_bytes
|
| 7 |
import docx
|
| 8 |
-
from
|
| 9 |
-
import
|
| 10 |
-
|
| 11 |
|
| 12 |
class DocumentProcessor:
|
| 13 |
-
def __init__(self):
|
| 14 |
-
pass
|
| 15 |
-
|
| 16 |
def process_document(self, file) -> Tuple[str, List[Dict]]:
|
| 17 |
"""Process a document and return its text and chunks."""
|
| 18 |
file_type = file.name.split(".")[-1].lower()
|
|
@@ -21,7 +14,7 @@ class DocumentProcessor:
|
|
| 21 |
elif file_type == "docx":
|
| 22 |
text = self._process_docx(file)
|
| 23 |
elif file_type in ["txt", "csv"]:
|
| 24 |
-
text =
|
| 25 |
else:
|
| 26 |
raise ValueError(f"Unsupported file type: {file_type}")
|
| 27 |
|
|
@@ -30,41 +23,39 @@ class DocumentProcessor:
|
|
| 30 |
|
| 31 |
def _process_pdf(self, file) -> str:
|
| 32 |
"""Extract text from a PDF, including OCR for scanned PDFs."""
|
| 33 |
-
|
| 34 |
-
|
| 35 |
-
|
| 36 |
-
|
| 37 |
-
|
| 38 |
-
|
| 39 |
-
|
| 40 |
-
|
| 41 |
-
|
| 42 |
-
|
| 43 |
-
|
| 44 |
-
return text
|
| 45 |
-
except Exception as e:
|
| 46 |
-
st.error(f"Error processing PDF: {e}")
|
| 47 |
-
return ""
|
| 48 |
|
| 49 |
def _perform_ocr(self, pdf_bytes: bytes) -> str:
|
| 50 |
"""Perform OCR on scanned PDF pages."""
|
| 51 |
-
|
| 52 |
-
|
| 53 |
-
|
| 54 |
-
|
| 55 |
-
|
| 56 |
-
return text
|
| 57 |
-
except Exception as e:
|
| 58 |
-
st.error(f"Error performing OCR: {e}")
|
| 59 |
-
return ""
|
| 60 |
|
| 61 |
def _process_docx(self, file) -> str:
|
| 62 |
"""Extract text from DOCX files."""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 63 |
try:
|
| 64 |
-
|
| 65 |
-
|
|
|
|
| 66 |
except Exception as e:
|
| 67 |
-
st.error(f"Error processing
|
| 68 |
return ""
|
| 69 |
|
| 70 |
def _chunk_text(self, text: str, chunk_size: int = 500) -> List[Dict]:
|
|
|
|
| 1 |
+
import chardet
|
|
|
|
|
|
|
|
|
|
| 2 |
import pypdf
|
|
|
|
| 3 |
import docx
|
| 4 |
+
from pdf2image import convert_from_bytes
|
| 5 |
+
import pytesseract
|
| 6 |
+
from PIL import Image
|
| 7 |
|
| 8 |
class DocumentProcessor:
|
|
|
|
|
|
|
|
|
|
| 9 |
def process_document(self, file) -> Tuple[str, List[Dict]]:
|
| 10 |
"""Process a document and return its text and chunks."""
|
| 11 |
file_type = file.name.split(".")[-1].lower()
|
|
|
|
| 14 |
elif file_type == "docx":
|
| 15 |
text = self._process_docx(file)
|
| 16 |
elif file_type in ["txt", "csv"]:
|
| 17 |
+
text = self._process_text(file)
|
| 18 |
else:
|
| 19 |
raise ValueError(f"Unsupported file type: {file_type}")
|
| 20 |
|
|
|
|
| 23 |
|
| 24 |
def _process_pdf(self, file) -> str:
|
| 25 |
"""Extract text from a PDF, including OCR for scanned PDFs."""
|
| 26 |
+
reader = pypdf.PdfReader(file)
|
| 27 |
+
text = ""
|
| 28 |
+
for page in reader.pages:
|
| 29 |
+
page_text = page.extract_text()
|
| 30 |
+
if page_text:
|
| 31 |
+
text += page_text
|
| 32 |
+
else:
|
| 33 |
+
st.warning("Detected a scanned PDF. Performing OCR...")
|
| 34 |
+
pdf_bytes = file.read()
|
| 35 |
+
text += self._perform_ocr(pdf_bytes)
|
| 36 |
+
return text
|
|
|
|
|
|
|
|
|
|
|
|
|
| 37 |
|
| 38 |
def _perform_ocr(self, pdf_bytes: bytes) -> str:
|
| 39 |
"""Perform OCR on scanned PDF pages."""
|
| 40 |
+
images = convert_from_bytes(pdf_bytes)
|
| 41 |
+
text = ""
|
| 42 |
+
for image in images:
|
| 43 |
+
text += pytesseract.image_to_string(image, config="--psm 6")
|
| 44 |
+
return text
|
|
|
|
|
|
|
|
|
|
|
|
|
| 45 |
|
| 46 |
def _process_docx(self, file) -> str:
|
| 47 |
"""Extract text from DOCX files."""
|
| 48 |
+
doc = docx.Document(file)
|
| 49 |
+
return "\n".join(para.text for para in doc.paragraphs)
|
| 50 |
+
|
| 51 |
+
def _process_text(self, file) -> str:
|
| 52 |
+
"""Process plain text files with unknown encoding."""
|
| 53 |
try:
|
| 54 |
+
raw_data = file.read()
|
| 55 |
+
detected_encoding = chardet.detect(raw_data)["encoding"]
|
| 56 |
+
return raw_data.decode(detected_encoding)
|
| 57 |
except Exception as e:
|
| 58 |
+
st.error(f"Error processing text file: {e}")
|
| 59 |
return ""
|
| 60 |
|
| 61 |
def _chunk_text(self, text: str, chunk_size: int = 500) -> List[Dict]:
|