import os import whisper import speech_recognition as sr import logging import backoff import subprocess import time import torch import json from pathlib import Path from pydub import AudioSegment from moviepy.editor import VideoFileClip from typing import Optional, List, Dict, Any # Fix import paths try: from app.utils.device_utils import device, run_on_device from app.utils.logging_utils import time_it, setup_logger except ImportError: # Try relative imports for running from project root from behavior_backend.app.utils.device_utils import device, run_on_device from behavior_backend.app.utils.logging_utils import time_it, setup_logger # Configure logging logger = setup_logger(__name__) class TranscriptionService: """Service for cloud-based speech-to-text operations.""" def __init__(self): """Initialize the transcription service.""" self.recognizer = sr.Recognizer() # Load credentials self.credentials = self._load_credentials() # Define available recognizers self.available_recognizers = {} # Check which recognizers are available and add them if hasattr(self.recognizer, 'recognize_openai_whisper') or hasattr(self.recognizer, 'recognize_whisper_api'): self.available_recognizers['openai_whisper'] = self._transcribe_openai_whisper if hasattr(self.recognizer, 'recognize_google_cloud'): self.available_recognizers['google_cloud'] = self._transcribe_google_cloud if hasattr(self.recognizer, 'recognize_groq'): self.available_recognizers['groq'] = self._transcribe_groq logger.info(f"Available cloud transcription services: {', '.join(self.available_recognizers.keys())}") def _load_credentials(self): """Load all service credentials""" creds = {} try: # Google Cloud - check for credentials file in the project directory google_creds_path = os.path.join(os.path.dirname(__file__), "google_credentials.json") if os.path.exists(google_creds_path): creds['google_cloud'] = google_creds_path else: # Try environment variable creds['google_cloud'] = os.getenv('GOOGLE_APPLICATION_CREDENTIALS') # Groq API key creds['groq'] = os.getenv('GROQ_API_KEY') print('---------------------------------------------- ') print( "Groq API key:",creds['groq']) print('----------------------------------------------') # OpenAI API key creds['openai'] = os.getenv('OPENAI_API_KEY') print('----------------------------------------------') print( "OpenAI API key:",creds['openai']) print('----------------------------------------------') except Exception as e: logger.error(f"Error loading credentials: {e}") return creds def convert_to_wav(self, input_path): """Convert audio/video file to WAV format if needed""" input_path = Path(input_path) if input_path.suffix.lower() == '.wav': return str(input_path) output_path = input_path.with_suffix('.wav') logger.info(f"Converting {input_path} to WAV format") try: audio = AudioSegment.from_file(str(input_path)) audio.export(str(output_path), format="wav") logger.info(f"Conversion completed: {output_path}") return str(output_path) except Exception as e: logger.error(f"Error converting file: {e}") raise @backoff.on_exception( backoff.expo, Exception, max_tries=3 ) def transcribe(self, audio_file_path, services=None, cleanup=True, language='en'): """ Transcribe audio using multiple services Args: audio_file_path: Path to the audio file services: List of services to use for transcription cleanup: Whether to clean up temporary files language: Language code Returns: Dictionary of transcription results by service """ if services is None: services = list(self.available_recognizers.keys()) results = {} original_path = Path(audio_file_path) try: wav_path = self.convert_to_wav(audio_file_path) with sr.AudioFile(wav_path) as source: audio = self.recognizer.record(source) # Try each requested service for service in services: if service in self.available_recognizers: try: logger.info(f"Starting transcription with {service}") text = self.available_recognizers[service](audio, language) if text: results[service] = text logger.info(f"{service} transcription completed") except Exception as e: logger.error(f"{service} transcription failed: {e}") results[service] = f"Error: {str(e)}" if cleanup and original_path.suffix.lower() != '.wav' and wav_path != str(original_path): os.remove(wav_path) logger.info("Cleaned up converted file") return results except Exception as e: logger.error(f"Transcription process failed: {e}") raise # Individual transcription methods def _transcribe_openai_whisper(self, audio, language): """Transcribe using OpenAI Whisper API""" if not self.credentials.get('openai'): raise ValueError("OpenAI API key not found") # Convert language code if needed (e.g., 'en-US' to 'en') whisper_lang = language.split('-')[0] if '-' in language else language # Try both method names that might be available if hasattr(self.recognizer, 'recognize_whisper_api'): return self.recognizer.recognize_whisper_api( audio, api_key=self.credentials['openai'], language=whisper_lang ) elif hasattr(self.recognizer, 'recognize_openai_whisper'): return self.recognizer.recognize_openai_whisper( audio, api_key=self.credentials['openai'], language=whisper_lang ) else: raise NotImplementedError("No OpenAI Whisper API recognition method available") def _transcribe_google_cloud(self, audio, language): """Transcribe using Google Cloud Speech-to-Text""" if not self.credentials.get('google_cloud'): raise ValueError("Google Cloud credentials not found") return self.recognizer.recognize_google_cloud( audio, credentials_json=self.credentials['google_cloud'], language=language ) def _transcribe_groq(self, audio, language): """Transcribe using Groq API""" if not self.credentials.get('groq'): raise ValueError("Groq API key not found") return self.recognizer.recognize_groq(audio) class SpeechService: """Service for speech-to-text operations.""" def __init__(self): """Initialize the speech service.""" self.whisper_model = None self.ffmpeg_success = False self.cloud_transcription_service = TranscriptionService() @time_it def extract_audio(self, video_path: str) -> str: """ Extract audio from a video file using FFmpeg (primary) or MoviePy (fallback). Args: video_path: Path to the video file Returns: Path to the extracted audio file """ logger.info(f"Extracting audio from {video_path}") # Create output path video_filename = Path(video_path).stem audio_path = f"temp_{video_filename}.wav" # Try FFmpeg approach first self.ffmpeg_success = False ffmpeg_start_time = time.time() try: logger.info("Attempting audio extraction with FFmpeg...") result = subprocess.run([ 'ffmpeg', '-i', str(video_path), '-acodec', 'pcm_s16le', '-ar', '16000', # 16kHz sample rate '-ac', '1', # Mono channel '-y', # Overwrite output file if it exists str(audio_path) ], check=True, capture_output=True, text=True) self.ffmpeg_success = True ffmpeg_end_time = time.time() ffmpeg_duration = ffmpeg_end_time - ffmpeg_start_time logger.info(f"FFmpeg audio extraction successful in {ffmpeg_duration:.4f} seconds") except (subprocess.CalledProcessError, FileNotFoundError) as e: ffmpeg_end_time = time.time() ffmpeg_duration = ffmpeg_end_time - ffmpeg_start_time logger.warning(f"FFmpeg audio extraction failed after {ffmpeg_duration:.4f} seconds: {str(e)}") logger.warning("Falling back to MoviePy for audio extraction...") # Fallback to MoviePy approach moviepy_start_time = time.time() try: # Extract audio using moviepy video = VideoFileClip(video_path) video.audio.write_audiofile(audio_path, codec='pcm_s16le', logger=None) video.close() # Explicitly close to free resources moviepy_end_time = time.time() moviepy_duration = moviepy_end_time - moviepy_start_time logger.info(f"MoviePy audio extraction successful in {moviepy_duration:.4f} seconds") except Exception as e: moviepy_end_time = time.time() moviepy_duration = moviepy_end_time - moviepy_start_time logger.error(f"MoviePy audio extraction also failed after {moviepy_duration:.4f} seconds: {str(e)}") raise RuntimeError(f"Failed to extract audio from video using both FFmpeg and MoviePy: {str(e)}") # Verify the audio file exists and has content audio_file = Path(audio_path) if not audio_file.exists() or audio_file.stat().st_size == 0: logger.error(f"Audio extraction produced empty or missing file: {audio_path}") raise RuntimeError(f"Audio extraction failed: output file {audio_path} is empty or missing") logger.info(f"Audio extracted to {audio_path}") # Log performance comparison if both methods were used if not self.ffmpeg_success: logger.info(f"Audio extraction performance comparison - FFmpeg: {ffmpeg_duration:.4f}s, MoviePy: {moviepy_duration:.4f}s") return audio_path @time_it def split_audio(self, audio_path: str, chunk_length_ms: int = 30000) -> List[str]: """ Split audio file into chunks for processing. Args: audio_path: Path to the audio file chunk_length_ms: Length of each chunk in milliseconds Returns: List of paths to audio chunks """ logger.info(f"Splitting audio {audio_path} into {chunk_length_ms}ms chunks") # Load audio audio = AudioSegment.from_file(audio_path) # Create directory for chunks chunks_dir = Path("temp_chunks") chunks_dir.mkdir(exist_ok=True) # Split audio into chunks chunk_paths = [] for i, chunk_start in enumerate(range(0, len(audio), chunk_length_ms)): chunk_end = min(chunk_start + chunk_length_ms, len(audio)) chunk = audio[chunk_start:chunk_end] chunk_path = chunks_dir / f"chunk_{i}.wav" chunk.export(chunk_path, format="wav") chunk_paths.append(str(chunk_path)) logger.info(f"Split audio into {len(chunk_paths)} chunks") return chunk_paths @run_on_device @time_it def transcribe_with_whisper(self, audio_path: str, language: str = 'en', device: str = 'cpu') -> str: """ Transcribe audio using Whisper. Args: audio_path: Path to the audio file language: Language code device: Device to use for processing Returns: Transcribed text """ logger.info(f"Transcribing {audio_path} with Whisper on {device}") try: # Load model if not already loaded or if device has changed if self.whisper_model is None or getattr(self, '_current_device', None) != device: # Clear existing model if it exists to free memory if self.whisper_model is not None: del self.whisper_model import gc gc.collect() torch.cuda.empty_cache() if device == 'cuda' else None logger.info(f"Loading Whisper model on {device}") # Use tiny model instead of base to reduce memory usage self.whisper_model = whisper.load_model("tiny", device=device) self._current_device = device # Convert language code if needed (e.g., 'en-US' to 'en') if '-' in language: language = language.split('-')[0] # Transcribe audio with reduced compute settings result = self.whisper_model.transcribe( audio_path, language=language, fp16=(device == 'cuda'), # Use fp16 only on CUDA beam_size=3, # Reduce beam size (default is 5) best_of=1 # Reduce number of candidates (default is 5) ) return result["text"] finally: # Force garbage collection after transcription to free memory import gc gc.collect() torch.cuda.empty_cache() if device == 'cuda' else None @backoff.on_exception( backoff.expo, Exception, max_tries=3 ) @time_it def transcribe_audio(self, audio_path: str, language: str = 'en', service: str = 'whisper') -> str: """ Transcribe audio file to text. Args: audio_path: Path to the audio file language: Language code service: Transcription service to use ('whisper', 'groq', 'google_cloud', 'openai_whisper') Returns: Transcribed text """ logger.info(f"Starting transcription of {audio_path} using {service}") # For cloud-based transcription services if service in ['groq', 'google_cloud', 'openai_whisper']: # Check if the requested service is available if service not in self.cloud_transcription_service.available_recognizers: logger.warning(f"Requested service {service} is not available, falling back to whisper") service = 'whisper' # Continue with the existing implementation if service in ['groq', 'google_cloud', 'openai_whisper']: logger.info(f"Using cloud-based transcription with {service}") # For long audio files, split into chunks and transcribe each chunk if os.path.getsize(audio_path) > 10 * 1024 * 1024: # 10 MB logger.info(f"Audio file is large, splitting into chunks") chunk_paths = self.split_audio(audio_path) # Transcribe each chunk transcripts = [] for chunk_path in chunk_paths: # Transcribe with cloud service results = self.cloud_transcription_service.transcribe( chunk_path, services=[service], language=language ) # Get the result for the requested service if service in results and results[service] and not results[service].startswith('Error:'): transcripts.append(results[service]) else: logger.warning(f"Failed to transcribe chunk with {service}, falling back to whisper") transcript = self.transcribe_with_whisper(chunk_path, language) transcripts.append(transcript) # Combine transcripts full_transcript = " ".join(transcripts) # Clean up chunks for chunk_path in chunk_paths: os.remove(chunk_path) return full_transcript else: # Transcribe directly with cloud service results = self.cloud_transcription_service.transcribe( audio_path, services=[service], language=language ) # Get the result for the requested service if service in results and results[service] and not results[service].startswith('Error:'): return results[service] else: logger.warning(f"Failed to transcribe with {service}, falling back to whisper") return self.transcribe_with_whisper(audio_path, language) # For local whisper transcription (default) else: # For long audio files, split into chunks and transcribe each chunk if os.path.getsize(audio_path) > 10 * 1024 * 1024: # 10 MB logger.info(f"Audio file is large, splitting into chunks") chunk_paths = self.split_audio(audio_path) # Transcribe each chunk transcripts = [] for chunk_path in chunk_paths: transcript = self.transcribe_with_whisper(chunk_path, language) transcripts.append(transcript) # Combine transcripts full_transcript = " ".join(transcripts) # Clean up chunks for chunk_path in chunk_paths: os.remove(chunk_path) return full_transcript else: # Transcribe directly return self.transcribe_with_whisper(audio_path, language) @time_it def process_video_speech(self, video_path: str, language: str = 'en', service: str = 'whisper') -> str: """ Process speech in a video file. Args: video_path: Path to the video file language: Language code service: Transcription service to use ('whisper', 'groq', 'google_cloud', 'openai_whisper') If 'whisper' is selected, local Whisper model will be used. If 'groq', 'google_cloud', or 'openai_whisper' are selected, cloud-based transcription will be used. If the requested service is not available, it will fall back to 'whisper'. Returns: Transcribed text """ audio_path = None extraction_method = None # Check if the requested service is available if service != 'whisper' and service not in self.cloud_transcription_service.available_recognizers: logger.warning(f"Requested service {service} is not available, falling back to whisper") service = 'whisper' try: # Extract audio start_time = time.time() audio_path = self.extract_audio(video_path) extraction_time = time.time() - start_time # Determine which method was used (for logging) if self.ffmpeg_success: extraction_method = "FFmpeg" else: extraction_method = "MoviePy" logger.info(f"Audio extracted using {extraction_method} in {extraction_time:.4f} seconds") # Transcribe audio start_time = time.time() transcript = self.transcribe_audio(audio_path, language, service) transcription_time = time.time() - start_time logger.info(f"Audio transcribed in {transcription_time:.4f} seconds") logger.info(f"Total speech processing time: {extraction_time + transcription_time:.4f} seconds") return transcript except Exception as e: logger.error(f"Error in process_video_speech: {str(e)}") raise finally: # Clean up if audio_path and os.path.exists(audio_path): try: os.remove(audio_path) logger.info(f"Temporary audio file {audio_path} removed") except Exception as e: logger.warning(f"Failed to remove temporary audio file {audio_path}: {str(e)}") # Force garbage collection import gc gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache()