test / behavior_backend /app /services /processing /speech_service.py
hibatorrahmen's picture
Add backend application and Dockerfile
8ae78b0
import os
import whisper
import speech_recognition as sr
import logging
import backoff
import subprocess
import time
import torch
import json
from pathlib import Path
from pydub import AudioSegment
from moviepy.editor import VideoFileClip
from typing import Optional, List, Dict, Any
# Fix import paths
try:
from app.utils.device_utils import device, run_on_device
from app.utils.logging_utils import time_it, setup_logger
except ImportError:
# Try relative imports for running from project root
from behavior_backend.app.utils.device_utils import device, run_on_device
from behavior_backend.app.utils.logging_utils import time_it, setup_logger
# Configure logging
logger = setup_logger(__name__)
class TranscriptionService:
"""Service for cloud-based speech-to-text operations."""
def __init__(self):
"""Initialize the transcription service."""
self.recognizer = sr.Recognizer()
# Load credentials
self.credentials = self._load_credentials()
# Define available recognizers
self.available_recognizers = {}
# Check which recognizers are available and add them
if hasattr(self.recognizer, 'recognize_openai_whisper') or hasattr(self.recognizer, 'recognize_whisper_api'):
self.available_recognizers['openai_whisper'] = self._transcribe_openai_whisper
if hasattr(self.recognizer, 'recognize_google_cloud'):
self.available_recognizers['google_cloud'] = self._transcribe_google_cloud
if hasattr(self.recognizer, 'recognize_groq'):
self.available_recognizers['groq'] = self._transcribe_groq
logger.info(f"Available cloud transcription services: {', '.join(self.available_recognizers.keys())}")
def _load_credentials(self):
"""Load all service credentials"""
creds = {}
try:
# Google Cloud - check for credentials file in the project directory
google_creds_path = os.path.join(os.path.dirname(__file__), "google_credentials.json")
if os.path.exists(google_creds_path):
creds['google_cloud'] = google_creds_path
else:
# Try environment variable
creds['google_cloud'] = os.getenv('GOOGLE_APPLICATION_CREDENTIALS')
# Groq API key
creds['groq'] = os.getenv('GROQ_API_KEY')
print('---------------------------------------------- ')
print( "Groq API key:",creds['groq'])
print('----------------------------------------------')
# OpenAI API key
creds['openai'] = os.getenv('OPENAI_API_KEY')
print('----------------------------------------------')
print( "OpenAI API key:",creds['openai'])
print('----------------------------------------------')
except Exception as e:
logger.error(f"Error loading credentials: {e}")
return creds
def convert_to_wav(self, input_path):
"""Convert audio/video file to WAV format if needed"""
input_path = Path(input_path)
if input_path.suffix.lower() == '.wav':
return str(input_path)
output_path = input_path.with_suffix('.wav')
logger.info(f"Converting {input_path} to WAV format")
try:
audio = AudioSegment.from_file(str(input_path))
audio.export(str(output_path), format="wav")
logger.info(f"Conversion completed: {output_path}")
return str(output_path)
except Exception as e:
logger.error(f"Error converting file: {e}")
raise
@backoff.on_exception(
backoff.expo,
Exception,
max_tries=3
)
def transcribe(self, audio_file_path, services=None, cleanup=True, language='en'):
"""
Transcribe audio using multiple services
Args:
audio_file_path: Path to the audio file
services: List of services to use for transcription
cleanup: Whether to clean up temporary files
language: Language code
Returns:
Dictionary of transcription results by service
"""
if services is None:
services = list(self.available_recognizers.keys())
results = {}
original_path = Path(audio_file_path)
try:
wav_path = self.convert_to_wav(audio_file_path)
with sr.AudioFile(wav_path) as source:
audio = self.recognizer.record(source)
# Try each requested service
for service in services:
if service in self.available_recognizers:
try:
logger.info(f"Starting transcription with {service}")
text = self.available_recognizers[service](audio, language)
if text:
results[service] = text
logger.info(f"{service} transcription completed")
except Exception as e:
logger.error(f"{service} transcription failed: {e}")
results[service] = f"Error: {str(e)}"
if cleanup and original_path.suffix.lower() != '.wav' and wav_path != str(original_path):
os.remove(wav_path)
logger.info("Cleaned up converted file")
return results
except Exception as e:
logger.error(f"Transcription process failed: {e}")
raise
# Individual transcription methods
def _transcribe_openai_whisper(self, audio, language):
"""Transcribe using OpenAI Whisper API"""
if not self.credentials.get('openai'):
raise ValueError("OpenAI API key not found")
# Convert language code if needed (e.g., 'en-US' to 'en')
whisper_lang = language.split('-')[0] if '-' in language else language
# Try both method names that might be available
if hasattr(self.recognizer, 'recognize_whisper_api'):
return self.recognizer.recognize_whisper_api(
audio,
api_key=self.credentials['openai'],
language=whisper_lang
)
elif hasattr(self.recognizer, 'recognize_openai_whisper'):
return self.recognizer.recognize_openai_whisper(
audio,
api_key=self.credentials['openai'],
language=whisper_lang
)
else:
raise NotImplementedError("No OpenAI Whisper API recognition method available")
def _transcribe_google_cloud(self, audio, language):
"""Transcribe using Google Cloud Speech-to-Text"""
if not self.credentials.get('google_cloud'):
raise ValueError("Google Cloud credentials not found")
return self.recognizer.recognize_google_cloud(
audio,
credentials_json=self.credentials['google_cloud'],
language=language
)
def _transcribe_groq(self, audio, language):
"""Transcribe using Groq API"""
if not self.credentials.get('groq'):
raise ValueError("Groq API key not found")
return self.recognizer.recognize_groq(audio)
class SpeechService:
"""Service for speech-to-text operations."""
def __init__(self):
"""Initialize the speech service."""
self.whisper_model = None
self.ffmpeg_success = False
self.cloud_transcription_service = TranscriptionService()
@time_it
def extract_audio(self, video_path: str) -> str:
"""
Extract audio from a video file using FFmpeg (primary) or MoviePy (fallback).
Args:
video_path: Path to the video file
Returns:
Path to the extracted audio file
"""
logger.info(f"Extracting audio from {video_path}")
# Create output path
video_filename = Path(video_path).stem
audio_path = f"temp_{video_filename}.wav"
# Try FFmpeg approach first
self.ffmpeg_success = False
ffmpeg_start_time = time.time()
try:
logger.info("Attempting audio extraction with FFmpeg...")
result = subprocess.run([
'ffmpeg',
'-i', str(video_path),
'-acodec', 'pcm_s16le',
'-ar', '16000', # 16kHz sample rate
'-ac', '1', # Mono channel
'-y', # Overwrite output file if it exists
str(audio_path)
], check=True, capture_output=True, text=True)
self.ffmpeg_success = True
ffmpeg_end_time = time.time()
ffmpeg_duration = ffmpeg_end_time - ffmpeg_start_time
logger.info(f"FFmpeg audio extraction successful in {ffmpeg_duration:.4f} seconds")
except (subprocess.CalledProcessError, FileNotFoundError) as e:
ffmpeg_end_time = time.time()
ffmpeg_duration = ffmpeg_end_time - ffmpeg_start_time
logger.warning(f"FFmpeg audio extraction failed after {ffmpeg_duration:.4f} seconds: {str(e)}")
logger.warning("Falling back to MoviePy for audio extraction...")
# Fallback to MoviePy approach
moviepy_start_time = time.time()
try:
# Extract audio using moviepy
video = VideoFileClip(video_path)
video.audio.write_audiofile(audio_path, codec='pcm_s16le', logger=None)
video.close() # Explicitly close to free resources
moviepy_end_time = time.time()
moviepy_duration = moviepy_end_time - moviepy_start_time
logger.info(f"MoviePy audio extraction successful in {moviepy_duration:.4f} seconds")
except Exception as e:
moviepy_end_time = time.time()
moviepy_duration = moviepy_end_time - moviepy_start_time
logger.error(f"MoviePy audio extraction also failed after {moviepy_duration:.4f} seconds: {str(e)}")
raise RuntimeError(f"Failed to extract audio from video using both FFmpeg and MoviePy: {str(e)}")
# Verify the audio file exists and has content
audio_file = Path(audio_path)
if not audio_file.exists() or audio_file.stat().st_size == 0:
logger.error(f"Audio extraction produced empty or missing file: {audio_path}")
raise RuntimeError(f"Audio extraction failed: output file {audio_path} is empty or missing")
logger.info(f"Audio extracted to {audio_path}")
# Log performance comparison if both methods were used
if not self.ffmpeg_success:
logger.info(f"Audio extraction performance comparison - FFmpeg: {ffmpeg_duration:.4f}s, MoviePy: {moviepy_duration:.4f}s")
return audio_path
@time_it
def split_audio(self, audio_path: str, chunk_length_ms: int = 30000) -> List[str]:
"""
Split audio file into chunks for processing.
Args:
audio_path: Path to the audio file
chunk_length_ms: Length of each chunk in milliseconds
Returns:
List of paths to audio chunks
"""
logger.info(f"Splitting audio {audio_path} into {chunk_length_ms}ms chunks")
# Load audio
audio = AudioSegment.from_file(audio_path)
# Create directory for chunks
chunks_dir = Path("temp_chunks")
chunks_dir.mkdir(exist_ok=True)
# Split audio into chunks
chunk_paths = []
for i, chunk_start in enumerate(range(0, len(audio), chunk_length_ms)):
chunk_end = min(chunk_start + chunk_length_ms, len(audio))
chunk = audio[chunk_start:chunk_end]
chunk_path = chunks_dir / f"chunk_{i}.wav"
chunk.export(chunk_path, format="wav")
chunk_paths.append(str(chunk_path))
logger.info(f"Split audio into {len(chunk_paths)} chunks")
return chunk_paths
@run_on_device
@time_it
def transcribe_with_whisper(self, audio_path: str, language: str = 'en', device: str = 'cpu') -> str:
"""
Transcribe audio using Whisper.
Args:
audio_path: Path to the audio file
language: Language code
device: Device to use for processing
Returns:
Transcribed text
"""
logger.info(f"Transcribing {audio_path} with Whisper on {device}")
try:
# Load model if not already loaded or if device has changed
if self.whisper_model is None or getattr(self, '_current_device', None) != device:
# Clear existing model if it exists to free memory
if self.whisper_model is not None:
del self.whisper_model
import gc
gc.collect()
torch.cuda.empty_cache() if device == 'cuda' else None
logger.info(f"Loading Whisper model on {device}")
# Use tiny model instead of base to reduce memory usage
self.whisper_model = whisper.load_model("tiny", device=device)
self._current_device = device
# Convert language code if needed (e.g., 'en-US' to 'en')
if '-' in language:
language = language.split('-')[0]
# Transcribe audio with reduced compute settings
result = self.whisper_model.transcribe(
audio_path,
language=language,
fp16=(device == 'cuda'), # Use fp16 only on CUDA
beam_size=3, # Reduce beam size (default is 5)
best_of=1 # Reduce number of candidates (default is 5)
)
return result["text"]
finally:
# Force garbage collection after transcription to free memory
import gc
gc.collect()
torch.cuda.empty_cache() if device == 'cuda' else None
@backoff.on_exception(
backoff.expo,
Exception,
max_tries=3
)
@time_it
def transcribe_audio(self, audio_path: str, language: str = 'en', service: str = 'whisper') -> str:
"""
Transcribe audio file to text.
Args:
audio_path: Path to the audio file
language: Language code
service: Transcription service to use ('whisper', 'groq', 'google_cloud', 'openai_whisper')
Returns:
Transcribed text
"""
logger.info(f"Starting transcription of {audio_path} using {service}")
# For cloud-based transcription services
if service in ['groq', 'google_cloud', 'openai_whisper']:
# Check if the requested service is available
if service not in self.cloud_transcription_service.available_recognizers:
logger.warning(f"Requested service {service} is not available, falling back to whisper")
service = 'whisper'
# Continue with the existing implementation
if service in ['groq', 'google_cloud', 'openai_whisper']:
logger.info(f"Using cloud-based transcription with {service}")
# For long audio files, split into chunks and transcribe each chunk
if os.path.getsize(audio_path) > 10 * 1024 * 1024: # 10 MB
logger.info(f"Audio file is large, splitting into chunks")
chunk_paths = self.split_audio(audio_path)
# Transcribe each chunk
transcripts = []
for chunk_path in chunk_paths:
# Transcribe with cloud service
results = self.cloud_transcription_service.transcribe(
chunk_path,
services=[service],
language=language
)
# Get the result for the requested service
if service in results and results[service] and not results[service].startswith('Error:'):
transcripts.append(results[service])
else:
logger.warning(f"Failed to transcribe chunk with {service}, falling back to whisper")
transcript = self.transcribe_with_whisper(chunk_path, language)
transcripts.append(transcript)
# Combine transcripts
full_transcript = " ".join(transcripts)
# Clean up chunks
for chunk_path in chunk_paths:
os.remove(chunk_path)
return full_transcript
else:
# Transcribe directly with cloud service
results = self.cloud_transcription_service.transcribe(
audio_path,
services=[service],
language=language
)
# Get the result for the requested service
if service in results and results[service] and not results[service].startswith('Error:'):
return results[service]
else:
logger.warning(f"Failed to transcribe with {service}, falling back to whisper")
return self.transcribe_with_whisper(audio_path, language)
# For local whisper transcription (default)
else:
# For long audio files, split into chunks and transcribe each chunk
if os.path.getsize(audio_path) > 10 * 1024 * 1024: # 10 MB
logger.info(f"Audio file is large, splitting into chunks")
chunk_paths = self.split_audio(audio_path)
# Transcribe each chunk
transcripts = []
for chunk_path in chunk_paths:
transcript = self.transcribe_with_whisper(chunk_path, language)
transcripts.append(transcript)
# Combine transcripts
full_transcript = " ".join(transcripts)
# Clean up chunks
for chunk_path in chunk_paths:
os.remove(chunk_path)
return full_transcript
else:
# Transcribe directly
return self.transcribe_with_whisper(audio_path, language)
@time_it
def process_video_speech(self, video_path: str, language: str = 'en', service: str = 'whisper') -> str:
"""
Process speech in a video file.
Args:
video_path: Path to the video file
language: Language code
service: Transcription service to use ('whisper', 'groq', 'google_cloud', 'openai_whisper')
If 'whisper' is selected, local Whisper model will be used.
If 'groq', 'google_cloud', or 'openai_whisper' are selected, cloud-based transcription will be used.
If the requested service is not available, it will fall back to 'whisper'.
Returns:
Transcribed text
"""
audio_path = None
extraction_method = None
# Check if the requested service is available
if service != 'whisper' and service not in self.cloud_transcription_service.available_recognizers:
logger.warning(f"Requested service {service} is not available, falling back to whisper")
service = 'whisper'
try:
# Extract audio
start_time = time.time()
audio_path = self.extract_audio(video_path)
extraction_time = time.time() - start_time
# Determine which method was used (for logging)
if self.ffmpeg_success:
extraction_method = "FFmpeg"
else:
extraction_method = "MoviePy"
logger.info(f"Audio extracted using {extraction_method} in {extraction_time:.4f} seconds")
# Transcribe audio
start_time = time.time()
transcript = self.transcribe_audio(audio_path, language, service)
transcription_time = time.time() - start_time
logger.info(f"Audio transcribed in {transcription_time:.4f} seconds")
logger.info(f"Total speech processing time: {extraction_time + transcription_time:.4f} seconds")
return transcript
except Exception as e:
logger.error(f"Error in process_video_speech: {str(e)}")
raise
finally:
# Clean up
if audio_path and os.path.exists(audio_path):
try:
os.remove(audio_path)
logger.info(f"Temporary audio file {audio_path} removed")
except Exception as e:
logger.warning(f"Failed to remove temporary audio file {audio_path}: {str(e)}")
# Force garbage collection
import gc
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()