davidtran999 commited on
Commit
f468a7c
·
verified ·
1 Parent(s): f81cf0b

Upload backend/core/tasks.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. backend/core/tasks.py +78 -0
backend/core/tasks.py ADDED
@@ -0,0 +1,78 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import os
4
+ from pathlib import Path
5
+
6
+ from django.utils import timezone
7
+
8
+ from hue_portal.core.models import IngestionJob
9
+
10
+ # Optional celery import - may not be available in all environments
11
+ try:
12
+ from celery import shared_task
13
+ CELERY_AVAILABLE = True
14
+ except (ImportError, AttributeError, Exception):
15
+ CELERY_AVAILABLE = False
16
+ # Create a dummy decorator if celery is not available
17
+ def shared_task(*args, **kwargs):
18
+ def decorator(func):
19
+ return func
20
+ return decorator
21
+
22
+
23
+ @shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=30, retry_kwargs={"max_retries": 3})
24
+ def process_ingestion_job(self, job_id: str) -> None:
25
+ job = IngestionJob.objects.filter(id=job_id).first()
26
+ if not job:
27
+ return
28
+
29
+ job.status = IngestionJob.STATUS_RUNNING
30
+ job.started_at = timezone.now()
31
+ job.progress = 10
32
+ job.save(update_fields=["status", "started_at", "progress", "updated_at"])
33
+
34
+ try:
35
+ storage_path = Path(job.storage_path)
36
+ if not storage_path.exists():
37
+ raise FileNotFoundError(f"Job file missing: {storage_path}")
38
+ from hue_portal.core.services.legal_ingestion import ingest_uploaded_document
39
+
40
+ with storage_path.open("rb") as handle:
41
+ result = ingest_uploaded_document(
42
+ file_obj=handle,
43
+ filename=job.filename,
44
+ metadata=job.metadata or {},
45
+ )
46
+ job.status = IngestionJob.STATUS_COMPLETED
47
+ job.document = result.document
48
+ job.finished_at = timezone.now()
49
+ job.progress = 100
50
+ job.stats = {"sections": result.sections_count, "images": result.images_count}
51
+ job.save(
52
+ update_fields=[
53
+ "status",
54
+ "document",
55
+ "finished_at",
56
+ "progress",
57
+ "stats",
58
+ "updated_at",
59
+ ]
60
+ )
61
+ if os.getenv("DELETE_JOB_FILES_ON_SUCCESS", "false").lower() == "true":
62
+ storage_path.unlink(missing_ok=True)
63
+ except Exception as exc: # pragma: no cover - logging path
64
+ job.status = IngestionJob.STATUS_FAILED
65
+ job.error_message = str(exc)
66
+ job.finished_at = timezone.now()
67
+ job.progress = 100
68
+ job.save(
69
+ update_fields=[
70
+ "status",
71
+ "error_message",
72
+ "finished_at",
73
+ "progress",
74
+ "updated_at",
75
+ ]
76
+ )
77
+ raise
78
+