Spaces:
Running
Running
File size: 8,176 Bytes
5539271 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 | """Analysis service — async document parsing orchestration.
Uses an injected DocumentConverter (port) so the service is decoupled
from the conversion implementation (local Docling lib vs remote Docling Serve).
"""
from __future__ import annotations
import asyncio
import functools
import json
import logging
from dataclasses import asdict
from typing import TYPE_CHECKING
from domain.models import AnalysisJob, AnalysisStatus
from domain.value_objects import ChunkingOptions, ChunkResult, ConversionOptions, ConversionResult
if TYPE_CHECKING:
from domain.ports import DocumentChunker, DocumentConverter
from persistence import analysis_repo, document_repo
logger = logging.getLogger(__name__)
def _chunk_to_dict(c: ChunkResult) -> dict:
"""Serialize ChunkResult to a camelCase dict matching the frontend API contract."""
return {
"text": c.text,
"headings": c.headings,
"sourcePage": c.source_page,
"tokenCount": c.token_count,
"bboxes": [{"page": b.page, "bbox": b.bbox} for b in c.bboxes],
}
# Maximum number of concurrent analysis jobs to prevent resource exhaustion.
_DEFAULT_MAX_CONCURRENT = 3
class AnalysisService:
"""Orchestrates document analysis using an injected converter."""
def __init__(
self,
converter: DocumentConverter,
chunker: DocumentChunker | None = None,
conversion_timeout: int = 600,
max_concurrent: int = _DEFAULT_MAX_CONCURRENT,
):
self._converter = converter
self._chunker = chunker
self._conversion_timeout = conversion_timeout
self._semaphore = asyncio.Semaphore(max_concurrent)
async def create(
self,
document_id: str,
*,
pipeline_options: dict | None = None,
chunking_options: dict | None = None,
) -> AnalysisJob:
"""Create a new analysis job and launch background processing."""
doc = await document_repo.find_by_id(document_id)
if not doc:
raise ValueError(f"Document not found: {document_id}")
job = AnalysisJob(document_id=document_id)
job.document_filename = doc.filename
await analysis_repo.insert(job)
task = asyncio.create_task(
self._run_analysis(
job.id,
doc.storage_path,
doc.filename,
pipeline_options,
chunking_options,
)
)
task.add_done_callback(functools.partial(_on_task_done, job_id=job.id))
return job
async def find_all(self) -> list[AnalysisJob]:
"""Return all analysis jobs, newest first."""
return await analysis_repo.find_all()
async def find_by_id(self, job_id: str) -> AnalysisJob | None:
"""Find an analysis job by ID, or return None."""
return await analysis_repo.find_by_id(job_id)
async def delete(self, job_id: str) -> bool:
"""Delete an analysis job. Returns True if it existed."""
return await analysis_repo.delete(job_id)
async def rechunk(self, job_id: str, chunking_options: dict) -> list[ChunkResult]:
"""Re-chunk an existing completed analysis with new options."""
job = await analysis_repo.find_by_id(job_id)
if not job:
raise ValueError(f"Analysis not found: {job_id}")
if job.status != AnalysisStatus.COMPLETED:
raise ValueError(f"Analysis is not completed: {job_id}")
if not job.document_json:
raise ValueError(f"No document data available for re-chunking: {job_id}")
if not self._chunker:
raise ValueError("Chunking is not available")
options = ChunkingOptions(**chunking_options)
chunks = await self._chunker.chunk(job.document_json, options)
chunks_json = json.dumps([_chunk_to_dict(c) for c in chunks])
await analysis_repo.update_chunks(job_id, chunks_json)
return chunks
async def _run_analysis(
self,
job_id: str,
file_path: str,
filename: str,
pipeline_options: dict | None = None,
chunking_options: dict | None = None,
) -> None:
"""Background task: run conversion and optionally chunk.
Acquires the concurrency semaphore to limit parallel conversions
and prevent CPU/memory exhaustion on modest hardware.
"""
async with self._semaphore:
await self._run_analysis_inner(
job_id, file_path, filename, pipeline_options, chunking_options
)
async def _run_analysis_inner(
self,
job_id: str,
file_path: str,
filename: str,
pipeline_options: dict | None = None,
chunking_options: dict | None = None,
) -> None:
"""Inner analysis logic — called under the concurrency semaphore."""
try:
job = await analysis_repo.find_by_id(job_id)
if not job:
logger.error("Analysis job %s not found", job_id)
return
job.mark_running()
await analysis_repo.update_status(job)
logger.info("Analysis started: %s (file: %s)", job_id, filename)
options = ConversionOptions(**(pipeline_options or {}))
result: ConversionResult = await asyncio.wait_for(
self._converter.convert(file_path, options),
timeout=self._conversion_timeout,
)
pages_json = json.dumps([asdict(p) for p in result.pages])
chunks_json = None
if chunking_options and self._chunker and result.document_json:
chunk_opts = ChunkingOptions(**chunking_options)
chunks = await self._chunker.chunk(result.document_json, chunk_opts)
chunks_json = json.dumps([_chunk_to_dict(c) for c in chunks])
logger.info("Chunking produced %d chunks for job %s", len(chunks), job_id)
job.mark_completed(
markdown=result.content_markdown,
html=result.content_html,
pages_json=pages_json,
document_json=result.document_json,
chunks_json=chunks_json,
)
await analysis_repo.update_status(job)
if result.page_count:
await document_repo.update_page_count(job.document_id, result.page_count)
logger.info("Analysis completed: %s (%d pages)", job_id, result.page_count)
except TimeoutError:
logger.error("Analysis timed out after %ds: %s", self._conversion_timeout, job_id)
await _mark_failed(job_id, f"Conversion timed out after {self._conversion_timeout}s")
except Exception as e:
logger.exception("Analysis failed: %s", job_id)
await _mark_failed(job_id, str(e))
_background_tasks: set[asyncio.Task] = set()
def _on_task_done(task: asyncio.Task, *, job_id: str) -> None:
"""Log unhandled exceptions from background analysis tasks and mark job as FAILED."""
if task.cancelled():
logger.warning("Analysis task was cancelled: %s", job_id)
_schedule_mark_failed(job_id, "Task was cancelled")
return
exc = task.exception()
if exc:
logger.error("Unhandled exception in analysis task %s: %s", job_id, exc, exc_info=True)
_schedule_mark_failed(job_id, str(exc))
def _schedule_mark_failed(job_id: str, error: str) -> None:
"""Schedule _mark_failed as a tracked background task."""
t = asyncio.ensure_future(_mark_failed(job_id, error))
_background_tasks.add(t)
t.add_done_callback(_background_tasks.discard)
async def _mark_failed(job_id: str, error: str) -> None:
"""Safely mark a job as failed, handling DB errors gracefully."""
try:
job = await analysis_repo.find_by_id(job_id)
if job:
job.mark_failed(error)
await analysis_repo.update_status(job)
except OSError:
logger.exception("Database I/O error marking job %s as failed", job_id)
except Exception:
logger.exception("Unexpected error marking job %s as failed", job_id)
|