Hamza4100 commited on
Commit
496eca5
·
verified ·
1 Parent(s): 50fc8a8

Update job_worker.py

Browse files
Files changed (1) hide show
  1. job_worker.py +158 -157
job_worker.py CHANGED
@@ -1,157 +1,158 @@
1
- import os
2
- import json
3
- import uuid
4
- import time
5
- import threading
6
- import asyncio
7
- from typing import Dict, Optional
8
-
9
- # Note: job processing will call into async functions from main via asyncio.run
10
-
11
- JOBS_DIR = os.path.join(os.path.dirname(__file__), "jobs")
12
- os.makedirs(JOBS_DIR, exist_ok=True)
13
-
14
-
15
- class JobManager:
16
- """Simple persistent job queue and worker.
17
-
18
- Jobs are stored as JSON files in `jobs/` so they survive restarts.
19
- Worker runs in a dedicated thread and processes jobs sequentially.
20
- """
21
-
22
- def __init__(self):
23
- self._lock = threading.Lock()
24
- self._stop_event = threading.Event()
25
- self._thread: Optional[threading.Thread] = None
26
-
27
- def start(self):
28
- if self._thread and self._thread.is_alive():
29
- return
30
- self._thread = threading.Thread(target=self._run, daemon=True)
31
- self._thread.start()
32
-
33
- def stop(self):
34
- self._stop_event.set()
35
- if self._thread:
36
- self._thread.join(timeout=2)
37
-
38
- def _run(self):
39
- # Continuously look for queued jobs and process them
40
- while not self._stop_event.is_set():
41
- job = self._pop_next_job()
42
- if job:
43
- try:
44
- self._process_job(job)
45
- except Exception as e:
46
- # Mark job failed
47
- job['status'] = 'failed'
48
- job['error'] = str(e)
49
- job['finished_at'] = time.time()
50
- self._save_job(job)
51
- else:
52
- time.sleep(0.5)
53
-
54
- def _job_path(self, job_id: str) -> str:
55
- return os.path.join(JOBS_DIR, f"{job_id}.json")
56
-
57
- def _save_job(self, job: Dict):
58
- with open(self._job_path(job['id']), 'w', encoding='utf-8') as f:
59
- json.dump(job, f, indent=2)
60
-
61
- def _load_job(self, job_id: str) -> Optional[Dict]:
62
- p = self._job_path(job_id)
63
- if not os.path.exists(p):
64
- return None
65
- with open(p, 'r', encoding='utf-8') as f:
66
- return json.load(f)
67
-
68
- def _pop_next_job(self) -> Optional[Dict]:
69
- # Find oldest job with status 'queued'
70
- files = [f for f in os.listdir(JOBS_DIR) if f.endswith('.json')]
71
- files.sort()
72
- for fname in files:
73
- path = os.path.join(JOBS_DIR, fname)
74
- try:
75
- with open(path, 'r', encoding='utf-8') as f:
76
- job = json.load(f)
77
- if job.get('status') == 'queued':
78
- job['status'] = 'processing'
79
- job['started_at'] = time.time()
80
- self._save_job(job)
81
- return job
82
- except Exception:
83
- continue
84
- return None
85
-
86
- def create_job(self, user_id: str, filename: str, file_path: str) -> Dict:
87
- job_id = uuid.uuid4().hex
88
- job = {
89
- 'id': job_id,
90
- 'user_id': user_id,
91
- 'filename': filename,
92
- 'file_path': file_path,
93
- 'status': 'queued',
94
- 'created_at': time.time()
95
- }
96
- self._save_job(job)
97
- return job
98
-
99
- def get_job(self, job_id: str) -> Optional[Dict]:
100
- return self._load_job(job_id)
101
-
102
- def _process_job(self, job: Dict):
103
- # Import heavy modules lazily to avoid startup cost
104
- from main import rag_manager, hf_storage
105
-
106
- job_id = job['id']
107
- user_id = job['user_id']
108
- filename = job['filename']
109
- file_path = job['file_path']
110
-
111
- try:
112
- # Read content
113
- with open(file_path, 'rb') as f:
114
- content = f.read()
115
-
116
- # Ensure RAG engine is ready (async call)
117
- engine = asyncio.run(rag_manager.get_engine(user_id))
118
-
119
- # Run upload_document in thread to avoid blocking event loop
120
- result = asyncio.run(asyncio.to_thread(engine.upload_document, filename, content, 'auto'))
121
-
122
- if result.get('status') != 'success':
123
- job['status'] = 'failed'
124
- job['error'] = result.get('message', 'processing error')
125
- job['finished_at'] = time.time()
126
- self._save_job(job)
127
- return
128
-
129
- # Save pdf to user's uploaded_pdfs folder
130
- base_dir = os.path.dirname(os.path.abspath(__file__))
131
- user_pdfs_dir = os.path.join(base_dir, 'users', user_id, 'uploaded_pdfs')
132
- os.makedirs(user_pdfs_dir, exist_ok=True)
133
- pdf_path = os.path.join(user_pdfs_dir, filename)
134
- with open(pdf_path, 'wb') as f:
135
- f.write(content)
136
-
137
- # Push to HF (perform in thread)
138
- asyncio.run(asyncio.to_thread(hf_storage.push_storage_to_hf, user_id, f"Upload {filename}"))
139
-
140
- job['status'] = 'success'
141
- job['result'] = {
142
- 'document_id': result.get('doc_id', ''),
143
- 'filename': filename,
144
- 'pages': result.get('pages'),
145
- 'chunks': result.get('chunks')
146
- }
147
- job['finished_at'] = time.time()
148
- self._save_job(job)
149
-
150
- except Exception as e:
151
- job['status'] = 'failed'
152
- job['error'] = str(e)
153
- job['finished_at'] = time.time()
154
- self._save_job(job)
155
-
156
-
157
- job_manager = JobManager()
 
 
1
+ import os
2
+ import json
3
+ import uuid
4
+ import time
5
+ import threading
6
+ import asyncio
7
+ from typing import Dict, Optional
8
+
9
+ # Note: job processing will call into async functions from main via asyncio.run
10
+
11
+ JOBS_DIR = os.path.join(os.path.dirname(__file__), "jobs")
12
+ os.makedirs(JOBS_DIR, exist_ok=True)
13
+
14
+
15
+ class JobManager:
16
+ """Simple persistent job queue and worker.
17
+
18
+ Jobs are stored as JSON files in `jobs/` so they survive restarts.
19
+ Worker runs in a dedicated thread and processes jobs sequentially.
20
+ """
21
+
22
+ def __init__(self):
23
+ self._lock = threading.Lock()
24
+ self._stop_event = threading.Event()
25
+ self._thread: Optional[threading.Thread] = None
26
+
27
+ def start(self):
28
+ if self._thread and self._thread.is_alive():
29
+ return
30
+ self._thread = threading.Thread(target=self._run, daemon=True)
31
+ self._thread.start()
32
+
33
+ def stop(self):
34
+ self._stop_event.set()
35
+ if self._thread:
36
+ self._thread.join(timeout=2)
37
+
38
+ def _run(self):
39
+ # Continuously look for queued jobs and process them
40
+ while not self._stop_event.is_set():
41
+ job = self._pop_next_job()
42
+ if job:
43
+ try:
44
+ self._process_job(job)
45
+ except Exception as e:
46
+ # Mark job failed
47
+ job['status'] = 'failed'
48
+ job['error'] = str(e)
49
+ job['finished_at'] = time.time()
50
+ self._save_job(job)
51
+ else:
52
+ time.sleep(0.5)
53
+
54
+ def _job_path(self, job_id: str) -> str:
55
+ return os.path.join(JOBS_DIR, f"{job_id}.json")
56
+
57
+ def _save_job(self, job: Dict):
58
+ with open(self._job_path(job['id']), 'w', encoding='utf-8') as f:
59
+ json.dump(job, f, indent=2)
60
+
61
+ def _load_job(self, job_id: str) -> Optional[Dict]:
62
+ p = self._job_path(job_id)
63
+ if not os.path.exists(p):
64
+ return None
65
+ with open(p, 'r', encoding='utf-8') as f:
66
+ return json.load(f)
67
+
68
+ def _pop_next_job(self) -> Optional[Dict]:
69
+ # Find oldest job with status 'queued'
70
+ files = [f for f in os.listdir(JOBS_DIR) if f.endswith('.json')]
71
+ files.sort()
72
+ for fname in files:
73
+ path = os.path.join(JOBS_DIR, fname)
74
+ try:
75
+ with open(path, 'r', encoding='utf-8') as f:
76
+ job = json.load(f)
77
+ if job.get('status') == 'queued':
78
+ job['status'] = 'processing'
79
+ job['started_at'] = time.time()
80
+ self._save_job(job)
81
+ return job
82
+ except Exception:
83
+ continue
84
+ return None
85
+
86
+ def create_job(self, user_id: str, filename: str, file_path: str, username: Optional[str] = None) -> Dict:
87
+ job_id = uuid.uuid4().hex
88
+ job = {
89
+ 'id': job_id,
90
+ 'user_id': user_id,
91
+ 'username': username,
92
+ 'filename': filename,
93
+ 'file_path': file_path,
94
+ 'status': 'queued',
95
+ 'created_at': time.time()
96
+ }
97
+ self._save_job(job)
98
+ return job
99
+
100
+ def get_job(self, job_id: str) -> Optional[Dict]:
101
+ return self._load_job(job_id)
102
+
103
+ def _process_job(self, job: Dict):
104
+ # Import heavy modules lazily to avoid startup cost
105
+ from main import rag_manager, hf_storage
106
+
107
+ job_id = job['id']
108
+ user_id = job['user_id']
109
+ filename = job['filename']
110
+ file_path = job['file_path']
111
+
112
+ try:
113
+ # Read content
114
+ with open(file_path, 'rb') as f:
115
+ content = f.read()
116
+
117
+ # Ensure RAG engine is ready (async call)
118
+ engine = asyncio.run(rag_manager.get_engine(user_id))
119
+
120
+ # Run upload_document in thread to avoid blocking event loop
121
+ result = asyncio.run(asyncio.to_thread(engine.upload_document, filename, content, 'auto'))
122
+
123
+ if result.get('status') != 'success':
124
+ job['status'] = 'failed'
125
+ job['error'] = result.get('message', 'processing error')
126
+ job['finished_at'] = time.time()
127
+ self._save_job(job)
128
+ return
129
+
130
+ # Save pdf to user's uploaded_pdfs folder
131
+ base_dir = os.path.dirname(os.path.abspath(__file__))
132
+ user_pdfs_dir = os.path.join(base_dir, 'users', user_id, 'uploaded_pdfs')
133
+ os.makedirs(user_pdfs_dir, exist_ok=True)
134
+ pdf_path = os.path.join(user_pdfs_dir, filename)
135
+ with open(pdf_path, 'wb') as f:
136
+ f.write(content)
137
+
138
+ # Push to HF (perform in thread)
139
+ asyncio.run(asyncio.to_thread(hf_storage.push_storage_to_hf, user_id, f"Upload {filename}"))
140
+
141
+ job['status'] = 'success'
142
+ job['result'] = {
143
+ 'document_id': result.get('doc_id', ''),
144
+ 'filename': filename,
145
+ 'pages': result.get('pages'),
146
+ 'chunks': result.get('chunks')
147
+ }
148
+ job['finished_at'] = time.time()
149
+ self._save_job(job)
150
+
151
+ except Exception as e:
152
+ job['status'] = 'failed'
153
+ job['error'] = str(e)
154
+ job['finished_at'] = time.time()
155
+ self._save_job(job)
156
+
157
+
158
+ job_manager = JobManager()