AliZak commited on
Commit
6ebfd3d
·
verified ·
1 Parent(s): bafb5d4

trying smoothing + buffer

Browse files
Files changed (1) hide show
  1. perception_roi_server.py +69 -41
perception_roi_server.py CHANGED
@@ -21,6 +21,7 @@ import uuid
21
  import time
22
  import math
23
  import threading
 
24
  import shutil
25
  import subprocess
26
  from dataclasses import dataclass, field
@@ -38,6 +39,9 @@ DEFAULT_CONF = float(os.environ.get("YOLO_CONF", "0.25"))
38
  DEFAULT_DEVICE = os.environ.get("YOLO_DEVICE", "auto")
39
  FAST_DETECT_SCALE = float(os.environ.get("FAST_DETECT_SCALE", "0.65"))
40
  FAST_DETECT_IMGSZ = int(os.environ.get("FAST_DETECT_IMGSZ", "512"))
 
 
 
41
  DATA_DIR = os.environ.get("DATA_DIR", "/tmp/roi_demo")
42
  UPLOAD_DIR = os.path.join(DATA_DIR, "uploads")
43
  OUTPUT_DIR = os.path.join(DATA_DIR, "outputs")
@@ -144,7 +148,7 @@ def _yolo_detect_frame(
144
  def _draw_boxes(frame_bgr: np.ndarray, dets: List[Dict[str, Any]]) -> np.ndarray:
145
  out = frame_bgr.copy()
146
  for d in dets:
147
- b = d.get("bbox_xyxy")
148
  if not (isinstance(b, (list, tuple)) and len(b) == 4):
149
  continue
150
  x1, y1, x2, y2 = [int(max(0, v)) for v in b]
@@ -159,6 +163,32 @@ def _draw_boxes(frame_bgr: np.ndarray, dets: List[Dict[str, Any]]) -> np.ndarray
159
  return out
160
 
161
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
162
  def _iou_xyxy(a: List[float], b: List[float]) -> float:
163
  ax1, ay1, ax2, ay2 = a
164
  bx1, by1, bx2, by2 = b
@@ -305,7 +335,7 @@ def _apply_roi_overlay(frame_bgr: np.ndarray, dets: List[Dict[str, Any]], target
305
  out = bg.copy()
306
  pad = max(2, int(min(w, h) * 0.005))
307
  for d in dets:
308
- b = d.get("bbox_xyxy")
309
  if not (isinstance(b, (list, tuple)) and len(b) == 4):
310
  continue
311
  x1, y1, x2, y2 = [int(v) for v in b]
@@ -346,6 +376,9 @@ class Job:
346
  latest_jpeg: Optional[bytes] = None
347
  latest_compressed_jpeg: Optional[bytes] = None
348
  latest_roi_jpeg: Optional[bytes] = None
 
 
 
349
  lock: threading.Lock = field(default_factory=threading.Lock)
350
  tracker_state: Dict[str, Any] = field(default_factory=lambda: {"next_id": 1, "tracks": []})
351
 
@@ -407,6 +440,7 @@ def _process_job(job: Job):
407
  ]
408
  max_id = max((int(d.get("track_id", 0)) for d in dets), default=0)
409
  tracker["next_id"] = max(tracker.get("next_id", 1), max_id + 1)
 
410
  with job.lock:
411
  job.det_by_frame[int(frame_idx)] = dets
412
  last_dets = dets
@@ -418,6 +452,7 @@ def _process_job(job: Job):
418
  if ok2:
419
  with job.lock:
420
  job.latest_jpeg = jpg.tobytes()
 
421
 
422
  if overlay_writer is not None:
423
  overlay_writer.write(overlay)
@@ -515,11 +550,13 @@ def _compress_job(job: Job, bandwidth_kbps: int, target_fps: int, target_w: int,
515
  if okc:
516
  with job.lock:
517
  job.latest_compressed_jpeg = jpgc.tobytes()
 
518
  if roi_frame is not None:
519
  okr, jpgr = cv2.imencode(".jpg", roi_frame, [int(cv2.IMWRITE_JPEG_QUALITY), 80])
520
  if okr:
521
  with job.lock:
522
  job.latest_roi_jpeg = jpgr.tobytes()
 
523
  except Exception:
524
  pass
525
 
@@ -598,6 +635,7 @@ async def track_async(
598
  model = get_model(job.weights)
599
  det0 = _yolo_detect_frame(model, frame0, conf=job.conf, queries=job.queries, device=job.device, fast_mode=job.fast_mode)
600
  det0 = _assign_tracks(det0, job.tracker_state)
 
601
  with job.lock:
602
  job.det_by_frame[0] = det0
603
  vis0 = _draw_boxes(frame0, det0)
@@ -605,6 +643,7 @@ async def track_async(
605
  if ok2:
606
  with job.lock:
607
  job.latest_jpeg = jpg.tobytes()
 
608
  except Exception:
609
  pass
610
 
@@ -665,13 +704,27 @@ def process_status(job_id: str):
665
  }
666
 
667
 
668
- def _mjpeg_generator(job: Job):
669
  boundary = b"--frame"
 
 
 
 
 
670
  while True:
671
  with job.lock:
672
- jpg = job.latest_jpeg
673
  status = job.status
674
  err = job.error
 
 
 
 
 
 
 
 
 
675
  if err:
676
  break
677
  if jpg:
@@ -679,50 +732,25 @@ def _mjpeg_generator(job: Job):
679
  yield b"Content-Type: image/jpeg\r\n"
680
  yield f"Content-Length: {len(jpg)}\r\n\r\n".encode("ascii")
681
  yield jpg + b"\r\n"
682
- time.sleep(0.15)
683
- if status in ("completed", "error"):
684
- time.sleep(0.5)
 
 
 
685
  break
686
 
687
 
 
 
 
 
688
  def _mjpeg_generator_compressed(job: Job):
689
- boundary = b"--frame"
690
- while True:
691
- with job.lock:
692
- jpg = job.latest_compressed_jpeg
693
- status = job.status
694
- err = job.error
695
- if err:
696
- break
697
- if jpg:
698
- yield boundary + b"\r\n"
699
- yield b"Content-Type: image/jpeg\r\n"
700
- yield f"Content-Length: {len(jpg)}\r\n\r\n".encode("ascii")
701
- yield jpg + b"\r\n"
702
- time.sleep(0.15)
703
- if status in ("completed", "error"):
704
- time.sleep(0.5)
705
- break
706
 
707
 
708
  def _mjpeg_generator_roi(job: Job):
709
- boundary = b"--frame"
710
- while True:
711
- with job.lock:
712
- jpg = job.latest_roi_jpeg
713
- status = job.status
714
- err = job.error
715
- if err:
716
- break
717
- if jpg:
718
- yield boundary + b"\r\n"
719
- yield b"Content-Type: image/jpeg\r\n"
720
- yield f"Content-Length: {len(jpg)}\r\n\r\n".encode("ascii")
721
- yield jpg + b"\r\n"
722
- time.sleep(0.15)
723
- if status in ("completed", "error"):
724
- time.sleep(0.5)
725
- break
726
 
727
 
728
  @app.get("/detect/stream/{job_id}")
 
21
  import time
22
  import math
23
  import threading
24
+ from collections import deque
25
  import shutil
26
  import subprocess
27
  from dataclasses import dataclass, field
 
39
  DEFAULT_DEVICE = os.environ.get("YOLO_DEVICE", "auto")
40
  FAST_DETECT_SCALE = float(os.environ.get("FAST_DETECT_SCALE", "0.65"))
41
  FAST_DETECT_IMGSZ = int(os.environ.get("FAST_DETECT_IMGSZ", "512"))
42
+ SMOOTH_ALPHA = float(os.environ.get("SMOOTH_ALPHA", "0.7"))
43
+ MJPEG_STREAM_FPS = float(os.environ.get("MJPEG_STREAM_FPS", "8"))
44
+ MJPEG_STARTUP_BUFFER_SEC = float(os.environ.get("MJPEG_STARTUP_BUFFER_SEC", "0.6"))
45
  DATA_DIR = os.environ.get("DATA_DIR", "/tmp/roi_demo")
46
  UPLOAD_DIR = os.path.join(DATA_DIR, "uploads")
47
  OUTPUT_DIR = os.path.join(DATA_DIR, "outputs")
 
148
  def _draw_boxes(frame_bgr: np.ndarray, dets: List[Dict[str, Any]]) -> np.ndarray:
149
  out = frame_bgr.copy()
150
  for d in dets:
151
+ b = d.get("bbox_xyxy_smooth") or d.get("bbox_xyxy")
152
  if not (isinstance(b, (list, tuple)) and len(b) == 4):
153
  continue
154
  x1, y1, x2, y2 = [int(max(0, v)) for v in b]
 
163
  return out
164
 
165
 
166
+ def _smooth_tracks(dets: List[Dict[str, Any]], tracker: Dict[str, Any], alpha: float = SMOOTH_ALPHA) -> List[Dict[str, Any]]:
167
+ if not dets:
168
+ return dets
169
+ smooth = tracker.setdefault("smooth", {})
170
+ out: List[Dict[str, Any]] = []
171
+ for d in dets:
172
+ b = d.get("bbox_xyxy")
173
+ tid = d.get("track_id")
174
+ if not (isinstance(b, (list, tuple)) and len(b) == 4) or tid is None:
175
+ out.append(d)
176
+ continue
177
+ prev = smooth.get(int(tid))
178
+ if isinstance(prev, (list, tuple)) and len(prev) == 4:
179
+ sb = [
180
+ float(alpha) * float(prev[i]) + (1.0 - float(alpha)) * float(b[i])
181
+ for i in range(4)
182
+ ]
183
+ else:
184
+ sb = [float(v) for v in b]
185
+ smooth[int(tid)] = sb
186
+ d2 = dict(d)
187
+ d2["bbox_xyxy_smooth"] = sb
188
+ out.append(d2)
189
+ return out
190
+
191
+
192
  def _iou_xyxy(a: List[float], b: List[float]) -> float:
193
  ax1, ay1, ax2, ay2 = a
194
  bx1, by1, bx2, by2 = b
 
335
  out = bg.copy()
336
  pad = max(2, int(min(w, h) * 0.005))
337
  for d in dets:
338
+ b = d.get("bbox_xyxy_smooth") or d.get("bbox_xyxy")
339
  if not (isinstance(b, (list, tuple)) and len(b) == 4):
340
  continue
341
  x1, y1, x2, y2 = [int(v) for v in b]
 
376
  latest_jpeg: Optional[bytes] = None
377
  latest_compressed_jpeg: Optional[bytes] = None
378
  latest_roi_jpeg: Optional[bytes] = None
379
+ overlay_buffer: deque = field(default_factory=lambda: deque(maxlen=180))
380
+ compressed_buffer: deque = field(default_factory=lambda: deque(maxlen=180))
381
+ roi_buffer: deque = field(default_factory=lambda: deque(maxlen=180))
382
  lock: threading.Lock = field(default_factory=threading.Lock)
383
  tracker_state: Dict[str, Any] = field(default_factory=lambda: {"next_id": 1, "tracks": []})
384
 
 
440
  ]
441
  max_id = max((int(d.get("track_id", 0)) for d in dets), default=0)
442
  tracker["next_id"] = max(tracker.get("next_id", 1), max_id + 1)
443
+ dets = _smooth_tracks(dets, tracker)
444
  with job.lock:
445
  job.det_by_frame[int(frame_idx)] = dets
446
  last_dets = dets
 
452
  if ok2:
453
  with job.lock:
454
  job.latest_jpeg = jpg.tobytes()
455
+ job.overlay_buffer.append(job.latest_jpeg)
456
 
457
  if overlay_writer is not None:
458
  overlay_writer.write(overlay)
 
550
  if okc:
551
  with job.lock:
552
  job.latest_compressed_jpeg = jpgc.tobytes()
553
+ job.compressed_buffer.append(job.latest_compressed_jpeg)
554
  if roi_frame is not None:
555
  okr, jpgr = cv2.imencode(".jpg", roi_frame, [int(cv2.IMWRITE_JPEG_QUALITY), 80])
556
  if okr:
557
  with job.lock:
558
  job.latest_roi_jpeg = jpgr.tobytes()
559
+ job.roi_buffer.append(job.latest_roi_jpeg)
560
  except Exception:
561
  pass
562
 
 
635
  model = get_model(job.weights)
636
  det0 = _yolo_detect_frame(model, frame0, conf=job.conf, queries=job.queries, device=job.device, fast_mode=job.fast_mode)
637
  det0 = _assign_tracks(det0, job.tracker_state)
638
+ det0 = _smooth_tracks(det0, job.tracker_state)
639
  with job.lock:
640
  job.det_by_frame[0] = det0
641
  vis0 = _draw_boxes(frame0, det0)
 
643
  if ok2:
644
  with job.lock:
645
  job.latest_jpeg = jpg.tobytes()
646
+ job.overlay_buffer.append(job.latest_jpeg)
647
  except Exception:
648
  pass
649
 
 
704
  }
705
 
706
 
707
+ def _mjpeg_generator_from_buffer(job: Job, buffer_attr: str, latest_attr: str):
708
  boundary = b"--frame"
709
+ stream_fps = max(2.0, min(30.0, float(MJPEG_STREAM_FPS or 8.0)))
710
+ frame_interval = 1.0 / stream_fps
711
+ min_frames = int(math.ceil(stream_fps * max(0.0, float(MJPEG_STARTUP_BUFFER_SEC or 0.0))))
712
+ started = False
713
+ next_time = time.time()
714
  while True:
715
  with job.lock:
716
+ buf = getattr(job, buffer_attr, None)
717
  status = job.status
718
  err = job.error
719
+ if buf is not None and len(buf) > 0:
720
+ if not started and len(buf) < min_frames:
721
+ jpg = None
722
+ else:
723
+ started = True
724
+ jpg = buf.popleft()
725
+ else:
726
+ jpg = getattr(job, latest_attr, None)
727
+ remaining = len(buf) if buf is not None else 0
728
  if err:
729
  break
730
  if jpg:
 
732
  yield b"Content-Type: image/jpeg\r\n"
733
  yield f"Content-Length: {len(jpg)}\r\n\r\n".encode("ascii")
734
  yield jpg + b"\r\n"
735
+ now = time.time()
736
+ if now < next_time:
737
+ time.sleep(next_time - now)
738
+ next_time = max(next_time + frame_interval, now)
739
+ if status in ("completed", "error") and remaining == 0:
740
+ time.sleep(0.2)
741
  break
742
 
743
 
744
+ def _mjpeg_generator(job: Job):
745
+ return _mjpeg_generator_from_buffer(job, "overlay_buffer", "latest_jpeg")
746
+
747
+
748
  def _mjpeg_generator_compressed(job: Job):
749
+ return _mjpeg_generator_from_buffer(job, "compressed_buffer", "latest_compressed_jpeg")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
750
 
751
 
752
  def _mjpeg_generator_roi(job: Job):
753
+ return _mjpeg_generator_from_buffer(job, "roi_buffer", "latest_roi_jpeg")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
754
 
755
 
756
  @app.get("/detect/stream/{job_id}")