sop-audio-analyzer / src /fraud_detection /pause_detector.py
daasime's picture
Add SOP Audio Analyzer app files
ebba35f
"""
Suspicious Pause Detector
Detects abnormally long silences that may indicate the speaker is looking up
answers or receiving help during a test.
"""
import numpy as np
from dataclasses import dataclass, field
from typing import List, Optional
@dataclass
class SuspiciousPause:
"""A detected suspicious pause."""
start: float
end: float
duration: float
context: str = "" # What happened before/after
@dataclass
class PauseResult:
"""Result of suspicious pause detection."""
detected: bool
pauses: List[SuspiciousPause] = field(default_factory=list)
total_suspicious_time: float = 0.0
longest_pause: float = 0.0
@property
def count(self) -> int:
return len(self.pauses)
class SuspiciousPauseDetector:
"""
Detects suspicious long pauses in speech.
In natural conversation, pauses are typically:
- Short (< 2 seconds) for thinking
- Medium (2-4 seconds) for complex thoughts
Suspicious pauses (> 5 seconds) may indicate:
- Looking up answers
- Receiving external help
- Reading from a source
"""
def __init__(self,
min_suspicious_duration: float = 5.0,
warning_duration: float = 3.0,
max_natural_pause: float = 2.0):
"""
Args:
min_suspicious_duration: Minimum pause duration to flag as suspicious
warning_duration: Duration to flag as a warning (not fully suspicious)
max_natural_pause: Maximum duration for a natural pause
"""
self.min_suspicious_duration = min_suspicious_duration
self.warning_duration = warning_duration
self.max_natural_pause = max_natural_pause
def detect(self, speech_segments: List[dict],
total_duration: float,
transcription_segments: List[dict] = None) -> PauseResult:
"""
Detect suspicious pauses between speech segments.
Args:
speech_segments: List of {'start': float, 'end': float} for speech
total_duration: Total audio duration in seconds
transcription_segments: Optional transcription with timestamps for context
Returns:
PauseResult with detected suspicious pauses
"""
if not speech_segments:
return PauseResult(detected=False)
# Sort segments by start time
sorted_segments = sorted(speech_segments, key=lambda s: s.get('start', 0))
suspicious_pauses = []
# Check pause at the beginning
first_start = sorted_segments[0].get('start', 0)
if first_start >= self.min_suspicious_duration:
context = self._get_context(0, first_start, transcription_segments, "start")
suspicious_pauses.append(SuspiciousPause(
start=0,
end=first_start,
duration=round(first_start, 2),
context=context
))
# Check pauses between segments
for i in range(1, len(sorted_segments)):
prev_end = sorted_segments[i-1].get('end', 0)
curr_start = sorted_segments[i].get('start', 0)
gap = curr_start - prev_end
if gap >= self.min_suspicious_duration:
context = self._get_context(prev_end, curr_start, transcription_segments, "middle")
suspicious_pauses.append(SuspiciousPause(
start=round(prev_end, 2),
end=round(curr_start, 2),
duration=round(gap, 2),
context=context
))
# Check pause at the end
last_end = sorted_segments[-1].get('end', 0)
end_gap = total_duration - last_end
if end_gap >= self.min_suspicious_duration:
context = self._get_context(last_end, total_duration, transcription_segments, "end")
suspicious_pauses.append(SuspiciousPause(
start=round(last_end, 2),
end=round(total_duration, 2),
duration=round(end_gap, 2),
context=context
))
# Calculate summary statistics
total_suspicious_time = sum(p.duration for p in suspicious_pauses)
longest_pause = max((p.duration for p in suspicious_pauses), default=0)
return PauseResult(
detected=len(suspicious_pauses) > 0,
pauses=suspicious_pauses,
total_suspicious_time=round(total_suspicious_time, 2),
longest_pause=round(longest_pause, 2)
)
def detect_from_vad(self, vad_result: dict, total_duration: float) -> PauseResult:
"""
Detect suspicious pauses using VAD output.
Args:
vad_result: VAD result with 'segments' list
total_duration: Total audio duration
Returns:
PauseResult with detected suspicious pauses
"""
segments = vad_result.get('segments', [])
return self.detect(segments, total_duration)
def _get_context(self, start: float, end: float,
transcription_segments: List[dict],
position: str) -> str:
"""
Get context about what happened before/after the pause.
"""
if not transcription_segments:
if position == "start":
return "Long silence at audio start"
elif position == "end":
return "Long silence at audio end"
else:
return "Long silence mid-conversation"
# Find text before and after the pause
text_before = ""
text_after = ""
for seg in transcription_segments:
seg_end = seg.get('end', 0)
seg_start = seg.get('start', 0)
seg_text = seg.get('text', '').strip()
# Text ending just before pause
if seg_end <= start + 0.5 and seg_end >= start - 1.0:
text_before = seg_text[-50:] if len(seg_text) > 50 else seg_text
# Text starting just after pause
if seg_start >= end - 0.5 and seg_start <= end + 1.0:
text_after = seg_text[:50] if len(seg_text) > 50 else seg_text
if text_before and text_after:
return f"After: '{text_before}...' | Before: '...{text_after}'"
elif text_before:
return f"After: '{text_before}...'"
elif text_after:
return f"Before: '...{text_after}'"
else:
return f"Silence at {position} of audio"
def analyze_pause_pattern(self, speech_segments: List[dict],
total_duration: float) -> dict:
"""
Analyze the overall pause pattern in the audio.
Returns statistics about pause behavior.
"""
if not speech_segments or len(speech_segments) < 2:
return {
'avg_pause': 0,
'max_pause': 0,
'pause_count': 0,
'speech_ratio': 0
}
sorted_segments = sorted(speech_segments, key=lambda s: s.get('start', 0))
pauses = []
for i in range(1, len(sorted_segments)):
prev_end = sorted_segments[i-1].get('end', 0)
curr_start = sorted_segments[i].get('start', 0)
gap = curr_start - prev_end
if gap > 0.1: # Ignore very small gaps
pauses.append(gap)
if not pauses:
return {
'avg_pause': 0,
'max_pause': 0,
'pause_count': 0,
'speech_ratio': 1.0
}
# Calculate speech time
speech_time = sum(
seg.get('end', 0) - seg.get('start', 0)
for seg in sorted_segments
)
return {
'avg_pause': round(np.mean(pauses), 2),
'max_pause': round(max(pauses), 2),
'pause_count': len(pauses),
'speech_ratio': round(speech_time / total_duration, 2) if total_duration > 0 else 0,
'natural_pauses': sum(1 for p in pauses if p <= self.max_natural_pause),
'warning_pauses': sum(1 for p in pauses if self.max_natural_pause < p < self.min_suspicious_duration),
'suspicious_pauses': sum(1 for p in pauses if p >= self.min_suspicious_duration)
}