docling-studio / document-parser /services /analysis_service.py
Pier-Jean's picture
Initial deploy: Docling Studio (local mode, port 7860)
5539271
"""Analysis service — async document parsing orchestration.
Uses an injected DocumentConverter (port) so the service is decoupled
from the conversion implementation (local Docling lib vs remote Docling Serve).
"""
from __future__ import annotations
import asyncio
import functools
import json
import logging
from dataclasses import asdict
from typing import TYPE_CHECKING
from domain.models import AnalysisJob, AnalysisStatus
from domain.value_objects import ChunkingOptions, ChunkResult, ConversionOptions, ConversionResult
if TYPE_CHECKING:
from domain.ports import DocumentChunker, DocumentConverter
from persistence import analysis_repo, document_repo
logger = logging.getLogger(__name__)
def _chunk_to_dict(c: ChunkResult) -> dict:
"""Serialize ChunkResult to a camelCase dict matching the frontend API contract."""
return {
"text": c.text,
"headings": c.headings,
"sourcePage": c.source_page,
"tokenCount": c.token_count,
"bboxes": [{"page": b.page, "bbox": b.bbox} for b in c.bboxes],
}
# Maximum number of concurrent analysis jobs to prevent resource exhaustion.
_DEFAULT_MAX_CONCURRENT = 3
class AnalysisService:
"""Orchestrates document analysis using an injected converter."""
def __init__(
self,
converter: DocumentConverter,
chunker: DocumentChunker | None = None,
conversion_timeout: int = 600,
max_concurrent: int = _DEFAULT_MAX_CONCURRENT,
):
self._converter = converter
self._chunker = chunker
self._conversion_timeout = conversion_timeout
self._semaphore = asyncio.Semaphore(max_concurrent)
async def create(
self,
document_id: str,
*,
pipeline_options: dict | None = None,
chunking_options: dict | None = None,
) -> AnalysisJob:
"""Create a new analysis job and launch background processing."""
doc = await document_repo.find_by_id(document_id)
if not doc:
raise ValueError(f"Document not found: {document_id}")
job = AnalysisJob(document_id=document_id)
job.document_filename = doc.filename
await analysis_repo.insert(job)
task = asyncio.create_task(
self._run_analysis(
job.id,
doc.storage_path,
doc.filename,
pipeline_options,
chunking_options,
)
)
task.add_done_callback(functools.partial(_on_task_done, job_id=job.id))
return job
async def find_all(self) -> list[AnalysisJob]:
"""Return all analysis jobs, newest first."""
return await analysis_repo.find_all()
async def find_by_id(self, job_id: str) -> AnalysisJob | None:
"""Find an analysis job by ID, or return None."""
return await analysis_repo.find_by_id(job_id)
async def delete(self, job_id: str) -> bool:
"""Delete an analysis job. Returns True if it existed."""
return await analysis_repo.delete(job_id)
async def rechunk(self, job_id: str, chunking_options: dict) -> list[ChunkResult]:
"""Re-chunk an existing completed analysis with new options."""
job = await analysis_repo.find_by_id(job_id)
if not job:
raise ValueError(f"Analysis not found: {job_id}")
if job.status != AnalysisStatus.COMPLETED:
raise ValueError(f"Analysis is not completed: {job_id}")
if not job.document_json:
raise ValueError(f"No document data available for re-chunking: {job_id}")
if not self._chunker:
raise ValueError("Chunking is not available")
options = ChunkingOptions(**chunking_options)
chunks = await self._chunker.chunk(job.document_json, options)
chunks_json = json.dumps([_chunk_to_dict(c) for c in chunks])
await analysis_repo.update_chunks(job_id, chunks_json)
return chunks
async def _run_analysis(
self,
job_id: str,
file_path: str,
filename: str,
pipeline_options: dict | None = None,
chunking_options: dict | None = None,
) -> None:
"""Background task: run conversion and optionally chunk.
Acquires the concurrency semaphore to limit parallel conversions
and prevent CPU/memory exhaustion on modest hardware.
"""
async with self._semaphore:
await self._run_analysis_inner(
job_id, file_path, filename, pipeline_options, chunking_options
)
async def _run_analysis_inner(
self,
job_id: str,
file_path: str,
filename: str,
pipeline_options: dict | None = None,
chunking_options: dict | None = None,
) -> None:
"""Inner analysis logic — called under the concurrency semaphore."""
try:
job = await analysis_repo.find_by_id(job_id)
if not job:
logger.error("Analysis job %s not found", job_id)
return
job.mark_running()
await analysis_repo.update_status(job)
logger.info("Analysis started: %s (file: %s)", job_id, filename)
options = ConversionOptions(**(pipeline_options or {}))
result: ConversionResult = await asyncio.wait_for(
self._converter.convert(file_path, options),
timeout=self._conversion_timeout,
)
pages_json = json.dumps([asdict(p) for p in result.pages])
chunks_json = None
if chunking_options and self._chunker and result.document_json:
chunk_opts = ChunkingOptions(**chunking_options)
chunks = await self._chunker.chunk(result.document_json, chunk_opts)
chunks_json = json.dumps([_chunk_to_dict(c) for c in chunks])
logger.info("Chunking produced %d chunks for job %s", len(chunks), job_id)
job.mark_completed(
markdown=result.content_markdown,
html=result.content_html,
pages_json=pages_json,
document_json=result.document_json,
chunks_json=chunks_json,
)
await analysis_repo.update_status(job)
if result.page_count:
await document_repo.update_page_count(job.document_id, result.page_count)
logger.info("Analysis completed: %s (%d pages)", job_id, result.page_count)
except TimeoutError:
logger.error("Analysis timed out after %ds: %s", self._conversion_timeout, job_id)
await _mark_failed(job_id, f"Conversion timed out after {self._conversion_timeout}s")
except Exception as e:
logger.exception("Analysis failed: %s", job_id)
await _mark_failed(job_id, str(e))
_background_tasks: set[asyncio.Task] = set()
def _on_task_done(task: asyncio.Task, *, job_id: str) -> None:
"""Log unhandled exceptions from background analysis tasks and mark job as FAILED."""
if task.cancelled():
logger.warning("Analysis task was cancelled: %s", job_id)
_schedule_mark_failed(job_id, "Task was cancelled")
return
exc = task.exception()
if exc:
logger.error("Unhandled exception in analysis task %s: %s", job_id, exc, exc_info=True)
_schedule_mark_failed(job_id, str(exc))
def _schedule_mark_failed(job_id: str, error: str) -> None:
"""Schedule _mark_failed as a tracked background task."""
t = asyncio.ensure_future(_mark_failed(job_id, error))
_background_tasks.add(t)
t.add_done_callback(_background_tasks.discard)
async def _mark_failed(job_id: str, error: str) -> None:
"""Safely mark a job as failed, handling DB errors gracefully."""
try:
job = await analysis_repo.find_by_id(job_id)
if job:
job.mark_failed(error)
await analysis_repo.update_status(job)
except OSError:
logger.exception("Database I/O error marking job %s as failed", job_id)
except Exception:
logger.exception("Unexpected error marking job %s as failed", job_id)