File size: 12,217 Bytes
e8051be
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
"""

Modular Document Preprocessor



Main orchestrator class that uses all preprocessing modules to process documents.

"""

import os
import asyncio
from typing import List, Dict, Any, Union
from pathlib import Path

from config.config import OUTPUT_DIR
from .pdf_downloader import PDFDownloader
from .file_downloader import FileDownloader
from .text_extractor import TextExtractor
from .text_chunker import TextChunker
from .embedding_manager import EmbeddingManager
from .vector_storage import VectorStorage
from .metadata_manager import MetadataManager

# Import new extractors
from .docx_extractor import extract_docx
from .pptx_extractor import extract_pptx
from .xlsx_extractor import extract_xlsx
from .image_extractor import extract_image_content


class ModularDocumentPreprocessor:
    """

    Modular document preprocessor that orchestrates the entire preprocessing pipeline.

    

    This class combines all preprocessing modules to provide a clean interface

    for document processing while maintaining separation of concerns.

    """
    
    def __init__(self):
        """Initialize the modular document preprocessor."""
        # Set up base database path
        self.base_db_path = Path(OUTPUT_DIR).resolve()
        self._ensure_base_directory()
        
        # Initialize all modules
        self.pdf_downloader = PDFDownloader()  # Keep for backward compatibility
        self.file_downloader = FileDownloader()  # New enhanced downloader
        self.text_extractor = TextExtractor()
        self.text_chunker = TextChunker()
        self.embedding_manager = EmbeddingManager()
        self.vector_storage = VectorStorage(self.base_db_path)
        self.metadata_manager = MetadataManager(self.base_db_path)
        
        print("βœ… Modular Document Preprocessor initialized successfully")
    
    def _ensure_base_directory(self):
        """Ensure the base directory exists."""
        if not self.base_db_path.exists():
            try:
                self.base_db_path.mkdir(parents=True, exist_ok=True)
                print(f"βœ… Created directory: {self.base_db_path}")
            except PermissionError:
                print(f"⚠️  Directory {self.base_db_path} should exist in production environment")
                if not self.base_db_path.exists():
                    raise RuntimeError(f"Required directory {self.base_db_path} does not exist and cannot be created")
    
    # Delegate metadata operations to metadata manager
    def generate_doc_id(self, document_url: str) -> str:
        """Generate a unique document ID from the URL."""
        return self.metadata_manager.generate_doc_id(document_url)
    
    def is_document_processed(self, document_url: str) -> bool:
        """Check if a document has already been processed."""
        return self.metadata_manager.is_document_processed(document_url)
    
    def get_document_info(self, document_url: str) -> Dict[str, Any]:
        """Get information about a processed document."""
        return self.metadata_manager.get_document_info(document_url)
    
    def list_processed_documents(self) -> Dict[str, Dict]:
        """List all processed documents."""
        return self.metadata_manager.list_processed_documents()
    
    def get_collection_stats(self) -> Dict[str, Any]:
        """Get statistics about all collections."""
        return self.metadata_manager.get_collection_stats()
    
    async def process_document(self, document_url: str, force_reprocess: bool = False, timeout: int = 300) -> Union[str, List]:
        """

        Process a single document: download, extract, chunk, embed, and store.

        

        Args:

            document_url: URL of the document (PDF, DOCX, PPTX, XLSX, images, etc.)

            force_reprocess: If True, reprocess even if already processed

            timeout: Download timeout in seconds (default: 300s/5min)

            

        Returns:

            str: Document ID for normal processing

            List: [content, type] for special handling (oneshot, tabular, image)

        """
        doc_id = self.generate_doc_id(document_url)
        
        # Check if already processed
        if not force_reprocess and self.is_document_processed(document_url):
            print(f"βœ… Document {doc_id} already processed, skipping...")
            return doc_id
        
        print(f"πŸš€ Processing document: {doc_id}")
        print(f"πŸ“„ URL: {document_url}")
        
        temp_file_path = None
        try:
            # Step 1: Download file (enhanced to handle multiple types)
            temp_file_path, ext = await self.file_downloader.download_file(document_url, timeout=timeout)
            
            if temp_file_path == 'not supported':
                return ['unsupported', ext]

            # Step 2: Extract text based on file type
            full_text = ""
            match ext:
                case 'pdf':
                    full_text = await self.text_extractor.extract_text_from_pdf(temp_file_path)
                
                case 'docx':
                    full_text = extract_docx(temp_file_path)
                
                case 'pptx':
                    full_text = extract_pptx(temp_file_path)
                    return [full_text, 'oneshot']
                
                case 'url':
                    new_context = "URL for Context: " + temp_file_path
                    return [new_context, 'oneshot']
                
                case 'txt':
                    with open(temp_file_path, 'r', encoding='utf-8') as f:
                        full_text = f.read()
                
                case 'xlsx':
                    full_text = extract_xlsx(temp_file_path)
                    # Print a short preview (10-15 chars) to verify extraction
                    try:
                        preview = ''.join(full_text.split())[:15]
                        if preview:
                            print(f"πŸ”Ž XLSX extracted preview: {preview}")
                    except Exception:
                        pass
                    return [full_text, 'tabular']
                
                case 'csv':
                    with open(temp_file_path, 'r', encoding='utf-8') as f:
                        full_text = f.read()
                    return [full_text, 'tabular']

                case 'png' | 'jpeg' | 'jpg':
                    # Don't clean up image files - they'll be cleaned up by the caller
                    return [temp_file_path, 'image', True]  # Third element indicates no cleanup needed
                
                case _:
                    raise Exception(f"Unsupported file type: {ext}")

            # Validate extracted text
            if not self.text_extractor.validate_extracted_text(full_text):
                raise Exception("No meaningful text extracted from document")
            
            # Step 3: Create chunks
            chunks = self.text_chunker.chunk_text(full_text)
            
            # Check if document is too short for chunking
            if len(chunks) < 5:
                print(f"Only {len(chunks)} chunks formed, going for oneshot.")
                return [full_text, 'oneshot']
            
            if not chunks:
                raise Exception("No chunks created from text")
            
            # Log chunk statistics
            chunk_stats = self.text_chunker.get_chunk_stats(chunks)
            print(f"πŸ“Š Chunk Statistics: {chunk_stats['total_chunks']} chunks, "
                  f"avg size: {chunk_stats['avg_chunk_size']:.0f} chars")
            
            # Step 4: Create embeddings
            embeddings = await self.embedding_manager.create_embeddings(chunks)
            
            # Validate embeddings
            if not self.embedding_manager.validate_embeddings(embeddings, len(chunks)):
                raise Exception("Invalid embeddings generated")
            
            # Step 5: Store in Qdrant
            await self.vector_storage.store_in_qdrant(chunks, embeddings, doc_id)
            
            # Step 6: Save metadata
            self.metadata_manager.save_document_metadata(chunks, doc_id, document_url)
            
            print(f"βœ… Document {doc_id} processed successfully: {len(chunks)} chunks")
            return doc_id
            
        except Exception as e:
            print(f"❌ Error processing document {doc_id}: {str(e)}")
            raise
        finally:
            # Clean up temporary file - but NOT for images since they need the file path
            if temp_file_path and ext not in ['png', 'jpeg', 'jpg']:
                self.file_downloader.cleanup_temp_file(temp_file_path)
    
    async def process_multiple_documents(self, document_urls: List[str], force_reprocess: bool = False) -> Dict[str, str]:
        """

        Process multiple documents concurrently.

        

        Args:

            document_urls: List of PDF URLs

            force_reprocess: If True, reprocess even if already processed

            

        Returns:

            Dict[str, str]: Mapping of URLs to document IDs

        """
        print(f"πŸš€ Processing {len(document_urls)} documents...")
        
        results = {}
        
        # Process documents concurrently (with limited concurrency)
        semaphore = asyncio.Semaphore(3)  # Limit to 3 concurrent downloads
        
        async def process_single(url):
            async with semaphore:
                try:
                    doc_id = await self.process_document(url, force_reprocess)
                    return url, doc_id
                except Exception as e:
                    print(f"❌ Failed to process {url}: {str(e)}")
                    return url, None
        
        tasks = [process_single(url) for url in document_urls]
        completed_tasks = await asyncio.gather(*tasks, return_exceptions=True)
        
        for result in completed_tasks:
            if isinstance(result, tuple):
                url, doc_id = result
                if doc_id:
                    results[url] = doc_id
        
        print(f"βœ… Successfully processed {len(results)}/{len(document_urls)} documents")
        return results
    
    def get_system_info(self) -> Dict[str, Any]:
        """

        Get information about the preprocessing system.

        

        Returns:

            Dict[str, Any]: System information

        """
        return {
            "base_db_path": str(self.base_db_path),
            "embedding_model": self.embedding_manager.get_model_info(),
            "text_chunker_config": {
                "chunk_size": self.text_chunker.chunk_size,
                "chunk_overlap": self.text_chunker.chunk_overlap
            },
            "processed_documents_registry": self.metadata_manager.get_registry_path(),
            "collection_stats": self.get_collection_stats()
        }
    
    def cleanup_document(self, document_url: str) -> bool:
        """

        Remove all data for a specific document.

        

        Args:

            document_url: URL of the document to clean up

            

        Returns:

            bool: True if successfully cleaned up

        """
        doc_id = self.generate_doc_id(document_url)
        
        try:
            # Remove vector storage
            vector_removed = self.vector_storage.delete_collection(doc_id)
            
            # Remove metadata
            metadata_removed = self.metadata_manager.remove_document_metadata(doc_id)
            
            success = vector_removed and metadata_removed
            if success:
                print(f"βœ… Successfully cleaned up document {doc_id}")
            else:
                print(f"⚠️ Partial cleanup for document {doc_id}")
            
            return success
            
        except Exception as e:
            print(f"❌ Error cleaning up document {doc_id}: {e}")
            return False