File size: 7,510 Bytes
43efcb9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
"""
Document processing utilities for text extraction and chunking.
"""

import os
import logging
from typing import List, Dict, Any, Optional, Tuple, Union
import uuid

# Configure logging
logger = logging.getLogger(__name__)


class DocumentProcessor:
    """
    Class to handle document processing, chunking, and text extraction.
    """
    
    def __init__(
        self,
        chunk_size: int = 1000,
        chunk_overlap: int = 200
    ):
        """
        Initialize the document processor.
        
        Args:
            chunk_size: Maximum size of text chunks in characters
            chunk_overlap: Overlap between chunks in characters
        """
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
    
    def process_file(
        self,
        file_path: str,
        metadata: Optional[Dict[str, Any]] = None
    ) -> Tuple[List[str], List[Dict[str, Any]]]:
        """
        Process a document file: extract text and chunk it.
        
        Args:
            file_path: Path to the document file
            metadata: Optional metadata about the document
            
        Returns:
            Tuple of (list of text chunks, list of metadata dictionaries)
        """
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"Document not found: {file_path}")
        
        # Extract text from document
        logger.info(f"Processing document: {file_path}")
        text = self._extract_text(file_path)
        
        if not text:
            logger.warning(f"No text could be extracted from {file_path}")
            return [], []
        
        # Create base metadata if not provided
        base_metadata = {"source": os.path.basename(file_path)}
        if metadata:
            base_metadata.update(metadata)
        
        # Chunk the document
        chunks = self._chunk_text(text, self.chunk_size, self.chunk_overlap)
        logger.info(f"Created {len(chunks)} chunks from document")
        
        # Create chunk-specific metadata
        chunk_metadata = []
        for i, _ in enumerate(chunks):
            metadata_item = {
                **base_metadata,
                "chunk_id": i,
                "total_chunks": len(chunks),
                "document_id": str(uuid.uuid4())  # Unique ID for tracking
            }
            chunk_metadata.append(metadata_item)
        
        return chunks, chunk_metadata
    
    def _extract_text(self, file_path: str) -> str:
        """
        Extract text from a document file based on its extension.
        
        Args:
            file_path: Path to the document file
            
        Returns:
            Extracted text
        """
        _, ext = os.path.splitext(file_path)
        ext = ext.lower()
        
        if ext == '.pdf':
            return self._extract_text_from_pdf(file_path)
        elif ext == '.txt':
            return self._extract_text_from_txt(file_path)
        elif ext == '.md':
            return self._extract_text_from_txt(file_path)
        elif ext == '.docx':
            return self._extract_text_from_docx(file_path)
        else:
            raise ValueError(f"Unsupported file format: {ext}")
    
    def _extract_text_from_pdf(self, file_path: str) -> str:
        """
        Extract text from a PDF file.
        
        Args:
            file_path: Path to the PDF file
            
        Returns:
            Extracted text
        """
        try:
            import PyPDF2
        except ImportError:
            raise ImportError(
                "PyPDF2 is not installed. "
                "Please install it with `pip install PyPDF2`."
            )
        
        text = ""
        try:
            with open(file_path, "rb") as f:
                pdf_reader = PyPDF2.PdfReader(f)
                num_pages = len(pdf_reader.pages)
                logger.info(f"PDF has {num_pages} pages")
                
                for page in pdf_reader.pages:
                    page_text = page.extract_text()
                    if page_text:
                        text += page_text + "\n\n"
        except Exception as e:
            logger.error(f"Error reading PDF file {file_path}: {e}")
        
        logger.info(f"Extracted {len(text)} characters from PDF")
        return text
    
    def _extract_text_from_txt(self, file_path: str) -> str:
        """
        Extract text from a plain text file.
        
        Args:
            file_path: Path to the text file
            
        Returns:
            Extracted text
        """
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                text = f.read()
            
            logger.info(f"Extracted {len(text)} characters from text file")
            return text
        except Exception as e:
            logger.error(f"Error reading text file {file_path}: {e}")
            return ""
    
    def _extract_text_from_docx(self, file_path: str) -> str:
        """
        Extract text from a DOCX file.
        
        Args:
            file_path: Path to the DOCX file
            
        Returns:
            Extracted text
        """
        try:
            import docx
        except ImportError:
            raise ImportError(
                "python-docx is not installed. "
                "Please install it with `pip install python-docx`."
            )
        
        try:
            doc = docx.Document(file_path)
            text = "\n\n".join([paragraph.text for paragraph in doc.paragraphs if paragraph.text])
            
            logger.info(f"Extracted {len(text)} characters from DOCX")
            return text
        except Exception as e:
            logger.error(f"Error reading DOCX file {file_path}: {e}")
            return ""
    
    @staticmethod
    def _chunk_text(
        text: str,
        chunk_size: int = 1000,
        overlap: int = 200
    ) -> List[str]:
        """
        Split text into overlapping chunks.
        
        Args:
            text: The text to chunk
            chunk_size: Maximum chunk size in characters
            overlap: Overlap between chunks in characters
            
        Returns:
            List of text chunks
        """
        if not text or not text.strip():
            return []
            
        chunks = []
        start = 0
        text_len = len(text)
        
        while start < text_len:
            # Define the initial chunk end
            end = min(start + chunk_size, text_len)
            
            # Try to find a natural break point if not at the end of text
            if end < text_len:
                # Look for paragraph break
                next_para = text.find('\n\n', end - overlap, end + 100)
                if next_para != -1:
                    end = next_para + 2
                else:
                    # Look for sentence break
                    for punct in ['. ', '! ', '? ', '.\n', '!\n', '?\n']:
                        next_sent = text.find(punct, end - overlap, end + 100)
                        if next_sent != -1:
                            end = next_sent + len(punct)
                            break
            
            # Extract the chunk
            chunk = text[start:end].strip()
            if chunk:  # Only add non-empty chunks
                chunks.append(chunk)
            
            # Move to next chunk with overlap
            start = max(end - overlap, start + 1)
            
        return chunks