| import os |
| import whisper |
| import speech_recognition as sr |
| import logging |
| import backoff |
| import subprocess |
| import time |
| import torch |
| import json |
| from pathlib import Path |
| from pydub import AudioSegment |
| from moviepy.editor import VideoFileClip |
| from typing import Optional, List, Dict, Any |
|
|
| |
| try: |
| from app.utils.device_utils import device, run_on_device |
| from app.utils.logging_utils import time_it, setup_logger |
| except ImportError: |
| |
| from behavior_backend.app.utils.device_utils import device, run_on_device |
| from behavior_backend.app.utils.logging_utils import time_it, setup_logger |
|
|
| |
| logger = setup_logger(__name__) |
|
|
| class TranscriptionService: |
| """Service for cloud-based speech-to-text operations.""" |
| |
| def __init__(self): |
| """Initialize the transcription service.""" |
| self.recognizer = sr.Recognizer() |
| |
| |
| self.credentials = self._load_credentials() |
| |
| |
| self.available_recognizers = {} |
| |
| |
| if hasattr(self.recognizer, 'recognize_openai_whisper') or hasattr(self.recognizer, 'recognize_whisper_api'): |
| self.available_recognizers['openai_whisper'] = self._transcribe_openai_whisper |
| |
| if hasattr(self.recognizer, 'recognize_google_cloud'): |
| self.available_recognizers['google_cloud'] = self._transcribe_google_cloud |
| |
| if hasattr(self.recognizer, 'recognize_groq'): |
| self.available_recognizers['groq'] = self._transcribe_groq |
| |
| logger.info(f"Available cloud transcription services: {', '.join(self.available_recognizers.keys())}") |
| |
| def _load_credentials(self): |
| """Load all service credentials""" |
| creds = {} |
| try: |
| |
| google_creds_path = os.path.join(os.path.dirname(__file__), "google_credentials.json") |
| if os.path.exists(google_creds_path): |
| creds['google_cloud'] = google_creds_path |
| else: |
| |
| creds['google_cloud'] = os.getenv('GOOGLE_APPLICATION_CREDENTIALS') |
| |
| |
| creds['groq'] = os.getenv('GROQ_API_KEY') |
| print('---------------------------------------------- ') |
| print( "Groq API key:",creds['groq']) |
| print('----------------------------------------------') |
| |
| creds['openai'] = os.getenv('OPENAI_API_KEY') |
| print('----------------------------------------------') |
| print( "OpenAI API key:",creds['openai']) |
| print('----------------------------------------------') |
| except Exception as e: |
| logger.error(f"Error loading credentials: {e}") |
| |
| return creds |
| |
| def convert_to_wav(self, input_path): |
| """Convert audio/video file to WAV format if needed""" |
| input_path = Path(input_path) |
| |
| if input_path.suffix.lower() == '.wav': |
| return str(input_path) |
| |
| output_path = input_path.with_suffix('.wav') |
| logger.info(f"Converting {input_path} to WAV format") |
| |
| try: |
| audio = AudioSegment.from_file(str(input_path)) |
| audio.export(str(output_path), format="wav") |
| logger.info(f"Conversion completed: {output_path}") |
| return str(output_path) |
| except Exception as e: |
| logger.error(f"Error converting file: {e}") |
| raise |
| |
| @backoff.on_exception( |
| backoff.expo, |
| Exception, |
| max_tries=3 |
| ) |
| def transcribe(self, audio_file_path, services=None, cleanup=True, language='en'): |
| """ |
| Transcribe audio using multiple services |
| |
| Args: |
| audio_file_path: Path to the audio file |
| services: List of services to use for transcription |
| cleanup: Whether to clean up temporary files |
| language: Language code |
| |
| Returns: |
| Dictionary of transcription results by service |
| """ |
| if services is None: |
| services = list(self.available_recognizers.keys()) |
| |
| results = {} |
| original_path = Path(audio_file_path) |
| |
| try: |
| wav_path = self.convert_to_wav(audio_file_path) |
| |
| with sr.AudioFile(wav_path) as source: |
| audio = self.recognizer.record(source) |
| |
| |
| for service in services: |
| if service in self.available_recognizers: |
| try: |
| logger.info(f"Starting transcription with {service}") |
| text = self.available_recognizers[service](audio, language) |
| if text: |
| results[service] = text |
| logger.info(f"{service} transcription completed") |
| except Exception as e: |
| logger.error(f"{service} transcription failed: {e}") |
| results[service] = f"Error: {str(e)}" |
| |
| if cleanup and original_path.suffix.lower() != '.wav' and wav_path != str(original_path): |
| os.remove(wav_path) |
| logger.info("Cleaned up converted file") |
| |
| return results |
| |
| except Exception as e: |
| logger.error(f"Transcription process failed: {e}") |
| raise |
| |
| |
| def _transcribe_openai_whisper(self, audio, language): |
| """Transcribe using OpenAI Whisper API""" |
| if not self.credentials.get('openai'): |
| raise ValueError("OpenAI API key not found") |
| |
| |
| whisper_lang = language.split('-')[0] if '-' in language else language |
| |
| |
| if hasattr(self.recognizer, 'recognize_whisper_api'): |
| return self.recognizer.recognize_whisper_api( |
| audio, |
| api_key=self.credentials['openai'], |
| language=whisper_lang |
| ) |
| elif hasattr(self.recognizer, 'recognize_openai_whisper'): |
| return self.recognizer.recognize_openai_whisper( |
| audio, |
| api_key=self.credentials['openai'], |
| language=whisper_lang |
| ) |
| else: |
| raise NotImplementedError("No OpenAI Whisper API recognition method available") |
| |
| def _transcribe_google_cloud(self, audio, language): |
| """Transcribe using Google Cloud Speech-to-Text""" |
| if not self.credentials.get('google_cloud'): |
| raise ValueError("Google Cloud credentials not found") |
| |
| return self.recognizer.recognize_google_cloud( |
| audio, |
| credentials_json=self.credentials['google_cloud'], |
| language=language |
| ) |
| |
| def _transcribe_groq(self, audio, language): |
| """Transcribe using Groq API""" |
| if not self.credentials.get('groq'): |
| raise ValueError("Groq API key not found") |
| return self.recognizer.recognize_groq(audio) |
|
|
| class SpeechService: |
| """Service for speech-to-text operations.""" |
| |
| def __init__(self): |
| """Initialize the speech service.""" |
| self.whisper_model = None |
| self.ffmpeg_success = False |
| self.cloud_transcription_service = TranscriptionService() |
| |
| @time_it |
| def extract_audio(self, video_path: str) -> str: |
| """ |
| Extract audio from a video file using FFmpeg (primary) or MoviePy (fallback). |
| |
| Args: |
| video_path: Path to the video file |
| |
| Returns: |
| Path to the extracted audio file |
| """ |
| logger.info(f"Extracting audio from {video_path}") |
| |
| |
| video_filename = Path(video_path).stem |
| audio_path = f"temp_{video_filename}.wav" |
| |
| |
| self.ffmpeg_success = False |
| ffmpeg_start_time = time.time() |
| |
| try: |
| logger.info("Attempting audio extraction with FFmpeg...") |
| result = subprocess.run([ |
| 'ffmpeg', |
| '-i', str(video_path), |
| '-acodec', 'pcm_s16le', |
| '-ar', '16000', |
| '-ac', '1', |
| '-y', |
| str(audio_path) |
| ], check=True, capture_output=True, text=True) |
| |
| self.ffmpeg_success = True |
| ffmpeg_end_time = time.time() |
| ffmpeg_duration = ffmpeg_end_time - ffmpeg_start_time |
| logger.info(f"FFmpeg audio extraction successful in {ffmpeg_duration:.4f} seconds") |
| |
| except (subprocess.CalledProcessError, FileNotFoundError) as e: |
| ffmpeg_end_time = time.time() |
| ffmpeg_duration = ffmpeg_end_time - ffmpeg_start_time |
| logger.warning(f"FFmpeg audio extraction failed after {ffmpeg_duration:.4f} seconds: {str(e)}") |
| logger.warning("Falling back to MoviePy for audio extraction...") |
| |
| |
| moviepy_start_time = time.time() |
| try: |
| |
| video = VideoFileClip(video_path) |
| video.audio.write_audiofile(audio_path, codec='pcm_s16le', logger=None) |
| video.close() |
| |
| moviepy_end_time = time.time() |
| moviepy_duration = moviepy_end_time - moviepy_start_time |
| logger.info(f"MoviePy audio extraction successful in {moviepy_duration:.4f} seconds") |
| |
| except Exception as e: |
| moviepy_end_time = time.time() |
| moviepy_duration = moviepy_end_time - moviepy_start_time |
| logger.error(f"MoviePy audio extraction also failed after {moviepy_duration:.4f} seconds: {str(e)}") |
| raise RuntimeError(f"Failed to extract audio from video using both FFmpeg and MoviePy: {str(e)}") |
| |
| |
| audio_file = Path(audio_path) |
| if not audio_file.exists() or audio_file.stat().st_size == 0: |
| logger.error(f"Audio extraction produced empty or missing file: {audio_path}") |
| raise RuntimeError(f"Audio extraction failed: output file {audio_path} is empty or missing") |
| |
| logger.info(f"Audio extracted to {audio_path}") |
| |
| |
| if not self.ffmpeg_success: |
| logger.info(f"Audio extraction performance comparison - FFmpeg: {ffmpeg_duration:.4f}s, MoviePy: {moviepy_duration:.4f}s") |
| |
| return audio_path |
| |
| @time_it |
| def split_audio(self, audio_path: str, chunk_length_ms: int = 30000) -> List[str]: |
| """ |
| Split audio file into chunks for processing. |
| |
| Args: |
| audio_path: Path to the audio file |
| chunk_length_ms: Length of each chunk in milliseconds |
| |
| Returns: |
| List of paths to audio chunks |
| """ |
| logger.info(f"Splitting audio {audio_path} into {chunk_length_ms}ms chunks") |
| |
| |
| audio = AudioSegment.from_file(audio_path) |
| |
| |
| chunks_dir = Path("temp_chunks") |
| chunks_dir.mkdir(exist_ok=True) |
| |
| |
| chunk_paths = [] |
| for i, chunk_start in enumerate(range(0, len(audio), chunk_length_ms)): |
| chunk_end = min(chunk_start + chunk_length_ms, len(audio)) |
| chunk = audio[chunk_start:chunk_end] |
| |
| chunk_path = chunks_dir / f"chunk_{i}.wav" |
| chunk.export(chunk_path, format="wav") |
| chunk_paths.append(str(chunk_path)) |
| |
| logger.info(f"Split audio into {len(chunk_paths)} chunks") |
| return chunk_paths |
| |
| @run_on_device |
| @time_it |
| def transcribe_with_whisper(self, audio_path: str, language: str = 'en', device: str = 'cpu') -> str: |
| """ |
| Transcribe audio using Whisper. |
| |
| Args: |
| audio_path: Path to the audio file |
| language: Language code |
| device: Device to use for processing |
| |
| Returns: |
| Transcribed text |
| """ |
| logger.info(f"Transcribing {audio_path} with Whisper on {device}") |
| |
| try: |
| |
| if self.whisper_model is None or getattr(self, '_current_device', None) != device: |
| |
| if self.whisper_model is not None: |
| del self.whisper_model |
| import gc |
| gc.collect() |
| torch.cuda.empty_cache() if device == 'cuda' else None |
| |
| logger.info(f"Loading Whisper model on {device}") |
| |
| self.whisper_model = whisper.load_model("tiny", device=device) |
| self._current_device = device |
| |
| |
| if '-' in language: |
| language = language.split('-')[0] |
| |
| |
| result = self.whisper_model.transcribe( |
| audio_path, |
| language=language, |
| fp16=(device == 'cuda'), |
| beam_size=3, |
| best_of=1 |
| ) |
| |
| return result["text"] |
| finally: |
| |
| import gc |
| gc.collect() |
| torch.cuda.empty_cache() if device == 'cuda' else None |
| |
| @backoff.on_exception( |
| backoff.expo, |
| Exception, |
| max_tries=3 |
| ) |
| @time_it |
| def transcribe_audio(self, audio_path: str, language: str = 'en', service: str = 'whisper') -> str: |
| """ |
| Transcribe audio file to text. |
| |
| Args: |
| audio_path: Path to the audio file |
| language: Language code |
| service: Transcription service to use ('whisper', 'groq', 'google_cloud', 'openai_whisper') |
| |
| Returns: |
| Transcribed text |
| """ |
| logger.info(f"Starting transcription of {audio_path} using {service}") |
| |
| |
| if service in ['groq', 'google_cloud', 'openai_whisper']: |
| |
| if service not in self.cloud_transcription_service.available_recognizers: |
| logger.warning(f"Requested service {service} is not available, falling back to whisper") |
| service = 'whisper' |
| |
| |
| if service in ['groq', 'google_cloud', 'openai_whisper']: |
| logger.info(f"Using cloud-based transcription with {service}") |
| |
| |
| if os.path.getsize(audio_path) > 10 * 1024 * 1024: |
| logger.info(f"Audio file is large, splitting into chunks") |
| chunk_paths = self.split_audio(audio_path) |
| |
| |
| transcripts = [] |
| for chunk_path in chunk_paths: |
| |
| results = self.cloud_transcription_service.transcribe( |
| chunk_path, |
| services=[service], |
| language=language |
| ) |
| |
| |
| if service in results and results[service] and not results[service].startswith('Error:'): |
| transcripts.append(results[service]) |
| else: |
| logger.warning(f"Failed to transcribe chunk with {service}, falling back to whisper") |
| transcript = self.transcribe_with_whisper(chunk_path, language) |
| transcripts.append(transcript) |
| |
| |
| full_transcript = " ".join(transcripts) |
| |
| |
| for chunk_path in chunk_paths: |
| os.remove(chunk_path) |
| |
| return full_transcript |
| else: |
| |
| results = self.cloud_transcription_service.transcribe( |
| audio_path, |
| services=[service], |
| language=language |
| ) |
| |
| |
| if service in results and results[service] and not results[service].startswith('Error:'): |
| return results[service] |
| else: |
| logger.warning(f"Failed to transcribe with {service}, falling back to whisper") |
| return self.transcribe_with_whisper(audio_path, language) |
| |
| |
| else: |
| |
| if os.path.getsize(audio_path) > 10 * 1024 * 1024: |
| logger.info(f"Audio file is large, splitting into chunks") |
| chunk_paths = self.split_audio(audio_path) |
| |
| |
| transcripts = [] |
| for chunk_path in chunk_paths: |
| transcript = self.transcribe_with_whisper(chunk_path, language) |
| transcripts.append(transcript) |
| |
| |
| full_transcript = " ".join(transcripts) |
| |
| |
| for chunk_path in chunk_paths: |
| os.remove(chunk_path) |
| |
| return full_transcript |
| else: |
| |
| return self.transcribe_with_whisper(audio_path, language) |
| |
| @time_it |
| def process_video_speech(self, video_path: str, language: str = 'en', service: str = 'whisper') -> str: |
| """ |
| Process speech in a video file. |
| |
| Args: |
| video_path: Path to the video file |
| language: Language code |
| service: Transcription service to use ('whisper', 'groq', 'google_cloud', 'openai_whisper') |
| If 'whisper' is selected, local Whisper model will be used. |
| If 'groq', 'google_cloud', or 'openai_whisper' are selected, cloud-based transcription will be used. |
| If the requested service is not available, it will fall back to 'whisper'. |
| |
| Returns: |
| Transcribed text |
| """ |
| audio_path = None |
| extraction_method = None |
| |
| |
| if service != 'whisper' and service not in self.cloud_transcription_service.available_recognizers: |
| logger.warning(f"Requested service {service} is not available, falling back to whisper") |
| service = 'whisper' |
| |
| try: |
| |
| start_time = time.time() |
| audio_path = self.extract_audio(video_path) |
| extraction_time = time.time() - start_time |
| |
| |
| if self.ffmpeg_success: |
| extraction_method = "FFmpeg" |
| else: |
| extraction_method = "MoviePy" |
| |
| logger.info(f"Audio extracted using {extraction_method} in {extraction_time:.4f} seconds") |
| |
| |
| start_time = time.time() |
| transcript = self.transcribe_audio(audio_path, language, service) |
| transcription_time = time.time() - start_time |
| |
| logger.info(f"Audio transcribed in {transcription_time:.4f} seconds") |
| logger.info(f"Total speech processing time: {extraction_time + transcription_time:.4f} seconds") |
| |
| return transcript |
| |
| except Exception as e: |
| logger.error(f"Error in process_video_speech: {str(e)}") |
| raise |
| |
| finally: |
| |
| if audio_path and os.path.exists(audio_path): |
| try: |
| os.remove(audio_path) |
| logger.info(f"Temporary audio file {audio_path} removed") |
| except Exception as e: |
| logger.warning(f"Failed to remove temporary audio file {audio_path}: {str(e)}") |
| |
| |
| import gc |
| gc.collect() |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |