Hamza4100 commited on
Commit
ff19c99
·
verified ·
1 Parent(s): d399225

Upload job_worker.py

Browse files
Files changed (1) hide show
  1. job_worker.py +157 -0
job_worker.py ADDED
@@ -0,0 +1,157 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ import uuid
4
+ import time
5
+ import threading
6
+ import asyncio
7
+ from typing import Dict, Optional
8
+
9
+ # Note: job processing will call into async functions from main via asyncio.run
10
+
11
+ JOBS_DIR = os.path.join(os.path.dirname(__file__), "jobs")
12
+ os.makedirs(JOBS_DIR, exist_ok=True)
13
+
14
+
15
+ class JobManager:
16
+ """Simple persistent job queue and worker.
17
+
18
+ Jobs are stored as JSON files in `jobs/` so they survive restarts.
19
+ Worker runs in a dedicated thread and processes jobs sequentially.
20
+ """
21
+
22
+ def __init__(self):
23
+ self._lock = threading.Lock()
24
+ self._stop_event = threading.Event()
25
+ self._thread: Optional[threading.Thread] = None
26
+
27
+ def start(self):
28
+ if self._thread and self._thread.is_alive():
29
+ return
30
+ self._thread = threading.Thread(target=self._run, daemon=True)
31
+ self._thread.start()
32
+
33
+ def stop(self):
34
+ self._stop_event.set()
35
+ if self._thread:
36
+ self._thread.join(timeout=2)
37
+
38
+ def _run(self):
39
+ # Continuously look for queued jobs and process them
40
+ while not self._stop_event.is_set():
41
+ job = self._pop_next_job()
42
+ if job:
43
+ try:
44
+ self._process_job(job)
45
+ except Exception as e:
46
+ # Mark job failed
47
+ job['status'] = 'failed'
48
+ job['error'] = str(e)
49
+ job['finished_at'] = time.time()
50
+ self._save_job(job)
51
+ else:
52
+ time.sleep(0.5)
53
+
54
+ def _job_path(self, job_id: str) -> str:
55
+ return os.path.join(JOBS_DIR, f"{job_id}.json")
56
+
57
+ def _save_job(self, job: Dict):
58
+ with open(self._job_path(job['id']), 'w', encoding='utf-8') as f:
59
+ json.dump(job, f, indent=2)
60
+
61
+ def _load_job(self, job_id: str) -> Optional[Dict]:
62
+ p = self._job_path(job_id)
63
+ if not os.path.exists(p):
64
+ return None
65
+ with open(p, 'r', encoding='utf-8') as f:
66
+ return json.load(f)
67
+
68
+ def _pop_next_job(self) -> Optional[Dict]:
69
+ # Find oldest job with status 'queued'
70
+ files = [f for f in os.listdir(JOBS_DIR) if f.endswith('.json')]
71
+ files.sort()
72
+ for fname in files:
73
+ path = os.path.join(JOBS_DIR, fname)
74
+ try:
75
+ with open(path, 'r', encoding='utf-8') as f:
76
+ job = json.load(f)
77
+ if job.get('status') == 'queued':
78
+ job['status'] = 'processing'
79
+ job['started_at'] = time.time()
80
+ self._save_job(job)
81
+ return job
82
+ except Exception:
83
+ continue
84
+ return None
85
+
86
+ def create_job(self, user_id: str, filename: str, file_path: str) -> Dict:
87
+ job_id = uuid.uuid4().hex
88
+ job = {
89
+ 'id': job_id,
90
+ 'user_id': user_id,
91
+ 'filename': filename,
92
+ 'file_path': file_path,
93
+ 'status': 'queued',
94
+ 'created_at': time.time()
95
+ }
96
+ self._save_job(job)
97
+ return job
98
+
99
+ def get_job(self, job_id: str) -> Optional[Dict]:
100
+ return self._load_job(job_id)
101
+
102
+ def _process_job(self, job: Dict):
103
+ # Import heavy modules lazily to avoid startup cost
104
+ from main import rag_manager, hf_storage
105
+
106
+ job_id = job['id']
107
+ user_id = job['user_id']
108
+ filename = job['filename']
109
+ file_path = job['file_path']
110
+
111
+ try:
112
+ # Read content
113
+ with open(file_path, 'rb') as f:
114
+ content = f.read()
115
+
116
+ # Ensure RAG engine is ready (async call)
117
+ engine = asyncio.run(rag_manager.get_engine(user_id))
118
+
119
+ # Run upload_document in thread to avoid blocking event loop
120
+ result = asyncio.run(asyncio.to_thread(engine.upload_document, filename, content, 'auto'))
121
+
122
+ if result.get('status') != 'success':
123
+ job['status'] = 'failed'
124
+ job['error'] = result.get('message', 'processing error')
125
+ job['finished_at'] = time.time()
126
+ self._save_job(job)
127
+ return
128
+
129
+ # Save pdf to user's uploaded_pdfs folder
130
+ base_dir = os.path.dirname(os.path.abspath(__file__))
131
+ user_pdfs_dir = os.path.join(base_dir, 'users', user_id, 'uploaded_pdfs')
132
+ os.makedirs(user_pdfs_dir, exist_ok=True)
133
+ pdf_path = os.path.join(user_pdfs_dir, filename)
134
+ with open(pdf_path, 'wb') as f:
135
+ f.write(content)
136
+
137
+ # Push to HF (perform in thread)
138
+ asyncio.run(asyncio.to_thread(hf_storage.push_storage_to_hf, user_id, f"Upload {filename}"))
139
+
140
+ job['status'] = 'success'
141
+ job['result'] = {
142
+ 'document_id': result.get('doc_id', ''),
143
+ 'filename': filename,
144
+ 'pages': result.get('pages'),
145
+ 'chunks': result.get('chunks')
146
+ }
147
+ job['finished_at'] = time.time()
148
+ self._save_job(job)
149
+
150
+ except Exception as e:
151
+ job['status'] = 'failed'
152
+ job['error'] = str(e)
153
+ job['finished_at'] = time.time()
154
+ self._save_job(job)
155
+
156
+
157
+ job_manager = JobManager()