File size: 8,176 Bytes
5539271
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
"""Analysis service — async document parsing orchestration.

Uses an injected DocumentConverter (port) so the service is decoupled
from the conversion implementation (local Docling lib vs remote Docling Serve).
"""

from __future__ import annotations

import asyncio
import functools
import json
import logging
from dataclasses import asdict
from typing import TYPE_CHECKING

from domain.models import AnalysisJob, AnalysisStatus
from domain.value_objects import ChunkingOptions, ChunkResult, ConversionOptions, ConversionResult

if TYPE_CHECKING:
    from domain.ports import DocumentChunker, DocumentConverter
from persistence import analysis_repo, document_repo

logger = logging.getLogger(__name__)


def _chunk_to_dict(c: ChunkResult) -> dict:
    """Serialize ChunkResult to a camelCase dict matching the frontend API contract."""
    return {
        "text": c.text,
        "headings": c.headings,
        "sourcePage": c.source_page,
        "tokenCount": c.token_count,
        "bboxes": [{"page": b.page, "bbox": b.bbox} for b in c.bboxes],
    }


# Maximum number of concurrent analysis jobs to prevent resource exhaustion.
_DEFAULT_MAX_CONCURRENT = 3


class AnalysisService:
    """Orchestrates document analysis using an injected converter."""

    def __init__(
        self,
        converter: DocumentConverter,
        chunker: DocumentChunker | None = None,
        conversion_timeout: int = 600,
        max_concurrent: int = _DEFAULT_MAX_CONCURRENT,
    ):
        self._converter = converter
        self._chunker = chunker
        self._conversion_timeout = conversion_timeout
        self._semaphore = asyncio.Semaphore(max_concurrent)

    async def create(
        self,
        document_id: str,
        *,
        pipeline_options: dict | None = None,
        chunking_options: dict | None = None,
    ) -> AnalysisJob:
        """Create a new analysis job and launch background processing."""
        doc = await document_repo.find_by_id(document_id)
        if not doc:
            raise ValueError(f"Document not found: {document_id}")

        job = AnalysisJob(document_id=document_id)
        job.document_filename = doc.filename
        await analysis_repo.insert(job)

        task = asyncio.create_task(
            self._run_analysis(
                job.id,
                doc.storage_path,
                doc.filename,
                pipeline_options,
                chunking_options,
            )
        )
        task.add_done_callback(functools.partial(_on_task_done, job_id=job.id))

        return job

    async def find_all(self) -> list[AnalysisJob]:
        """Return all analysis jobs, newest first."""
        return await analysis_repo.find_all()

    async def find_by_id(self, job_id: str) -> AnalysisJob | None:
        """Find an analysis job by ID, or return None."""
        return await analysis_repo.find_by_id(job_id)

    async def delete(self, job_id: str) -> bool:
        """Delete an analysis job. Returns True if it existed."""
        return await analysis_repo.delete(job_id)

    async def rechunk(self, job_id: str, chunking_options: dict) -> list[ChunkResult]:
        """Re-chunk an existing completed analysis with new options."""
        job = await analysis_repo.find_by_id(job_id)
        if not job:
            raise ValueError(f"Analysis not found: {job_id}")
        if job.status != AnalysisStatus.COMPLETED:
            raise ValueError(f"Analysis is not completed: {job_id}")
        if not job.document_json:
            raise ValueError(f"No document data available for re-chunking: {job_id}")
        if not self._chunker:
            raise ValueError("Chunking is not available")

        options = ChunkingOptions(**chunking_options)
        chunks = await self._chunker.chunk(job.document_json, options)

        chunks_json = json.dumps([_chunk_to_dict(c) for c in chunks])
        await analysis_repo.update_chunks(job_id, chunks_json)

        return chunks

    async def _run_analysis(
        self,
        job_id: str,
        file_path: str,
        filename: str,
        pipeline_options: dict | None = None,
        chunking_options: dict | None = None,
    ) -> None:
        """Background task: run conversion and optionally chunk.

        Acquires the concurrency semaphore to limit parallel conversions
        and prevent CPU/memory exhaustion on modest hardware.
        """
        async with self._semaphore:
            await self._run_analysis_inner(
                job_id, file_path, filename, pipeline_options, chunking_options
            )

    async def _run_analysis_inner(
        self,
        job_id: str,
        file_path: str,
        filename: str,
        pipeline_options: dict | None = None,
        chunking_options: dict | None = None,
    ) -> None:
        """Inner analysis logic — called under the concurrency semaphore."""
        try:
            job = await analysis_repo.find_by_id(job_id)
            if not job:
                logger.error("Analysis job %s not found", job_id)
                return

            job.mark_running()
            await analysis_repo.update_status(job)
            logger.info("Analysis started: %s (file: %s)", job_id, filename)

            options = ConversionOptions(**(pipeline_options or {}))

            result: ConversionResult = await asyncio.wait_for(
                self._converter.convert(file_path, options),
                timeout=self._conversion_timeout,
            )

            pages_json = json.dumps([asdict(p) for p in result.pages])

            chunks_json = None
            if chunking_options and self._chunker and result.document_json:
                chunk_opts = ChunkingOptions(**chunking_options)
                chunks = await self._chunker.chunk(result.document_json, chunk_opts)
                chunks_json = json.dumps([_chunk_to_dict(c) for c in chunks])
                logger.info("Chunking produced %d chunks for job %s", len(chunks), job_id)

            job.mark_completed(
                markdown=result.content_markdown,
                html=result.content_html,
                pages_json=pages_json,
                document_json=result.document_json,
                chunks_json=chunks_json,
            )
            await analysis_repo.update_status(job)

            if result.page_count:
                await document_repo.update_page_count(job.document_id, result.page_count)

            logger.info("Analysis completed: %s (%d pages)", job_id, result.page_count)

        except TimeoutError:
            logger.error("Analysis timed out after %ds: %s", self._conversion_timeout, job_id)
            await _mark_failed(job_id, f"Conversion timed out after {self._conversion_timeout}s")

        except Exception as e:
            logger.exception("Analysis failed: %s", job_id)
            await _mark_failed(job_id, str(e))


_background_tasks: set[asyncio.Task] = set()


def _on_task_done(task: asyncio.Task, *, job_id: str) -> None:
    """Log unhandled exceptions from background analysis tasks and mark job as FAILED."""
    if task.cancelled():
        logger.warning("Analysis task was cancelled: %s", job_id)
        _schedule_mark_failed(job_id, "Task was cancelled")
        return
    exc = task.exception()
    if exc:
        logger.error("Unhandled exception in analysis task %s: %s", job_id, exc, exc_info=True)
        _schedule_mark_failed(job_id, str(exc))


def _schedule_mark_failed(job_id: str, error: str) -> None:
    """Schedule _mark_failed as a tracked background task."""
    t = asyncio.ensure_future(_mark_failed(job_id, error))
    _background_tasks.add(t)
    t.add_done_callback(_background_tasks.discard)


async def _mark_failed(job_id: str, error: str) -> None:
    """Safely mark a job as failed, handling DB errors gracefully."""
    try:
        job = await analysis_repo.find_by_id(job_id)
        if job:
            job.mark_failed(error)
            await analysis_repo.update_status(job)
    except OSError:
        logger.exception("Database I/O error marking job %s as failed", job_id)
    except Exception:
        logger.exception("Unexpected error marking job %s as failed", job_id)