""" Application Layer - Document Indexing Use Case Handles document upload and indexing into the knowledge base. """ import hashlib from pathlib import Path from typing import List from uuid import uuid4 from app.application.dto import DocumentDTO, DocumentUploadDTO from app.domain.entities import Document, DocumentChunk, DocumentStatus, DocumentType from app.domain.interfaces import IChunkRepository, IDocumentRepository, IEmbedder class DocumentIndexingUseCase: """Use case for indexing documents into the knowledge base""" def __init__( self, document_repository: IDocumentRepository, chunk_repository: IChunkRepository, embedder: IEmbedder, chunking_service: "ChunkingService", ): self.document_repository = document_repository self.chunk_repository = chunk_repository self.embedder = embedder self.chunking_service = chunking_service async def execute(self, upload_dto: DocumentUploadDTO) -> DocumentDTO: """Execute document indexing pipeline""" # 1. Detect file type file_type = self._detect_file_type(upload_dto.filename) # 2. Create document entity document = Document( title=self._extract_title(upload_dto.filename), filename=upload_dto.filename, file_type=file_type, file_size=len(upload_dto.content), storage_path=self._generate_storage_path(upload_dto.filename), department=upload_dto.department, metadata=upload_dto.metadata, ) # 3. Save document to repository saved_document = await self.document_repository.create(document) # 4. Mark as processing saved_document.mark_as_processing() await self.document_repository.update(saved_document) try: # 5. Extract text content text_content = await self._extract_text(upload_dto.content, file_type) # 6. Chunk the document chunks_data = await self.chunking_service.chunk_text( text=text_content, document_id=saved_document.id, metadata=upload_dto.metadata ) # 7. Generate embeddings texts = [chunk.content for chunk in chunks_data] embeddings = await self.embedder.embed_texts(texts) # 8. Store chunks with embeddings # (Vector storage will be handled in infrastructure layer) chunks = await self.chunk_repository.create_bulk(chunks_data) # 9. Mark document as indexed saved_document.mark_as_indexed() await self.document_repository.update(saved_document) # 10. Return DTO return self._to_dto(saved_document) except Exception as e: # Mark as failed saved_document.mark_as_failed() await self.document_repository.update(saved_document) raise def _detect_file_type(self, filename: str) -> DocumentType: """Detect file type from filename""" suffix = Path(filename).suffix.lower() type_map = { ".pdf": DocumentType.PDF, ".docx": DocumentType.DOCX, ".txt": DocumentType.TXT, ".md": DocumentType.MD, ".html": DocumentType.HTML, } return type_map.get(suffix, DocumentType.TXT) def _extract_title(self, filename: str) -> str: """Extract title from filename""" return Path(filename).stem.replace("_", " ").replace("-", " ").title() def _generate_storage_path(self, filename: str) -> str: """Generate unique storage path""" file_hash = hashlib.md5(f"{uuid4()}{filename}".encode()).hexdigest() return f"documents/{file_hash[:2]}/{file_hash}/{filename}" async def _extract_text(self, content: bytes, file_type: DocumentType) -> str: """Extract text from document content""" # Simplified - in production use proper libraries # (PyPDF2, python-docx, BeautifulSoup, etc.) if file_type == DocumentType.TXT or file_type == DocumentType.MD: return content.decode("utf-8") else: # Placeholder - implement proper extraction return content.decode("utf-8", errors="ignore") def _to_dto(self, document: Document) -> DocumentDTO: """Convert Document entity to DTO""" return DocumentDTO( id=str(document.id), title=document.title, filename=document.filename, file_type=document.file_type.value, file_size=document.file_size, department=document.department, status=document.status.value, uploaded_at=document.uploaded_at.isoformat(), indexed_at=document.indexed_at.isoformat() if document.indexed_at else None, metadata=document.metadata, )