Zhen Ye commited on
Commit
6a62982
·
1 Parent(s): bbef397

Add aggressive worker debug logging and BaseException catch

Browse files
Files changed (1) hide show
  1. inference.py +22 -13
inference.py CHANGED
@@ -1047,6 +1047,7 @@ def run_inference(
1047
  writer_finished = False
1048
 
1049
  def worker_task(gpu_idx: int):
 
1050
  detector_instance = detectors[gpu_idx]
1051
  depth_instance = depth_estimators[gpu_idx] if gpu_idx < len(depth_estimators) else None # Handle mismatched lists safely
1052
 
@@ -1055,6 +1056,7 @@ def run_inference(
1055
 
1056
  def flush_batch():
1057
  if not batch_accum: return
 
1058
 
1059
  indices = [item[0] for item in batch_accum]
1060
  frames = [item[1] for item in batch_accum]
@@ -1068,8 +1070,8 @@ def run_inference(
1068
  else:
1069
  with detector_instance.lock:
1070
  det_results = [detector_instance.predict(f, queries) for f in frames]
1071
- except Exception:
1072
- logging.exception("Batch detection failed")
1073
  det_results = [None] * len(frames)
1074
 
1075
  # Run depth batch (if enabled)
@@ -1081,8 +1083,8 @@ def run_inference(
1081
  depth_results = depth_instance.predict_batch(frames)
1082
  else:
1083
  depth_results = [depth_instance.predict(f) for f in frames]
1084
- except Exception:
1085
- logging.exception("Batch depth failed")
1086
 
1087
  # --- POST PROCESSING ---
1088
  for i, (idx, frame, d_res, dep_res) in enumerate(zip(indices, frames, det_results, depth_results)):
@@ -1103,11 +1105,6 @@ def run_inference(
1103
  _attach_depth_from_result(detections, dep_res, depth_scale)
1104
  except: pass
1105
 
1106
- # B. Render Boxes - DEFERRED TO WRITER THREAD FOR SEQUENTIAL TRACKING
1107
- # display_labels = [_build_display_label(d) for d in detections]
1108
- # if d_res:
1109
- # processed = draw_boxes(processed, d_res.boxes, label_names=display_labels)
1110
-
1111
  # 3. Output
1112
  while True:
1113
  try:
@@ -1120,34 +1117,46 @@ def run_inference(
1120
  if job_id: _check_cancellation(job_id)
1121
 
1122
  batch_accum.clear()
 
1123
 
1124
  while True:
1125
- item = queue_in.get()
 
 
 
 
 
 
1126
  try:
1127
  if item is None:
 
1128
  flush_batch()
1129
  break
1130
 
1131
  frame_idx, frame_data = item
 
1132
 
1133
  if frame_idx % 30 == 0:
1134
- logging.debug("Processing frame %d on device %s", frame_idx, "cpu" if num_gpus==0 else f"cuda:{gpu_idx}")
1135
 
1136
  batch_accum.append((frame_idx, frame_data))
1137
  if len(batch_accum) >= batch_size:
1138
  flush_batch()
1139
- except Exception as e:
1140
- logging.exception("Worker failed processing frame")
1141
  # Emit empty/failed frames for the batch to keep sequence alive
1142
  for idx, frm in batch_accum:
1143
  try:
1144
  # Fallback: Return original frame with empty detections
1145
  queue_out.put((idx, frm, []), timeout=5.0)
 
1146
  except:
1147
  pass
1148
  batch_accum.clear()
1149
  finally:
1150
  queue_in.task_done()
 
 
1151
 
1152
  # 6. Start Workers
1153
  workers = []
 
1047
  writer_finished = False
1048
 
1049
  def worker_task(gpu_idx: int):
1050
+ logging.info(f"Worker {gpu_idx} started. PID: {os.getpid()}")
1051
  detector_instance = detectors[gpu_idx]
1052
  depth_instance = depth_estimators[gpu_idx] if gpu_idx < len(depth_estimators) else None # Handle mismatched lists safely
1053
 
 
1056
 
1057
  def flush_batch():
1058
  if not batch_accum: return
1059
+ logging.info(f"Worker {gpu_idx} flushing batch of {len(batch_accum)} frames")
1060
 
1061
  indices = [item[0] for item in batch_accum]
1062
  frames = [item[1] for item in batch_accum]
 
1070
  else:
1071
  with detector_instance.lock:
1072
  det_results = [detector_instance.predict(f, queries) for f in frames]
1073
+ except BaseException as e:
1074
+ logging.exception("Batch detection crashed with critical error")
1075
  det_results = [None] * len(frames)
1076
 
1077
  # Run depth batch (if enabled)
 
1083
  depth_results = depth_instance.predict_batch(frames)
1084
  else:
1085
  depth_results = [depth_instance.predict(f) for f in frames]
1086
+ except BaseException as e:
1087
+ logging.exception("Batch depth crashed with critical error")
1088
 
1089
  # --- POST PROCESSING ---
1090
  for i, (idx, frame, d_res, dep_res) in enumerate(zip(indices, frames, det_results, depth_results)):
 
1105
  _attach_depth_from_result(detections, dep_res, depth_scale)
1106
  except: pass
1107
 
 
 
 
 
 
1108
  # 3. Output
1109
  while True:
1110
  try:
 
1117
  if job_id: _check_cancellation(job_id)
1118
 
1119
  batch_accum.clear()
1120
+ logging.info(f"Worker {gpu_idx} finished flushing batch")
1121
 
1122
  while True:
1123
+ try:
1124
+ item = queue_in.get(timeout=2.0)
1125
+ except Empty:
1126
+ # Periodic check for cancellation if main thread is slow
1127
+ if job_id: _check_cancellation(job_id)
1128
+ continue
1129
+
1130
  try:
1131
  if item is None:
1132
+ logging.info(f"Worker {gpu_idx} received sentinel. Flushing and exiting.")
1133
  flush_batch()
1134
  break
1135
 
1136
  frame_idx, frame_data = item
1137
+ # logging.info(f"Worker {gpu_idx} got frame {frame_idx}") # Verbose
1138
 
1139
  if frame_idx % 30 == 0:
1140
+ logging.info("Processing frame %d on device %s", frame_idx, "cpu" if num_gpus==0 else f"cuda:{gpu_idx}")
1141
 
1142
  batch_accum.append((frame_idx, frame_data))
1143
  if len(batch_accum) >= batch_size:
1144
  flush_batch()
1145
+ except BaseException as e:
1146
+ logging.exception(f"Worker {gpu_idx} CRASHED processing frame. Recovering...")
1147
  # Emit empty/failed frames for the batch to keep sequence alive
1148
  for idx, frm in batch_accum:
1149
  try:
1150
  # Fallback: Return original frame with empty detections
1151
  queue_out.put((idx, frm, []), timeout=5.0)
1152
+ logging.info(f"Emitted fallback frame {idx}")
1153
  except:
1154
  pass
1155
  batch_accum.clear()
1156
  finally:
1157
  queue_in.task_done()
1158
+
1159
+ logging.info(f"Worker {gpu_idx} thread exiting normally.")
1160
 
1161
  # 6. Start Workers
1162
  workers = []