Zhen Ye commited on
Commit
4977275
·
1 Parent(s): 55e372a

Add StreamingVideoWriter (ffmpeg pipe), reuse first-frame GPT results in inference pipeline

Browse files
Files changed (2) hide show
  1. inference.py +69 -61
  2. utils/video.py +78 -3
inference.py CHANGED
@@ -21,8 +21,8 @@ from models.model_loader import load_detector, load_detector_on_device
21
  from models.segmenters.model_loader import load_segmenter, load_segmenter_on_device
22
  from models.depth_estimators.model_loader import load_depth_estimator, load_depth_estimator_on_device
23
  from models.depth_estimators.base import DepthEstimator
24
- from utils.video import extract_frames, write_video, VideoReader, VideoWriter, AsyncVideoReader
25
- from utils.gpt_reasoning import estimate_threat_gpt
26
  from utils.relevance import evaluate_relevance, evaluate_relevance_llm
27
  from jobs.storage import set_track_data
28
  import tempfile
@@ -718,13 +718,13 @@ def process_first_frame(
718
  enable_depth_estimator: bool = False,
719
  enable_gpt: bool = True, # ENABLED BY DEFAULT
720
  mission_spec=None, # Optional[MissionSpecification]
721
- ) -> Tuple[np.ndarray, List[Dict[str, Any]], Optional[np.ndarray]]:
722
  frame, _, _, _ = extract_first_frame(video_path)
723
  if mode == "segmentation":
724
  processed, _ = infer_segmentation_frame(
725
  frame, text_queries=queries, segmenter_name=segmenter_name
726
  )
727
- return processed, [], None
728
 
