Spaces:
Sleeping
Sleeping
| import whisper | |
| import torch | |
| import numpy as np | |
| import re | |
| import warnings | |
| import librosa | |
| warnings.filterwarnings("ignore") | |
| # Main class for speech recognition and analysis | |
| class SpeechRecognizer: | |
| def __init__(self, model_size="base", device=None): | |
| # set device - use GPU if available | |
| if device is None: | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| else: | |
| self.device = device | |
| # load whisper model | |
| print(f"Loading Whisper {model_size} model on {self.device}...") | |
| self.model = whisper.load_model(model_size, device=self.device) | |
| print(f"Whisper model loaded successfully.") | |
| self.model_size = model_size | |
| # check if audio file is valid before processing | |
| def _validate_audio(self, audio_path): | |
| try: | |
| # load and check audio | |
| audio, sr = librosa.load(audio_path, sr=16000) | |
| duration = len(audio) / sr | |
| # audio must be at least 0.1 seconds | |
| if duration < 0.1: | |
| return False, "Audio too short", duration | |
| # check for silent audio | |
| if np.max(np.abs(audio)) < 0.001: | |
| return False, "Audio is silent", duration | |
| return True, "Valid", duration | |
| except Exception as e: | |
| return False, f"Could not load audio file", 0.0 | |
| # main transcription function | |
| def transcribe(self, audio_path, language=None, task="transcribe"): | |
| # validate audio first | |
| is_valid, message, audio_duration = self._validate_audio(audio_path) | |
| if not is_valid: | |
| print(f"Audio check failed: {message}") | |
| return self._get_empty_response(message, audio_duration) | |
| # try to transcribe with word timestamps | |
| try: | |
| result = self.model.transcribe( | |
| audio_path, | |
| language=language, | |
| task=task, | |
| verbose=False, | |
| word_timestamps=True, | |
| fp16=False # avoid fp16 issues | |
| ) | |
| except (KeyError, RuntimeError) as e: | |
| error_msg = str(e) | |
| # handle specific errors | |
| if "reshape tensor of 0 elements" in error_msg or "ambiguous" in error_msg: | |
| print(f"Audio might be too short or corrupted") | |
| return self._get_empty_response("Audio too short or corrupted", audio_duration) | |
| # retry without word timestamps | |
| print(f"First try failed, trying again...") | |
| try: | |
| result = self.model.transcribe( | |
| audio_path, | |
| language=language, | |
| task=task, | |
| verbose=False, | |
| word_timestamps=False, | |
| fp16=False | |
| ) | |
| except Exception as e2: | |
| print(f"Could not transcribe audio: {e2}") | |
| return self._get_empty_response("Transcription failed", audio_duration) | |
| # extract transcription results | |
| transcription = result['text'].strip() | |
| detected_language = result.get('language', 'unknown') | |
| segments = result.get('segments', []) | |
| # handle empty transcription | |
| if not transcription or len(transcription.strip()) == 0: | |
| print("Warning: Transcription is empty") | |
| return self._get_empty_response("No speech detected in audio", audio_duration) | |
| # analyze transcription for speech patterns | |
| analysis = self._analyze_transcription(transcription, segments) | |
| # extract kopparapu features for read/spontaneous detection | |
| duration = analysis['duration'] if analysis['duration'] > 0 else 1.0 | |
| kopparapu_features = self._extract_kopparapu_features( | |
| transcription, duration, segments, analysis['pause_patterns'] | |
| ) | |
| kopparapu_score = self._calculate_kopparapu_score(kopparapu_features) | |
| return { | |
| 'transcription': transcription, | |
| 'language': detected_language, | |
| 'segments': segments, | |
| 'word_count': analysis['word_count'], | |
| 'duration': analysis['duration'], | |
| 'speech_rate': analysis['speech_rate'], | |
| 'pause_patterns': analysis['pause_patterns'], | |
| 'filler_words': analysis['filler_words'], | |
| 'kopparapu_features': kopparapu_features, | |
| 'kopparapu_score': kopparapu_score, | |
| 'kopparapu_classification': 'read' if kopparapu_score >= 0.5 else 'spontaneous', | |
| 'interpretation': self._interpret_speech_patterns(analysis, kopparapu_features, kopparapu_score) | |
| } | |
| # return empty response when transcription fails | |
| def _get_empty_response(self, reason, duration=0.0): | |
| return { | |
| 'transcription': f"[Error: {reason}]", | |
| 'language': 'unknown', | |
| 'segments': [], | |
| 'word_count': 0, | |
| 'duration': duration, | |
| 'speech_rate': 0.0, | |
| 'pause_patterns': { | |
| 'avg_pause': 0.0, | |
| 'max_pause': 0.0, | |
| 'num_pauses': 0, | |
| 'pause_variability': 0.0 | |
| }, | |
| 'filler_words': { | |
| 'count': 0, | |
| 'ratio': 0.0, | |
| 'details': {} | |
| }, | |
| 'kopparapu_features': { | |
| 'chars_per_word': 0.0, | |
| 'words_per_sec': 0.0, | |
| 'nonalpha_per_sec': 0.0, | |
| 'filler_rate': 0.0, | |
| 'repetition_count': 0, | |
| 'alpha_ratio': 0.0 | |
| }, | |
| 'kopparapu_score': 0.5, | |
| 'kopparapu_classification': 'unknown', | |
| 'interpretation': f"Could not process audio: {reason}\n\nTips:\n- Make sure audio is at least 1 second\n- Check that there is actual speech\n- Try a different audio file" | |
| } | |
| # analyze transcription for various speech metrics | |
| def _analyze_transcription(self, text, segments): | |
| words = text.split() | |
| word_count = len(words) | |
| # calculate duration from segments | |
| duration = 0 | |
| if segments: | |
| duration = segments[-1]['end'] - segments[0]['start'] | |
| # calculate speaking rate (words per minute) | |
| speech_rate = (word_count / duration * 60) if duration > 0 else 0 | |
| # list of filler words to detect | |
| filler_words_list = [ | |
| ('um', r'\bum\b'), ('uh', r'\buh\b'), ('er', r'\ber\b'), | |
| ('ah', r'\bah\b'), ('like', r'\blike\b'), ('you know', r'\byou know\b'), | |
| ('i mean', r'\bi mean\b'), ('actually', r'\bactually\b'), | |
| ('basically', r'\bbasically\b'), ('literally', r'\bliterally\b'), | |
| ('so', r'\bso\b'), ('well', r'\bwell\b'), ('okay', r'\bokay\b'), | |
| ('hmm', r'\bhmm+\b'), ('mm', r'\bmm+\b') | |
| ] | |
| # count filler words | |
| text_lower = text.lower() | |
| filler_count = {} | |
| total_fillers = 0 | |
| for filler_name, filler_pattern in filler_words_list: | |
| matches = re.findall(filler_pattern, text_lower, re.IGNORECASE) | |
| count = len(matches) | |
| if count > 0: | |
| filler_count[filler_name] = count | |
| total_fillers += count | |
| # calculate filler ratio | |
| filler_ratio = total_fillers / word_count if word_count > 0 else 0 | |
| # analyze pause patterns | |
| pause_patterns = self._analyze_pauses(segments) | |
| return { | |
| 'word_count': word_count, | |
| 'duration': duration, | |
| 'speech_rate': speech_rate, | |
| 'filler_words': { | |
| 'count': total_fillers, | |
| 'ratio': filler_ratio, | |
| 'details': filler_count | |
| }, | |
| 'pause_patterns': pause_patterns | |
| } | |
| # extract pause timing information from segments | |
| def _analyze_pauses(self, segments): | |
| pauses = [] | |
| # find pauses between segments | |
| if len(segments) >= 2: | |
| for i in range(len(segments) - 1): | |
| pause = segments[i + 1]['start'] - segments[i]['end'] | |
| if pause > 0.05: # pauses > 50ms | |
| pauses.append(pause) | |
| # find pauses between words within segments | |
| for segment in segments: | |
| if 'words' in segment and len(segment['words']) > 1: | |
| words = segment['words'] | |
| for i in range(len(words) - 1): | |
| if 'start' in words[i] and 'end' in words[i] and 'start' in words[i+1]: | |
| pause = words[i + 1]['start'] - words[i]['end'] | |
| if pause > 0.15: # word-level pauses > 150ms | |
| pauses.append(pause) | |
| # return empty stats if no pauses found | |
| if not pauses: | |
| return { | |
| 'avg_pause': 0.0, | |
| 'max_pause': 0.0, | |
| 'num_pauses': 0, | |
| 'pause_variability': 0.0 | |
| } | |
| return { | |
| 'avg_pause': float(np.mean(pauses)), | |
| 'max_pause': float(np.max(pauses)), | |
| 'num_pauses': len(pauses), | |
| 'pause_variability': float(np.std(pauses)) if len(pauses) > 1 else 0.0 | |
| } | |
| # extract features based on kopparapu's method for read vs spontaneous detection | |
| def _extract_kopparapu_features(self, text, duration_sec, segments=None, pause_patterns=None): | |
| text = text.strip() | |
| # handle empty text | |
| if len(text) == 0: | |
| return { | |
| 'alpha_ratio': 0.0, | |
| 'chars_per_word': 0.0, | |
| 'words_per_sec': 0.0, | |
| 'nonalpha_per_sec': 0.0, | |
| 'repetition_count': 0, | |
| 'filler_rate': 0.0, | |
| 'pause_regularity': 0.5, | |
| 'speech_rate_variability': 0.0, | |
| 'sentence_length_variance': 0.0, | |
| 'self_correction_count': 0 | |
| } | |
| # count character types | |
| total_chars = len(text) | |
| alpha_chars = sum(c.isalpha() for c in text) | |
| nonalpha_chars = total_chars - alpha_chars | |
| # ratio of alphabetic characters | |
| alpha_ratio = alpha_chars / total_chars if total_chars > 0 else 0 | |
| # average word length | |
| words = text.split() | |
| num_words = max(len(words), 1) | |
| chars_per_word = alpha_chars / num_words | |
| # speaking rate features | |
| duration_sec = max(duration_sec, 1e-3) | |
| words_per_sec = num_words / duration_sec | |
| nonalpha_per_sec = nonalpha_chars / duration_sec | |
| # detect character repetitions like "sooo" or "ummmm" | |
| char_reps = len(re.findall(r'(.)\1{2,}', text)) | |
| # detect word repetitions like "I I think" | |
| words_list = text.lower().split() | |
| word_reps = 0 | |
| for i in range(len(words_list) - 1): | |
| if words_list[i] == words_list[i + 1] and len(words_list[i]) > 2: | |
| word_reps += 1 | |
| repetition_count = char_reps + word_reps | |
| # count filler words | |
| lower = text.lower() | |
| filler_patterns = [ | |
| r'\bum\b', r'\buh\b', r'\buhm\b', r'\ber\b', r'\bah\b', | |
| r'\blike\b', r'\byou know\b', r'\bi mean\b', | |
| r'\bactually\b', r'\bbasically\b', r'\bliterally\b', | |
| r'\bso\b', r'\bwell\b', r'\bokay\b', | |
| r'\bhmm+\b', r'\bmm+\b', r'\boh\b' | |
| ] | |
| filler_count = 0 | |
| for pattern in filler_patterns: | |
| filler_count += len(re.findall(pattern, lower)) | |
| filler_rate = filler_count / num_words | |
| # pause regularity - read speech has regular pauses at punctuation | |
| pause_regularity = 0.5 | |
| if pause_patterns and pause_patterns.get('num_pauses', 0) > 2: | |
| pause_var = pause_patterns.get('pause_variability', 0.5) | |
| # low variability = regular pauses = likely read | |
| pause_regularity = max(0.0, min(1.0, 1.0 - (pause_var / 0.6))) | |
| # speech rate variability across segments | |
| speech_rate_variability = self._compute_rate_variability(segments) if segments else 0.0 | |
| # sentence length variance - uniform = likely read | |
| sentence_length_variance = self._compute_sentence_variance(text) | |
| # count self-corrections and false starts | |
| self_correction_patterns = [ | |
| r'\bwait\b', r'\bsorry\b', r'\bno\s*,?\s*I\b', | |
| r'\bactually\s*,?\s*no\b', r'\blet me\b', r'\bwhat I meant\b', | |
| r'\bI meant\b', r'\bhold on\b', r'\bwhat was I\b', r'\bor rather\b' | |
| ] | |
| self_correction_count = 0 | |
| for pattern in self_correction_patterns: | |
| self_correction_count += len(re.findall(pattern, lower)) | |
| return { | |
| 'alpha_ratio': float(alpha_ratio), | |
| 'chars_per_word': float(chars_per_word), | |
| 'words_per_sec': float(words_per_sec), | |
| 'nonalpha_per_sec': float(nonalpha_per_sec), | |
| 'repetition_count': int(repetition_count), | |
| 'filler_rate': float(filler_rate), | |
| 'pause_regularity': float(pause_regularity), | |
| 'speech_rate_variability': float(speech_rate_variability), | |
| 'sentence_length_variance': float(sentence_length_variance), | |
| 'self_correction_count': int(self_correction_count) | |
| } | |
| # compute variability in speaking rate across segments | |
| def _compute_rate_variability(self, segments): | |
| if not segments or len(segments) < 3: | |
| return 0.0 | |
| segment_rates = [] | |
| for seg in segments: | |
| duration = seg.get('end', 0) - seg.get('start', 0) | |
| if duration > 0.3: # only segments > 300ms | |
| words_in_seg = len(seg.get('text', '').split()) | |
| rate = words_in_seg / duration | |
| if rate > 0: | |
| segment_rates.append(rate) | |
| if len(segment_rates) < 3: | |
| return 0.0 | |
| # calculate coefficient of variation | |
| mean_rate = np.mean(segment_rates) | |
| std_rate = np.std(segment_rates) | |
| cv = std_rate / mean_rate if mean_rate > 0 else 0 | |
| return float(min(1.0, cv / 0.5)) | |
| # compute variance in sentence lengths | |
| def _compute_sentence_variance(self, text): | |
| # split into sentences | |
| sentences = re.split(r'[.!?]+', text) | |
| sentences = [s.strip() for s in sentences if s.strip()] | |
| if len(sentences) < 2: | |
| return 0.0 | |
| # get word counts per sentence | |
| lengths = [len(s.split()) for s in sentences] | |
| mean_len = np.mean(lengths) | |
| std_len = np.std(lengths) | |
| # coefficient of variation normalized | |
| cv = std_len / mean_len if mean_len > 0 else 0 | |
| return float(min(1.0, cv / 0.6)) | |
| # logistic function for smooth score transitions | |
| def _logistic(self, x, a, b): | |
| return 1.0 / (1.0 + np.exp(-(x - a) / b)) | |
| # calculate overall kopparapu score for read vs spontaneous | |
| def _calculate_kopparapu_score(self, features): | |
| # L1: vocabulary complexity - higher = more formal = read | |
| f1 = features['chars_per_word'] | |
| L1 = self._logistic(f1, a=4.8, b=1.2) | |
| # L2: speaking rate - faster, steadier = read | |
| f2 = features['words_per_sec'] | |
| L2 = self._logistic(f2, a=2.2, b=0.6) | |
| # L3: disfluency - less disfluency = more read | |
| disfluency = ( | |
| features['nonalpha_per_sec'] + | |
| 8.0 * features['filler_rate'] + | |
| 0.5 * features['repetition_count'] | |
| ) | |
| L3 = self._logistic(-disfluency, a=0.0, b=0.8) | |
| # L4: pause regularity - regular = read | |
| L4 = features.get('pause_regularity', 0.5) | |
| # L5: rate variability - low = read | |
| rate_var = features.get('speech_rate_variability', 0.0) | |
| L5 = 1.0 - rate_var | |
| # L6: sentence variance - uniform = read | |
| sent_var = features.get('sentence_length_variance', 0.0) | |
| L6 = 1.0 - sent_var | |
| # L7: self-corrections - fewer = read | |
| corrections = features.get('self_correction_count', 0) | |
| L7 = self._logistic(-corrections, a=0.0, b=1.5) | |
| # weighted combination | |
| score = ( | |
| 0.15 * L1 + # vocabulary complexity | |
| 0.15 * L2 + # speaking rate | |
| 0.15 * L3 + # disfluency | |
| 0.20 * L4 + # pause regularity | |
| 0.15 * L5 + # rate variability | |
| 0.10 * L6 + # sentence uniformity | |
| 0.10 * L7 # self-corrections | |
| ) | |
| return float(score) | |
| # generate human-readable interpretation of speech patterns | |
| def _interpret_speech_patterns(self, analysis, kopparapu_features=None, kopparapu_score=None): | |
| filler_ratio = analysis['filler_words']['ratio'] | |
| pause_patterns = analysis['pause_patterns'] | |
| speech_rate = analysis['speech_rate'] | |
| interpretation = "**Overall Assessment:**\n\n" | |
| # calculate spontaneity score | |
| spontaneity_score = 0 | |
| indicators = [] | |
| if filler_ratio > 0.03: | |
| spontaneity_score += 1 | |
| indicators.append(f"Filler words present ({filler_ratio*100:.1f}%)") | |
| if pause_patterns['pause_variability'] > 0.5: | |
| spontaneity_score += 1 | |
| indicators.append(f"Irregular pause patterns (variability: {pause_patterns['pause_variability']:.2f})") | |
| if 120 <= speech_rate <= 180: | |
| spontaneity_score += 1 | |
| indicators.append(f"Natural speech rate ({speech_rate:.1f} words/min)") | |
| # generate interpretation based on score | |
| if spontaneity_score >= 2: | |
| interpretation += "✓ **Speech patterns suggest spontaneous, natural speaking.**\n\n" | |
| if indicators: | |
| interpretation += "Key indicators:\n" | |
| for indicator in indicators: | |
| interpretation += f"- {indicator}\n" | |
| else: | |
| interpretation += "⚠ **Speech patterns suggest potentially scripted or read speech.**\n\n" | |
| if filler_ratio < 0.02: | |
| interpretation += "- Very low filler word usage\n" | |
| if pause_patterns['pause_variability'] < 0.3: | |
| interpretation += "- Regular, consistent pause patterns\n" | |
| if speech_rate > 180: | |
| interpretation += "- Fast, steady speaking rate\n" | |
| return interpretation | |
| # get detailed segment information | |
| def get_detailed_segments(self, audio_path): | |
| result = self.model.transcribe(audio_path, word_timestamps=True, verbose=False) | |
| return result.get('segments', []) | |
| # test code - runs when script is executed directly | |
| if __name__ == "__main__": | |
| recognizer = SpeechRecognizer(model_size="base") | |
| print(f"Speech recognizer initialized with {recognizer.model_size} model") | |
| print(f"Device: {recognizer.device}") | |