cryogenic22 commited on
Commit
1628132
·
verified ·
1 Parent(s): 8a2ab7f

Create utils/document_processor.py

Browse files
Files changed (1) hide show
  1. utils/document_processor.py +74 -0
utils/document_processor.py ADDED
@@ -0,0 +1,74 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # utils/document_processor.py
2
+ import fitz
3
+ import docx
4
+ from typing import List, Dict, Tuple
5
+ import re
6
+ from sentence_transformers import SentenceTransformer
7
+
8
+ class DocumentProcessor:
9
+ def __init__(self):
10
+ if 'embedder' not in st.session_state:
11
+ st.session_state.embedder = SentenceTransformer('all-MiniLM-L6-v2')
12
+ self.embedder = st.session_state.embedder
13
+
14
+ def process_document(self, file) -> Tuple[str, List[Dict]]:
15
+ """Process document and return text and chunks"""
16
+ # Extract text based on file type
17
+ file_type = file.name.split('.')[-1].lower()
18
+ if file_type == 'pdf':
19
+ text = self._process_pdf(file)
20
+ elif file_type == 'docx':
21
+ text = self._process_docx(file)
22
+ else:
23
+ text = file.getvalue().decode()
24
+
25
+ # Create chunks
26
+ chunks = self._create_chunks(text)
27
+ return text, chunks
28
+
29
+ def _process_pdf(self, file) -> str:
30
+ """Process PDF file"""
31
+ pdf_bytes = file.getvalue()
32
+ doc = fitz.open(stream=pdf_bytes, filetype="pdf")
33
+ text = ""
34
+ for page in doc:
35
+ text += page.get_text()
36
+ return text
37
+
38
+ def _process_docx(self, file) -> str:
39
+ """Process DOCX file"""
40
+ doc = docx.Document(file)
41
+ text = []
42
+ for para in doc.paragraphs:
43
+ text.append(para.text)
44
+ return "\n".join(text)
45
+
46
+ def _create_chunks(self, text: str, chunk_size: int = 1000) -> List[Dict]:
47
+ """Create chunks from text"""
48
+ # Split into paragraphs
49
+ paragraphs = [p.strip() for p in text.split('\n') if p.strip()]
50
+
51
+ chunks = []
52
+ current_chunk = ""
53
+
54
+ for para in paragraphs:
55
+ if len(current_chunk) + len(para) > chunk_size and current_chunk:
56
+ chunks.append(self._create_chunk_dict(current_chunk))
57
+ current_chunk = para
58
+ else:
59
+ current_chunk += "\n" + para if current_chunk else para
60
+
61
+ if current_chunk:
62
+ chunks.append(self._create_chunk_dict(current_chunk))
63
+
64
+ return chunks
65
+
66
+ def _create_chunk_dict(self, text: str) -> Dict:
67
+ """Create a chunk dictionary with metadata"""
68
+ return {
69
+ "text": text,
70
+ "metadata": {
71
+ "length": len(text),
72
+ "embedding": self.embedder.encode(text).tolist()
73
+ }
74
+ }