Spaces:
Sleeping
Sleeping
File size: 12,217 Bytes
e8051be |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 |
"""
Modular Document Preprocessor
Main orchestrator class that uses all preprocessing modules to process documents.
"""
import os
import asyncio
from typing import List, Dict, Any, Union
from pathlib import Path
from config.config import OUTPUT_DIR
from .pdf_downloader import PDFDownloader
from .file_downloader import FileDownloader
from .text_extractor import TextExtractor
from .text_chunker import TextChunker
from .embedding_manager import EmbeddingManager
from .vector_storage import VectorStorage
from .metadata_manager import MetadataManager
# Import new extractors
from .docx_extractor import extract_docx
from .pptx_extractor import extract_pptx
from .xlsx_extractor import extract_xlsx
from .image_extractor import extract_image_content
class ModularDocumentPreprocessor:
"""
Modular document preprocessor that orchestrates the entire preprocessing pipeline.
This class combines all preprocessing modules to provide a clean interface
for document processing while maintaining separation of concerns.
"""
def __init__(self):
"""Initialize the modular document preprocessor."""
# Set up base database path
self.base_db_path = Path(OUTPUT_DIR).resolve()
self._ensure_base_directory()
# Initialize all modules
self.pdf_downloader = PDFDownloader() # Keep for backward compatibility
self.file_downloader = FileDownloader() # New enhanced downloader
self.text_extractor = TextExtractor()
self.text_chunker = TextChunker()
self.embedding_manager = EmbeddingManager()
self.vector_storage = VectorStorage(self.base_db_path)
self.metadata_manager = MetadataManager(self.base_db_path)
print("β
Modular Document Preprocessor initialized successfully")
def _ensure_base_directory(self):
"""Ensure the base directory exists."""
if not self.base_db_path.exists():
try:
self.base_db_path.mkdir(parents=True, exist_ok=True)
print(f"β
Created directory: {self.base_db_path}")
except PermissionError:
print(f"β οΈ Directory {self.base_db_path} should exist in production environment")
if not self.base_db_path.exists():
raise RuntimeError(f"Required directory {self.base_db_path} does not exist and cannot be created")
# Delegate metadata operations to metadata manager
def generate_doc_id(self, document_url: str) -> str:
"""Generate a unique document ID from the URL."""
return self.metadata_manager.generate_doc_id(document_url)
def is_document_processed(self, document_url: str) -> bool:
"""Check if a document has already been processed."""
return self.metadata_manager.is_document_processed(document_url)
def get_document_info(self, document_url: str) -> Dict[str, Any]:
"""Get information about a processed document."""
return self.metadata_manager.get_document_info(document_url)
def list_processed_documents(self) -> Dict[str, Dict]:
"""List all processed documents."""
return self.metadata_manager.list_processed_documents()
def get_collection_stats(self) -> Dict[str, Any]:
"""Get statistics about all collections."""
return self.metadata_manager.get_collection_stats()
async def process_document(self, document_url: str, force_reprocess: bool = False, timeout: int = 300) -> Union[str, List]:
"""
Process a single document: download, extract, chunk, embed, and store.
Args:
document_url: URL of the document (PDF, DOCX, PPTX, XLSX, images, etc.)
force_reprocess: If True, reprocess even if already processed
timeout: Download timeout in seconds (default: 300s/5min)
Returns:
str: Document ID for normal processing
List: [content, type] for special handling (oneshot, tabular, image)
"""
doc_id = self.generate_doc_id(document_url)
# Check if already processed
if not force_reprocess and self.is_document_processed(document_url):
print(f"β
Document {doc_id} already processed, skipping...")
return doc_id
print(f"π Processing document: {doc_id}")
print(f"π URL: {document_url}")
temp_file_path = None
try:
# Step 1: Download file (enhanced to handle multiple types)
temp_file_path, ext = await self.file_downloader.download_file(document_url, timeout=timeout)
if temp_file_path == 'not supported':
return ['unsupported', ext]
# Step 2: Extract text based on file type
full_text = ""
match ext:
case 'pdf':
full_text = await self.text_extractor.extract_text_from_pdf(temp_file_path)
case 'docx':
full_text = extract_docx(temp_file_path)
case 'pptx':
full_text = extract_pptx(temp_file_path)
return [full_text, 'oneshot']
case 'url':
new_context = "URL for Context: " + temp_file_path
return [new_context, 'oneshot']
case 'txt':
with open(temp_file_path, 'r', encoding='utf-8') as f:
full_text = f.read()
case 'xlsx':
full_text = extract_xlsx(temp_file_path)
# Print a short preview (10-15 chars) to verify extraction
try:
preview = ''.join(full_text.split())[:15]
if preview:
print(f"π XLSX extracted preview: {preview}")
except Exception:
pass
return [full_text, 'tabular']
case 'csv':
with open(temp_file_path, 'r', encoding='utf-8') as f:
full_text = f.read()
return [full_text, 'tabular']
case 'png' | 'jpeg' | 'jpg':
# Don't clean up image files - they'll be cleaned up by the caller
return [temp_file_path, 'image', True] # Third element indicates no cleanup needed
case _:
raise Exception(f"Unsupported file type: {ext}")
# Validate extracted text
if not self.text_extractor.validate_extracted_text(full_text):
raise Exception("No meaningful text extracted from document")
# Step 3: Create chunks
chunks = self.text_chunker.chunk_text(full_text)
# Check if document is too short for chunking
if len(chunks) < 5:
print(f"Only {len(chunks)} chunks formed, going for oneshot.")
return [full_text, 'oneshot']
if not chunks:
raise Exception("No chunks created from text")
# Log chunk statistics
chunk_stats = self.text_chunker.get_chunk_stats(chunks)
print(f"π Chunk Statistics: {chunk_stats['total_chunks']} chunks, "
f"avg size: {chunk_stats['avg_chunk_size']:.0f} chars")
# Step 4: Create embeddings
embeddings = await self.embedding_manager.create_embeddings(chunks)
# Validate embeddings
if not self.embedding_manager.validate_embeddings(embeddings, len(chunks)):
raise Exception("Invalid embeddings generated")
# Step 5: Store in Qdrant
await self.vector_storage.store_in_qdrant(chunks, embeddings, doc_id)
# Step 6: Save metadata
self.metadata_manager.save_document_metadata(chunks, doc_id, document_url)
print(f"β
Document {doc_id} processed successfully: {len(chunks)} chunks")
return doc_id
except Exception as e:
print(f"β Error processing document {doc_id}: {str(e)}")
raise
finally:
# Clean up temporary file - but NOT for images since they need the file path
if temp_file_path and ext not in ['png', 'jpeg', 'jpg']:
self.file_downloader.cleanup_temp_file(temp_file_path)
async def process_multiple_documents(self, document_urls: List[str], force_reprocess: bool = False) -> Dict[str, str]:
"""
Process multiple documents concurrently.
Args:
document_urls: List of PDF URLs
force_reprocess: If True, reprocess even if already processed
Returns:
Dict[str, str]: Mapping of URLs to document IDs
"""
print(f"π Processing {len(document_urls)} documents...")
results = {}
# Process documents concurrently (with limited concurrency)
semaphore = asyncio.Semaphore(3) # Limit to 3 concurrent downloads
async def process_single(url):
async with semaphore:
try:
doc_id = await self.process_document(url, force_reprocess)
return url, doc_id
except Exception as e:
print(f"β Failed to process {url}: {str(e)}")
return url, None
tasks = [process_single(url) for url in document_urls]
completed_tasks = await asyncio.gather(*tasks, return_exceptions=True)
for result in completed_tasks:
if isinstance(result, tuple):
url, doc_id = result
if doc_id:
results[url] = doc_id
print(f"β
Successfully processed {len(results)}/{len(document_urls)} documents")
return results
def get_system_info(self) -> Dict[str, Any]:
"""
Get information about the preprocessing system.
Returns:
Dict[str, Any]: System information
"""
return {
"base_db_path": str(self.base_db_path),
"embedding_model": self.embedding_manager.get_model_info(),
"text_chunker_config": {
"chunk_size": self.text_chunker.chunk_size,
"chunk_overlap": self.text_chunker.chunk_overlap
},
"processed_documents_registry": self.metadata_manager.get_registry_path(),
"collection_stats": self.get_collection_stats()
}
def cleanup_document(self, document_url: str) -> bool:
"""
Remove all data for a specific document.
Args:
document_url: URL of the document to clean up
Returns:
bool: True if successfully cleaned up
"""
doc_id = self.generate_doc_id(document_url)
try:
# Remove vector storage
vector_removed = self.vector_storage.delete_collection(doc_id)
# Remove metadata
metadata_removed = self.metadata_manager.remove_document_metadata(doc_id)
success = vector_removed and metadata_removed
if success:
print(f"β
Successfully cleaned up document {doc_id}")
else:
print(f"β οΈ Partial cleanup for document {doc_id}")
return success
except Exception as e:
print(f"β Error cleaning up document {doc_id}: {e}")
return False
|