Spaces:
Running
Running
| """ | |
| Application Layer - Document Indexing Use Case | |
| Handles document upload and indexing into the knowledge base. | |
| """ | |
| import hashlib | |
| from pathlib import Path | |
| from typing import List | |
| from uuid import uuid4 | |
| from app.application.dto import DocumentDTO, DocumentUploadDTO | |
| from app.domain.entities import Document, DocumentChunk, DocumentStatus, DocumentType | |
| from app.domain.interfaces import IChunkRepository, IDocumentRepository, IEmbedder | |
| class DocumentIndexingUseCase: | |
| """Use case for indexing documents into the knowledge base""" | |
| def __init__( | |
| self, | |
| document_repository: IDocumentRepository, | |
| chunk_repository: IChunkRepository, | |
| embedder: IEmbedder, | |
| chunking_service: "ChunkingService", | |
| ): | |
| self.document_repository = document_repository | |
| self.chunk_repository = chunk_repository | |
| self.embedder = embedder | |
| self.chunking_service = chunking_service | |
| async def execute(self, upload_dto: DocumentUploadDTO) -> DocumentDTO: | |
| """Execute document indexing pipeline""" | |
| # 1. Detect file type | |
| file_type = self._detect_file_type(upload_dto.filename) | |
| # 2. Create document entity | |
| document = Document( | |
| title=self._extract_title(upload_dto.filename), | |
| filename=upload_dto.filename, | |
| file_type=file_type, | |
| file_size=len(upload_dto.content), | |
| storage_path=self._generate_storage_path(upload_dto.filename), | |
| department=upload_dto.department, | |
| metadata=upload_dto.metadata, | |
| ) | |
| # 3. Save document to repository | |
| saved_document = await self.document_repository.create(document) | |
| # 4. Mark as processing | |
| saved_document.mark_as_processing() | |
| await self.document_repository.update(saved_document) | |
| try: | |
| # 5. Extract text content | |
| text_content = await self._extract_text(upload_dto.content, file_type) | |
| # 6. Chunk the document | |
| chunks_data = await self.chunking_service.chunk_text( | |
| text=text_content, document_id=saved_document.id, metadata=upload_dto.metadata | |
| ) | |
| # 7. Generate embeddings | |
| texts = [chunk.content for chunk in chunks_data] | |
| embeddings = await self.embedder.embed_texts(texts) | |
| # 8. Store chunks with embeddings | |
| # (Vector storage will be handled in infrastructure layer) | |
| chunks = await self.chunk_repository.create_bulk(chunks_data) | |
| # 9. Mark document as indexed | |
| saved_document.mark_as_indexed() | |
| await self.document_repository.update(saved_document) | |
| # 10. Return DTO | |
| return self._to_dto(saved_document) | |
| except Exception as e: | |
| # Mark as failed | |
| saved_document.mark_as_failed() | |
| await self.document_repository.update(saved_document) | |
| raise | |
| def _detect_file_type(self, filename: str) -> DocumentType: | |
| """Detect file type from filename""" | |
| suffix = Path(filename).suffix.lower() | |
| type_map = { | |
| ".pdf": DocumentType.PDF, | |
| ".docx": DocumentType.DOCX, | |
| ".txt": DocumentType.TXT, | |
| ".md": DocumentType.MD, | |
| ".html": DocumentType.HTML, | |
| } | |
| return type_map.get(suffix, DocumentType.TXT) | |
| def _extract_title(self, filename: str) -> str: | |
| """Extract title from filename""" | |
| return Path(filename).stem.replace("_", " ").replace("-", " ").title() | |
| def _generate_storage_path(self, filename: str) -> str: | |
| """Generate unique storage path""" | |
| file_hash = hashlib.md5(f"{uuid4()}{filename}".encode()).hexdigest() | |
| return f"documents/{file_hash[:2]}/{file_hash}/{filename}" | |
| async def _extract_text(self, content: bytes, file_type: DocumentType) -> str: | |
| """Extract text from document content""" | |
| # Simplified - in production use proper libraries | |
| # (PyPDF2, python-docx, BeautifulSoup, etc.) | |
| if file_type == DocumentType.TXT or file_type == DocumentType.MD: | |
| return content.decode("utf-8") | |
| else: | |
| # Placeholder - implement proper extraction | |
| return content.decode("utf-8", errors="ignore") | |
| def _to_dto(self, document: Document) -> DocumentDTO: | |
| """Convert Document entity to DTO""" | |
| return DocumentDTO( | |
| id=str(document.id), | |
| title=document.title, | |
| filename=document.filename, | |
| file_type=document.file_type.value, | |
| file_size=document.file_size, | |
| department=document.department, | |
| status=document.status.value, | |
| uploaded_at=document.uploaded_at.isoformat(), | |
| indexed_at=document.indexed_at.isoformat() if document.indexed_at else None, | |
| metadata=document.metadata, | |
| ) | |