MacBook pro commited on
Commit
9457f1e
·
1 Parent(s): 5d8fdc1

feat(webrtc): hybrid inline-if-idle processing, latency & queue metrics, pipeline stats endpoint

Browse files
models/_logs/download_audit.jsonl ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ {"ts": "2025-09-25T00:33:19Z", "event": "start", "tag": "downloader"}
2
+ {"ts": "2025-09-25T00:35:16Z", "event": "download_ok", "tag": "downloader", "model": "inswapper", "path": "/Users/macbookpro/Desktop/mirage/models/inswapper/inswapper_128_fp16.onnx"}
requirements_local.txt ADDED
@@ -0,0 +1,18 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ fastapi==0.104.1
2
+ uvicorn[standard]==0.24.0
3
+ aiortc==1.6.0
4
+ websockets==11.0.3
5
+ numpy==1.24.4
6
+ opencv-python==4.8.1.78
7
+ Pillow==10.0.1
8
+ insightface==0.7.3
9
+ basicsr==1.4.2
10
+ timm==0.9.12
11
+ python-multipart==0.0.9
12
+ av==11.0.0
13
+ psutil==5.9.8
14
+ huggingface-hub==0.24.5
15
+ onnx==1.16.1
16
+ # Use GPU build of ONNX Runtime; required for CUDAExecutionProvider on A10G
17
+ torch==2.1.2
18
+ facexlib==0.3.0
swap_pipeline.py CHANGED
@@ -82,6 +82,20 @@ class FaceSwapPipeline:
82
  self.low_brightness_threshold = float(os.getenv('MIRAGE_LOW_BRIGHTNESS_THRESH', '40'))
83
  # Similarity threshold for logging (cosine similarity typical range [-1,1])
84
  self.similarity_warn_threshold = float(os.getenv('MIRAGE_SIMILARITY_WARN', '0.15'))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
85
 
86
  def initialize(self):
87
  if self.initialized:
@@ -309,6 +323,7 @@ class FaceSwapPipeline:
309
  return pcm_bytes
310
 
311
  def process_frame(self, frame: np.ndarray) -> np.ndarray:
 
312
  if not self.initialized or self.swapper is None or self.app is None:
313
  self._stats['early_uninitialized'] += 1
314
  if self.swap_debug:
@@ -336,9 +351,27 @@ class FaceSwapPipeline:
336
  logger.debug(f'Applied brightness compensation gain={gain:.2f} (brightness={brightness:.1f})')
337
  except Exception:
338
  pass
339
- faces = self.app.get(frame)
 
 
 
 
 
 
 
 
 
 
 
340
  self._last_faces_cache = faces
341
  if not faces:
 
 
 
 
 
 
 
342
  if self.swap_debug:
343
  logger.debug('process_frame: no faces detected in incoming frame')
344
  self._record_latency(time.time() - t0)
@@ -378,11 +411,36 @@ class FaceSwapPipeline:
378
  logger.debug(f'Low similarity primary face sim={sim:.3f}')
379
  except Exception:
380
  pass
381
- out = self.swapper.get(out, f, self.source_face, paste_back=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
382
  count += 1
383
  except Exception as e:
384
  logger.debug(f"Swap failed for face: {e}")
385
  self._stats['total_faces_swapped'] += count
 
 
 
 
386
  # Optional debug overlay for visual confirmation
387
  if count > 0 and os.getenv('MIRAGE_DEBUG_OVERLAY', '0').lower() in ('1','true','yes','on'):
388
  try:
@@ -433,6 +491,11 @@ class FaceSwapPipeline:
433
  self._stats['swap_faces_last'] = count
434
  self._stats['frames'] += 1
435
  self._frame_index += 1
 
 
 
 
 
436
  return out
437
 
438
  def _record_latency(self, dt: float):
@@ -455,6 +518,8 @@ class FaceSwapPipeline:
455
  codeformer_avg_latency_ms=cf_avg,
456
  max_faces=self.max_faces,
457
  debug_overlay=os.getenv('MIRAGE_DEBUG_OVERLAY', '0'),
 
 
458
  )
459
  # Provider diagnostics (best-effort)
460
  try: # pragma: no cover
 
82
  self.low_brightness_threshold = float(os.getenv('MIRAGE_LOW_BRIGHTNESS_THRESH', '40'))
83
  # Similarity threshold for logging (cosine similarity typical range [-1,1])
84
  self.similarity_warn_threshold = float(os.getenv('MIRAGE_SIMILARITY_WARN', '0.15'))
