File size: 2,516 Bytes
f468a7c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
from __future__ import annotations

import os
from pathlib import Path

from django.utils import timezone

from hue_portal.core.models import IngestionJob

# Optional celery import - may not be available in all environments
try:
    from celery import shared_task
    CELERY_AVAILABLE = True
except (ImportError, AttributeError, Exception):
    CELERY_AVAILABLE = False
    # Create a dummy decorator if celery is not available
    def shared_task(*args, **kwargs):
        def decorator(func):
            return func
        return decorator


@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=30, retry_kwargs={"max_retries": 3})
def process_ingestion_job(self, job_id: str) -> None:
    job = IngestionJob.objects.filter(id=job_id).first()
    if not job:
        return

    job.status = IngestionJob.STATUS_RUNNING
    job.started_at = timezone.now()
    job.progress = 10
    job.save(update_fields=["status", "started_at", "progress", "updated_at"])

    try:
        storage_path = Path(job.storage_path)
        if not storage_path.exists():
            raise FileNotFoundError(f"Job file missing: {storage_path}")
        from hue_portal.core.services.legal_ingestion import ingest_uploaded_document

        with storage_path.open("rb") as handle:
            result = ingest_uploaded_document(
                file_obj=handle,
                filename=job.filename,
                metadata=job.metadata or {},
            )
        job.status = IngestionJob.STATUS_COMPLETED
        job.document = result.document
        job.finished_at = timezone.now()
        job.progress = 100
        job.stats = {"sections": result.sections_count, "images": result.images_count}
        job.save(
            update_fields=[
                "status",
                "document",
                "finished_at",
                "progress",
                "stats",
                "updated_at",
            ]
        )
        if os.getenv("DELETE_JOB_FILES_ON_SUCCESS", "false").lower() == "true":
            storage_path.unlink(missing_ok=True)
    except Exception as exc:  # pragma: no cover - logging path
        job.status = IngestionJob.STATUS_FAILED
        job.error_message = str(exc)
        job.finished_at = timezone.now()
        job.progress = 100
        job.save(
            update_fields=[
                "status",
                "error_message",
                "finished_at",
                "progress",
                "updated_at",
            ]
        )
        raise