Zhen Ye commited on
Commit
955fa9e
·
1 Parent(s): 19fdabe

Fix backend worker pipeline stall on exception (add fallback fallback)

Browse files
Files changed (1) hide show
  1. inference.py +14 -7
inference.py CHANGED
@@ -1138,10 +1138,14 @@ def run_inference(
1138
  flush_batch()
1139
  except Exception as e:
1140
  logging.exception("Worker failed processing frame")
1141
- # Important: If we lose a batch, the pipeline might stall waiting for those indices.
1142
- # Ideally we should emit error placeholders?
1143
- # For now, just ensure we don't hold the lock.
1144
- raise
 
 
 
 
1145
  finally:
1146
  queue_in.task_done()
1147
 
@@ -1473,13 +1477,15 @@ def run_segmentation(
1473
 
1474
  except Exception as e:
1475
  logging.error("Batch seg failed: %s", e)
 
1476
  for idx, frm in batch_accum:
1477
  while True:
1478
  try:
1479
- queue_out.put((idx, frm), timeout=1.0) # Fallback
 
1480
  break
1481
  except Full:
1482
- if writer_finished: raise
1483
  if job_id: _check_cancellation(job_id)
1484
  batch_accum.clear()
1485
 
@@ -1807,13 +1813,14 @@ def run_depth_inference(
1807
 
1808
  except Exception as e:
1809
  logging.error("Batch depth failed: %s", e)
 
1810
  for idx, frm in batch_accum:
1811
  while True:
1812
  try:
1813
  queue_out.put((idx, frm), timeout=1.0)
1814
  break
1815
  except Full:
1816
- if writer_finished: raise
1817
  if job_id: _check_cancellation(job_id)
1818
  batch_accum.clear()
1819
 
 
1138
  flush_batch()
1139
  except Exception as e:
1140
  logging.exception("Worker failed processing frame")
1141
+ # Emit empty/failed frames for the batch to keep sequence alive
1142
+ for idx, frm in batch_accum:
1143
+ try:
1144
+ # Fallback: Return original frame with empty detections
1145
+ queue_out.put((idx, frm, []), timeout=5.0)
1146
+ except:
1147
+ pass
1148
+ batch_accum.clear()
1149
  finally:
1150
  queue_in.task_done()
1151
 
 
1477
 
1478
  except Exception as e:
1479
  logging.error("Batch seg failed: %s", e)
1480
+ # Fallback: Emit failed frames to prevent writer stall
1481
  for idx, frm in batch_accum:
1482
  while True:
1483
  try:
1484
+ # Return original frame without mask
1485
+ queue_out.put((idx, frm), timeout=1.0)
1486
  break
1487
  except Full:
1488
+ if writer_finished: break
1489
  if job_id: _check_cancellation(job_id)
1490
  batch_accum.clear()
1491
 
 
1813
 
1814
  except Exception as e:
1815
  logging.error("Batch depth failed: %s", e)
1816
+ # Fallback: Emit original frames (no depth map)
1817
  for idx, frm in batch_accum:
1818
  while True:
1819
  try:
1820
  queue_out.put((idx, frm), timeout=1.0)
1821
  break
1822
  except Full:
1823
+ if writer_finished: break
1824
  if job_id: _check_cancellation(job_id)
1825
  batch_accum.clear()
1826