Spaces:
Running
Running
| """Analysis service — async document parsing orchestration. | |
| Uses an injected DocumentConverter (port) so the service is decoupled | |
| from the conversion implementation (local Docling lib vs remote Docling Serve). | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import functools | |
| import json | |
| import logging | |
| from dataclasses import asdict | |
| from typing import TYPE_CHECKING | |
| from domain.models import AnalysisJob, AnalysisStatus | |
| from domain.value_objects import ChunkingOptions, ChunkResult, ConversionOptions, ConversionResult | |
| if TYPE_CHECKING: | |
| from domain.ports import DocumentChunker, DocumentConverter | |
| from persistence import analysis_repo, document_repo | |
| logger = logging.getLogger(__name__) | |
| def _chunk_to_dict(c: ChunkResult) -> dict: | |
| """Serialize ChunkResult to a camelCase dict matching the frontend API contract.""" | |
| return { | |
| "text": c.text, | |
| "headings": c.headings, | |
| "sourcePage": c.source_page, | |
| "tokenCount": c.token_count, | |
| "bboxes": [{"page": b.page, "bbox": b.bbox} for b in c.bboxes], | |
| } | |
| # Maximum number of concurrent analysis jobs to prevent resource exhaustion. | |
| _DEFAULT_MAX_CONCURRENT = 3 | |
| class AnalysisService: | |
| """Orchestrates document analysis using an injected converter.""" | |
| def __init__( | |
| self, | |
| converter: DocumentConverter, | |
| chunker: DocumentChunker | None = None, | |
| conversion_timeout: int = 600, | |
| max_concurrent: int = _DEFAULT_MAX_CONCURRENT, | |
| ): | |
| self._converter = converter | |
| self._chunker = chunker | |
| self._conversion_timeout = conversion_timeout | |
| self._semaphore = asyncio.Semaphore(max_concurrent) | |
| async def create( | |
| self, | |
| document_id: str, | |
| *, | |
| pipeline_options: dict | None = None, | |
| chunking_options: dict | None = None, | |
| ) -> AnalysisJob: | |
| """Create a new analysis job and launch background processing.""" | |
| doc = await document_repo.find_by_id(document_id) | |
| if not doc: | |
| raise ValueError(f"Document not found: {document_id}") | |
| job = AnalysisJob(document_id=document_id) | |
| job.document_filename = doc.filename | |
| await analysis_repo.insert(job) | |
| task = asyncio.create_task( | |
| self._run_analysis( | |
| job.id, | |
| doc.storage_path, | |
| doc.filename, | |
| pipeline_options, | |
| chunking_options, | |
| ) | |
| ) | |
| task.add_done_callback(functools.partial(_on_task_done, job_id=job.id)) | |
| return job | |
| async def find_all(self) -> list[AnalysisJob]: | |
| """Return all analysis jobs, newest first.""" | |
| return await analysis_repo.find_all() | |
| async def find_by_id(self, job_id: str) -> AnalysisJob | None: | |
| """Find an analysis job by ID, or return None.""" | |
| return await analysis_repo.find_by_id(job_id) | |
| async def delete(self, job_id: str) -> bool: | |
| """Delete an analysis job. Returns True if it existed.""" | |
| return await analysis_repo.delete(job_id) | |
| async def rechunk(self, job_id: str, chunking_options: dict) -> list[ChunkResult]: | |
| """Re-chunk an existing completed analysis with new options.""" | |
| job = await analysis_repo.find_by_id(job_id) | |
| if not job: | |
| raise ValueError(f"Analysis not found: {job_id}") | |
| if job.status != AnalysisStatus.COMPLETED: | |
| raise ValueError(f"Analysis is not completed: {job_id}") | |
| if not job.document_json: | |
| raise ValueError(f"No document data available for re-chunking: {job_id}") | |
| if not self._chunker: | |
| raise ValueError("Chunking is not available") | |
| options = ChunkingOptions(**chunking_options) | |
| chunks = await self._chunker.chunk(job.document_json, options) | |
| chunks_json = json.dumps([_chunk_to_dict(c) for c in chunks]) | |
| await analysis_repo.update_chunks(job_id, chunks_json) | |
| return chunks | |
| async def _run_analysis( | |
| self, | |
| job_id: str, | |
| file_path: str, | |
| filename: str, | |
| pipeline_options: dict | None = None, | |
| chunking_options: dict | None = None, | |
| ) -> None: | |
| """Background task: run conversion and optionally chunk. | |
| Acquires the concurrency semaphore to limit parallel conversions | |
| and prevent CPU/memory exhaustion on modest hardware. | |
| """ | |
| async with self._semaphore: | |
| await self._run_analysis_inner( | |
| job_id, file_path, filename, pipeline_options, chunking_options | |
| ) | |
| async def _run_analysis_inner( | |
| self, | |
| job_id: str, | |
| file_path: str, | |
| filename: str, | |
| pipeline_options: dict | None = None, | |
| chunking_options: dict | None = None, | |
| ) -> None: | |
| """Inner analysis logic — called under the concurrency semaphore.""" | |
| try: | |
| job = await analysis_repo.find_by_id(job_id) | |
| if not job: | |
| logger.error("Analysis job %s not found", job_id) | |
| return | |
| job.mark_running() | |
| await analysis_repo.update_status(job) | |
| logger.info("Analysis started: %s (file: %s)", job_id, filename) | |
| options = ConversionOptions(**(pipeline_options or {})) | |
| result: ConversionResult = await asyncio.wait_for( | |
| self._converter.convert(file_path, options), | |
| timeout=self._conversion_timeout, | |
| ) | |
| pages_json = json.dumps([asdict(p) for p in result.pages]) | |
| chunks_json = None | |
| if chunking_options and self._chunker and result.document_json: | |
| chunk_opts = ChunkingOptions(**chunking_options) | |
| chunks = await self._chunker.chunk(result.document_json, chunk_opts) | |
| chunks_json = json.dumps([_chunk_to_dict(c) for c in chunks]) | |
| logger.info("Chunking produced %d chunks for job %s", len(chunks), job_id) | |
| job.mark_completed( | |
| markdown=result.content_markdown, | |
| html=result.content_html, | |
| pages_json=pages_json, | |
| document_json=result.document_json, | |
| chunks_json=chunks_json, | |
| ) | |
| await analysis_repo.update_status(job) | |
| if result.page_count: | |
| await document_repo.update_page_count(job.document_id, result.page_count) | |
| logger.info("Analysis completed: %s (%d pages)", job_id, result.page_count) | |
| except TimeoutError: | |
| logger.error("Analysis timed out after %ds: %s", self._conversion_timeout, job_id) | |
| await _mark_failed(job_id, f"Conversion timed out after {self._conversion_timeout}s") | |
| except Exception as e: | |
| logger.exception("Analysis failed: %s", job_id) | |
| await _mark_failed(job_id, str(e)) | |
| _background_tasks: set[asyncio.Task] = set() | |
| def _on_task_done(task: asyncio.Task, *, job_id: str) -> None: | |
| """Log unhandled exceptions from background analysis tasks and mark job as FAILED.""" | |
| if task.cancelled(): | |
| logger.warning("Analysis task was cancelled: %s", job_id) | |
| _schedule_mark_failed(job_id, "Task was cancelled") | |
| return | |
| exc = task.exception() | |
| if exc: | |
| logger.error("Unhandled exception in analysis task %s: %s", job_id, exc, exc_info=True) | |
| _schedule_mark_failed(job_id, str(exc)) | |
| def _schedule_mark_failed(job_id: str, error: str) -> None: | |
| """Schedule _mark_failed as a tracked background task.""" | |
| t = asyncio.ensure_future(_mark_failed(job_id, error)) | |
| _background_tasks.add(t) | |
| t.add_done_callback(_background_tasks.discard) | |
| async def _mark_failed(job_id: str, error: str) -> None: | |
| """Safely mark a job as failed, handling DB errors gracefully.""" | |
| try: | |
| job = await analysis_repo.find_by_id(job_id) | |
| if job: | |
| job.mark_failed(error) | |
| await analysis_repo.update_status(job) | |
| except OSError: | |
| logger.exception("Database I/O error marking job %s as failed", job_id) | |
| except Exception: | |
| logger.exception("Unexpected error marking job %s as failed", job_id) | |