Deepfake Authenticator commited on
Commit
8d12a34
Β·
1 Parent(s): feec9df

Phase 5: Async chunk-streaming pipeline with early exit - reduces RAM and enables fast termination

Browse files
Files changed (1) hide show
  1. backend/detector.py +163 -26
backend/detector.py CHANGED
@@ -556,6 +556,69 @@ class DecisionAgent:
556
  "face_coverage": round(face_coverage, 3),
557
  }
558
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
559
 
560
  # ─────────────────────────────────────────────
561
  # Agent 4: Report Generator Agent
@@ -758,11 +821,16 @@ class DeepfakeAuthenticator:
758
  # ── Step 1: Metadata (instant) ────────────────────────────────────
759
  metadata_result = self.metadata_agent.analyze(video_path)
760
 
761
- # ── Step 2: Extract frames ────────────────────────────────────────
762
- metadata = self.frame_agent.get_video_metadata(video_path)
763
- frames = self.frame_agent.extract_frames(video_path, fast_mode=fast_mode)
764
 
765
- if not frames:
 
 
 
 
 
 
766
  return {
767
  "result": "ERROR", "confidence": 0,
768
  "details": ["Could not extract frames from video"],
@@ -770,31 +838,100 @@ class DeepfakeAuthenticator:
770
  "audio": {"available": False, "result": "NO_AUDIO", "confidence": 0, "details": []},
771
  }
772
 
773
- # ── Step 3: Face detection + audio in parallel ────────────────────
774
  audio_result = {"available": False, "result": "NO_AUDIO", "confidence": 0, "details": []}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
775
 
776
- with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
777
- face_future = executor.submit(self.face_agent.detect_all_frames, frames)
778
- audio_agent = self._get_audio()
779
- audio_future = None
780
- if audio_agent:
781
- audio_future = executor.submit(audio_agent.analyze, video_path, 0.5)
782
-
783
- face_crops_per_frame = face_future.result()
784
-
785
- if audio_future:
786
- try:
787
- # 20s hard timeout β€” never block the pipeline for audio
788
- audio_result = audio_future.result(timeout=20)
789
- except concurrent.futures.TimeoutError:
790
- logger.warning("Audio analysis timed out after 20s β€” skipping")
791
- except Exception as e:
792
- logger.warning(f"Audio analysis failed: {e}")
793
-
794
- # ── Step 4: Visual decision ───────────────────────────────────────
795
- analysis = self.decision_agent.analyze_frames(frames, face_crops_per_frame)
796
 
797
- # ── Step 5: Report ────────────────────────────────────────────────
798
  report = self.report_agent.generate(
799
  analysis, metadata, audio_result,
800
  metadata_result=metadata_result,
 
556
  "face_coverage": round(face_coverage, 3),
557
  }
558
 
559
+ def analyze_chunk_streaming(self, chunk_frames: list[np.ndarray],
560
+ face_crops_per_frame: list[list[np.ndarray]],
561
+ chunk_idx: int) -> dict:
562
+ """
563
+ Phase 5: Analyze a single chunk and return results for early exit decision.
564
+ Returns chunk-level statistics that can be used to decide whether to continue.
565
+ """
566
+ indexed_crops = []
567
+ total_faces = sum(len(c) for c in face_crops_per_frame)
568
+
569
+ if total_faces < 2:
570
+ # Use full frames if no faces
571
+ for i, frame in enumerate(chunk_frames):
572
+ crop = cv2.resize(frame, (224, 224))
573
+ if self._is_quality_crop(crop):
574
+ indexed_crops.append((i, crop))
575
+ else:
576
+ for i, crops in enumerate(face_crops_per_frame):
577
+ for crop in crops:
578
+ if self._is_quality_crop(crop):
579
+ indexed_crops.append((i, crop))
580
+
581
+ if not indexed_crops:
582
+ return {
583
+ "chunk_idx": chunk_idx,
584
+ "frame_scores": [],
585
+ "chunk_mean": 0.40,
586
+ "frames_analyzed": len(chunk_frames),
587
+ "frames_with_faces": 0,
588
+ }
589
+
590
+ # Run inference on this chunk's crops
591
+ crops_only = [c for _, c in indexed_crops]
592
+ if self.use_hf_model:
593
+ try:
594
+ all_scores = self._batch_predict(crops_only)
595
+ except Exception as e:
596
+ logger.warning(f"Chunk {chunk_idx} inference failed: {e}")
597
+ all_scores = [self._heuristic_predict(c) for c in crops_only]
598
+ else:
599
+ all_scores = [self._heuristic_predict(c) for c in crops_only]
600
+
601
+ # Aggregate scores per frame
602
+ frame_score_map: dict[int, list[float]] = {}
603
+ for (frame_idx, _), score in zip(indexed_crops, all_scores):
604
+ frame_score_map.setdefault(frame_idx, []).append(score)
605
+
606
+ frame_scores = [
607
+ {"frame_index": fi, "fake_probability": round(float(np.mean(sc)), 4)}
608
+ for fi, sc in sorted(frame_score_map.items())
609
+ ]
610
+
611
+ probs = [s["fake_probability"] for s in frame_scores]
612
+ chunk_mean = float(np.mean(probs)) if probs else 0.40
613
+
614
+ return {
615
+ "chunk_idx": chunk_idx,
616
+ "frame_scores": frame_scores,
617
+ "chunk_mean": round(chunk_mean, 4),
618
+ "frames_analyzed": len(chunk_frames),
619
+ "frames_with_faces": len(frame_score_map),
620
+ }
621
+
622
 
623
  # ─────────────────────────────────────────────
624
  # Agent 4: Report Generator Agent
 
