Baktabek's picture
Upload folder using huggingface_hub
409c17a verified
"""
Application Layer - Document Indexing Use Case
Handles document upload and indexing into the knowledge base.
"""
import hashlib
from pathlib import Path
from typing import List
from uuid import uuid4
from app.application.dto import DocumentDTO, DocumentUploadDTO
from app.domain.entities import Document, DocumentChunk, DocumentStatus, DocumentType
from app.domain.interfaces import IChunkRepository, IDocumentRepository, IEmbedder
class DocumentIndexingUseCase:
"""Use case for indexing documents into the knowledge base"""
def __init__(
self,
document_repository: IDocumentRepository,
chunk_repository: IChunkRepository,
embedder: IEmbedder,
chunking_service: "ChunkingService",
):
self.document_repository = document_repository
self.chunk_repository = chunk_repository
self.embedder = embedder
self.chunking_service = chunking_service
async def execute(self, upload_dto: DocumentUploadDTO) -> DocumentDTO:
"""Execute document indexing pipeline"""
# 1. Detect file type
file_type = self._detect_file_type(upload_dto.filename)
# 2. Create document entity
document = Document(
title=self._extract_title(upload_dto.filename),
filename=upload_dto.filename,
file_type=file_type,
file_size=len(upload_dto.content),
storage_path=self._generate_storage_path(upload_dto.filename),
department=upload_dto.department,
metadata=upload_dto.metadata,
)
# 3. Save document to repository
saved_document = await self.document_repository.create(document)
# 4. Mark as processing
saved_document.mark_as_processing()
await self.document_repository.update(saved_document)
try:
# 5. Extract text content
text_content = await self._extract_text(upload_dto.content, file_type)
# 6. Chunk the document
chunks_data = await self.chunking_service.chunk_text(
text=text_content, document_id=saved_document.id, metadata=upload_dto.metadata
)
# 7. Generate embeddings
texts = [chunk.content for chunk in chunks_data]
embeddings = await self.embedder.embed_texts(texts)
# 8. Store chunks with embeddings
# (Vector storage will be handled in infrastructure layer)
chunks = await self.chunk_repository.create_bulk(chunks_data)
# 9. Mark document as indexed
saved_document.mark_as_indexed()
await self.document_repository.update(saved_document)
# 10. Return DTO
return self._to_dto(saved_document)
except Exception as e:
# Mark as failed
saved_document.mark_as_failed()
await self.document_repository.update(saved_document)
raise
def _detect_file_type(self, filename: str) -> DocumentType:
"""Detect file type from filename"""
suffix = Path(filename).suffix.lower()
type_map = {
".pdf": DocumentType.PDF,
".docx": DocumentType.DOCX,
".txt": DocumentType.TXT,
".md": DocumentType.MD,
".html": DocumentType.HTML,
}
return type_map.get(suffix, DocumentType.TXT)
def _extract_title(self, filename: str) -> str:
"""Extract title from filename"""
return Path(filename).stem.replace("_", " ").replace("-", " ").title()
def _generate_storage_path(self, filename: str) -> str:
"""Generate unique storage path"""
file_hash = hashlib.md5(f"{uuid4()}{filename}".encode()).hexdigest()
return f"documents/{file_hash[:2]}/{file_hash}/{filename}"
async def _extract_text(self, content: bytes, file_type: DocumentType) -> str:
"""Extract text from document content"""
# Simplified - in production use proper libraries
# (PyPDF2, python-docx, BeautifulSoup, etc.)
if file_type == DocumentType.TXT or file_type == DocumentType.MD:
return content.decode("utf-8")
else:
# Placeholder - implement proper extraction
return content.decode("utf-8", errors="ignore")
def _to_dto(self, document: Document) -> DocumentDTO:
"""Convert Document entity to DTO"""
return DocumentDTO(
id=str(document.id),
title=document.title,
filename=document.filename,
file_type=document.file_type.value,
file_size=document.file_size,
department=document.department,
status=document.status.value,
uploaded_at=document.uploaded_at.isoformat(),
indexed_at=document.indexed_at.isoformat() if document.indexed_at else None,
metadata=document.metadata,
)