cryogenic22 commited on
Commit
82781b0
·
verified ·
1 Parent(s): 9c6318f

Update utils/document_processor.py

Browse files
Files changed (1) hide show
  1. utils/document_processor.py +51 -85
utils/document_processor.py CHANGED
@@ -1,107 +1,73 @@
1
- # utils/document_processor.py
2
- import streamlit as st
3
- import fitz
4
- import docx
5
- from typing import List, Dict, Tuple
6
- import re
7
- import io
8
  from PIL import Image
 
 
 
 
 
 
9
 
10
  class DocumentProcessor:
11
  def __init__(self):
12
- pass # Removed embedder dependency for simpler processing
13
 
14
  def process_document(self, file) -> Tuple[str, List[Dict]]:
15
- """Process document and return text and chunks"""
16
- try:
17
- # Extract text based on file type
18
- file_type = file.name.split('.')[-1].lower()
19
- if file_type == 'pdf':
20
- text = self._process_pdf(file)
21
- elif file_type == 'docx':
22
- text = self._process_docx(file)
23
- else:
24
- text = self._process_text(file)
25
-
26
- # Create chunks
27
- chunks = self._create_chunks(text)
28
- return text, chunks
29
- except Exception as e:
30
- st.error(f"Error processing document: {str(e)}")
31
- return "", []
32
 
33
  def _process_pdf(self, file) -> str:
34
- """Process PDF file"""
35
  try:
36
- pdf_bytes = file.getvalue()
37
- doc = fitz.open(stream=pdf_bytes, filetype="pdf")
38
  text = ""
39
- for page in doc:
40
- text += page.get_text()
 
 
 
 
 
 
41
  return text
42
  except Exception as e:
43
- st.error(f"Error processing PDF: {str(e)}")
44
  return ""
45
 
46
- def _process_docx(self, file) -> str:
47
- """Process DOCX file"""
48
  try:
49
- doc = docx.Document(io.BytesIO(file.getvalue()))
50
- text = []
51
- for para in doc.paragraphs:
52
- text.append(para.text)
53
- return "\n".join(text)
54
  except Exception as e:
55
- st.error(f"Error processing DOCX: {str(e)}")
56
  return ""
57
 
58
- def _process_text(self, file) -> str:
59
- """Process text file"""
60
  try:
61
- return file.getvalue().decode('utf-8')
 
62
  except Exception as e:
63
- st.error(f"Error processing text file: {str(e)}")
64
  return ""
65
 
66
- def _create_chunks(self, text: str, chunk_size: int = 1000) -> List[Dict]:
67
- """Create chunks from text"""
68
- if not text:
69
- return []
70
-
71
- # Split into paragraphs
72
- paragraphs = [p.strip() for p in text.split('\n') if p.strip()]
73
-
74
- chunks = []
75
- current_chunk = ""
76
-
77
- for para in paragraphs:
78
- if len(current_chunk) + len(para) > chunk_size and current_chunk:
79
- chunks.append({
80
- "text": current_chunk,
81
- "metadata": {
82
- "length": len(current_chunk)
83
- }
84
- })
85
- current_chunk = para
86
- else:
87
- current_chunk += "\n" + para if current_chunk else para
88
-
89
- if current_chunk:
90
- chunks.append({
91
- "text": current_chunk,
92
- "metadata": {
93
- "length": len(current_chunk)
94
- }
95
- })
96
-
97
- return chunks
98
-
99
- def _create_chunk_dict(self, text: str) -> Dict:
100
- """Create a chunk dictionary with metadata"""
101
- return {
102
- "text": text,
103
- "metadata": {
104
- "length": len(text),
105
- "embedding": self.embedder.encode(text).tolist()
106
- }
107
- }
 
1
+ import os
2
+ import pytesseract
3
+ from pytesseract import Output
 
 
 
 
4
  from PIL import Image
5
+ import pypdf
6
+ from pdf2image import convert_from_bytes
7
+ import docx
8
+ from typing import Tuple, List, Dict
9
+ import streamlit as st
10
+
11
 
12
  class DocumentProcessor:
13
  def __init__(self):
14
+ pass
15
 
16
  def process_document(self, file) -> Tuple[str, List[Dict]]:
17
+ """Process a document and return its text and chunks."""
18
+ file_type = file.name.split(".")[-1].lower()
19
+ if file_type == "pdf":
20
+ text = self._process_pdf(file)
21
+ elif file_type == "docx":
22
+ text = self._process_docx(file)
23
+ elif file_type in ["txt", "csv"]:
24
+ text = file.read().decode("utf-8")
25
+ else:
26
+ raise ValueError(f"Unsupported file type: {file_type}")
27
+
28
+ chunks = self._chunk_text(text)
29
+ return text, chunks
 
 
 
 
30
 
31
  def _process_pdf(self, file) -> str:
32
+ """Extract text from a PDF, including OCR for scanned PDFs."""
33
  try:
34
+ reader = pypdf.PdfReader(file)
 
35
  text = ""
36
+ for page in reader.pages:
37
+ page_text = page.extract_text()
38
+ if not page_text.strip(): # Fallback to OCR if text is empty
39
+ st.warning("Detected a scanned PDF. Performing OCR...")
40
+ pdf_bytes = file.read()
41
+ text += self._perform_ocr(pdf_bytes)
42
+ else:
43
+ text += page_text
44
  return text
45
  except Exception as e:
46
+ st.error(f"Error processing PDF: {e}")
47
  return ""
48
 
49
+ def _perform_ocr(self, pdf_bytes: bytes) -> str:
50
+ """Perform OCR on scanned PDF pages."""
51
  try:
52
+ images = convert_from_bytes(pdf_bytes)
53
+ text = ""
54
+ for image in images:
55
+ text += pytesseract.image_to_string(image, config="--psm 6")
56
+ return text
57
  except Exception as e:
58
+ st.error(f"Error performing OCR: {e}")
59
  return ""
60
 
61
+ def _process_docx(self, file) -> str:
62
+ """Extract text from DOCX files."""
63
  try:
64
+ doc = docx.Document(file)
65
+ return "\n".join(para.text for para in doc.paragraphs)
66
  except Exception as e:
67
+ st.error(f"Error processing DOCX: {e}")
68
  return ""
69
 
70
+ def _chunk_text(self, text: str, chunk_size: int = 500) -> List[Dict]:
71
+ """Split text into smaller chunks for vectorization."""
72
+ return [{"chunk_id": idx, "text": text[i:i + chunk_size]}
73
+ for idx, i in enumerate(range(0, len(text), chunk_size))]