85
+ # Temporal reuse configuration
86
+ self.face_cache_ttl = int(os.getenv('MIRAGE_FACE_CACHE_TTL', '5') or '5') # frames
87
+ self._cached_face = None
88
+ self._cached_face_age = 0
89
+ # Aggressive blend toggle for visibility
90
+ self.aggressive_blend = os.getenv('MIRAGE_AGGRESSIVE_BLEND', '0').lower() in ('1','true','yes','on')
91
+ # Optional face ROI upscaling for tiny faces
92
+ self.face_min_size = int(os.getenv('MIRAGE_FACE_MIN_SIZE', '80') or '80')
93
+ self.face_upscale_factor = float(os.getenv('MIRAGE_FACE_UPSCALE', '1.6'))
94
+ # Detector preprocessing (CLAHE) low light
95
+ self.det_clahe = os.getenv('MIRAGE_DET_CLAHE', '1').lower() in ('1','true','yes','on')
96
+ # End-to-end latency markers
97
+ self._last_e2e_ms = None
98
+ self._e2e_hist: List[float] = []
99
 
100
  def initialize(self):
101
  if self.initialized:
 
323
  return pcm_bytes
324
 
325
  def process_frame(self, frame: np.ndarray) -> np.ndarray:
326
+ frame_in_ts = time.time()
327
  if not self.initialized or self.swapper is None or self.app is None:
328
  self._stats['early_uninitialized'] += 1
329
  if self.swap_debug:
 
351
  logger.debug(f'Applied brightness compensation gain={gain:.2f} (brightness={brightness:.1f})')
352
  except Exception:
353
  pass
354
+ # Detector preprocessing path for improved low-light detect
355
+ det_input = frame
356
+ if self.det_clahe:
357
+ try:
358
+ gray_det = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
359
+ if float(np.mean(gray_det)) < (self.low_brightness_threshold + 15):
360
+ clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
361
+ eq = clahe.apply(gray_det)
362
+ det_input = cv2.cvtColor(eq, cv2.COLOR_GRAY2BGR)
363
+ except Exception:
364
+ pass
365
+ faces = self.app.get(det_input)
366
  self._last_faces_cache = faces
367
  if not faces:
368
+ # Attempt temporal reuse of last successful face if within ttl
369
+ if self._cached_face is not None and self._cached_face_age < self.face_cache_ttl:
370
+ faces = [self._cached_face]
371
+ self._cached_face_age += 1
372
+ else:
373
+ self._cached_face = None
374
+ self._cached_face_age = 0
375
  if self.swap_debug:
376
  logger.debug('process_frame: no faces detected in incoming frame')
377
  self._record_latency(time.time() - t0)
 
411
  logger.debug(f'Low similarity primary face sim={sim:.3f}')
412
  except Exception:
413
  pass
414
+ # Upscale small face region before swapping to reduce warping artifacts
415
+ try:
416
+ x1,y1,x2,y2 = f.bbox.astype(int)
417
+ fh = y2 - y1; fw = x2 - x1
418
+ if min(fh, fw) < self.face_min_size:
419
+ # Extract padded ROI, upscale, run swapper, then downscale
420
+ pad = int(0.15 * max(fh, fw))
421
+ h, w = out.shape[:2]
422
+ rx1 = max(0, x1 - pad); ry1 = max(0, y1 - pad)
423
+ rx2 = min(w, x2 + pad); ry2 = min(h, y2 + pad)
424
+ roi = out[ry1:ry2, rx1:rx2]
425
+ if roi.size > 0:
426
+ big = cv2.resize(roi, None, fx=self.face_upscale_factor, fy=self.face_upscale_factor, interpolation=cv2.INTER_CUBIC)
427
+ swapped_big = self.swapper.get(big, f, self.source_face, paste_back=False)
428
+ swapped_small = cv2.resize(swapped_big, (rx2-rx1, ry2-ry1), interpolation=cv2.INTER_LINEAR)
429
+ out[ry1:ry2, rx1:rx2] = swapped_small
430
+ else:
431
+ out = self.swapper.get(out, f, self.source_face, paste_back=True)
432
+ else:
433
+ out = self.swapper.get(out, f, self.source_face, paste_back=True)
434
+ except Exception:
435
+ out = self.swapper.get(out, f, self.source_face, paste_back=True)
436
  count += 1
437
  except Exception as e:
438
  logger.debug(f"Swap failed for face: {e}")
439
  self._stats['total_faces_swapped'] += count
