File size: 11,831 Bytes
8882944
 
 
 
 
 
 
 
 
 
 
 
2a1fd0d
8882944
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2bf325f
8882944
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d51a05c
8882944
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d51a05c
8882944
 
 
 
 
 
 
 
 
47ec39b
8882944
 
 
 
 
 
 
 
823b264
df24b87
2bf325f
823b264
8882944
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2bf325f
 
 
 
 
 
 
df24b87
8882944
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9dc6d26
8882944
 
9dc6d26
 
8882944
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
"""
Modular Document Preprocessor

Main orchestrator class that uses all preprocessing modules to process documents.
"""

import os
import asyncio
from typing import List, Dict, Any
from pathlib import Path

from config.config import OUTPUT_DIR
from .pdf_extractor import TextExtractor
from .text_chunker import TextChunker
from .embedding_manager import EmbeddingManager
from .vector_storage import VectorStorage
from .metadata_manager import MetadataManager

from .file_downloader import FileDownloader
from .docx_extractor import extract_docx
from .pptx_extractor import extract_pptx
from .xlsx_extractor import extract_xlsx

from dotenv import load_dotenv
# Load environment variables
load_dotenv()
groq_api_key = os.getenv("GROQ_API_KEY_")

class ModularDocumentPreprocessor:
    """
    Modular document preprocessor that orchestrates the entire preprocessing pipeline.
    
    This class combines all preprocessing modules to provide a clean interface
    for document processing while maintaining separation of concerns.
    """
    
    def __init__(self):
        """Initialize the modular document preprocessor."""
        # Set up base database path
        self.base_db_path = Path(OUTPUT_DIR).resolve()
        self._ensure_base_directory()
        
        # Initialize all modules
        self.file_downloader = FileDownloader()
        self.text_extractor = TextExtractor()
        self.text_chunker = TextChunker()
        self.embedding_manager = EmbeddingManager()
        self.vector_storage = VectorStorage(self.base_db_path)
        self.metadata_manager = MetadataManager(self.base_db_path)
        self.cached_chunks = {}
        
        print("βœ… Modular Document Preprocessor initialized successfully")
    
    def _ensure_base_directory(self):
        """Ensure the base directory exists."""
        if not self.base_db_path.exists():
            try:
                self.base_db_path.mkdir(parents=True, exist_ok=True)
                print(f"βœ… Created directory: {self.base_db_path}")
            except PermissionError:
                print(f"⚠️  Directory {self.base_db_path} should exist in production environment")
                if not self.base_db_path.exists():
                    raise RuntimeError(f"Required directory {self.base_db_path} does not exist and cannot be created")
    
    # Delegate metadata operations to metadata manager
    def generate_doc_id(self, document_url: str) -> str:
        """Generate a unique document ID from the URL."""
        return self.metadata_manager.generate_doc_id(document_url)
    
    def is_document_processed(self, document_url: str) -> bool:
        """Check if a document has already been processed."""
        return self.metadata_manager.is_document_processed(document_url)
    
    def get_document_info(self, document_url: str) -> Dict[str, Any]:
        """Get information about a processed document."""
        return self.metadata_manager.get_document_info(document_url)
    
    def list_processed_documents(self) -> Dict[str, Dict]:
        """List all processed documents."""
        return self.metadata_manager.list_processed_documents()
    
    def get_collection_stats(self) -> Dict[str, Any]:
        """Get statistics about all collections."""
        return self.metadata_manager.get_collection_stats()
    
    async def process_document(self, document_url: str, force_reprocess: bool = False, timeout: int = 300) -> str | List:
        """
        Process a single document: download, extract, chunk, embed, and store.
        
        Args:
            document_url: URL of the document
            force_reprocess: If True, reprocess even if already processed
            timeout: Download timeout in seconds (default: 300s/5min)
            
        Returns:
            str: Document ID
        """
        doc_id = self.generate_doc_id(document_url)
        
        # Check if already processed
        if not force_reprocess and self.is_document_processed(document_url):
            print(f"βœ… Document {doc_id} already processed, skipping...")
            return doc_id
        
        print(f"πŸš€ Processing document: {doc_id}")
        print(f"πŸ“„ URL: {document_url}")
        
        temp_file_path = None
        try:
            # Step 1: Download Document
            temp_file_path, ext = await self.file_downloader.download_file(document_url, timeout=timeout)
            
            if temp_file_path == 'not supported':
                return ['unsupported', ext]

            # Step 2: Extract text
            full_text = ""
            match ext:
                case 'pdf':
                    full_text = self.text_extractor.extract_text_from_pdf(temp_file_path)
                
                case 'docx':
                    full_text = extract_docx(temp_file_path)
                
                case 'pptx':
                    full_text = extract_pptx(temp_file_path)
                    return [full_text, 'oneshot']
                
                case 'url':
                    new_context = "URL for Context: " + temp_file_path
                    return [new_context, 'oneshot']
                
                case 'txt':
                    with open (temp_file_path, 'r') as f:
                        full_text = f.read()
                
                case 'xlsx':
                    full_text = extract_xlsx(temp_file_path)
                    return [full_text, 'tabular']
                
                case 'csv':
                    with open (temp_file_path, 'r') as f:
                        full_text = f.read()
                    return [full_text, 'tabular']

                case 'png':
                    # Don't clean up image files - they'll be cleaned up by the caller
                    return [temp_file_path, 'image', True]  # Third element indicates no cleanup needed
                
                case 'jpeg':
                    # Don't clean up image files - they'll be cleaned up by the caller
                    return [temp_file_path, 'image', True]  # Third element indicates no cleanup needed
                
                case 'jpg':
                    # Don't clean up image files - they'll be cleaned up by the caller
                    return [temp_file_path, 'image', True]  # Third element indicates no cleanup needed

            # Validate extracted text
            if not self.text_extractor.validate_extracted_text(full_text):
                raise Exception("No meaningful text extracted from PDF")
            
            # Step 3: Create chunks
            chunks = self.cached_chunks.get(document_url)
            if not chunks:
                chunks = self.text_chunker.chunk_text(full_text)
                self.cached_chunks[document_url] = chunks
            else:
                print("Using Cache: Skipped chunking")
            if len(chunks) < 5:
                    print(f"Only {len(chunks)} chunks formed, going for oneshot.")
                    return [full_text, 'oneshot']
            
            if not chunks:
                raise Exception("No chunks created from text")
            
            # Log chunk statistics
            chunk_stats = self.text_chunker.get_chunk_stats(chunks)
            print(f"πŸ“Š Chunk Statistics: {chunk_stats['total_chunks']} chunks, "
                  f"avg size: {chunk_stats['avg_chunk_size']:.0f} chars")
            
            # Step 4: Create embeddings
            embeddings = await self.embedding_manager.create_embeddings(chunks)
            
            # Validate embeddings
            if not self.embedding_manager.validate_embeddings(embeddings, len(chunks)):
                raise Exception("Invalid embeddings generated")
            
            # Step 5: Store in Qdrant
            await self.vector_storage.store_in_qdrant(chunks, embeddings, doc_id)
            
            # Step 6: Save metadata
            self.metadata_manager.save_document_metadata(chunks, doc_id, document_url)
            
            print(f"βœ… Document {doc_id} processed successfully: {len(chunks)} chunks")
            return doc_id
            
        except Exception as e:
            print(f"❌ Error processing document {doc_id}: {str(e)}")
            raise
        # finally:
            # Clean up temporary file - but NOT for images since they need the file path
            # Images return a third element indicating no cleanup needed
            # if temp_file_path and ext not in ['png', 'jpeg', 'jpg']:
            #     self.file_downloader.cleanup_temp_file(temp_file_path)
    
    async def process_multiple_documents(self, document_urls: List[str], force_reprocess: bool = False) -> Dict[str, str]:
        """
        Process multiple documents concurrently.
        
        Args:
            document_urls: List of PDF URLs
            force_reprocess: If True, reprocess even if already processed
            
        Returns:
            Dict[str, str]: Mapping of URLs to document IDs
        """
        print(f"πŸš€ Processing {len(document_urls)} documents...")
        
        results = {}
        
        # Process documents concurrently (with limited concurrency)
        semaphore = asyncio.Semaphore(3)  # Limit to 3 concurrent downloads
        
        async def process_single(url):
            async with semaphore:
                try:
                    doc_id = await self.process_document(url, force_reprocess)
                    return url, doc_id
                except Exception as e:
                    print(f"❌ Failed to process {url}: {str(e)}")
                    return url, None
        
        tasks = [process_single(url) for url in document_urls]
        completed_tasks = await asyncio.gather(*tasks, return_exceptions=True)
        
        for result in completed_tasks:
            if isinstance(result, tuple):
                url, doc_id = result
                if doc_id:
                    results[url] = doc_id
        
        print(f"βœ… Successfully processed {len(results)}/{len(document_urls)} documents")
        return results
    
    def get_system_info(self) -> Dict[str, Any]:
        """
        Get information about the preprocessing system.
        
        Returns:
            Dict[str, Any]: System information
        """
        return {
            "base_db_path": str(self.base_db_path),
            "embedding_model": self.embedding_manager.get_model_info(),
            "text_chunker_config": {
                "chunk_size": self.text_chunker.chunk_size,
                "chunk_overlap": self.text_chunker.chunk_overlap
            },
            "processed_documents_registry": self.metadata_manager.get_registry_path(),
            "collection_stats": self.get_collection_stats()
        }
    
    def cleanup_document(self, document_url: str) -> bool:
        """
        Remove all data for a specific document.
        
        Args:
            document_url: URL of the document to clean up
            
        Returns:
            bool: True if successfully cleaned up
        """
        doc_id = self.generate_doc_id(document_url)
        
        try:
            # Remove vector storage
            vector_removed = self.vector_storage.delete_collection(doc_id)
            
            # Remove metadata
            metadata_removed = self.metadata_manager.remove_document_metadata(doc_id)
            
            success = vector_removed and metadata_removed
            if success:
                print(f"βœ… Successfully cleaned up document {doc_id}")
            else:
                print(f"⚠️ Partial cleanup for document {doc_id}")
            
            return success
            
        except Exception as e:
            print(f"❌ Error cleaning up document {doc_id}: {e}")
            return False