File size: 4,806 Bytes
64d7fdf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
from ingestion.loaders import document_loader
from ingestion.preprocessor import text_preprocessor
from ingestion.chunker import chunker
from ingestion.embedder import embedder
from app.db.vector_store import vector_store
from app.db.mongodb import MongoDB
from app.utils.logger import logger
from app.utils.errors import DocumentProcessingError
from app.config import config
from pathlib import Path
from datetime import datetime
from typing import Dict, List
import hashlib


class DocumentProcessor:
    def __init__(self):
        self.loader = document_loader
        self.preprocessor = text_preprocessor
        self.chunker = chunker
        self.embedder = embedder
        self.vector_store = vector_store
        self.mongodb = MongoDB()
        self.collection_name = config["database"]["qdrant"]["collection_name"]
        
        logger.info("DocumentProcessor initialized")
    
    def _generate_doc_id(self, file_path: str) -> str:
        content = f"{file_path}{datetime.utcnow().isoformat()}"
        return hashlib.md5(content.encode()).hexdigest()
    
    async def process_document(self, file_path: str, metadata: Dict = None) -> Dict:
        try:
            # Ensure MongoDB is connected
            if self.mongodb.db is None:
                await self.mongodb.connect()
            
            documents = self.loader.load(file_path)
            
            if not documents:
                raise DocumentProcessingError("No content extracted from document")
            
            documents = self.preprocessor.preprocess_documents(documents)
            chunks = self.chunker.split_documents(documents)
            
            if not chunks:
                raise DocumentProcessingError("No chunks created from document")
            
            logger.info(f"Created {len(chunks)} chunks")
            
            doc_id = self._generate_doc_id(file_path)
            file_name = Path(file_path).name
            
            for i, chunk in enumerate(chunks):
                if not hasattr(chunk, 'metadata'):
                    chunk.metadata = {}
                chunk.metadata.update({
                    "doc_id": doc_id,
                    "file_name": file_name,
                    "chunk_index": i,
                    "total_chunks": len(chunks),
                    "processed_at": datetime.utcnow().isoformat(),
                    **(metadata or {})
                })
            
            # Add documents to Qdrant via vector store
            from app.core.retriever import hybrid_retriever
            hybrid_retriever._initialize_vector_store()
            hybrid_retriever.vector_store.add_documents(chunks)
            
            doc_metadata = {
                "doc_id": doc_id,
                "file_name": file_name,
                "file_path": file_path,
                "num_chunks": len(chunks),
                "processed_at": datetime.utcnow(),
                "metadata": metadata or {}
            }
            
            collection = self.mongodb.get_collection("documents")
            await collection.insert_one(doc_metadata)
            
            logger.info(f"Successfully processed document: {file_name}")
            
            return {
                "doc_id": doc_id,
                "file_name": file_name,
                "num_chunks": len(chunks),
                "status": "success"
            }
            
        except Exception as e:
            logger.error(f"Error processing document {file_path}: {str(e)}")
            raise DocumentProcessingError(f"Document processing failed: {str(e)}")
    
    async def process_documents(self, file_paths: List[str]) -> List[Dict]:
        results = []
        
        for file_path in file_paths:
            try:
                result = await self.process_document(file_path)
                results.append(result)
            except Exception as e:
                results.append({
                    "file_path": file_path,
                    "status": "failed",
                    "error": str(e)
                })
        
        return results
    
    async def delete_document(self, doc_id: str) -> bool:
        try:
            await self.vector_store.delete_by_metadata(
                collection_name=self.collection_name,
                metadata_key="doc_id",
                metadata_value=doc_id
            )
            
            collection = self.mongodb.get_collection("documents")
            await collection.delete_one({"doc_id": doc_id})
            
            logger.info(f"Deleted document: {doc_id}")
            return True
            
        except Exception as e:
            logger.error(f"Error deleting document {doc_id}: {str(e)}")
            return False


document_processor = DocumentProcessor()