jebin2 commited on
Commit
0164d71
·
1 Parent(s): c94217d

stuck or duplicate process

Browse files
Files changed (1) hide show
  1. services/priority_worker_pool.py +56 -8
services/priority_worker_pool.py CHANGED
@@ -69,9 +69,9 @@ class WorkerConfig:
69
  fast_workers: int = 5
70
  medium_workers: int = 5
71
  slow_workers: int = 5
72
- fast_interval: int = 5 # seconds
73
- medium_interval: int = 30 # seconds
74
- slow_interval: int = 60 # seconds
75
  max_retries: int = 60 # Max retry attempts before failing
76
 
77
  @classmethod
@@ -221,16 +221,64 @@ class PriorityWorker(Generic[JobType]):
221
  """Process a single job."""
222
  logger.info(f"Worker {self.worker_id}: Processing job {job.job_id} (status: {job.status})")
223
 
 
 
224
  if job.status == "queued":
225
- # New job - start processing
226
- job.status = "processing"
227
- job.started_at = datetime.utcnow()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
228
  await session.commit()
229
 
230
- # Process the job
 
 
 
 
 
231
  job = await self.job_processor.process(job, session)
 
232
  else:
233
- # Already processing - check status
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
234
  job = await self.job_processor.check_status(job, session)
235
 
236
  # Handle retry limit
 
69
  fast_workers: int = 5
70
  medium_workers: int = 5
71
  slow_workers: int = 5
72
+ fast_interval: int = 2 # seconds
73
+ medium_interval: int = 10 # seconds
74
+ slow_interval: int = 15 # seconds
75
  max_retries: int = 60 # Max retry attempts before failing
76
 
77
  @classmethod
 
221
  """Process a single job."""
222
  logger.info(f"Worker {self.worker_id}: Processing job {job.job_id} (status: {job.status})")
223
 
224
+ from sqlalchemy import update
225
+
226
  if job.status == "queued":
227
+ # New job - try to claim it atomically
228
+ # Set next_process_at to future to prevent others from picking it up while we process
229
+ next_check = datetime.utcnow() + timedelta(seconds=self.poll_interval * 2)
230
+
231
+ stmt = (
232
+ update(self.job_model)
233
+ .where(
234
+ self.job_model.job_id == job.job_id,
235
+ self.job_model.status == "queued"
236
+ )
237
+ .values(
238
+ status="processing",
239
+ started_at=datetime.utcnow(),
240
+ next_process_at=next_check
241
+ )
242
+ )
243
+ result = await session.execute(stmt)
244
  await session.commit()
245
 
246
+ if result.rowcount == 0:
247
+ logger.info(f"Worker {self.worker_id}: Failed to claim job {job.job_id} (already taken)")
248
+ return
249
+
250
+ # We claimed it. Refresh and process.
251
+ await session.refresh(job)
252
  job = await self.job_processor.process(job, session)
253
+
254
  else:
255
+ # Already processing - try to claim for status check
256
+ # Ensure we only pick it up if next_process_at matches (or is null/past)
257
+ # But the SELECT already filtered for that.
258
+ # We just need to ensure no one else grabbed it between SELECT and UPDATE.
259
+
260
+ # Update next_process_at to future to lock it for this check
261
+ next_check = datetime.utcnow() + timedelta(seconds=self.poll_interval * 2)
262
+
263
+ stmt = (
264
+ update(self.job_model)
265
+ .where(
266
+ self.job_model.job_id == job.job_id,
267
+ or_(
268
+ self.job_model.next_process_at.is_(None),
269
+ self.job_model.next_process_at <= datetime.utcnow()
270
+ )
271
+ )
272
+ .values(next_process_at=next_check)
273
+ )
274
+ result = await session.execute(stmt)
275
+ await session.commit()
276
+
277
+ if result.rowcount == 0:
278
+ logger.info(f"Worker {self.worker_id}: Failed to claim job {job.job_id} for check (already taken)")
279
+ return
280
+
281
+ await session.refresh(job)
282
  job = await self.job_processor.check_status(job, session)
283
 
284
  # Handle retry limit