821
  # ── Step 1: Metadata (instant) ────────────────────────────────────
822
  metadata_result = self.metadata_agent.analyze(video_path)
823
 
824
+ # ── Step 2: Get video metadata ────────────────────────────────────
825
+ metadata = self.frame_agent.get_video_metadata(video_path)
 
826
 
827
+ # ── Step 3: Chunk-streaming pipeline with early exit ──────────────
828
+ logger.info("Phase 5: Starting chunk-streaming pipeline")
829
+
830
+ # Extract frames grouped by chunks
831
+ chunks = self.frame_agent.extract_frames_chunked(video_path, fast_mode=fast_mode)
832
+
833
+ if not chunks or all(len(c) == 0 for c in chunks):
834
  return {
835
  "result": "ERROR", "confidence": 0,
836
  "details": ["Could not extract frames from video"],
 
838
  "audio": {"available": False, "result": "NO_AUDIO", "confidence": 0, "details": []},
839
  }
840
 
841
+ # Start audio analysis in parallel (non-blocking)
842
  audio_result = {"available": False, "result": "NO_AUDIO", "confidence": 0, "details": []}
843
+ audio_future = None
844
+ audio_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
845
+ audio_agent = self._get_audio()
846
+ if audio_agent:
847
+ audio_future = audio_executor.submit(audio_agent.analyze, video_path, 0.5)
848
+
849
+ # Process chunks one by one with early exit
850
+ all_chunk_results = []
851
+ all_frame_scores = []
852
+ total_frames_analyzed = 0
853
+ total_frames_with_faces = 0
854
+ early_exit = False
855
+
856
+ for chunk_idx, chunk_frames in enumerate(chunks):
857
+ if not chunk_frames:
858
+ continue
859
+
860
+ logger.info(f"Processing chunk {chunk_idx + 1}/{len(chunks)} ({len(chunk_frames)} frames)")
861
+
862
+ # Face detection for this chunk
863
+ face_crops_per_frame = self.face_agent.detect_all_frames(chunk_frames)
864
+
865
+ # Inference for this chunk
866
+ chunk_result = self.decision_agent.analyze_chunk_streaming(
867
+ chunk_frames, face_crops_per_frame, chunk_idx
868
+ )
869
+
870
+ all_chunk_results.append(chunk_result)
871
+ all_frame_scores.extend(chunk_result["frame_scores"])
872
+ total_frames_analyzed += chunk_result["frames_analyzed"]
873
+ total_frames_with_faces += chunk_result["frames_with_faces"]
874
+
875
+ # Early exit logic: if we have enough data and strong signal
876
+ if chunk_idx >= 2: # Need at least 3 chunks for reliable decision
877
+ chunk_means = [r["chunk_mean"] for r in all_chunk_results]
878
+ overall_mean = float(np.mean(chunk_means))
879
+ consistency = sum(1 for m in chunk_means if m > 0.55) / len(chunk_means)
880
+
881
+ # Strong fake signal β†’ exit early
882
+ if overall_mean > 0.75 and consistency > 0.66:
883
+ logger.info(f"Early exit: Strong FAKE signal (mean={overall_mean:.3f}, consistency={consistency:.2f})")
884
+ early_exit = True
885
+ break
886
+
887
+ # Strong real signal β†’ exit early
888
+ if overall_mean < 0.35 and consistency > 0.66:
889
+ logger.info(f"Early exit: Strong REAL signal (mean={overall_mean:.3f}, consistency={consistency:.2f})")
890
+ early_exit = True
891
+ break
892
+
893
+ # Aggregate results from all processed chunks
894
+ if not all_frame_scores:
895
+ overall_prob = 0.40
896
+ consistency = 0.0
897
+ else:
898
+ probs = [s["fake_probability"] for s in all_frame_scores]
899
+ if len(probs) < 3:
900
+ overall_prob = float(np.mean(probs)) * 0.80
901
+ else:
902
+ overall_prob = float(np.mean(probs)) * 0.65 + float(np.median(probs)) * 0.35
903
+ overall_prob = float(np.clip(overall_prob, 0.0, 1.0))
904
+ consistency = sum(1 for p in probs if p > 0.50) / len(probs)
905
+
906
+ face_coverage = total_frames_with_faces / max(total_frames_analyzed, 1)
907
+
908
+ analysis = {
909
+ "frame_scores": all_frame_scores,
910
+ "overall_fake_probability": round(overall_prob, 4),
911
+ "frames_analyzed": total_frames_analyzed,
912
+ "frames_with_faces": total_frames_with_faces,
913
+ "consistency": round(consistency, 3),
914
+ "face_coverage": round(face_coverage, 3),
915
+ "early_exit": early_exit,
916
+ "chunks_processed": len(all_chunk_results),
917
+ "chunks_total": len(chunks),
918
+ }
919
+
920
+ logger.info(f"Chunk streaming: processed {len(all_chunk_results)}/{len(chunks)} chunks, "
921
+ f"early_exit={early_exit}")
922
 
923
+ # Wait for audio (with timeout)
924
+ if audio_future:
925
+ try:
926
+ audio_result = audio_future.result(timeout=20)
927
+ except concurrent.futures.TimeoutError:
928
+ logger.warning("Audio analysis timed out after 20s")
929
+ except Exception as e:
930
+ logger.warning(f"Audio analysis failed: {e}")
931
+ finally:
932
+ audio_executor.shutdown(wait=False)
 
 
 
 
 
 
 
 
 
 
933
 
934
+ # ── Step 4: Generate report ───────────────────────────────────────
935
  report = self.report_agent.generate(
936
  analysis, metadata, audio_result,
937
  metadata_result=metadata_result,