Ragora-Server / app /application /document_service.py
Peterase's picture
feat: nested folder hierarchy with parent_id, root_only filter, recursive delete
5ebe979
"""Document service β€” async upload with background processing."""
from app.ports.storage import StoragePort
from app.ports.document_processor import DocumentProcessorPort
from app.ports.embedder import EmbedderPort
from app.ports.vector_db import VectorDBPort, VectorChunk
from app.services.chunking_service import ChunkingService
from app.models import Document, User
from app.database import SessionLocal
from sqlalchemy.orm import Session
from typing import BinaryIO, Optional, List
import logging
import uuid
logger = logging.getLogger(__name__)
class DocumentService:
"""Orchestrates document processing workflow."""
def __init__(
self,
storage: StoragePort,
processor: DocumentProcessorPort,
embedder: EmbedderPort,
vector_db: VectorDBPort,
chunking_service: ChunkingService,
db: Session,
):
self.storage = storage
self.processor = processor
self.embedder = embedder
self.vector_db = vector_db
self.chunking_service = chunking_service
self.db = db
# ── Public: called by the API endpoint ────────────────────────────────────
async def accept_upload(
self,
file_data: bytes,
filename: str,
file_size: int,
user: User,
folder_id: Optional[str] = None,
) -> Document:
"""
Persist the file immediately and return a Document with status='processing'.
The heavy work (extract β†’ embed β†’ store) is scheduled as a background task.
"""
if not self.processor.supports_file(filename):
raise ValueError(f"Unsupported file type: {filename}")
storage_key = f"{user.org_id}/{uuid.uuid4()}_{filename}"
content_type = self._get_content_type(filename)
# Store raw bytes (non-blocking β€” our DB adapter just caches in memory)
await self.storage.upload(storage_key, file_data, content_type)
# Persist document record immediately β€” status = "processing"
document = Document(
name=filename,
size=file_size,
storage_path=storage_key,
file_content=file_data,
chunks=0,
status="processing",
user_id=user.id,
org_id=user.org_id,
folder_id=folder_id,
)
self.db.add(document)
self.db.commit()
self.db.refresh(document)
logger.info(f"Accepted upload: {filename} β†’ doc {document.id} (processing in background)")
return document
async def process_document_background(self, document_id: str) -> None:
"""
Heavy processing: extract text β†’ chunk β†’ embed β†’ store vectors.
Runs as a FastAPI BackgroundTask so it never blocks the HTTP response.
Uses its own DB session (the request session is already closed).
"""
db = SessionLocal()
try:
document = db.query(Document).filter(Document.id == document_id).first()
if not document:
logger.error(f"Background task: document {document_id} not found")
return
logger.info(f"Background processing: {document.name} ({document_id})")
# 1. Extract text (runs in thread pool β€” non-blocking)
from io import BytesIO
text = await self.processor.extract_text(BytesIO(document.file_content), document.name)
# 2. Chunk
chunks = self.chunking_service.chunk_text(text)
logger.info(f"Split into {len(chunks)} chunks")
# 3. Embed (runs in thread pool β€” non-blocking)
embeddings = await self.embedder.embed_batch(chunks)
# 4. Store vectors
from app.config import get_settings
settings = get_settings()
vector_chunks = [
VectorChunk(
id=f"{document_id}_{i}",
document_id=document_id,
chunk_index=i,
text=chunk,
embedding=embedding,
metadata={
"org_id": document.org_id,
"document_name": document.name,
},
)
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings))
]
await self.vector_db.store_chunks(vector_chunks, settings.QDRANT_COLLECTION)
# 5. Mark done
document.chunks = len(chunks)
document.status = "done"
db.commit()
logger.info(f"Done processing {document.name}: {len(chunks)} chunks stored")
except Exception as e:
logger.error(f"Background processing failed for {document_id}: {e}", exc_info=True)
try:
document = db.query(Document).filter(Document.id == document_id).first()
if document:
document.status = "error"
document.error_message = str(e)
db.commit()
except Exception:
pass
finally:
db.close()
# ── Other service methods ─────────────────────────────────────────────────
async def list_documents(
self,
user: User,
folder_id: Optional[str] = None,
root_only: bool = False
) -> List[Document]:
query = self.db.query(Document).filter(Document.org_id == user.org_id)
if folder_id:
# Specific folder
query = query.filter(Document.folder_id == folder_id)
elif root_only:
# Only files with no folder (root level)
query = query.filter(Document.folder_id == None)
# else: no filter = all documents
return query.order_by(Document.created_at.desc()).all()
async def get_document_status(self, document_id: str, user: User) -> Document:
doc = self.db.query(Document).filter(
Document.id == document_id,
Document.org_id == user.org_id,
).first()
if not doc:
raise ValueError("Document not found")
return doc
async def delete_document(self, document_id: str, user: User) -> None:
document = self.db.query(Document).filter(
Document.id == document_id,
Document.org_id == user.org_id,
).first()
if not document:
raise ValueError("Document not found")
await self.storage.delete(document.storage_path)
from app.config import get_settings
settings = get_settings()
await self.vector_db.delete_document(document_id, settings.QDRANT_COLLECTION)
self.db.delete(document)
self.db.commit()
logger.info(f"Deleted document {document_id}")
async def get_download_url(self, document_id: str, user: User) -> str:
document = self.db.query(Document).filter(
Document.id == document_id,
Document.org_id == user.org_id,
).first()
if not document:
raise ValueError("Document not found")
return await self.storage.get_presigned_url(document.storage_path)
def _get_content_type(self, filename: str) -> str:
if filename.endswith(".pdf"):
return "application/pdf"
elif filename.endswith((".docx", ".doc")):
return "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
return "application/octet-stream"