729
  processed, detections = infer_frame(
730
  frame, queries, detector_name=detector_name
@@ -822,15 +822,15 @@ def process_first_frame(
822
 
823
  # 2. GPT-based Distance/Direction Estimation (Explicitly enabled)
824
  # Only assess mission-relevant detections
 
825
  if enable_gpt and gpt_input_dets:
826
  try:
827
- with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp_img:
828
- cv2.imwrite(tmp_img.name, frame)
829
- gpt_results = estimate_threat_gpt(
830
- tmp_img.name, gpt_input_dets, mission_spec=mission_spec
831
- )
832
- logging.info(f"GPT Output for First Frame:\n{gpt_results}")
833
- os.remove(tmp_img.name)
834
 
835
  # Merge GPT results into detections (polyfilled keys from gpt_reasoning)
836
  for i, det in enumerate(gpt_input_dets):
@@ -851,7 +851,7 @@ def process_first_frame(
851
  if "assessment_status" not in det:
852
  det["assessment_status"] = "UNASSESSED"
853
 
854
- return processed, detections, depth_map
855
 
856
 
857
  def run_inference(
@@ -866,6 +866,7 @@ def run_inference(
866
  enable_gpt: bool = True,
867
  stream_queue: Optional[Queue] = None,
868
  mission_spec=None, # Optional[MissionSpecification]
 
869
  ) -> Tuple[str, List[List[Dict[str, Any]]]]:
870
 
871
  # 1. Setup Video Reader
@@ -995,7 +996,7 @@ def run_inference(
995
  queue_in = Queue(maxsize=16)
996
  # Tuning for A10: buffer at least 32 frames per GPU (batch size)
997
  # GPT Latency Buffer: GPT takes ~3s. At 30fps, that's 90 frames. We need to absorb this burst.
998
- queue_out_max = max(512, (len(detectors) if detectors else 1) * 64)
999
  queue_out = Queue(maxsize=queue_out_max)
1000
 
1001
 
@@ -1147,20 +1148,15 @@ def run_inference(
1147
  llm_filtered = False # LLM post-filter runs once on frame 0
1148
 
1149
  try:
1150
- with VideoWriter(output_video_path, fps, width, height) as writer:
1151
  while next_idx < total_frames:
1152
  # Fetch from queue
1153
  try:
1154
  while next_idx not in buffer:
1155
- # Backpressure: If buffer gets too big due to out-of-order frames,
1156
- # we might want to warn or just hope for the best.
1157
- # But here we are just consuming.
1158
-
1159
- # However, if 'buffer' grows too large (because we are missing next_idx),
1160
- # we are effectively unbounded again if queue_out fills up with future frames.
1161
- # So we should monitor buffer size.
1162
- if len(buffer) > 200 and len(buffer) % 50 == 0:
1163
- logging.warning("Writer buffer large (%d items), waiting for frame %d (GPT Latency?)...", len(buffer), next_idx)
1164
 
1165
  item = queue_out.get(timeout=1.0) # wait
1166
 
@@ -1180,20 +1176,27 @@ def run_inference(
1180
  and mission_spec.parse_mode == "LLM_EXTRACTED"
1181
  and not llm_filtered
1182
  and next_idx == 0):
1183
- unique_labels = list({
1184
- d.get("label", "").lower()
1185
- for d in dets if d.get("label")
1186
- })
1187
- relevant_labels = evaluate_relevance_llm(
1188
- unique_labels, mission_spec.operator_text
1189
- )
1190
- # Cache into relevance_criteria for all subsequent frames
1191
- mission_spec.relevance_criteria.required_classes = list(relevant_labels)
 
 
 
 
 
 
 
 
 
 
 
1192
  llm_filtered = True
1193
- logging.info(
1194
- "LLM post-filter applied on frame 0: relevant=%s",
1195
- relevant_labels,
1196
- )
1197
 
1198
  # --- RELEVANCE GATE (deterministic, uses updated criteria) ---
1199
  if mission_spec:
@@ -1222,22 +1225,26 @@ def run_inference(
1222
  # --- GPT ESTIMATION (Frame 0 Only) ---
1223
  if next_idx == 0 and enable_gpt and gpt_dets:
1224
  try:
1225
- logging.info("Running GPT estimation for video start (Frame 0)...")
1226
- with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
1227
- cv2.imwrite(tmp.name, p_frame)
 
 
 
 
1228
  gpt_res = estimate_threat_gpt(
1229
- tmp.name, gpt_dets, mission_spec=mission_spec
 
1230
  )
1231
- os.remove(tmp.name)
1232
 
1233
- # Merge using real track_id assigned by ByteTracker
1234
- for d in gpt_dets:
1235
- oid = d.get('track_id')
1236
- if oid and oid in gpt_res:
1237
- d.update(gpt_res[oid])
1238
- d["gpt_raw"] = gpt_res[oid]
1239
- d["assessment_frame_index"] = 0
1240
- d["assessment_status"] = "ASSESSED"
1241
 
1242
  # Push GPT data back into tracker's internal STrack objects
1243
  tracker.inject_metadata(gpt_dets)
@@ -1563,17 +1570,17 @@ def run_segmentation(
1563
  buffer = {}
1564
 
1565
  try:
1566
- with VideoWriter(output_video_path, fps, width, height) as writer:
1567
  while next_idx < total_frames:
1568
  try:
1569
  while next_idx not in buffer:
1570
- # Check buffer size
1571
- if len(buffer) > 64:
1572
- logging.warning("Writer buffer large (%d), waiting for %d", len(buffer), next_idx)
1573
 
1574
  idx, frm = queue_out.get(timeout=1.0)
1575
  buffer[idx] = frm
1576
-
1577
  frm = buffer.pop(next_idx)
1578
  writer.write(frm)
1579
 
@@ -1898,15 +1905,16 @@ def run_depth_inference(
1898
  processed_frames_subset = [] # Keep first frame for saving if needed
1899
 
1900
  try:
1901
- with VideoWriter(output_video_path, fps, width, height) as writer:
1902
  while next_idx < total_frames:
1903
  try:
1904
  while next_idx not in buffer:
1905
- if len(buffer) > 64:
1906
- logging.warning("Writer buffer large (%d), waiting for %d", len(buffer), next_idx)
 
1907
  idx, frm = queue_out.get(timeout=1.0)
1908
  buffer[idx] = frm
1909
-
1910
  frm = buffer.pop(next_idx)
1911
  writer.write(frm)
1912
 
@@ -1916,11 +1924,11 @@ def run_depth_inference(
1916
  except:
1917
  pass
1918
 
1919
-
1920
  if first_frame_depth_path and not first_frame_saved and next_idx == 0:
1921
  cv2.imwrite(first_frame_depth_path, frm)
1922
  first_frame_saved = True
1923
-
1924
  next_idx += 1
1925
  if next_idx % 30 == 0:
1926
  logging.debug("Wrote depth frame %d/%d", next_idx, total_frames)
@@ -1934,7 +1942,7 @@ def run_depth_inference(
1934
 
1935
  w_thread = Thread(target=writer_loop, daemon=True)
1936
  w_thread.start()
1937
-
1938
  # Feeder
1939
  try:
1940
  reader_iter = iter(reader)
 
21
  from models.segmenters.model_loader import load_segmenter, load_segmenter_on_device
22
  from models.depth_estimators.model_loader import load_depth_estimator, load_depth_estimator_on_device
23
  from models.depth_estimators.base import DepthEstimator
24
+ from utils.video import extract_frames, write_video, VideoReader, VideoWriter, AsyncVideoReader, StreamingVideoWriter
25
+ from utils.gpt_reasoning import estimate_threat_gpt, encode_frame_to_b64
26
  from utils.relevance import evaluate_relevance, evaluate_relevance_llm
27
  from jobs.storage import set_track_data
28
  import tempfile
 
718
  enable_depth_estimator: bool = False,
719
  enable_gpt: bool = True, # ENABLED BY DEFAULT
720
  mission_spec=None, # Optional[MissionSpecification]
721
+ ) -> Tuple[np.ndarray, List[Dict[str, Any]], Optional[np.ndarray], Optional[Dict[str, Any]]]:
722
  frame, _, _, _ = extract_first_frame(video_path)
723
  if mode == "segmentation":
724
  processed, _ = infer_segmentation_frame(
725
  frame, text_queries=queries, segmenter_name=segmenter_name
726
  )
727
+ return processed, [], None, None
728
 
729
  processed, detections = infer_frame(
730
  frame, queries, detector_name=detector_name
 
822
 
823
  # 2. GPT-based Distance/Direction Estimation (Explicitly enabled)
824
  # Only assess mission-relevant detections
825
+ gpt_results = None
826
  if enable_gpt and gpt_input_dets:
827
  try:
828
+ frame_b64 = encode_frame_to_b64(frame)
829
+ gpt_results = estimate_threat_gpt(
830
+ detections=gpt_input_dets, mission_spec=mission_spec,
831
+ image_b64=frame_b64,
832
+ )
833
+ logging.info(f"GPT Output for First Frame:\n{gpt_results}")
 
834
 
835
  # Merge GPT results into detections (polyfilled keys from gpt_reasoning)
836
  for i, det in enumerate(gpt_input_dets):
 
851
  if "assessment_status" not in det:
852
  det["assessment_status"] = "UNASSESSED"
853
 
854
+ return processed, detections, depth_map, gpt_results
855
 
856
 
857
  def run_inference(
 
866
  enable_gpt: bool = True,
867
  stream_queue: Optional[Queue] = None,
868
  mission_spec=None, # Optional[MissionSpecification]
869
+ first_frame_gpt_results: Optional[Dict[str, Any]] = None,
870
  ) -> Tuple[str, List[List[Dict[str, Any]]]]:
871
 
872
  # 1. Setup Video Reader
 
996
  queue_in = Queue(maxsize=16)
997
  # Tuning for A10: buffer at least 32 frames per GPU (batch size)
998
  # GPT Latency Buffer: GPT takes ~3s. At 30fps, that's 90 frames. We need to absorb this burst.
999
+ queue_out_max = max(128, (len(detectors) if detectors else 1) * 32)
1000
  queue_out = Queue(maxsize=queue_out_max)
1001
 
1002
 
 
1148
  llm_filtered = False # LLM post-filter runs once on frame 0
1149
 
1150
  try:
1151
+ with StreamingVideoWriter(output_video_path, fps, width, height) as writer:
1152
  while next_idx < total_frames:
1153
  # Fetch from queue
1154
  try:
1155
  while next_idx not in buffer:
1156
+ # Backpressure: bound the reorder buffer to prevent memory blowup
1157
+ if len(buffer) > 128:
1158
+ logging.warning("Writer reorder buffer too large (%d items), applying backpressure (waiting for frame %d)...", len(buffer), next_idx)
1159
+ time.sleep(0.05)
 
 
 
 
 
1160
 
1161
  item = queue_out.get(timeout=1.0) # wait
1162
 
 
1176
  and mission_spec.parse_mode == "LLM_EXTRACTED"
1177
  and not llm_filtered
1178
  and next_idx == 0):
1179
+ # Skip if process_first_frame already populated required_classes
1180
+ if mission_spec.relevance_criteria.required_classes:
1181
+ logging.info(
1182
+ "LLM post-filter already applied by process_first_frame: classes=%s",
1183
+ mission_spec.relevance_criteria.required_classes,
1184
+ )
1185
+ else:
1186
+ unique_labels = list({
1187
+ d.get("label", "").lower()
1188
+ for d in dets if d.get("label")
1189
+ })
1190
+ relevant_labels = evaluate_relevance_llm(
1191
+ unique_labels, mission_spec.operator_text
1192
+ )
1193
+ # Cache into relevance_criteria for all subsequent frames
1194
+ mission_spec.relevance_criteria.required_classes = list(relevant_labels)
1195
+ logging.info(
1196
+ "LLM post-filter applied on frame 0: relevant=%s",
1197
+ relevant_labels,
1198
+ )
1199
  llm_filtered = True
 
 
 
 
1200
 
1201
  # --- RELEVANCE GATE (deterministic, uses updated criteria) ---
1202
  if mission_spec:
 
1225
  # --- GPT ESTIMATION (Frame 0 Only) ---
1226
  if next_idx == 0 and enable_gpt and gpt_dets:
1227
  try:
1228
+ if first_frame_gpt_results:
1229
+ # Re-use GPT results from process_first_frame (avoid duplicate call)
1230
+ logging.info("Re-using GPT results from first-frame processing (skipping duplicate call)")
1231
+ gpt_res = first_frame_gpt_results
1232
+ else:
1233
+ logging.info("Running GPT estimation for video start (Frame 0)...")
1234
+ frame_b64 = encode_frame_to_b64(p_frame)
1235
  gpt_res = estimate_threat_gpt(
1236
+ detections=gpt_dets, mission_spec=mission_spec,
1237
+ image_b64=frame_b64,
1238
  )
 
1239
 
1240
+ # Merge using real track_id assigned by ByteTracker
1241
+ for d in gpt_dets:
1242
+ oid = d.get('track_id')
1243
+ if oid and oid in gpt_res:
1244
+ d.update(gpt_res[oid])
1245
+ d["gpt_raw"] = gpt_res[oid]
1246
+ d["assessment_frame_index"] = 0
1247
+ d["assessment_status"] = "ASSESSED"
1248
 
1249
  # Push GPT data back into tracker's internal STrack objects
1250
  tracker.inject_metadata(gpt_dets)
 
1570
  buffer = {}
1571
 
1572
  try:
1573
+ with StreamingVideoWriter(output_video_path, fps, width, height) as writer:
1574
  while next_idx < total_frames:
1575
  try:
1576
  while next_idx not in buffer:
1577
+ if len(buffer) > 128:
1578
+ logging.warning("Writer reorder buffer too large (%d), applying backpressure (waiting for frame %d)...", len(buffer), next_idx)
1579
+ time.sleep(0.05)
1580
 
1581
  idx, frm = queue_out.get(timeout=1.0)
1582
  buffer[idx] = frm
1583
+
1584
  frm = buffer.pop(next_idx)
1585
  writer.write(frm)
1586
 
 
1905
  processed_frames_subset = [] # Keep first frame for saving if needed
1906
 
1907
  try:
1908
+ with StreamingVideoWriter(output_video_path, fps, width, height) as writer:
1909
  while next_idx < total_frames:
1910
  try:
1911
  while next_idx not in buffer:
1912
+ if len(buffer) > 128:
1913
+ logging.warning("Writer reorder buffer too large (%d), applying backpressure (waiting for frame %d)...", len(buffer), next_idx)
1914
+ time.sleep(0.05)
1915
  idx, frm = queue_out.get(timeout=1.0)
1916
  buffer[idx] = frm
1917
+
1918
  frm = buffer.pop(next_idx)
1919
  writer.write(frm)
1920
 
 
1924
  except:
1925
  pass
1926
 
1927
+
1928
  if first_frame_depth_path and not first_frame_saved and next_idx == 0:
1929
  cv2.imwrite(first_frame_depth_path, frm)
1930
  first_frame_saved = True
1931
+
1932
  next_idx += 1
1933
  if next_idx % 30 == 0:
1934
  logging.debug("Wrote depth frame %d/%d", next_idx, total_frames)
 
1942
 
1943
  w_thread = Thread(target=writer_loop, daemon=True)
1944
  w_thread.start()
1945
+
1946
  # Feeder
1947
  try:
1948
  reader_iter = iter(reader)
utils/video.py CHANGED
@@ -212,10 +212,10 @@ class VideoWriter:
212
  self.fps = fps
213
  self.width = width
214
  self.height = height
215
-
216
  self.temp_fd, self.temp_path = tempfile.mkstemp(prefix="raw_", suffix=".mp4")
217
  os.close(self.temp_fd)
218
-
219
  # Use mp4v for speed during writing, then transcode
220
  self.writer = cv2.VideoWriter(self.temp_path, cv2.VideoWriter_fourcc(*"mp4v"), self.fps, (self.width, self.height))
221
  if not self.writer.isOpened():
@@ -228,7 +228,7 @@ class VideoWriter:
228
  def close(self):
229
  if self.writer.isOpened():
230
  self.writer.release()
231
-
232
  # Transcode phase
233
  try:
234
  _transcode_with_ffmpeg(self.temp_path, self.output_path)
@@ -246,3 +246,78 @@ class VideoWriter:
246
 
247
  def __exit__(self, exc_type, exc_val, exc_tb):
248
  self.close()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
212
  self.fps = fps
213
  self.width = width
214
  self.height = height
215
+
216
  self.temp_fd, self.temp_path = tempfile.mkstemp(prefix="raw_", suffix=".mp4")
217
  os.close(self.temp_fd)
218
+
219
  # Use mp4v for speed during writing, then transcode
220
  self.writer = cv2.VideoWriter(self.temp_path, cv2.VideoWriter_fourcc(*"mp4v"), self.fps, (self.width, self.height))
221
  if not self.writer.isOpened():
 
228
  def close(self):
229
  if self.writer.isOpened():
230
  self.writer.release()
231
+
232
  # Transcode phase
233
  try:
234
  _transcode_with_ffmpeg(self.temp_path, self.output_path)
 
246
 
247
  def __exit__(self, exc_type, exc_val, exc_tb):
248
  self.close()
249
+
250
+
251
+ def _ffmpeg_available() -> bool:
252
+ """Check if ffmpeg is available on the system PATH."""
253
+ return shutil.which("ffmpeg") is not None
254
+
255
+
256
+ class StreamingVideoWriter:
257
+ """
258
+ Pipes raw BGR frames directly to an ffmpeg subprocess for H.264 encoding.
259
+ Eliminates the cv2.VideoWriter + post-transcode round-trip.
260
+ Falls back to VideoWriter if ffmpeg is unavailable.
261
+ """
262
+
263
+ def __init__(self, output_path: str, fps: float, width: int, height: int):
264
+ self.output_path = output_path
265
+ self._fallback = None
266
+
267
+ if not _ffmpeg_available():
268
+ logging.warning("ffmpeg not found; StreamingVideoWriter falling back to VideoWriter.")
269
+ self._fallback = VideoWriter(output_path, fps, width, height)
270
+ return
271
+
272
+ cmd = [
273
+ "ffmpeg", "-y",
274
+ "-f", "rawvideo",
275
+ "-pix_fmt", "bgr24",
276
+ "-s", f"{width}x{height}",
277
+ "-r", str(fps),
278
+ "-i", "pipe:",
279
+ "-c:v", "libx264",
280
+ "-preset", "veryfast",
281
+ "-pix_fmt", "yuv420p",
282
+ "-movflags", "+faststart",
283
+ output_path,
284
+ ]
285
+ try:
286
+ self.proc = subprocess.Popen(
287
+ cmd,
288
+ stdin=subprocess.PIPE,
289
+ stdout=subprocess.PIPE,
290
+ stderr=subprocess.PIPE,
291
+ )
292
+ except OSError as e:
293
+ logging.warning("Failed to start ffmpeg (%s); falling back to VideoWriter.", e)
294
+ self._fallback = VideoWriter(output_path, fps, width, height)
295
+
296
+ def write(self, frame: np.ndarray):
297
+ if self._fallback is not None:
298
+ self._fallback.write(frame)
299
+ return
300
+ try:
301
+ self.proc.stdin.write(frame.tobytes())
302
+ except BrokenPipeError:
303
+ logging.error("ffmpeg pipe broken; frames may be lost.")
304
+
305
+ def close(self):
306
+ if self._fallback is not None:
307
+ self._fallback.close()
308
+ return
309
+ try:
310
+ self.proc.stdin.close()
311
+ except OSError:
312
+ pass
313
+ self.proc.wait()
314
+ if self.proc.returncode != 0:
315
+ stderr = self.proc.stderr.read().decode("utf-8", errors="ignore")
316
+ logging.error("StreamingVideoWriter ffmpeg exited with code %d: %s",
317
+ self.proc.returncode, stderr)
318
+
319
+ def __enter__(self):
320
+ return self
321
+
322
+ def __exit__(self, exc_type, exc_val, exc_tb):
323
+ self.close()