Spaces:
Sleeping
Sleeping
File size: 4,806 Bytes
64d7fdf | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 | from ingestion.loaders import document_loader
from ingestion.preprocessor import text_preprocessor
from ingestion.chunker import chunker
from ingestion.embedder import embedder
from app.db.vector_store import vector_store
from app.db.mongodb import MongoDB
from app.utils.logger import logger
from app.utils.errors import DocumentProcessingError
from app.config import config
from pathlib import Path
from datetime import datetime
from typing import Dict, List
import hashlib
class DocumentProcessor:
def __init__(self):
self.loader = document_loader
self.preprocessor = text_preprocessor
self.chunker = chunker
self.embedder = embedder
self.vector_store = vector_store
self.mongodb = MongoDB()
self.collection_name = config["database"]["qdrant"]["collection_name"]
logger.info("DocumentProcessor initialized")
def _generate_doc_id(self, file_path: str) -> str:
content = f"{file_path}{datetime.utcnow().isoformat()}"
return hashlib.md5(content.encode()).hexdigest()
async def process_document(self, file_path: str, metadata: Dict = None) -> Dict:
try:
# Ensure MongoDB is connected
if self.mongodb.db is None:
await self.mongodb.connect()
documents = self.loader.load(file_path)
if not documents:
raise DocumentProcessingError("No content extracted from document")
documents = self.preprocessor.preprocess_documents(documents)
chunks = self.chunker.split_documents(documents)
if not chunks:
raise DocumentProcessingError("No chunks created from document")
logger.info(f"Created {len(chunks)} chunks")
doc_id = self._generate_doc_id(file_path)
file_name = Path(file_path).name
for i, chunk in enumerate(chunks):
if not hasattr(chunk, 'metadata'):
chunk.metadata = {}
chunk.metadata.update({
"doc_id": doc_id,
"file_name": file_name,
"chunk_index": i,
"total_chunks": len(chunks),
"processed_at": datetime.utcnow().isoformat(),
**(metadata or {})
})
# Add documents to Qdrant via vector store
from app.core.retriever import hybrid_retriever
hybrid_retriever._initialize_vector_store()
hybrid_retriever.vector_store.add_documents(chunks)
doc_metadata = {
"doc_id": doc_id,
"file_name": file_name,
"file_path": file_path,
"num_chunks": len(chunks),
"processed_at": datetime.utcnow(),
"metadata": metadata or {}
}
collection = self.mongodb.get_collection("documents")
await collection.insert_one(doc_metadata)
logger.info(f"Successfully processed document: {file_name}")
return {
"doc_id": doc_id,
"file_name": file_name,
"num_chunks": len(chunks),
"status": "success"
}
except Exception as e:
logger.error(f"Error processing document {file_path}: {str(e)}")
raise DocumentProcessingError(f"Document processing failed: {str(e)}")
async def process_documents(self, file_paths: List[str]) -> List[Dict]:
results = []
for file_path in file_paths:
try:
result = await self.process_document(file_path)
results.append(result)
except Exception as e:
results.append({
"file_path": file_path,
"status": "failed",
"error": str(e)
})
return results
async def delete_document(self, doc_id: str) -> bool:
try:
await self.vector_store.delete_by_metadata(
collection_name=self.collection_name,
metadata_key="doc_id",
metadata_value=doc_id
)
collection = self.mongodb.get_collection("documents")
await collection.delete_one({"doc_id": doc_id})
logger.info(f"Deleted document: {doc_id}")
return True
except Exception as e:
logger.error(f"Error deleting document {doc_id}: {str(e)}")
return False
document_processor = DocumentProcessor() |