440
+ # Cache first face for reuse
441
+ if faces:
442
+ self._cached_face = faces[0]
443
+ self._cached_face_age = 0
444
  # Optional debug overlay for visual confirmation
445
  if count > 0 and os.getenv('MIRAGE_DEBUG_OVERLAY', '0').lower() in ('1','true','yes','on'):
446
  try:
 
491
  self._stats['swap_faces_last'] = count
492
  self._stats['frames'] += 1
493
  self._frame_index += 1
494
+ # End-to-end latency including pre-detection + swap path
495
+ self._last_e2e_ms = (time.time() - frame_in_ts) * 1000.0
496
+ self._e2e_hist.append(self._last_e2e_ms)
497
+ if len(self._e2e_hist) > 200:
498
+ self._e2e_hist.pop(0)
499
  return out
500
 
501
  def _record_latency(self, dt: float):
 
518
  codeformer_avg_latency_ms=cf_avg,
519
  max_faces=self.max_faces,
520
  debug_overlay=os.getenv('MIRAGE_DEBUG_OVERLAY', '0'),
521
+ e2e_latency_ms=self._last_e2e_ms,
522
+ e2e_latency_avg_ms=(float(np.mean(self._e2e_hist)) if self._e2e_hist else None),
523
  )
524
  # Provider diagnostics (best-effort)
525
  try: # pragma: no cover
webrtc_server.py CHANGED
@@ -375,15 +375,33 @@ class IncomingVideoTrack(MediaStreamTrack):
375
  self._last_processed: Optional[np.ndarray] = None
376
  self._processing_task: Optional[asyncio.Task] = None
377
  self._lock = asyncio.Lock()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
378
 
379
  async def recv(self): # type: ignore[override]
380
  frame = await self.track.recv()
381
  self.frame_id += 1
 
 
 
382
  # Convert to numpy BGR for pipeline
383
  img = frame.to_ndarray(format="bgr24")
384
  h, w, _ = img.shape
385
  proc_input = img
386
- # Optionally downscale for processing to cap latency (configurable)
387
  try:
388
  max_dim_cfg = int(os.getenv('MIRAGE_PROC_MAX_DIM', '512') or '512')
389
  if max_dim_cfg < 64:
@@ -398,51 +416,96 @@ class IncomingVideoTrack(MediaStreamTrack):
398
  proc_input = cv2.resize(img, (max(1, scale_w), max(1, scale_h)))
399
  except Exception as e:
400
  logger.debug(f"Video downscale skip: {e}")
401
- # Schedule background processing to avoid blocking recv()
402
- async def _process_async(inp: np.ndarray, expected_size: tuple[int, int], fid: int):
 
 
 
 
 
403
  try:
404
- logger.info(f"Processing video frame {fid}, input shape: {inp.shape}")
405
- out_small = self.pipeline.process_video_frame(inp, fid)
406
- logger.info(f"Pipeline returned frame shape: {out_small.shape if out_small is not None else 'None'}")
407
- if out_small is None:
408
- logger.warning(f"Pipeline returned None for frame {fid}")
409
- return
410
- if (out_small.shape[1], out_small.shape[0]) != expected_size:
411
- out = cv2.resize(out_small, expected_size)
412
- logger.info(f"Resized frame from {out_small.shape[:2]} to {expected_size}")
413
  else:
414
- out = out_small
415
- async with self._lock:
416
- self._last_processed = out
417
- logger.info(f"Stored processed frame {fid}, shape: {out.shape}")
 
 
418
  except Exception as ex:
419
- logger.error(f"Video processing error(bg): {ex}")
420
- finally:
421
- self._processing_task = None
422
-
423
- expected = (w, h)
424
- if self._processing_task is None:
425
- # Only run one processing task at a time; drop older frames
426
- self._processing_task = asyncio.create_task(_process_async(proc_input, expected, self.frame_id))
427
-
428
- # Use last processed if available, else pass-through
429
- async with self._lock:
430
- processed = self._last_processed if self._last_processed is not None else img
431
- mode = 'processed' if self._last_processed is not None else 'passthrough'
432
- logger.info(f"Frame {self.frame_id}: using {mode} frame, shape: {processed.shape}")
433
-
434
- # Rebase timestamps to a clean monotonic sequence to avoid decoder stall if processing lagged
435
- import av as _av
436
- vframe = _av.VideoFrame.from_ndarray(processed, format="bgr24")
437
- # Provide new pts/time_base using VideoStreamTrack helper (borrow from OutboundVideoTrack semantics)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
438
  try:
