Authenticity / speech_recognizer.py
ranamhamoud's picture
Upload speech_recognizer.py with huggingface_hub
fce084f unverified
raw
history blame
19.3 kB
import whisper
import torch
import numpy as np
import re
from typing import Dict, Optional, List
import warnings
import librosa
warnings.filterwarnings("ignore")
class SpeechRecognizer:
def __init__(self, model_size: str = "base", device: str = None):
if device is None:
self.device = "cuda" if torch.cuda.is_available() else "cpu"
else:
self.device = device
print(f"Loading Whisper {model_size} model on {self.device}...")
self.model = whisper.load_model(model_size, device=self.device)
print(f"Whisper model loaded successfully.")
self.model_size = model_size
def _validate_audio(self, audio_path: str) -> tuple[bool, str, float]:
"""Validate audio file before transcription."""
try:
# Load audio to check if it's valid
audio, sr = librosa.load(audio_path, sr=16000)
duration = len(audio) / sr
# Check if audio is too short
if duration < 0.1:
return False, "Audio is too short (< 0.1 seconds)", duration
# Check if audio is empty or silent
if np.max(np.abs(audio)) < 0.001:
return False, "Audio appears to be silent or empty", duration
return True, "Valid", duration
except Exception as e:
return False, f"Failed to load audio: {str(e)}", 0.0
def transcribe(
self,
audio_path: str,
language: Optional[str] = None,
task: str = "transcribe"
) -> Dict[str, any]:
# Validate audio first
is_valid, message, audio_duration = self._validate_audio(audio_path)
if not is_valid:
print(f"Audio validation failed: {message}")
# Return minimal valid response for invalid audio
return self._get_empty_response(message, audio_duration)
try:
result = self.model.transcribe(
audio_path,
language=language,
task=task,
verbose=False,
word_timestamps=True,
fp16=False # Disable fp16 to avoid KV cache KeyError
)
except (KeyError, RuntimeError) as e:
error_msg = str(e)
# Check if it's a tensor shape error (empty audio issue)
if "reshape tensor of 0 elements" in error_msg or "ambiguous" in error_msg:
print(f"Audio processing failed: Audio may be too short or corrupted")
return self._get_empty_response("Audio too short or corrupted", audio_duration)
# Fallback: transcribe without word timestamps for other errors
print(f"Warning: Transcription failed ({error_msg[:100]}), retrying without word timestamps...")
try:
result = self.model.transcribe(
audio_path,
language=language,
task=task,
verbose=False,
word_timestamps=False,
fp16=False
)
except Exception as e2:
print(f"Transcription completely failed: {e2}")
return self._get_empty_response(f"Transcription failed: {str(e2)[:100]}", audio_duration)
transcription = result['text'].strip()
detected_language = result.get('language', 'unknown')
segments = result.get('segments', [])
# Handle empty transcription
if not transcription or len(transcription.strip()) == 0:
print("Warning: Transcription is empty")
return self._get_empty_response("No speech detected in audio", audio_duration)
analysis = self._analyze_transcription(transcription, segments)
duration = analysis['duration'] if analysis['duration'] > 0 else 1.0
kopparapu_features = self._extract_kopparapu_features(
transcription, duration, segments, analysis['pause_patterns']
)
kopparapu_score = self._calculate_kopparapu_score(kopparapu_features)
return {
'transcription': transcription,
'language': detected_language,
'segments': segments,
'word_count': analysis['word_count'],
'duration': analysis['duration'],
'speech_rate': analysis['speech_rate'],
'pause_patterns': analysis['pause_patterns'],
'filler_words': analysis['filler_words'],
'kopparapu_features': kopparapu_features,
'kopparapu_score': kopparapu_score,
'kopparapu_classification': 'read' if kopparapu_score >= 0.5 else 'spontaneous',
'interpretation': self._interpret_speech_patterns(analysis, kopparapu_features, kopparapu_score)
}
def _get_empty_response(self, reason: str, duration: float = 0.0) -> Dict[str, any]:
"""Return a valid empty response when transcription fails."""
return {
'transcription': f"[Error: {reason}]",
'language': 'unknown',
'segments': [],
'word_count': 0,
'duration': duration,
'speech_rate': 0.0,
'pause_patterns': {
'avg_pause': 0.0,
'max_pause': 0.0,
'num_pauses': 0,
'pause_variability': 0.0
},
'filler_words': {
'count': 0,
'ratio': 0.0,
'details': {}
},
'kopparapu_features': {
'chars_per_word': 0.0,
'words_per_sec': 0.0,
'nonalpha_per_sec': 0.0,
'filler_rate': 0.0,
'repetition_count': 0,
'alpha_ratio': 0.0
},
'kopparapu_score': 0.5,
'kopparapu_classification': 'unknown',
'interpretation': f"⚠️ Audio processing failed: {reason}\n\nPlease ensure:\n- Audio is at least 1 second long\n- Audio contains actual speech\n- Audio file is not corrupted"
}
def _analyze_transcription(self, text: str, segments: List[Dict]) -> Dict:
words = text.split()
word_count = len(words)
duration = 0
if segments:
duration = segments[-1]['end'] - segments[0]['start']
speech_rate = (word_count / duration * 60) if duration > 0 else 0
filler_words_list = [
('um', r'\bum\b'), ('uh', r'\buh\b'), ('er', r'\ber\b'),
('ah', r'\bah\b'), ('like', r'\blike\b'), ('you know', r'\byou know\b'),
('i mean', r'\bi mean\b'), ('actually', r'\bactually\b'),
('basically', r'\bbasically\b'), ('literally', r'\bliterally\b'),
('so', r'\bso\b'), ('well', r'\bwell\b'), ('okay', r'\bokay\b'),
('hmm', r'\bhmm+\b'), ('mm', r'\bmm+\b')
]
text_lower = text.lower()
filler_count = {}
total_fillers = 0
for filler_name, filler_pattern in filler_words_list:
matches = re.findall(filler_pattern, text_lower, re.IGNORECASE)
count = len(matches)
if count > 0:
filler_count[filler_name] = count
total_fillers += count
filler_ratio = total_fillers / word_count if word_count > 0 else 0
pause_patterns = self._analyze_pauses(segments)
return {
'word_count': word_count,
'duration': duration,
'speech_rate': speech_rate,
'filler_words': {
'count': total_fillers,
'ratio': filler_ratio,
'details': filler_count
},
'pause_patterns': pause_patterns
}
def _analyze_pauses(self, segments: List[Dict]) -> Dict:
pauses = []
if len(segments) >= 2:
for i in range(len(segments) - 1):
pause = segments[i + 1]['start'] - segments[i]['end']
if pause > 0.05: # Consider pauses > 50ms (lowered threshold)
pauses.append(pause)
for segment in segments:
if 'words' in segment and len(segment['words']) > 1:
words = segment['words']
for i in range(len(words) - 1):
if 'start' in words[i] and 'end' in words[i] and 'start' in words[i+1]:
pause = words[i + 1]['start'] - words[i]['end']
if pause > 0.15: # Word-level pauses (>150ms significant)
pauses.append(pause)
if not pauses:
return {
'avg_pause': 0.0,
'max_pause': 0.0,
'num_pauses': 0,
'pause_variability': 0.0
}
return {
'avg_pause': float(np.mean(pauses)),
'max_pause': float(np.max(pauses)),
'num_pauses': len(pauses),
'pause_variability': float(np.std(pauses)) if len(pauses) > 1 else 0.0
}
def _extract_kopparapu_features(
self, text: str, duration_sec: float,
segments: List[Dict] = None, pause_patterns: Dict = None
) -> Dict:
text = text.strip()
if len(text) == 0:
return {
'alpha_ratio': 0.0,
'chars_per_word': 0.0,
'words_per_sec': 0.0,
'nonalpha_per_sec': 0.0,
'repetition_count': 0,
'filler_rate': 0.0,
'pause_regularity': 0.5,
'speech_rate_variability': 0.0,
'sentence_length_variance': 0.0,
'self_correction_count': 0
}
total_chars = len(text)
alpha_chars = sum(c.isalpha() for c in text)
nonalpha_chars = total_chars - alpha_chars
alpha_ratio = alpha_chars / total_chars if total_chars > 0 else 0
words = text.split()
num_words = max(len(words), 1)
chars_per_word = alpha_chars / num_words
duration_sec = max(duration_sec, 1e-3)
words_per_sec = num_words / duration_sec
nonalpha_per_sec = nonalpha_chars / duration_sec
# Character repetitions (e.g., "sooo", "ummmm")
char_reps = len(re.findall(r'(.)\1{2,}', text))
# Word repetitions (e.g., "I I think", "the the")
words_list = text.lower().split()
word_reps = 0
for i in range(len(words_list) - 1):
if words_list[i] == words_list[i + 1] and len(words_list[i]) > 2:
word_reps += 1
repetition_count = char_reps + word_reps
# Filler words detection
lower = text.lower()
filler_patterns = [
r'\bum\b', r'\buh\b', r'\buhm\b', r'\ber\b', r'\bah\b',
r'\blike\b', r'\byou know\b', r'\bi mean\b',
r'\bactually\b', r'\bbasically\b', r'\bliterally\b',
r'\bso\b', r'\bwell\b', r'\bokay\b',
r'\bhmm+\b', r'\bmm+\b', r'\boh\b'
]
filler_count = 0
for pattern in filler_patterns:
filler_count += len(re.findall(pattern, lower))
filler_rate = filler_count / num_words
# NEW: Pause regularity - read speech has regular pauses at punctuation
# Low variability = regular pauses = likely read
pause_regularity = 0.5 # neutral default
if pause_patterns and pause_patterns.get('num_pauses', 0) > 2:
pause_var = pause_patterns.get('pause_variability', 0.5)
# Normalize: low variability (< 0.2) -> high regularity (close to 1)
# High variability (> 0.6) -> low regularity (close to 0)
pause_regularity = max(0.0, min(1.0, 1.0 - (pause_var / 0.6)))
# NEW: Speech rate variability across segments
# Read speech has consistent pacing; spontaneous varies with thinking
speech_rate_variability = self._compute_rate_variability(segments) if segments else 0.0
# NEW: Sentence length variance - read text has more uniform structure
sentence_length_variance = self._compute_sentence_variance(text)
# NEW: Self-corrections and false starts (spontaneous speech markers)
self_correction_patterns = [
r'\bwait\b', r'\bsorry\b', r'\bno\s*,?\s*I\b',
r'\bactually\s*,?\s*no\b', r'\blet me\b', r'\bwhat I meant\b',
r'\bI meant\b', r'\bhold on\b', r'\bwhat was I\b', r'\bor rather\b'
]
self_correction_count = 0
for pattern in self_correction_patterns:
self_correction_count += len(re.findall(pattern, lower))
return {
'alpha_ratio': float(alpha_ratio),
'chars_per_word': float(chars_per_word),
'words_per_sec': float(words_per_sec),
'nonalpha_per_sec': float(nonalpha_per_sec),
'repetition_count': int(repetition_count),
'filler_rate': float(filler_rate),
'pause_regularity': float(pause_regularity),
'speech_rate_variability': float(speech_rate_variability),
'sentence_length_variance': float(sentence_length_variance),
'self_correction_count': int(self_correction_count)
}
def _compute_rate_variability(self, segments: List[Dict]) -> float:
if not segments or len(segments) < 3:
return 0.0
segment_rates = []
for seg in segments:
duration = seg.get('end', 0) - seg.get('start', 0)
if duration > 0.3: # Only consider segments > 300ms
words_in_seg = len(seg.get('text', '').split())
rate = words_in_seg / duration
if rate > 0:
segment_rates.append(rate)
if len(segment_rates) < 3:
return 0.0
mean_rate = np.mean(segment_rates)
std_rate = np.std(segment_rates)
# Coefficient of variation normalized to 0-1
cv = std_rate / mean_rate if mean_rate > 0 else 0
return float(min(1.0, cv / 0.5)) # CV of 0.5+ maps to 1.0
def _compute_sentence_variance(self, text: str) -> float:
# Split into sentences
sentences = re.split(r'[.!?]+', text)
sentences = [s.strip() for s in sentences if s.strip()]
if len(sentences) < 2:
return 0.0
lengths = [len(s.split()) for s in sentences]
mean_len = np.mean(lengths)
std_len = np.std(lengths)
# Coefficient of variation normalized
cv = std_len / mean_len if mean_len > 0 else 0
return float(min(1.0, cv / 0.6)) # CV of 0.6+ maps to 1.0
def _logistic(self, x: float, a: float, b: float) -> float: return 1.0 / (1.0 + np.exp(-(x - a) / b))
def _calculate_kopparapu_score(self, features: Dict) -> float:
# L1: Vocabulary complexity - higher chars/word = more formal = read
f1 = features['chars_per_word']
L1 = self._logistic(f1, a=4.8, b=1.2)
# L2: Speaking rate - faster, steadier = read
f2 = features['words_per_sec']
L2 = self._logistic(f2, a=2.2, b=0.6)
# L3: Disfluency signal (inverted) - less disfluency = more read
# Combines filler rate, nonalpha, and repetitions
disfluency = (
features['nonalpha_per_sec'] +
8.0 * features['filler_rate'] +
0.5 * features['repetition_count']
)
L3 = self._logistic(-disfluency, a=0.0, b=0.8)
# L4: Pause regularity - regular pauses = read (already 0-1)
L4 = features.get('pause_regularity', 0.5)
# L5: Rate variability (inverted) - low variability = read
rate_var = features.get('speech_rate_variability', 0.0)
L5 = 1.0 - rate_var
# L6: Sentence variance (inverted) - uniform sentences = read
sent_var = features.get('sentence_length_variance', 0.0)
L6 = 1.0 - sent_var
# L7: Self-corrections (inverted) - more corrections = spontaneous
corrections = features.get('self_correction_count', 0)
L7 = self._logistic(-corrections, a=0.0, b=1.5)
# Weighted combination optimized for read detection
# Higher weights on pause regularity and rate consistency (key read markers)
score = (
0.15 * L1 + # Vocabulary complexity
0.15 * L2 + # Speaking rate
0.15 * L3 + # Disfluency (filler/repetition)
0.20 * L4 + # Pause regularity (strong read signal)
0.15 * L5 + # Rate variability
0.10 * L6 + # Sentence uniformity
0.10 * L7 # Self-corrections
)
return float(score)
def _interpret_speech_patterns(self, analysis: Dict, kopparapu_features: Dict = None, kopparapu_score: float = None) -> str:
filler_ratio = analysis['filler_words']['ratio']
pause_patterns = analysis['pause_patterns']
speech_rate = analysis['speech_rate']
interpretation = "**Overall Assessment:**\n\n"
spontaneity_score = 0
indicators = []
if filler_ratio > 0.03:
spontaneity_score += 1
indicators.append(f"Filler words present ({filler_ratio*100:.1f}%)")
if pause_patterns['pause_variability'] > 0.5:
spontaneity_score += 1
indicators.append(f"Irregular pause patterns (variability: {pause_patterns['pause_variability']:.2f})")
if 120 <= speech_rate <= 180:
spontaneity_score += 1
indicators.append(f"Natural speech rate ({speech_rate:.1f} words/min)")
if spontaneity_score >= 2:
interpretation += "✓ **Speech patterns suggest spontaneous, natural speaking.**\n\n"
if indicators:
interpretation += "Key indicators:\n"
for indicator in indicators:
interpretation += f"- {indicator}\n"
else:
interpretation += "⚠ **Speech patterns suggest potentially scripted or read speech.**\n\n"
if filler_ratio < 0.02:
interpretation += "- Very low filler word usage\n"
if pause_patterns['pause_variability'] < 0.3:
interpretation += "- Regular, consistent pause patterns\n"
if speech_rate > 180:
interpretation += "- Fast, steady speaking rate\n"
return interpretation
def get_detailed_segments(self, audio_path: str) -> List[Dict]:
result = self.model.transcribe(audio_path, word_timestamps=True, verbose=False)
return result.get('segments', [])
if __name__ == "__main__":
recognizer = SpeechRecognizer(model_size="base")
print(f"Speech recognizer initialized with {recognizer.model_size} model")
print(f"Device: {recognizer.device}")