File size: 6,992 Bytes
d1dd20b
d178ae1
 
 
d1dd20b
d178ae1
 
289cd09
d1dd20b
 
 
68613c4
d178ae1
 
 
d1dd20b
 
 
 
 
 
 
 
 
289cd09
68613c4
d178ae1
68613c4
 
d1dd20b
 
 
68613c4
 
 
 
289cd09
68613c4
 
 
 
 
 
 
 
 
 
 
d178ae1
68613c4
d1dd20b
d178ae1
d1dd20b
68613c4
68fead3
68613c4
 
 
 
 
 
 
d1dd20b
68613c4
d1dd20b
 
 
68613c4
d1dd20b
 
68613c4
d1dd20b
68613c4
 
 
d1dd20b
68613c4
d1dd20b
 
68613c4
d1dd20b
68613c4
68fead3
68613c4
d1dd20b
 
 
68613c4
d1dd20b
 
 
 
68613c4
 
d1dd20b
 
 
 
 
68613c4
d1dd20b
68613c4
68fead3
 
d178ae1
68613c4
d1dd20b
 
 
 
68613c4
d1dd20b
68613c4
d1dd20b
d178ae1
d1dd20b
68613c4
 
 
 
d1dd20b
 
 
 
68613c4
d1dd20b
68613c4
d178ae1
d1dd20b
68613c4
 
 
 
d1dd20b
 
 
d178ae1
 
 
d1dd20b
68613c4
68fead3
 
 
68613c4
 
68fead3
 
d1dd20b
d178ae1
d1dd20b
d178ae1
 
68fead3
 
 
68613c4
 
68fead3
 
d178ae1
 
 
d1dd20b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# utils/document_processor.py
import pytesseract
from pdf2image import convert_from_path
import docx
import fitz  # PyMuPDF
from PIL import Image
import io
from typing import List, Dict, Optional, Union, Any
import re
import tempfile
import os
import streamlit as st

class DocumentProcessor:
    def __init__(self):
        self.supported_formats = {
            'pdf': self._process_pdf,
            'docx': self._process_docx,
            'txt': self._process_text,
            'jpg': self._process_image,
            'jpeg': self._process_image,
            'png': self._process_image
        }

    def process_document(self, uploaded_file: Any) -> str:
        """Process uploaded document and extract text"""
        try:
            # Get file extension
            file_extension = uploaded_file.name.split('.')[-1].lower()
            
            if file_extension not in self.supported_formats:
                raise ValueError(f"Unsupported file format: {file_extension}")

            # Create a temporary file
            with tempfile.NamedTemporaryFile(delete=False, suffix=f'.{file_extension}') as tmp_file:
                # Write the uploaded file's content to the temporary file
                tmp_file.write(uploaded_file.getbuffer())
                tmp_file.flush()
                
                # Process the temporary file
                processor = self.supported_formats[file_extension]
                text = processor(tmp_file.name)
                
                # Clean up
                os.unlink(tmp_file.name)
                
                return self._clean_text(text)
                
        except Exception as e:
            st.error(f"Error processing document: {str(e)}")
            return ""

    def _process_pdf(self, file_path: str) -> str:
        """Process PDF files"""
        try:
            # Open PDF file
            with fitz.open(file_path) as doc:
                text = ""
                for page_num in range(len(doc)):
                    page = doc[page_num]
                    text += page.get_text()
                return text
        except Exception as e:
            st.error(f"Error processing PDF: {str(e)}")
            return ""

    def _process_docx(self, file_path: str) -> str:
        """Process DOCX files"""
        try:
            doc = docx.Document(file_path)
            text = []
            
            # Get paragraphs
            for para in doc.paragraphs:
                text.append(para.text)
            
            # Get tables
            for table in doc.tables:
                for row in table.rows:
                    text.append(" | ".join(cell.text for cell in row.cells))
            
            return "\n\n".join(text)
        except Exception as e:
            st.error(f"Error processing DOCX: {str(e)}")
            return ""

    def _process_text(self, file_path: str) -> str:
        """Process text files"""
        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                return file.read()
        except UnicodeDecodeError:
            # Try different encodings
            for encoding in ['latin-1', 'iso-8859-1', 'cp1252']:
                try:
                    with open(file_path, 'r', encoding=encoding) as file:
                        return file.read()
                except:
                    continue
            return ""
        except Exception as e:
            st.error(f"Error processing text file: {str(e)}")
            return ""

    def _process_image(self, file_path: str) -> str:
        """Process image files"""
        try:
            image = Image.open(file_path)
            if image.mode != 'RGB':
                image = image.convert('RGB')
            return pytesseract.image_to_string(image)
        except Exception as e:
            st.error(f"Error processing image: {str(e)}")
            return ""

    def _clean_text(self, text: str) -> str:
        """Clean and normalize text"""
        if not text:
            return ""
            
        # Remove excessive whitespace
        text = re.sub(r'\s+', ' ', text)
        # Remove special characters but keep basic punctuation
        text = re.sub(r'[^\w\s.,!?-]', '', text)
        # Split into lines and remove empty ones
        lines = [line.strip() for line in text.split('\n') if line.strip()]
        return '\n'.join(lines)

    def chunk_document(self, text: str, chunk_size: int = 2000) -> List[Dict]:
        """Split document into chunks"""
        if not text:
            return []
            
        # Split into paragraphs
        paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
        
        chunks = []
        current_chunk = ""
        
        for para in paragraphs:
            if len(current_chunk) + len(para) > chunk_size and current_chunk:
                chunks.append({
                    "text": current_chunk,
                    "metadata": {
                        "length": len(current_chunk),
                        "type": "paragraph"
                    }
                })
                current_chunk = para
            else:
                current_chunk += "\n\n" + para if current_chunk else para
        
        if current_chunk:
            chunks.append({
                "text": current_chunk,
                "metadata": {
                    "length": len(current_chunk),
                    "type": "paragraph"
                }
            })
        
        return chunks

    def get_document_metadata(self, file_path: str) -> Dict:
        """
        Extract metadata from document
        """
        try:
            file_extension = file_path.split('.')[-1].lower()
            file_size = os.path.getsize(file_path)
            created_time = os.path.getctime(file_path)
            modified_time = os.path.getmtime(file_path)
            
            metadata = {
                "filename": os.path.basename(file_path),
                "file_type": file_extension,
                "file_size": file_size,
                "created_time": created_time,
                "modified_time": modified_time
            }
            
            # Add format-specific metadata
            if file_extension == 'pdf':
                doc = fitz.open(file_path)
                metadata.update({
                    "page_count": doc.page_count,
                    "pdf_metadata": doc.metadata
                })
                
            elif file_extension == 'docx':
                doc = docx.Document(file_path)
                metadata.update({
                    "paragraph_count": len(doc.paragraphs),
                    "table_count": len(doc.tables)
                })
            
            return metadata
            
        except Exception as e:
            print(f"Error extracting metadata: {str(e)}")
            return {
                "filename": os.path.basename(file_path),
                "error": str(e)
            }