Joker5800 commited on
Commit
445910e
·
verified ·
1 Parent(s): e4cc031

Update server.py

Browse files
Files changed (1) hide show
  1. server.py +52 -7
server.py CHANGED
@@ -124,7 +124,7 @@ def ensure_database() -> Collection:
124
  if not MONGO_URI:
125
  raise RuntimeError("NEXT_MONGODB_URI not configured")
126
 
127
- mongo_client = MongoClient(MONGO_URI)
128
  db = mongo_client.get_database(MONGO_DB_NAME)
129
  jobs_collection = db["jobs"]
130
  # analytics collection stores aggregated counts per day and hourly breakdowns
@@ -388,8 +388,8 @@ async def process_job(job_id: str) -> None:
388
  logger.info(f"[Job {job_id}] Processing started")
389
 
390
  if not input_path.exists():
391
- logger.warning(f"[Job {job_id}] Input file not found, deleting job")
392
- await asyncio.to_thread(collection.delete_one, {"jobId": job_id})
393
  return
394
 
395
  image_bytes = input_path.read_bytes()
@@ -440,6 +440,33 @@ def can_run_job(job: dict) -> bool:
440
 
441
  def claim_next_job() -> Optional[dict]:
442
  collection = get_job_store()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
443
  candidates = list(
444
  collection.find({"status": "queued"}).sort("createdAt", 1).limit(100)
445
  )
@@ -448,8 +475,17 @@ def claim_next_job() -> Optional[dict]:
448
  for job in candidates:
449
  input_path = Path(job.get("inputPath", ""))
450
  if input_path and not input_path.exists():
451
- logger.warning(f"[Job {job['jobId']}] Queued job has missing input file, deleting")
452
- collection.delete_one({"jobId": job["jobId"]})
 
 
 
 
 
 
 
 
 
453
 
454
  # Re-fetch candidates after cleanup
455
  candidates = list(
@@ -465,8 +501,8 @@ def claim_next_job() -> Optional[dict]:
465
  {
466
  "$set": {
467
  "status": "starting",
468
- "updatedAt": utcnow(),
469
- "startedAt": utcnow(),
470
  }
471
  },
472
  return_document=ReturnDocument.AFTER,
@@ -483,22 +519,28 @@ async def dispatcher_loop() -> None:
483
  assert dispatcher_wakeup is not None
484
 
485
  while True:
 
486
  while True:
 
487
  async with active_tasks_lock:
488
  active_count = len(active_tasks)
489
 
490
  current_concurrency = get_dynamic_concurrency()
 
491
  if active_count >= current_concurrency:
492
  break
493
 
 
494
  job = await asyncio.to_thread(claim_next_job)
495
  if not job:
496
  break
497
 
 
498
  task = asyncio.create_task(process_job(job["jobId"]))
499
  async with active_tasks_lock:
500
  active_tasks.add(task)
501
 
 
502
  try:
503
  await asyncio.wait_for(dispatcher_wakeup.wait(), timeout=QUEUE_POLL_SECONDS)
504
  except asyncio.TimeoutError:
@@ -506,6 +548,9 @@ async def dispatcher_loop() -> None:
506
 
507
  dispatcher_wakeup.clear()
508
 
 
 
 
509
 
510
  async def cleanup_loop() -> None:
511
  """
 
124
  if not MONGO_URI:
125
  raise RuntimeError("NEXT_MONGODB_URI not configured")
126
 
127
+ mongo_client = MongoClient(MONGO_URI, tz_aware=True)
128
  db = mongo_client.get_database(MONGO_DB_NAME)
129
  jobs_collection = db["jobs"]
130
  # analytics collection stores aggregated counts per day and hourly breakdowns
 
388
  logger.info(f"[Job {job_id}] Processing started")
389
 
390
  if not input_path.exists():
391
+ logger.warning(f"[Job {job_id}] Input file not found, marking failed")
392
+ await update_job(job_id, {"status": "failed", "error": "Input file not found"})
393
  return
394
 
395
  image_bytes = input_path.read_bytes()
 
440
 
441
  def claim_next_job() -> Optional[dict]:
442
  collection = get_job_store()
443
+ now = utcnow()
444
+
445
+ # Clean up stale "starting" jobs that have been stuck for >2 minutes
446
+ # These could be from crashed workers or hung processes
447
+ stale_starting = list(collection.find({
448
+ "status": "starting",
449
+ "startedAt": {"$lt": now - timedelta(minutes=2)}
450
+ }))
451
+ for job in stale_starting:
452
+ logger.warning(f"[Job {job['jobId']}] Stale starting job, resetting to queued")
453
+ collection.update_one(
454
+ {"jobId": job["jobId"]},
455
+ {"$set": {"status": "queued", "updatedAt": now}},
456
+ )
457
+
458
+ # Clean up stale "running" jobs that have been stuck for >5 minutes
459
+ stale_running = list(collection.find({
460
+ "status": "running",
461
+ "startedAt": {"$lt": now - timedelta(minutes=5)}
462
+ }))
463
+ for job in stale_running:
464
+ logger.warning(f"[Job {job['jobId']}] Stale running job, marking failed")
465
+ collection.update_one(
466
+ {"jobId": job["jobId"]},
467
+ {"$set": {"status": "failed", "error": "Job timed out", "updatedAt": now}},
468
+ )
469
+
470
  candidates = list(
471
  collection.find({"status": "queued"}).sort("createdAt", 1).limit(100)
472
  )
 
475
  for job in candidates:
476
  input_path = Path(job.get("inputPath", ""))
477
  if input_path and not input_path.exists():
478
+ logger.warning(f"[Job {job['jobId']}] Queued job has missing input file, marking failed")
479
+ collection.update_one(
480
+ {"jobId": job["jobId"]},
481
+ {
482
+ "$set": {
483
+ "status": "failed",
484
+ "error": "Queued job input file is missing",
485
+ "updatedAt": now,
486
+ }
487
+ },
488
+ )
489
 
490
  # Re-fetch candidates after cleanup
491
  candidates = list(
 
501
  {
502
  "$set": {
503
  "status": "starting",
504
+ "updatedAt": now,
505
+ "startedAt": now,
506
  }
507
  },
508
  return_document=ReturnDocument.AFTER,
 
519
  assert dispatcher_wakeup is not None
520
 
521
  while True:
522
+ # Claim and process as many jobs as we can
523
  while True:
524
+ # Check current concurrency
525
  async with active_tasks_lock:
526
  active_count = len(active_tasks)
527
 
528
  current_concurrency = get_dynamic_concurrency()
529
+
530
  if active_count >= current_concurrency:
531
  break
532
 
533
+ # claim_next_job handles cleanup and returns an already-claimed job
534
  job = await asyncio.to_thread(claim_next_job)
535
  if not job:
536
  break
537
 
538
+ logger.info(f"[Dispatcher] Processing job {job['jobId']}")
539
  task = asyncio.create_task(process_job(job["jobId"]))
540
  async with active_tasks_lock:
541
  active_tasks.add(task)
542
 
543
+ # No more jobs to claim, wait for work
544
  try:
545
  await asyncio.wait_for(dispatcher_wakeup.wait(), timeout=QUEUE_POLL_SECONDS)
546
  except asyncio.TimeoutError:
 
548
 
549
  dispatcher_wakeup.clear()
550
 
551
+ dispatcher_wakeup.clear()
552
+ logger.debug(f"[Dispatcher] Woke up, checking for new jobs")
553
+
554
 
555
  async def cleanup_loop() -> None:
556
  """