File size: 5,037 Bytes
409c17a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""

Application Layer - Document Indexing Use Case



Handles document upload and indexing into the knowledge base.

"""
import hashlib
from pathlib import Path
from typing import List
from uuid import uuid4

from app.application.dto import DocumentDTO, DocumentUploadDTO
from app.domain.entities import Document, DocumentChunk, DocumentStatus, DocumentType
from app.domain.interfaces import IChunkRepository, IDocumentRepository, IEmbedder


class DocumentIndexingUseCase:
    """Use case for indexing documents into the knowledge base"""

    def __init__(

        self,

        document_repository: IDocumentRepository,

        chunk_repository: IChunkRepository,

        embedder: IEmbedder,

        chunking_service: "ChunkingService",

    ):
        self.document_repository = document_repository
        self.chunk_repository = chunk_repository
        self.embedder = embedder
        self.chunking_service = chunking_service

    async def execute(self, upload_dto: DocumentUploadDTO) -> DocumentDTO:
        """Execute document indexing pipeline"""

        # 1. Detect file type
        file_type = self._detect_file_type(upload_dto.filename)

        # 2. Create document entity
        document = Document(
            title=self._extract_title(upload_dto.filename),
            filename=upload_dto.filename,
            file_type=file_type,
            file_size=len(upload_dto.content),
            storage_path=self._generate_storage_path(upload_dto.filename),
            department=upload_dto.department,
            metadata=upload_dto.metadata,
        )

        # 3. Save document to repository
        saved_document = await self.document_repository.create(document)

        # 4. Mark as processing
        saved_document.mark_as_processing()
        await self.document_repository.update(saved_document)

        try:
            # 5. Extract text content
            text_content = await self._extract_text(upload_dto.content, file_type)

            # 6. Chunk the document
            chunks_data = await self.chunking_service.chunk_text(
                text=text_content, document_id=saved_document.id, metadata=upload_dto.metadata
            )

            # 7. Generate embeddings
            texts = [chunk.content for chunk in chunks_data]
            embeddings = await self.embedder.embed_texts(texts)

            # 8. Store chunks with embeddings
            # (Vector storage will be handled in infrastructure layer)
            chunks = await self.chunk_repository.create_bulk(chunks_data)

            # 9. Mark document as indexed
            saved_document.mark_as_indexed()
            await self.document_repository.update(saved_document)

            # 10. Return DTO
            return self._to_dto(saved_document)

        except Exception as e:
            # Mark as failed
            saved_document.mark_as_failed()
            await self.document_repository.update(saved_document)
            raise

    def _detect_file_type(self, filename: str) -> DocumentType:
        """Detect file type from filename"""
        suffix = Path(filename).suffix.lower()
        type_map = {
            ".pdf": DocumentType.PDF,
            ".docx": DocumentType.DOCX,
            ".txt": DocumentType.TXT,
            ".md": DocumentType.MD,
            ".html": DocumentType.HTML,
        }
        return type_map.get(suffix, DocumentType.TXT)

    def _extract_title(self, filename: str) -> str:
        """Extract title from filename"""
        return Path(filename).stem.replace("_", " ").replace("-", " ").title()

    def _generate_storage_path(self, filename: str) -> str:
        """Generate unique storage path"""
        file_hash = hashlib.md5(f"{uuid4()}{filename}".encode()).hexdigest()
        return f"documents/{file_hash[:2]}/{file_hash}/{filename}"

    async def _extract_text(self, content: bytes, file_type: DocumentType) -> str:
        """Extract text from document content"""
        # Simplified - in production use proper libraries
        # (PyPDF2, python-docx, BeautifulSoup, etc.)
        if file_type == DocumentType.TXT or file_type == DocumentType.MD:
            return content.decode("utf-8")
        else:
            # Placeholder - implement proper extraction
            return content.decode("utf-8", errors="ignore")

    def _to_dto(self, document: Document) -> DocumentDTO:
        """Convert Document entity to DTO"""
        return DocumentDTO(
            id=str(document.id),
            title=document.title,
            filename=document.filename,
            file_type=document.file_type.value,
            file_size=document.file_size,
            department=document.department,
            status=document.status.value,
            uploaded_at=document.uploaded_at.isoformat(),
            indexed_at=document.indexed_at.isoformat() if document.indexed_at else None,
            metadata=document.metadata,
        )