439
- pts, time_base = await OutboundVideoTrack().next_timestamp() # ephemeral instance just for sequencing
440
- vframe.pts = pts
441
- vframe.time_base = time_base
 
 
 
 
442
  except Exception:
443
- # Fallback to original timing
444
- vframe.pts = frame.pts
445
- vframe.time_base = frame.time_base
 
 
 
 
 
 
 
 
 
446
  return vframe
447
 
448
 
@@ -872,6 +935,35 @@ async def frame_counter():
872
  except Exception as e:
873
  return {"active": False, "error": str(e)}
874
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
875
  # Optional: connection monitoring endpoint for diagnostics
876
  if add_connection_monitoring is not None:
877
  try:
 
375
  self._last_processed: Optional[np.ndarray] = None
376
  self._processing_task: Optional[asyncio.Task] = None
377
  self._lock = asyncio.Lock()
378
+ # Latency / timing metrics
379
+ self._capture_ts: Optional[float] = None
380
+ self._last_latency_ms: Optional[float] = None
381
+ self._avg_latency_ms: Optional[float] = None
382
+ self._lat_hist: list[float] = []
383
+ self._queue_wait_last_ms: Optional[float] = None
384
+ self._queue_wait_hist: list[float] = []
385
+ self._frames_passthrough = 0
386
+ self._frames_processed = 0
387
+ self._frames_dropped = 0
388
+ self._placeholder_active = True
389
+ self._sync_if_idle = os.getenv('MIRAGE_SYNC_IF_IDLE','1').lower() in ('1','true','yes','on')
390
+ self._pts_origin: Optional[float] = None # monotonic origin
391
+ self._last_sent_pts: Optional[int] = None
392
+ self._time_base = (1, 90000) # 90kHz typical video clock
393
 
394
  async def recv(self): # type: ignore[override]
395
  frame = await self.track.recv()
396
  self.frame_id += 1
397
+ capture_t = time.time()
398
+ if self._pts_origin is None:
399
+ self._pts_origin = capture_t
400
  # Convert to numpy BGR for pipeline
401
  img = frame.to_ndarray(format="bgr24")
402
  h, w, _ = img.shape
403
  proc_input = img
404
+ # Optional downscale (same as prior)
405
  try:
406
  max_dim_cfg = int(os.getenv('MIRAGE_PROC_MAX_DIM', '512') or '512')
407
  if max_dim_cfg < 64:
 
416
  proc_input = cv2.resize(img, (max(1, scale_w), max(1, scale_h)))
417
  except Exception as e:
418
  logger.debug(f"Video downscale skip: {e}")
419
+
420
+ expected_size = (w, h)
421
+ processed: Optional[np.ndarray] = None
422
+
423
+ # Hybrid processing: inline if no background task running OR sync flag set; else schedule
424
+ if self._sync_if_idle and (self._processing_task is None):
425
+ t_q_start = time.time()
426
  try:
427
+ out_small = self.pipeline.process_video_frame(proc_input, self.frame_id)
428
+ if out_small is not None and (out_small.shape[1], out_small.shape[0]) != expected_size:
429
+ processed = cv2.resize(out_small, expected_size)
 
 
 
 
 
 
430
  else:
431
+ processed = out_small if out_small is not None else img
432
+ self._queue_wait_last_ms = (time.time() - t_q_start) * 1000.0 # inclusive (no wait, pure proc)
433
+ self._queue_wait_hist.append(self._queue_wait_last_ms)
434
+ if len(self._queue_wait_hist) > 300:
435
+ self._queue_wait_hist.pop(0)
436
+ self._frames_processed += 1
437
  except Exception as ex:
