Zhen Ye commited on
Commit
6375955
·
1 Parent(s): ecbbe4e

Simplify first-frame processing, replace depth pre-scan with incremental stats, add GPT enrichment thread

Browse files
Files changed (2) hide show
  1. inference.py +213 -413
  2. jobs/background.py +1 -0
inference.py CHANGED
@@ -336,6 +336,50 @@ class SpeedEstimator:
336
  det['angle_deg'] = angle # 0 is right, 90 is down (screen space)
337
 
338
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
339
  _MODEL_LOCKS: Dict[str, RLock] = {}
340
  _MODEL_LOCKS_GUARD = RLock()
341
  _DEPTH_SCALE = float(os.getenv("DEPTH_SCALE", "25.0"))
@@ -713,145 +757,31 @@ def process_first_frame(
713
  mode: str,
714
  detector_name: Optional[str] = None,
715
  segmenter_name: Optional[str] = None,
716
- depth_estimator_name: Optional[str] = None,
717
- depth_scale: Optional[float] = None,
718
- enable_depth_estimator: bool = False,
719
- enable_gpt: bool = True, # ENABLED BY DEFAULT
720
- mission_spec=None, # Optional[MissionSpecification]
721
- ) -> Tuple[np.ndarray, List[Dict[str, Any]], Optional[np.ndarray], Optional[Dict[str, Any]]]:
 
 
 
722
  frame, _, _, _ = extract_first_frame(video_path)
723
  if mode == "segmentation":
724
  processed, _ = infer_segmentation_frame(
725
  frame, text_queries=queries, segmenter_name=segmenter_name
726
  )
727
- return processed, [], None, None
728
 
729
  processed, detections = infer_frame(
730
  frame, queries, detector_name=detector_name
731
  )
732
 
733
- # --- RELEVANCE GATE (between detection and GPT) ---
734
- if mission_spec:
735
- if mission_spec.parse_mode == "FAST_PATH":
736
- # Deterministic gate (unchanged)
737
- relevant_dets = []
738
- for det in detections:
739
- decision = evaluate_relevance(det, mission_spec.relevance_criteria)
740
- det["mission_relevant"] = decision.relevant
741
- det["relevance_reason"] = decision.reason
742
- if decision.relevant:
743
- relevant_dets.append(det)
744
- else:
745
- logging.info(
746
- json_module.dumps({
747
- "event": "relevance_decision",
748
- "label": det.get("label"),
749
- "relevant": False,
750
- "reason": decision.reason,
751
- "required_classes": mission_spec.relevance_criteria.required_classes,
752
- "frame": 0,
753
- })
754
- )
755
- gpt_input_dets = relevant_dets
756
- else:
757
- # LLM_EXTRACTED: post-filter with GPT on frame 0
758
- unique_labels = list({
759
- d.get("label", "").lower()
760
- for d in detections if d.get("label")
761
- })
762
- relevant_labels = evaluate_relevance_llm(
763
- unique_labels, mission_spec.operator_text
764
- )
765
-
766
- # Cache GPT-approved labels into relevance_criteria for subsequent frames
767
- mission_spec.relevance_criteria.required_classes = list(relevant_labels)
768
-
769
- for det in detections:
770
- label = (det.get("label") or "").lower()
771
- is_relevant = label in relevant_labels
772
- det["mission_relevant"] = is_relevant
773
- det["relevance_reason"] = "ok" if is_relevant else "llm_excluded"
774
- if not is_relevant:
775
- logging.info(
776
- json_module.dumps({
777
- "event": "relevance_decision",
778
- "label": det.get("label"),
779
- "relevant": False,
780
- "reason": "llm_excluded",
781
- "relevant_labels": list(relevant_labels),
782
- "frame": 0,
783
- })
784
- )
785
- gpt_input_dets = [d for d in detections if d.get("mission_relevant")]
786
- else:
787
- # LEGACY mode: all detections pass, tagged as unresolved
788
- for det in detections:
789
- det["mission_relevant"] = None
790
- gpt_input_dets = detections
791
-
792
- # 1. Synchronous Depth Estimation (HF Backend)
793
- depth_map = None
794
- # If a specific depth estimator is requested OR if generic "enable" flag is on
795
- should_run_depth = (depth_estimator_name is not None) or enable_depth_estimator
796
-
797
- if should_run_depth and detections:
798
- try:
799
- # Resolve name: if none given, default to "depth"
800
- d_name = depth_estimator_name if depth_estimator_name else "depth"
801
- scale = depth_scale if depth_scale is not None else 1.0
802
-
803
- logging.info(f"Running synchronous depth estimation with {d_name} (scale={scale})...")
804
- estimator = load_depth_estimator(d_name)
805
-
806
- # Run prediction
807
- with _get_model_lock("depth", estimator.name):
808
- result = estimator.predict(frame)
809
-
810
- depth_map = result.depth_map
811
-
812
- # Compute per-detection depth metrics
813
- detections = compute_depth_per_detection(depth_map, detections, scale)
814
-
815
- except Exception as e:
816
- logging.exception(f"First frame depth failed: {e}")
817
- # Mark all detections as depth_valid=False just in case
818
- for det in detections:
819
- det["depth_est_m"] = None
820
- det["depth_rel"] = None
821
- det["depth_valid"] = False
822
-
823
- # 2. GPT-based Distance/Direction Estimation (Explicitly enabled)
824
- # Only assess mission-relevant detections
825
- gpt_results = None
826
- if enable_gpt and gpt_input_dets:
827
- try:
828
- frame_b64 = encode_frame_to_b64(frame)
829
- gpt_results = estimate_threat_gpt(
830
- detections=gpt_input_dets, mission_spec=mission_spec,
831
- image_b64=frame_b64,
832
- )
833
- logging.info(f"GPT Output for First Frame:\n{gpt_results}")
834
-
835
- # Merge GPT results into detections (polyfilled keys from gpt_reasoning)
836
- for i, det in enumerate(gpt_input_dets):
837
- obj_id = f"T{str(i+1).zfill(2)}"
838
- if obj_id in gpt_results:
839
- info = gpt_results[obj_id]
840
- det.update(info)
841
- det["gpt_raw"] = info
842
- # Provenance: tag assessment frame
843
- det["assessment_frame_index"] = 0
844
- det["assessment_status"] = "ASSESSED"
845
-
846
- except Exception as e:
847
- logging.error(f"GPT Threat estimation failed: {e}")
848
-
849
- # Tag unassessed detections (INV-6: distinct from score 0)
850
  for det in detections:
