Spaces:
Sleeping
Sleeping
File size: 2,516 Bytes
f468a7c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
from __future__ import annotations
import os
from pathlib import Path
from django.utils import timezone
from hue_portal.core.models import IngestionJob
# Optional celery import - may not be available in all environments
try:
from celery import shared_task
CELERY_AVAILABLE = True
except (ImportError, AttributeError, Exception):
CELERY_AVAILABLE = False
# Create a dummy decorator if celery is not available
def shared_task(*args, **kwargs):
def decorator(func):
return func
return decorator
@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=30, retry_kwargs={"max_retries": 3})
def process_ingestion_job(self, job_id: str) -> None:
job = IngestionJob.objects.filter(id=job_id).first()
if not job:
return
job.status = IngestionJob.STATUS_RUNNING
job.started_at = timezone.now()
job.progress = 10
job.save(update_fields=["status", "started_at", "progress", "updated_at"])
try:
storage_path = Path(job.storage_path)
if not storage_path.exists():
raise FileNotFoundError(f"Job file missing: {storage_path}")
from hue_portal.core.services.legal_ingestion import ingest_uploaded_document
with storage_path.open("rb") as handle:
result = ingest_uploaded_document(
file_obj=handle,
filename=job.filename,
metadata=job.metadata or {},
)
job.status = IngestionJob.STATUS_COMPLETED
job.document = result.document
job.finished_at = timezone.now()
job.progress = 100
job.stats = {"sections": result.sections_count, "images": result.images_count}
job.save(
update_fields=[
"status",
"document",
"finished_at",
"progress",
"stats",
"updated_at",
]
)
if os.getenv("DELETE_JOB_FILES_ON_SUCCESS", "false").lower() == "true":
storage_path.unlink(missing_ok=True)
except Exception as exc: # pragma: no cover - logging path
job.status = IngestionJob.STATUS_FAILED
job.error_message = str(exc)
job.finished_at = timezone.now()
job.progress = 100
job.save(
update_fields=[
"status",
"error_message",
"finished_at",
"progress",
"updated_at",
]
)
raise
|