"""Analysis service — async document parsing orchestration. Uses an injected DocumentConverter (port) so the service is decoupled from the conversion implementation (local Docling lib vs remote Docling Serve). """ from __future__ import annotations import asyncio import functools import json import logging from dataclasses import asdict from typing import TYPE_CHECKING from domain.models import AnalysisJob, AnalysisStatus from domain.value_objects import ChunkingOptions, ChunkResult, ConversionOptions, ConversionResult if TYPE_CHECKING: from domain.ports import DocumentChunker, DocumentConverter from persistence import analysis_repo, document_repo logger = logging.getLogger(__name__) def _chunk_to_dict(c: ChunkResult) -> dict: """Serialize ChunkResult to a camelCase dict matching the frontend API contract.""" return { "text": c.text, "headings": c.headings, "sourcePage": c.source_page, "tokenCount": c.token_count, "bboxes": [{"page": b.page, "bbox": b.bbox} for b in c.bboxes], } # Maximum number of concurrent analysis jobs to prevent resource exhaustion. _DEFAULT_MAX_CONCURRENT = 3 class AnalysisService: """Orchestrates document analysis using an injected converter.""" def __init__( self, converter: DocumentConverter, chunker: DocumentChunker | None = None, conversion_timeout: int = 600, max_concurrent: int = _DEFAULT_MAX_CONCURRENT, ): self._converter = converter self._chunker = chunker self._conversion_timeout = conversion_timeout self._semaphore = asyncio.Semaphore(max_concurrent) async def create( self, document_id: str, *, pipeline_options: dict | None = None, chunking_options: dict | None = None, ) -> AnalysisJob: """Create a new analysis job and launch background processing.""" doc = await document_repo.find_by_id(document_id) if not doc: raise ValueError(f"Document not found: {document_id}") job = AnalysisJob(document_id=document_id) job.document_filename = doc.filename await analysis_repo.insert(job) task = asyncio.create_task( self._run_analysis( job.id, doc.storage_path, doc.filename, pipeline_options, chunking_options, ) ) task.add_done_callback(functools.partial(_on_task_done, job_id=job.id)) return job async def find_all(self) -> list[AnalysisJob]: """Return all analysis jobs, newest first.""" return await analysis_repo.find_all() async def find_by_id(self, job_id: str) -> AnalysisJob | None: """Find an analysis job by ID, or return None.""" return await analysis_repo.find_by_id(job_id) async def delete(self, job_id: str) -> bool: """Delete an analysis job. Returns True if it existed.""" return await analysis_repo.delete(job_id) async def rechunk(self, job_id: str, chunking_options: dict) -> list[ChunkResult]: """Re-chunk an existing completed analysis with new options.""" job = await analysis_repo.find_by_id(job_id) if not job: raise ValueError(f"Analysis not found: {job_id}") if job.status != AnalysisStatus.COMPLETED: raise ValueError(f"Analysis is not completed: {job_id}") if not job.document_json: raise ValueError(f"No document data available for re-chunking: {job_id}") if not self._chunker: raise ValueError("Chunking is not available") options = ChunkingOptions(**chunking_options) chunks = await self._chunker.chunk(job.document_json, options) chunks_json = json.dumps([_chunk_to_dict(c) for c in chunks]) await analysis_repo.update_chunks(job_id, chunks_json) return chunks async def _run_analysis( self, job_id: str, file_path: str, filename: str, pipeline_options: dict | None = None, chunking_options: dict | None = None, ) -> None: """Background task: run conversion and optionally chunk. Acquires the concurrency semaphore to limit parallel conversions and prevent CPU/memory exhaustion on modest hardware. """ async with self._semaphore: await self._run_analysis_inner( job_id, file_path, filename, pipeline_options, chunking_options ) async def _run_analysis_inner( self, job_id: str, file_path: str, filename: str, pipeline_options: dict | None = None, chunking_options: dict | None = None, ) -> None: """Inner analysis logic — called under the concurrency semaphore.""" try: job = await analysis_repo.find_by_id(job_id) if not job: logger.error("Analysis job %s not found", job_id) return job.mark_running() await analysis_repo.update_status(job) logger.info("Analysis started: %s (file: %s)", job_id, filename) options = ConversionOptions(**(pipeline_options or {})) result: ConversionResult = await asyncio.wait_for( self._converter.convert(file_path, options), timeout=self._conversion_timeout, ) pages_json = json.dumps([asdict(p) for p in result.pages]) chunks_json = None if chunking_options and self._chunker and result.document_json: chunk_opts = ChunkingOptions(**chunking_options) chunks = await self._chunker.chunk(result.document_json, chunk_opts) chunks_json = json.dumps([_chunk_to_dict(c) for c in chunks]) logger.info("Chunking produced %d chunks for job %s", len(chunks), job_id) job.mark_completed( markdown=result.content_markdown, html=result.content_html, pages_json=pages_json, document_json=result.document_json, chunks_json=chunks_json, ) await analysis_repo.update_status(job) if result.page_count: await document_repo.update_page_count(job.document_id, result.page_count) logger.info("Analysis completed: %s (%d pages)", job_id, result.page_count) except TimeoutError: logger.error("Analysis timed out after %ds: %s", self._conversion_timeout, job_id) await _mark_failed(job_id, f"Conversion timed out after {self._conversion_timeout}s") except Exception as e: logger.exception("Analysis failed: %s", job_id) await _mark_failed(job_id, str(e)) _background_tasks: set[asyncio.Task] = set() def _on_task_done(task: asyncio.Task, *, job_id: str) -> None: """Log unhandled exceptions from background analysis tasks and mark job as FAILED.""" if task.cancelled(): logger.warning("Analysis task was cancelled: %s", job_id) _schedule_mark_failed(job_id, "Task was cancelled") return exc = task.exception() if exc: logger.error("Unhandled exception in analysis task %s: %s", job_id, exc, exc_info=True) _schedule_mark_failed(job_id, str(exc)) def _schedule_mark_failed(job_id: str, error: str) -> None: """Schedule _mark_failed as a tracked background task.""" t = asyncio.ensure_future(_mark_failed(job_id, error)) _background_tasks.add(t) t.add_done_callback(_background_tasks.discard) async def _mark_failed(job_id: str, error: str) -> None: """Safely mark a job as failed, handling DB errors gracefully.""" try: job = await analysis_repo.find_by_id(job_id) if job: job.mark_failed(error) await analysis_repo.update_status(job) except OSError: logger.exception("Database I/O error marking job %s as failed", job_id) except Exception: logger.exception("Unexpected error marking job %s as failed", job_id)