851
- if "assessment_status" not in det:
852
- det["assessment_status"] = "UNASSESSED"
853
 
854
- return processed, detections, depth_map, gpt_results
855
 
856
 
857
  def run_inference(
@@ -867,6 +797,7 @@ def run_inference(
867
  stream_queue: Optional[Queue] = None,
868
  mission_spec=None, # Optional[MissionSpecification]
869
  first_frame_gpt_results: Optional[Dict[str, Any]] = None,
 
870
  ) -> Tuple[str, List[List[Dict[str, Any]]]]:
871
 
872
  # 1. Setup Video Reader
@@ -936,60 +867,8 @@ def run_inference(
936
  else:
937
  depth_estimators.append(None)
938
 
939
- # 4. Phase 1: Pre-Scan (Depth Normalization Stats) - ONLY IF DEPTH ENABLED
940
- global_min, global_max = 0.0, 1.0
941
- if depth_estimator_name and depth_estimators[0]:
942
- logging.info("Starting Phase 1: Pre-scan for depth stats...")
943
-
944
- # We need a quick scan logic here.
945
- # Since we have loaded models, we can use one of them to scan a few frames.
946
- # Let's pick 0-th GPU model.
947
- scan_est = depth_estimators[0]
948
- scan_values = []
949
-
950
- # Sample frames: First 10, Middle 10, Last 10
951
- target_indices = set(list(range(0, 10)) +
952
- list(range(total_frames//2, total_frames//2 + 10)) +
953
- list(range(max(0, total_frames-10), total_frames)))
954
- target_indices = sorted([i for i in target_indices if i < total_frames])
955
-
956
- try:
957
- # Quick reader scan
958
- reader_scan = AsyncVideoReader(input_video_path)
959
- scan_frames = []
960
- for i, frame in enumerate(reader_scan):
961
- if i in target_indices:
962
- scan_frames.append(frame)
963
- if i > max(target_indices):
964
- break
965
- reader_scan.close()
966
-
967
- # Predict
968
- with scan_est.lock:
969
- # Batch if supported, else loop
970
- if scan_est.supports_batch and scan_frames:
971
- scan_res = scan_est.predict_batch(scan_frames)
972
- else:
973
- scan_res = [scan_est.predict(f) for f in scan_frames]
974
-
975
- for r in scan_res:
976
- if r.depth_map is not None:
977
- scan_values.append(r.depth_map)
978
-
979
- # Stats
980
- if scan_values:
981
- all_vals = np.concatenate([v.ravel() for v in scan_values])
982
- valid = all_vals[np.isfinite(all_vals)]
983
- if valid.size > 0:
984
- global_min = float(np.percentile(valid, 1))
985
- global_max = float(np.percentile(valid, 99))
986
- # Prevent zero range
987
- if abs(global_max - global_min) < 1e-6: global_max = global_min + 1.0
988
-
989
- logging.info("Global Depth Range: %.2f - %.2f", global_min, global_max)
990
-
991
- except Exception as e:
992
- logging.warning("Pre-scan failed, using default range: %s", e)
993
 
994
  # queue_in: (frame_idx, frame_data)
995
  # queue_out: (frame_idx, processed_frame, detections)
@@ -1021,19 +900,35 @@ def run_inference(
1021
  frames = [item[1] for item in batch_accum]
1022
 
1023
  # --- UNIFIED INFERENCE ---
1024
- # Run detection batch
1025
- try:
1026
- if detector_instance.supports_batch:
1027
- with detector_instance.lock:
1028
- det_results = detector_instance.predict_batch(frames, queries)
1029
- else:
1030
- with detector_instance.lock:
1031
- det_results = [detector_instance.predict(f, queries) for f in frames]
1032
- except BaseException as e:
1033
- logging.exception("Batch detection crashed with critical error")
1034
- det_results = [None] * len(frames)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1035
 
1036
- # Run depth batch (if enabled)
1037
  depth_results = [None] * len(frames)
1038
  if depth_instance and depth_estimator_name:
1039
  try:
@@ -1045,21 +940,32 @@ def run_inference(
1045
  except BaseException as e:
1046
  logging.exception("Batch depth crashed with critical error")
1047
 
 
 
 
 
 
 
1048
  # --- POST PROCESSING ---
1049
- for i, (idx, frame, d_res, dep_res) in enumerate(zip(indices, frames, det_results, depth_results)):
1050
- # 1. Detections
1051
  detections = []
1052
- if d_res:
1053
- detections = _build_detection_records(
1054
- d_res.boxes, d_res.scores, d_res.labels, queries, d_res.label_names
1055
- )
1056
-
 
 
 
 
1057
  # 2. Frame Rendering
1058
  processed = frame.copy()
1059
-
1060
  # A. Render Depth Heatmap (if enabled)
1061
  if dep_res and dep_res.depth_map is not None:
1062
- processed = colorize_depth_map(dep_res.depth_map, global_min, global_max)
 
1063
  try:
1064
  _attach_depth_from_result(detections, dep_res, depth_scale)
1065
  except: pass
@@ -1137,15 +1043,76 @@ def run_inference(
1137
  # writer_finished = False
1138
 
1139
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1140
  def writer_loop():
1141
  nonlocal writer_finished
1142
  next_idx = 0
1143
  buffer = {}
1144
-
1145
  # Initialize Tracker & Speed Estimator
1146
  tracker = ByteTracker(frame_rate=fps)
1147
  speed_est = SpeedEstimator(fps=fps)
1148
- llm_filtered = False # LLM post-filter runs once on frame 0
 
 
 
 
1149
 
1150
  try:
1151
  with StreamingVideoWriter(output_video_path, fps, width, height) as writer:
@@ -1171,34 +1138,7 @@ def run_inference(
1171
  dets = tracker.update(dets)
1172
  speed_est.estimate(dets)
1173
 
1174
- # --- LLM POST-FILTER (frame 0 only, LLM_EXTRACTED mode) ---
1175
- if (mission_spec
1176
- and mission_spec.parse_mode == "LLM_EXTRACTED"
1177
- and not llm_filtered
1178
- and next_idx == 0):
1179
- # Skip if process_first_frame already populated required_classes
1180
- if mission_spec.relevance_criteria.required_classes:
1181
- logging.info(
1182
- "LLM post-filter already applied by process_first_frame: classes=%s",
1183
- mission_spec.relevance_criteria.required_classes,
1184
- )
1185
- else:
1186
- unique_labels = list({
1187
- d.get("label", "").lower()
1188
- for d in dets if d.get("label")
1189
- })
1190
- relevant_labels = evaluate_relevance_llm(
1191
- unique_labels, mission_spec.operator_text
1192
- )
1193
- # Cache into relevance_criteria for all subsequent frames
1194
- mission_spec.relevance_criteria.required_classes = list(relevant_labels)
1195
- logging.info(
1196
- "LLM post-filter applied on frame 0: relevant=%s",
1197
- relevant_labels,
1198
- )
1199
- llm_filtered = True
1200
-
1201
- # --- RELEVANCE GATE (deterministic, uses updated criteria) ---
1202
  if mission_spec:
1203
  for d in dets:
1204
  decision = evaluate_relevance(d, mission_spec.relevance_criteria)
@@ -1222,43 +1162,27 @@ def run_inference(
1222
  d["mission_relevant"] = None
1223
  gpt_dets = dets
1224
 
1225
- # --- GPT ESTIMATION (Frame 0 Only) ---
1226
- if next_idx == 0 and enable_gpt and gpt_dets:
 
 
 
1227
  try:
1228
- if first_frame_gpt_results:
1229
- # Re-use GPT results from process_first_frame (avoid duplicate call)
1230
- logging.info("Re-using GPT results from first-frame processing (skipping duplicate call)")
1231
- gpt_res = first_frame_gpt_results
1232
- else:
1233
- logging.info("Running GPT estimation for video start (Frame 0)...")
1234
- frame_b64 = encode_frame_to_b64(p_frame)
1235
- gpt_res = estimate_threat_gpt(
1236
- detections=gpt_dets, mission_spec=mission_spec,
1237
- image_b64=frame_b64,
1238
- )
1239
-
1240
- # Merge using real track_id assigned by ByteTracker
1241
- for d in gpt_dets:
1242
- oid = d.get('track_id')
1243
- if oid and oid in gpt_res:
1244
- d.update(gpt_res[oid])
1245
- d["gpt_raw"] = gpt_res[oid]
1246
- d["assessment_frame_index"] = 0
1247
- d["assessment_status"] = "ASSESSED"
1248
-
1249
- # Push GPT data back into tracker's internal STrack objects
1250
- tracker.inject_metadata(gpt_dets)
1251
-
1252
- except Exception as e:
1253
- logging.error("GPT failed for Frame 0: %s", e)
1254
 
1255
  # Tag unassessed detections (INV-6)
1256
  for d in dets:
1257
  if "assessment_status" not in d:
1258
  d["assessment_status"] = "UNASSESSED"
1259
-
1260
  # --- RENDER BOXES & OVERLAYS ---
1261
- # We need to convert list of dicts back to boxes array for draw_boxes
1262
  if dets:
1263
  display_boxes = np.array([d['bbox'] for d in dets])
1264
  display_labels = []
@@ -1267,81 +1191,42 @@ def run_inference(
1267
  # Append Track ID
1268
  if 'track_id' in d:
1269
  lbl = f"{d['track_id']} {lbl}"
1270
- # Speed display removed per user request
1271
- # if 'speed_kph' in d and d['speed_kph'] > 1.0:
1272
- # lbl += f" {int(d['speed_kph'])}km/h"
1273
- # Distance display removed per user request
1274
- # if d.get('gpt_distance_m'):
1275
- # lbl += f" {int(d['gpt_distance_m'])}m"
1276
-
1277
  display_labels.append(lbl)
1278
-
1279
  p_frame = draw_boxes(p_frame, display_boxes, label_names=display_labels)
1280
-
1281
  writer.write(p_frame)
1282
-
1283
  if stream_queue:
1284
  try:
1285
- # Send TRACKED detections to frontend for overlay
1286
- # We need to attach them to the frame or send separately?
1287
- # The stream_queue expects 'p_frame' which is an image.
1288
- # The frontend polls for 'async job' status which returns video, but
1289
- # we also want live updates during streaming?
1290
- # Currently streaming is just Mjpeg of p_frame.
1291
- stream_queue.put(p_frame, timeout=0.01)
1292
  except:
1293
  pass
1294
 
1295
  all_detections_map[next_idx] = dets
1296
-
1297
  # Store tracks for frontend access
1298
  if job_id:
1299
  set_track_data(job_id, next_idx, dets)
1300
 
1301
  next_idx += 1
1302
-
1303
  if next_idx % 30 == 0:
1304
  logging.debug("Wrote frame %d/%d", next_idx, total_frames)
1305
-
1306
  except Exception as e:
1307
  logging.error(f"Writer loop processing error at index {next_idx}: {e}")
1308
- # Important: If we failed AFTER popping from buffer, we must increment next_idx to avoid infinite loop
1309
- # How do we know? We can check if next_idx is in buffer.
1310
- # If we popped it, it's not in buffer.
1311
- # But wait, next_idx is used for loop condition.
1312
- # If we successfully popped it but failed later, we lost the frame.
1313
- # We should increment next_idx to skip it.
1314
-
1315
- # Heuristic: If we are here, something failed.
1316
- # If we haven't successfully written/processed, we should probably skip this frame processing
1317
- # to let the loop continue to next frame.
1318
- # But we need to make sure we don't skip if the error was just "queue empty" (timeout).
1319
-
1320
- # Wait, queue_out.get raises Empty. 'Empty' is NOT Exception?
1321
- # In Python 'queue.Empty' inherits form Exception?
1322
- # Actually 'queue.Empty' exception is just 'Exception'.
1323
- # Let's check imports. from queue import Empty.
1324
- # Yes.
1325
-
1326
- # We should catch Empty explicitly?
1327
- # No, get(timeout=1.0) raises Empty.
1328
-
1329
- # If the error is NOT Empty, then it's a real crash.
1330
  if "Empty" not in str(type(e)):
1331
  logging.error(f"CRITICAL WRITER ERROR: {e}")
1332
- # Force skip frame if we suspect we are stuck
1333
- # Only if we hold the lock/state?
1334
- # Simpler: Just try to proceed.
1335
- # If we popped the frame, next_idx should be incremented?
1336
- # Actually we can't easily know if we popped.
1337
- # But we can check if we are stuck on the same index for too long?
1338
- pass
1339
 
1340
  # Check cancellation or timeout
1341
- if job_id and _check_cancellation(job_id): # This raises
1342
  pass
1343
  if not any(w.is_alive() for w in workers) and queue_out.empty():
1344
- # Workers dead, queue empty, but not finished? prevent infinite loop
1345
  logging.error("Workers stopped unexpectedly.")
1346
  break
1347
  continue
@@ -1349,6 +1234,12 @@ def run_inference(
1349
  logging.exception("Writer loop failed")
1350
  finally:
1351
  logging.info("Writer loop finished. Wrote %d frames (target %d)", next_idx, total_frames)
 
 
 
 
 
 
1352
  writer_finished = True
1353
 
1354
  writer_thread = Thread(target=writer_loop, daemon=True)
@@ -1698,105 +1589,8 @@ def run_depth_inference(
1698
  est.lock = RLock()
1699
  estimators.append(est)
1700
 
1701
- # 3. Phase 1: Pre-scan for Stats
1702
- # We sample ~5% of frames or at least 20 frames distributed evenly
1703
- stride = max(1, total_frames // 20)
1704
- logging.info("Starting Phase 1: Pre-scan (stride=%d)...", stride)
1705
-
1706
- scan_values = []
1707
-
1708
- def scan_task(gpu_idx: int, frame_data: np.ndarray):
1709
- est = estimators[gpu_idx]
1710
- with est.lock:
1711
- result = est.predict(frame_data)
1712
- return result.depth_map
1713
-
1714
- # Run scan
1715
- # We can just run this sequentially or with pool? Pool is better.
1716
- # We need to construct a list of frames to scan.
1717
- scan_indices = list(range(0, total_frames, stride))
1718
-
1719
- # We need to read specific frames. VideoReader is sequential.
1720
- # So we iterate and skip.
1721
- scan_frames = []
1722
-
1723
- # Optimization: If total frames is huge, reading simply to skip might be slow?
1724
- # VideoReader uses cv2.read() which decodes.
1725
- # If we need random access, we should use set(cv2.CAP_PROP_POS_FRAMES).
1726
- # But for now, simple skip logic:
1727
-
1728
- current_idx = 0
1729
- # To avoid re-opening multiple times or complex seeking, let's just use the Reader
1730
- # and skip if not in indices.
1731
- # BUT, if video is 1 hour, skipping 99% frames is wastage of decode.
1732
- # Re-opening with set POS is better for sparse sampling.
1733
-
1734
- # Actually, for robustness, let's just stick to VideoReader sequential read but only process selective frames.
1735
- # If the video is truly huge, we might want to optimize this later.
1736
- # Given the constraints, let's just scan the first N frames + some middle ones?
1737
- # User agreed to "Small startup delay".
1738
-
1739
- # Let's try to just grab the frames we want.
1740
- scan_frames_data = []
1741
-
1742
- # Just grab first 50 frames? No, distribution is better.
1743
- # Let's use a temporary reader for scanning
1744
-
1745
- try:
1746
- from concurrent.futures import as_completed
1747
-
1748
- # Simple Approach: Process first 30 frames to get a baseline.
1749
- # This is usually enough for a "rough" estimation unless scenes change drastically.
1750
- # But for stability, spread is better.
1751
-
1752
- # Let's read first 10, middle 10, last 10.
1753
- target_indices = set(list(range(0, 10)) +
1754
- list(range(total_frames//2, total_frames//2 + 10)) +
1755
- list(range(max(0, total_frames-10), total_frames)))
1756
-
1757
- # Filter valid
1758
- target_indices = sorted([i for i in target_indices if i < total_frames])
1759
-
1760
- # Manual read with seek is tricky with cv2 (unreliable keyframes).
1761
- # We will iterate and pick.
1762
-
1763
- cnt = 0
1764
- reader_scan = AsyncVideoReader(input_video_path)
1765
- for i, frame in enumerate(reader_scan):
1766
- if i in target_indices:
1767
- scan_frames_data.append(frame)
1768
- if i > max(target_indices):
1769
- break
1770
- reader_scan.close()
1771
-
1772
- # Run inference on these frames
1773
- with ThreadPoolExecutor(max_workers=min(len(estimators)*2, 8)) as pool:
1774
- futures = []
1775
- for i, frm in enumerate(scan_frames_data):
1776
- gpu = i % len(estimators)
1777
- futures.append(pool.submit(scan_task, gpu, frm))
1778
-
1779
- for f in as_completed(futures):
1780
- dm = f.result()
1781
- scan_values.append(dm)
1782
-
1783
- except Exception as e:
1784
- logging.warning("Pre-scan failed, falling back to default range: %s", e)
1785
-
1786
- # Compute stats
1787
- global_min, global_max = 0.0, 1.0
1788
- if scan_values:
1789
- all_vals = np.concatenate([v.ravel() for v in scan_values])
1790
- valid = all_vals[np.isfinite(all_vals)]
1791
- if valid.size > 0:
1792
- global_min = float(np.percentile(valid, 1))
1793
- global_max = float(np.percentile(valid, 99))
1794
-
1795
- # Safety
1796
- if abs(global_max - global_min) < 1e-6:
1797
- global_max = global_min + 1.0
1798
-
1799
- logging.info("Global Depth Range: %.2f - %.2f", global_min, global_max)
1800
 
1801
  # 4. Phase 2: Streaming Inference
1802
  logging.info("Starting Phase 2: Streaming...")
@@ -1826,10 +1620,16 @@ def run_depth_inference(
1826
  with est.lock:
1827
  results = [est.predict(f) for f in frames]
1828
 
 
 
 
 
 
1829
  # 2. Post-process loop
1830
  for idx, frm, res in zip(indices, frames, results):
1831
  depth_map = res.depth_map
1832
- colored = colorize_depth_map(depth_map, global_min, global_max)
 
1833
 
1834
  # Overlay Detections
1835
  if detections and idx < len(detections):
 
336
  det['angle_deg'] = angle # 0 is right, 90 is down (screen space)
337
 
338
 
339
+ class IncrementalDepthStats:
340
+ """Thread-safe incremental depth range estimator.
341
+
342
+ Collects depth statistics frame-by-frame so the expensive pre-scan
343
+ (opening a second video reader) can be eliminated. Before
344
+ ``warmup_frames`` updates the range defaults to (0.0, 1.0).
345
+ """
346
+
347
+ def __init__(self, warmup_frames: int = 30):
348
+ self._lock = RLock()
349
+ self._warmup = warmup_frames
350
+ self._count = 0
351
+ self._global_min = float("inf")
352
+ self._global_max = float("-inf")
353
+
354
+ def update(self, depth_map: np.ndarray) -> None:
355
+ if depth_map is None or depth_map.size == 0:
356
+ return
357
+ finite = depth_map[np.isfinite(depth_map)]
358
+ if finite.size == 0:
359
+ return
360
+ lo = float(np.percentile(finite, 1))
361
+ hi = float(np.percentile(finite, 99))
362
+ with self._lock:
363
+ self._global_min = min(self._global_min, lo)
364
+ self._global_max = max(self._global_max, hi)
365
+ self._count += 1
366
+
367
+ @property
368
+ def range(self) -> Tuple[float, float]:
369
+ with self._lock:
370
+ if self._count < self._warmup:
371
+ # Not enough data yet — use default range
372
+ if self._count == 0:
373
+ return (0.0, 1.0)
374
+ # Use what we have but may be less stable
375
+ lo, hi = self._global_min, self._global_max
376
+ else:
377
+ lo, hi = self._global_min, self._global_max
378
+ if abs(hi - lo) < 1e-6:
379
+ hi = lo + 1.0
380
+ return (lo, hi)
381
+
382
+
383
  _MODEL_LOCKS: Dict[str, RLock] = {}
384
  _MODEL_LOCKS_GUARD = RLock()
385
  _DEPTH_SCALE = float(os.getenv("DEPTH_SCALE", "25.0"))
 
757
  mode: str,
758
  detector_name: Optional[str] = None,
759
  segmenter_name: Optional[str] = None,
760
+ ) -> Tuple[np.ndarray, List[Dict[str, Any]]]:
761
+ """Lightweight first-frame processing: detection + rendering only.
762
+
763
+ GPT, depth, and LLM relevance are handled later in the async pipeline
764
+ (writer enrichment thread), avoiding 2-8s synchronous startup delay.
765
+
766
+ Returns:
767
+ (processed_frame, detections) — all detections tagged UNASSESSED.
768
+ """
769
  frame, _, _, _ = extract_first_frame(video_path)
770
  if mode == "segmentation":
771
  processed, _ = infer_segmentation_frame(
772
  frame, text_queries=queries, segmenter_name=segmenter_name
773
  )
774
+ return processed, []
775
 
776
  processed, detections = infer_frame(
777
  frame, queries, detector_name=detector_name
778
  )
779
 
780
+ # Tag all detections as unassessed GPT runs later in enrichment thread
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
781
  for det in detections:
782
+ det["assessment_status"] = "UNASSESSED"
 
783
 
784
+ return processed, detections
785
 
786
 
787
  def run_inference(
 
797
  stream_queue: Optional[Queue] = None,
798
  mission_spec=None, # Optional[MissionSpecification]
799
  first_frame_gpt_results: Optional[Dict[str, Any]] = None,
800
+ first_frame_detections: Optional[List[Dict[str, Any]]] = None,
801
  ) -> Tuple[str, List[List[Dict[str, Any]]]]:
802
 
803
  # 1. Setup Video Reader
 
867
  else:
868
  depth_estimators.append(None)
869
 
870
+ # 4. Incremental Depth Stats (replaces expensive pre-scan)
871
+ depth_stats = IncrementalDepthStats(warmup_frames=30) if depth_estimator_name else None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
872
 
873
  # queue_in: (frame_idx, frame_data)
874
  # queue_out: (frame_idx, processed_frame, detections)
 
900
  frames = [item[1] for item in batch_accum]
901
 
902
  # --- UNIFIED INFERENCE ---
903
+ # Separate frame 0 if we have cached detections (avoid re-detecting)
904
+ cached_frame0 = None
905
+ detect_indices = indices
906
+ detect_frames = frames
907
+ if first_frame_detections is not None and 0 in indices:
908
+ f0_pos = indices.index(0)
909
+ cached_frame0 = (indices[f0_pos], frames[f0_pos])
910
+ detect_indices = indices[:f0_pos] + indices[f0_pos+1:]
911
+ detect_frames = frames[:f0_pos] + frames[f0_pos+1:]
912
+ logging.info("Worker %d: reusing cached detections for frame 0", gpu_idx)
913
+
914
+ # Run detection batch (excluding frame 0 if cached)
915
+ det_results_map = {}
916
+ if detect_frames:
917
+ try:
918
+ if detector_instance.supports_batch:
919
+ with detector_instance.lock:
920
+ raw_results = detector_instance.predict_batch(detect_frames, queries)
921
+ else:
922
+ with detector_instance.lock:
923
+ raw_results = [detector_instance.predict(f, queries) for f in detect_frames]
924
+ for di, dr in zip(detect_indices, raw_results):
925
+ det_results_map[di] = dr
926
+ except BaseException as e:
927
+ logging.exception("Batch detection crashed with critical error")
928
+ for di in detect_indices:
929
+ det_results_map[di] = None
930
 
931
+ # Run depth batch (if enabled) — always for all frames
932
  depth_results = [None] * len(frames)
933
  if depth_instance and depth_estimator_name:
934
  try:
 
940
  except BaseException as e:
941
  logging.exception("Batch depth crashed with critical error")
942
 
943
+ # Update incremental depth stats
944
+ if depth_stats is not None:
945
+ for dep_res in depth_results:
946
+ if dep_res and dep_res.depth_map is not None:
947
+ depth_stats.update(dep_res.depth_map)
948
+
949
  # --- POST PROCESSING ---
950
+ for i, (idx, frame, dep_res) in enumerate(zip(indices, frames, depth_results)):
951
+ # 1. Detections — use cached for frame 0 if available
952
  detections = []
953
+ if cached_frame0 is not None and idx == 0:
954
+ detections = [d.copy() for d in first_frame_detections]
955
+ else:
956
+ d_res = det_results_map.get(idx)
957
+ if d_res:
958
+ detections = _build_detection_records(
959
+ d_res.boxes, d_res.scores, d_res.labels, queries, d_res.label_names
960
+ )
961
+
962
  # 2. Frame Rendering
963
  processed = frame.copy()
964
+
965
  # A. Render Depth Heatmap (if enabled)
966
  if dep_res and dep_res.depth_map is not None:
967
+ ds_min, ds_max = depth_stats.range if depth_stats else (0.0, 1.0)
968
+ processed = colorize_depth_map(dep_res.depth_map, ds_min, ds_max)
969
  try:
970
  _attach_depth_from_result(detections, dep_res, depth_scale)
971
  except: pass
 
1043
  # writer_finished = False
1044
 
1045
 
1046
+ # --- GPT Enrichment Thread (non-blocking) ---
1047
+ # Runs LLM relevance + GPT threat assessment off the writer's critical path.
1048
+ gpt_enrichment_queue = Queue(maxsize=4)
1049
+
1050
+ def enrichment_thread_fn(tracker_ref):
1051
+ """Dedicated thread for GPT/LLM calls. Receives work from writer, injects results via tracker."""
1052
+ while True:
1053
+ item = gpt_enrichment_queue.get()
1054
+ if item is None:
1055
+ break # Sentinel — shutdown
1056
+ frame_idx, frame_data, gpt_dets, ms = item
1057
+ try:
1058
+ # LLM post-filter (LLM_EXTRACTED mode, frame 0 only)
1059
+ if (ms and ms.parse_mode == "LLM_EXTRACTED"
1060
+ and not ms.relevance_criteria.required_classes):
1061
+ unique_labels = list({
1062
+ d.get("label", "").lower()
1063
+ for d in gpt_dets if d.get("label")
1064
+ })
1065
+ relevant_labels = evaluate_relevance_llm(
1066
+ unique_labels, ms.operator_text
1067
+ )
1068
+ ms.relevance_criteria.required_classes = list(relevant_labels)
1069
+ logging.info(
1070
+ "Enrichment: LLM post-filter applied on frame %d: relevant=%s",
1071
+ frame_idx, relevant_labels,
1072
+ )
1073
+
1074
+ # GPT threat assessment
1075
+ if gpt_dets:
1076
+ if first_frame_gpt_results:
1077
+ logging.info("Enrichment: re-using cached GPT results for frame %d", frame_idx)
1078
+ gpt_res = first_frame_gpt_results
1079
+ else:
1080
+ logging.info("Enrichment: running GPT estimation for frame %d...", frame_idx)
1081
+ frame_b64 = encode_frame_to_b64(frame_data)
1082
+ gpt_res = estimate_threat_gpt(
1083
+ detections=gpt_dets, mission_spec=ms,
1084
+ image_b64=frame_b64,
1085
+ )
1086
+
1087
+ # Merge using real track_id assigned by ByteTracker
1088
+ for d in gpt_dets:
1089
+ oid = d.get('track_id')
1090
+ if oid and oid in gpt_res:
1091
+ d.update(gpt_res[oid])
1092
+ d["gpt_raw"] = gpt_res[oid]
1093
+ d["assessment_frame_index"] = frame_idx
1094
+ d["assessment_status"] = "ASSESSED"
1095
+
1096
+ # Push GPT data back into tracker's internal STrack objects
1097
+ tracker_ref.inject_metadata(gpt_dets)
1098
+ logging.info("Enrichment: GPT results injected into tracker for frame %d", frame_idx)
1099
+
1100
+ except Exception as e:
1101
+ logging.error("Enrichment thread failed for frame %d: %s", frame_idx, e)
1102
+
1103
  def writer_loop():
1104
  nonlocal writer_finished
1105
  next_idx = 0
1106
  buffer = {}
1107
+
1108
  # Initialize Tracker & Speed Estimator
1109
  tracker = ByteTracker(frame_rate=fps)
1110
  speed_est = SpeedEstimator(fps=fps)
1111
+ gpt_submitted = False # GPT enrichment submitted once for frame 0
1112
+
1113
+ # Start enrichment thread
1114
+ enrich_thread = Thread(target=enrichment_thread_fn, args=(tracker,), daemon=True)
1115
+ enrich_thread.start()
1116
 
1117
  try:
1118
  with StreamingVideoWriter(output_video_path, fps, width, height) as writer:
 
1138
  dets = tracker.update(dets)
1139
  speed_est.estimate(dets)
1140
 
1141
+ # --- RELEVANCE GATE (deterministic, fast stays in writer) ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1142
  if mission_spec:
1143
  for d in dets:
1144
  decision = evaluate_relevance(d, mission_spec.relevance_criteria)
 
1162
  d["mission_relevant"] = None
1163
  gpt_dets = dets
1164
 
1165
+ # --- GPT ENRICHMENT (non-blocking, offloaded to enrichment thread) ---
1166
+ if next_idx == 0 and enable_gpt and gpt_dets and not gpt_submitted:
1167
+ # Tag as pending — enrichment thread will update to ASSESSED later
1168
+ for d in gpt_dets:
1169
+ d["assessment_status"] = "PENDING_GPT"
1170
  try:
1171
+ gpt_enrichment_queue.put(
1172
+ (next_idx, p_frame.copy(), gpt_dets, mission_spec),
1173
+ timeout=1.0,
1174
+ )
1175
+ gpt_submitted = True
1176
+ logging.info("Writer: offloaded GPT enrichment for frame 0")
1177
+ except Full:
1178
+ logging.warning("GPT enrichment queue full, skipping frame 0 GPT")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1179
 
1180
  # Tag unassessed detections (INV-6)
1181
  for d in dets:
1182
  if "assessment_status" not in d:
1183
  d["assessment_status"] = "UNASSESSED"
1184
+
1185
  # --- RENDER BOXES & OVERLAYS ---
 
1186
  if dets:
1187
  display_boxes = np.array([d['bbox'] for d in dets])
1188
  display_labels = []
 
1191
  # Append Track ID
1192
  if 'track_id' in d:
1193
  lbl = f"{d['track_id']} {lbl}"
 
 
 
 
 
 
 
1194
  display_labels.append(lbl)
1195
+
1196
  p_frame = draw_boxes(p_frame, display_boxes, label_names=display_labels)
1197
+
1198
  writer.write(p_frame)
1199
+
1200
  if stream_queue:
1201
  try:
1202
+ from jobs.streaming import publish_frame as _publish
1203
+ if job_id:
1204
+ _publish(job_id, p_frame)
1205
+ else:
1206
+ stream_queue.put(p_frame, timeout=0.01)
 
 
1207
  except:
1208
  pass
1209
 
1210
  all_detections_map[next_idx] = dets
1211
+
1212
  # Store tracks for frontend access
1213
  if job_id:
1214
  set_track_data(job_id, next_idx, dets)
1215
 
1216
  next_idx += 1
1217
+
1218
  if next_idx % 30 == 0:
1219
  logging.debug("Wrote frame %d/%d", next_idx, total_frames)
1220
+
1221
  except Exception as e:
1222
  logging.error(f"Writer loop processing error at index {next_idx}: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1223
  if "Empty" not in str(type(e)):
1224
  logging.error(f"CRITICAL WRITER ERROR: {e}")
 
 
 
 
 
 
 
1225
 
1226
  # Check cancellation or timeout
1227
+ if job_id and _check_cancellation(job_id):
1228
  pass
1229
  if not any(w.is_alive() for w in workers) and queue_out.empty():
 
1230
  logging.error("Workers stopped unexpectedly.")
1231
  break
1232
  continue
 
1234
  logging.exception("Writer loop failed")
1235
  finally:
1236
  logging.info("Writer loop finished. Wrote %d frames (target %d)", next_idx, total_frames)
1237
+ # Shut down enrichment thread
1238
+ try:
1239
+ gpt_enrichment_queue.put(None, timeout=5.0)
1240
+ enrich_thread.join(timeout=30)
1241
+ except Exception:
1242
+ logging.warning("Enrichment thread shutdown timed out")
1243
  writer_finished = True
1244
 
1245
  writer_thread = Thread(target=writer_loop, daemon=True)
 
1589
  est.lock = RLock()
1590
  estimators.append(est)
1591
 
1592
+ # 3. Incremental Depth Stats (replaces expensive pre-scan)
1593
+ depth_stats = IncrementalDepthStats(warmup_frames=30)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1594
 
1595
  # 4. Phase 2: Streaming Inference
1596
  logging.info("Starting Phase 2: Streaming...")
 
1620
  with est.lock:
1621
  results = [est.predict(f) for f in frames]
1622
 
1623
+ # Update incremental depth stats
1624
+ for res in results:
1625
+ if res and res.depth_map is not None:
1626
+ depth_stats.update(res.depth_map)
1627
+
1628
  # 2. Post-process loop
1629
  for idx, frm, res in zip(indices, frames, results):
1630
  depth_map = res.depth_map
1631
+ ds_min, ds_max = depth_stats.range
1632
+ colored = colorize_depth_map(depth_map, ds_min, ds_max)
1633
 
1634
  # Overlay Detections
1635
  if detections and idx < len(detections):
jobs/background.py CHANGED
@@ -54,6 +54,7 @@ async def process_video_async(job_id: str) -> None:
54
  stream_queue,
55
  job.mission_spec, # Forward mission spec for relevance gating
56
  job.first_frame_gpt_results, # Avoid duplicate GPT call on frame 0
 
57
  )
58
  detection_path, detections_list = result_pkg
59
 
 
54
  stream_queue,
55
  job.mission_spec, # Forward mission spec for relevance gating
56
  job.first_frame_gpt_results, # Avoid duplicate GPT call on frame 0
57
+ job.first_frame_detections, # Reuse frame 0 detections (avoid re-detecting)
58
  )
59
  detection_path, detections_list = result_pkg
60