Zhen Ye commited on
Commit
dbfb5c9
·
1 Parent(s): 6375955

Event-driven MJPEG streaming, GPT concurrency semaphore, pre-resize frames in publisher

Browse files
Files changed (2) hide show
  1. app.py +44 -62
  2. jobs/streaming.py +71 -17
app.py CHANGED
@@ -45,7 +45,7 @@ from inference import process_first_frame, run_inference, run_segmentation
45
  from models.depth_estimators.model_loader import list_depth_estimators
46
  from jobs.background import process_video_async
47
  from jobs.models import JobInfo, JobStatus
48
- from jobs.streaming import get_stream
49
  from jobs.storage import (
50
  get_depth_output_path,
51
  get_first_frame_depth_path,
@@ -66,6 +66,9 @@ logging.getLogger("httpx").setLevel(logging.WARNING)
66
  logging.getLogger("huggingface_hub").setLevel(logging.WARNING)
67
  logging.getLogger("transformers").setLevel(logging.WARNING)
68
 
 
 
 
69
 
70
 
71
  async def _periodic_cleanup() -> None:
@@ -405,33 +408,17 @@ async def detect_async_endpoint(
405
  active_depth = depth_estimator if enable_depth else None
406
 
407
  try:
408
- processed_frame, detections, depth_map, first_frame_gpt_results = process_first_frame(
409
  str(input_path),
410
  query_list,
411
  mode=mode,
412
  detector_name=detector_name,
413
  segmenter_name=segmenter,
414
- depth_estimator_name=active_depth,
415
- depth_scale=depth_scale,
416
- enable_depth_estimator=enable_depth,
417
- enable_gpt=enable_gpt,
418
- mission_spec=mission_spec,
419
  )
420
  cv2.imwrite(str(first_frame_path), processed_frame)
421
-
422
- if depth_map is not None:
423
- # Simple visualization: Normalize and apply colormap
424
- try:
425
- d_min, d_max = np.min(depth_map), np.max(depth_map)
426
- if d_max - d_min > 1e-6:
427
- d_norm = (depth_map - d_min) / (d_max - d_min)
428
- else:
429
- d_norm = np.zeros_like(depth_map)
430
- d_uint8 = (d_norm * 255).astype(np.uint8)
431
- d_color = cv2.applyColorMap(d_uint8, cv2.COLORMAP_INFERNO)
432
- cv2.imwrite(str(first_frame_depth_path), d_color)
433
- except Exception as e:
434
- logging.warning(f"Failed to save depth map: {e}")
435
  except Exception:
436
  logging.exception("First-frame processing failed.")
437
  shutil.rmtree(job_dir, ignore_errors=True)
@@ -632,70 +619,64 @@ async def detect_first_frame_depth(job_id: str):
632
 
633
  @app.get("/detect/stream/{job_id}")
634
  async def stream_video(job_id: str):
635
- """MJPEG stream of the processing video (optimized)."""
636
- import queue
637
-
638
  async def stream_generator():
639
  loop = asyncio.get_running_loop()
640
  buffered = False
641
-
 
 
 
642
  while True:
643
  q = get_stream(job_id)
644
  if not q:
645
  break
646
-
647
  try:
648
  # Initial Buffer: Wait until we have enough frames or job is done
649
  if not buffered:
650
  if q.qsize() < 30:
651
- # If queue is empty, wait a bit
652
  await asyncio.sleep(0.1)
653
- # Check if job is still running? For now just wait for buffer or stream close
654
  continue
655
  buffered = True
656
 
657
- # Get ONE frame (no skipping)
658
- # Use wait to allow generator to yield cleanly
659
- try:
660
- # Blocking get in executor to avoid hanging async loop?
661
- # Actually standard queue.get() is blocking. get_nowait is not.
662
- # We can sleep-poll for async compatibility
663
- while q.empty():
664
- await asyncio.sleep(0.01)
665
- if not get_stream(job_id): # Stream closed
666
  return
 
 
 
 
667
 
 
 
668
  frame = q.get_nowait()
669
- except queue.Empty:
670
  continue
671
-
672
- # Resize if too big (e.g. > 640 width)
673
- # Optimization: Only resize if needed
674
- h, w = frame.shape[:2]
675
- if w > 640:
676
- scale = 640 / w
677
- new_h = int(h * scale)
678
- frame = cv2.resize(frame, (640, new_h), interpolation=cv2.INTER_NEAREST)
679
-
680
- # Encode in thread
681
- # JPEG Quality = 60 (Better quality for smooth video)
682
  encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 60]
683
  success, buffer = await loop.run_in_executor(None, cv2.imencode, '.jpg', frame, encode_param)
684
-
685
  if success:
686
  yield (b'--frame\r\n'
687
  b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n')
688
-
689
- # Control playback speed?
690
- # If we blast frames as fast as possible, it might play accelerated.
691
- # Ideally we want to sync to ~30fps.
692
- await asyncio.sleep(0.033) # Simple pacer (~30fps)
693
 
694
  except Exception:
695
  await asyncio.sleep(0.1)
696
 
697
  return StreamingResponse(
698
- stream_generator(),
699
  media_type="multipart/x-mixed-replace; boundary=frame"
700
  )
701
 
@@ -725,8 +706,8 @@ async def reason_track(
725
  # This is blocking, but that's expected for this endpoint structure.
726
  # For high concurrency, might want to offload to threadpool or async wrapper.
727
  try:
728
- # estimate_threat_gpt reads the file from disk
729
- results = await asyncio.to_thread(estimate_threat_gpt, input_path, track_list)
730
  logging.info(f"GPT Output for Video Track Update:\n{results}")
731
  except Exception as e:
732
  logging.exception("GPT reasoning failed")
@@ -775,11 +756,12 @@ async def chat_threat_endpoint(
775
  except json_module.JSONDecodeError:
776
  pass # Non-critical, proceed without mission context
777
 
778
- # Run chat in thread to avoid blocking
779
  try:
780
- response = await asyncio.to_thread(
781
- chat_about_threats, question, detection_list, mission_spec_dict
782
- )
 
783
  return {"response": response}
784
  except Exception as e:
785
  logging.exception("Threat chat failed")
 
45
  from models.depth_estimators.model_loader import list_depth_estimators
46
  from jobs.background import process_video_async
47
  from jobs.models import JobInfo, JobStatus
48
+ from jobs.streaming import get_stream, get_stream_event
49
  from jobs.storage import (
50
  get_depth_output_path,
51
  get_first_frame_depth_path,
 
66
  logging.getLogger("huggingface_hub").setLevel(logging.WARNING)
67
  logging.getLogger("transformers").setLevel(logging.WARNING)
68
 
69
+ # GPT concurrency limiter — prevents thread exhaustion under load
70
+ _GPT_SEMAPHORE = asyncio.Semaphore(int(os.environ.get("GPT_CONCURRENCY_LIMIT", "4")))
71
+
72
 
73
 
74
  async def _periodic_cleanup() -> None:
 
408
  active_depth = depth_estimator if enable_depth else None
409
 
410
  try:
411
+ processed_frame, detections = process_first_frame(
412
  str(input_path),
413
  query_list,
414
  mode=mode,
415
  detector_name=detector_name,
416
  segmenter_name=segmenter,
 
 
 
 
 
417
  )
418
  cv2.imwrite(str(first_frame_path), processed_frame)
419
+ # GPT and depth are now handled in the async pipeline (enrichment thread)
420
+ depth_map = None
421
+ first_frame_gpt_results = None
 
 
 
 
 
 
 
 
 
 
 
422
  except Exception:
423
  logging.exception("First-frame processing failed.")
424
  shutil.rmtree(job_dir, ignore_errors=True)
 
619
 
620
  @app.get("/detect/stream/{job_id}")
621
  async def stream_video(job_id: str):
622
+ """MJPEG stream of the processing video (event-driven)."""
623
+ import queue as queue_mod
624
+
625
  async def stream_generator():
626
  loop = asyncio.get_running_loop()
627
  buffered = False
628
+
629
+ # Get or create the asyncio.Event for this stream (must be in async context)
630
+ event = get_stream_event(job_id)
631
+
632
  while True:
633
  q = get_stream(job_id)
634
  if not q:
635
  break
636
+
637
  try:
638
  # Initial Buffer: Wait until we have enough frames or job is done
639
  if not buffered:
640
  if q.qsize() < 30:
 
641
  await asyncio.sleep(0.1)
 
642
  continue
643
  buffered = True
644
 
645
+ # Event-driven wait replaces busy-wait polling
646
+ if event is not None:
647
+ try:
648
+ await asyncio.wait_for(event.wait(), timeout=1.0)
649
+ event.clear()
650
+ except asyncio.TimeoutError:
651
+ if not get_stream(job_id):
 
 
652
  return
653
+ continue
654
+ else:
655
+ # Fallback if no event (shouldn't happen)
656
+ await asyncio.sleep(0.033)
657
 
658
+ # Drain available frame (already pre-resized by publish_frame)
659
+ try:
660
  frame = q.get_nowait()
661
+ except queue_mod.Empty:
662
  continue
663
+
664
+ # Encode in thread (frame already resized by publish_frame)
 
 
 
 
 
 
 
 
 
665
  encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 60]
666
  success, buffer = await loop.run_in_executor(None, cv2.imencode, '.jpg', frame, encode_param)
667
+
668
  if success:
669
  yield (b'--frame\r\n'
670
  b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n')
671
+
672
+ # Simple pacer (~30fps)
673
+ await asyncio.sleep(0.033)
 
 
674
 
675
  except Exception:
676
  await asyncio.sleep(0.1)
677
 
678
  return StreamingResponse(
679
+ stream_generator(),
680
  media_type="multipart/x-mixed-replace; boundary=frame"
681
  )
682
 
 
706
  # This is blocking, but that's expected for this endpoint structure.
707
  # For high concurrency, might want to offload to threadpool or async wrapper.
708
  try:
709
+ async with _GPT_SEMAPHORE:
710
+ results = await asyncio.to_thread(estimate_threat_gpt, input_path, track_list)
711
  logging.info(f"GPT Output for Video Track Update:\n{results}")
712
  except Exception as e:
713
  logging.exception("GPT reasoning failed")
 
756
  except json_module.JSONDecodeError:
757
  pass # Non-critical, proceed without mission context
758
 
759
+ # Run chat in thread to avoid blocking (with concurrency limit)
760
  try:
761
+ async with _GPT_SEMAPHORE:
762
+ response = await asyncio.to_thread(
763
+ chat_about_threats, question, detection_list, mission_spec_dict
764
+ )
765
  return {"response": response}
766
  except Exception as e:
767
  logging.exception("Threat chat failed")
jobs/streaming.py CHANGED
@@ -1,26 +1,63 @@
 
1
  import queue
2
- from typing import Dict, Optional, Any
3
  from threading import Lock
4
 
 
 
 
5
  # Global registry of active streams
6
- # Key: job_id -> Queue[frame_data]
7
- _STREAMS: Dict[str, queue.Queue] = {}
8
  _LOCK = Lock()
9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
10
  def create_stream(job_id: str) -> queue.Queue:
11
- """Create a new stream queue for a job."""
12
  with _LOCK:
13
- # standard Queue, thread-safe
14
- # maxsize to prevent memory explosion if consumer is slow
15
- # Buffer increased to 300 (approx 10s at 30fps) for smooth streaming
16
  q = queue.Queue(maxsize=60)
17
- _STREAMS[job_id] = q
 
 
18
  return q
19
 
 
20
  def get_stream(job_id: str) -> Optional[queue.Queue]:
21
  """Get the stream queue for a job."""
22
  with _LOCK:
23
- return _STREAMS.get(job_id)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
24
 
25
  def remove_stream(job_id: str) -> None:
26
  """Remove a stream queue."""
@@ -28,15 +65,32 @@ def remove_stream(job_id: str) -> None:
28
  if job_id in _STREAMS:
29
  del _STREAMS[job_id]
30
 
 
31
  def publish_frame(job_id: str, frame: Any) -> None:
32
- """Publish a frame to a specific job's stream. Non-blocking drop if full."""
33
- q = get_stream(job_id)
34
- if q:
35
- try:
36
- q.put_nowait(frame)
37
- except queue.Full:
38
- # Drop frame if consumer is too slow
39
- pass
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
40
 
41
  def publish_frame_to_queue(q: queue.Queue, frame: Any) -> None:
42
  """Publish to a specific queue object. Non-blocking drop."""
 
1
+ import asyncio
2
  import queue
3
+ from typing import Dict, Optional, Any, Tuple
4
  from threading import Lock
5
 
6
+ import cv2
7
+ import numpy as np
8
+
9
  # Global registry of active streams
10
+ # Key: job_id -> (Queue[frame_data], asyncio.Event)
11
+ _STREAMS: Dict[str, Tuple[queue.Queue, asyncio.Event]] = {}
12
  _LOCK = Lock()
13
 
14
+ _STREAM_MAX_WIDTH = 640
15
+
16
+
17
+ def _resize_for_stream(frame: np.ndarray) -> np.ndarray:
18
+ """Resize frame for streaming (cap at 640px width). Pre-resizes in publisher thread."""
19
+ h, w = frame.shape[:2]
20
+ if w > _STREAM_MAX_WIDTH:
21
+ scale = _STREAM_MAX_WIDTH / w
22
+ new_h = int(h * scale)
23
+ return cv2.resize(frame, (_STREAM_MAX_WIDTH, new_h), interpolation=cv2.INTER_NEAREST)
24
+ return frame
25
+
26
+
27
  def create_stream(job_id: str) -> queue.Queue:
28
+ """Create a new stream queue for a job. Returns the queue for backward compat."""
29
  with _LOCK:
 
 
 
30
  q = queue.Queue(maxsize=60)
31
+ # Create event — will be lazily bound to the correct event loop
32
+ event = None # Lazily created in get_stream_event
33
+ _STREAMS[job_id] = (q, event)
34
  return q
35
 
36
+
37
  def get_stream(job_id: str) -> Optional[queue.Queue]:
38
  """Get the stream queue for a job."""
39
  with _LOCK:
40
+ entry = _STREAMS.get(job_id)
41
+ if entry:
42
+ return entry[0]
43
+ return None
44
+
45
+
46
+ def get_stream_event(job_id: str) -> Optional[asyncio.Event]:
47
+ """Get or create the asyncio.Event for a job's stream.
48
+
49
+ Must be called from the async event loop that will await the event.
50
+ """
51
+ with _LOCK:
52
+ entry = _STREAMS.get(job_id)
53
+ if not entry:
54
+ return None
55
+ q, event = entry
56
+ if event is None:
57
+ event = asyncio.Event()
58
+ _STREAMS[job_id] = (q, event)
59
+ return event
60
+
61
 
62
  def remove_stream(job_id: str) -> None:
63
  """Remove a stream queue."""
 
65
  if job_id in _STREAMS:
66
  del _STREAMS[job_id]
67
 
68
+
69
  def publish_frame(job_id: str, frame: Any) -> None:
70
+ """Publish a pre-resized frame to a job's stream. Non-blocking drop if full.
71
+
72
+ Also sets the asyncio.Event to wake the stream consumer immediately.
73
+ """
74
+ with _LOCK:
75
+ entry = _STREAMS.get(job_id)
76
+ if not entry:
77
+ return
78
+
79
+ q, event = entry
80
+
81
+ # Pre-resize for streaming (avoids resize in async handler)
82
+ resized = _resize_for_stream(frame)
83
+
84
+ try:
85
+ q.put_nowait(resized)
86
+ except queue.Full:
87
+ # Drop frame if consumer is too slow
88
+ pass
89
+
90
+ # Wake the async consumer if waiting
91
+ if event is not None:
92
+ event.set()
93
+
94
 
95
  def publish_frame_to_queue(q: queue.Queue, frame: Any) -> None:
96
  """Publish to a specific queue object. Non-blocking drop."""