438
+ logger.debug(f"inline processing error: {ex}")
439
+ processed = img
440
+ else:
441
+ # Background path
442
+ if self._processing_task is None:
443
+ async def _process_async(inp: np.ndarray, expected_size: tuple[int,int], fid: int, enqueue_t: float):
444
+ try:
445
+ out_small = self.pipeline.process_video_frame(inp, fid)
446
+ out = out_small
447
+ if out_small is not None and (out_small.shape[1], out_small.shape[0]) != expected_size:
448
+ out = cv2.resize(out_small, expected_size)
449
+ elif out is None:
450
+ out = inp # fallback
451
+ async with self._lock:
452
+ self._last_processed = out
453
+ q_wait = (time.time() - enqueue_t) * 1000.0
454
+ self._queue_wait_last_ms = q_wait
455
+ self._queue_wait_hist.append(q_wait)
456
+ if len(self._queue_wait_hist) > 300:
457
+ self._queue_wait_hist.pop(0)
458
+ self._frames_processed += 1
459
+ except Exception as ex:
460
+ logger.debug(f"video processing error(bg): {ex}")
461
+ finally:
462
+ self._processing_task = None
463
+ self._processing_task = asyncio.create_task(_process_async(proc_input, expected_size, self.frame_id, time.time()))
464
+ # Use last processed snapshot; count passthrough if not yet available
465
+ async with self._lock:
466
+ if self._last_processed is not None:
467
+ processed = self._last_processed
468
+ else:
469
+ processed = img
470
+ self._frames_passthrough += 1
471
+ # We'll consider this frame 'dropped' re: processing freshness if a task already running
472
+ if self._processing_task is not None:
473
+ self._frames_dropped += 1
474
+
475
+ # Metrics update
476
+ proc_latency_ms = (time.time() - capture_t) * 1000.0
477
+ self._last_latency_ms = proc_latency_ms
478
+ self._lat_hist.append(proc_latency_ms)
479
+ if len(self._lat_hist) > 300:
480
+ self._lat_hist.pop(0)
481
+ self._avg_latency_ms = float(np.mean(self._lat_hist)) if self._lat_hist else None
482
+
483
+ # Placeholder becomes inactive as soon as we emit a frame post-first capture
484
+ if self._placeholder_active:
485
+ self._placeholder_active = False
486
+
487
+ # Timestamp handling: derive pts from capture time relative to origin on a 90kHz clock
488
  try:
489
+ clock_rate = 90000
490
+ rel_sec = capture_t - (self._pts_origin or capture_t)
491
+ pts = int(rel_sec * clock_rate)
492
+ # Guard against monotonic regressions
493
+ if self._last_sent_pts is not None and pts <= self._last_sent_pts:
494
+ pts = self._last_sent_pts + int(clock_rate / 30) # assume ~30fps minimal increment
495
+ self._last_sent_pts = pts
496
  except Exception:
497
+ pts = frame.pts if frame.pts is not None else 0
498
+
499
+ import av as _av
500
+ vframe = _av.VideoFrame.from_ndarray(processed, format="bgr24")
501
+ vframe.pts = pts
502
+ vframe.time_base = _av.time_base.TimeBase(num=1, den=90000) if hasattr(_av, 'time_base') else frame.time_base
503
+ if (self.frame_id % 120) == 0:
504
+ logger.debug(
505
+ f"vid frame={self.frame_id} inline={self._sync_if_idle and self._processing_task is None} "
506
+ f"proc_ms={proc_latency_ms:.1f} avg_ms={self._avg_latency_ms:.1f if self._avg_latency_ms else None} "
507
+ f"queue_wait_last={self._queue_wait_last_ms} passthrough={self._frames_passthrough} dropped={self._frames_dropped}"
508
+ )
509
  return vframe
510
 
511
 
 
935
  except Exception as e:
936
  return {"active": False, "error": str(e)}
937
 
938
+ @router.get("/pipeline_stats")
939
+ async def pipeline_stats():
940
+ """Return merged swap pipeline stats and live video track latency metrics."""
941
+ try:
942
+ pipeline = get_pipeline()
943
+ base_stats = pipeline.get_performance_stats() if getattr(pipeline, 'loaded', False) else {}
944
+ # Attempt to locate the active IncomingVideoTrack via peer senders
945
+ track_stats = {}
946
+ try:
947
+ st = _peer_state
948
+ if st is not None:
949
+ pc = st.pc
950
+ for sender in pc.getSenders():
951
+ tr = getattr(sender, 'track', None)
952
+ if tr and isinstance(tr, MediaStreamTrack) and getattr(tr, 'kind', None) == 'video':
953
+ # Heuristic: if it has our added attributes
954
+ for attr in [
955
+ '_last_latency_ms','_avg_latency_ms','_queue_wait_last_ms','_frames_passthrough',
956
+ '_frames_processed','_frames_dropped','_placeholder_active'
957
+ ]:
958
+ if hasattr(tr, attr):
959
+ track_stats[attr.lstrip('_')] = getattr(tr, attr)
960
+ break
961
+ except Exception as e:
962
+ track_stats['error'] = f"track_stats: {e}"
963
+ return {"pipeline": base_stats, "video_track": track_stats}
964
+ except Exception as e:
965
+ return {"error": str(e)}
966
+
967
  # Optional: connection monitoring endpoint for diagnostics
968
  if add_connection_monitoring is not None:
969
  try: