File size: 21,980 Bytes
8ae78b0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
import os
import whisper
import speech_recognition as sr
import logging
import backoff
import subprocess
import time
import torch
import json
from pathlib import Path
from pydub import AudioSegment
from moviepy.editor import VideoFileClip
from typing import Optional, List, Dict, Any

# Fix import paths
try:
    from app.utils.device_utils import device, run_on_device
    from app.utils.logging_utils import time_it, setup_logger
except ImportError:
    # Try relative imports for running from project root
    from behavior_backend.app.utils.device_utils import device, run_on_device
    from behavior_backend.app.utils.logging_utils import time_it, setup_logger

# Configure logging
logger = setup_logger(__name__)

class TranscriptionService:
    """Service for cloud-based speech-to-text operations."""
    
    def __init__(self):
        """Initialize the transcription service."""
        self.recognizer = sr.Recognizer()
        
        # Load credentials
        self.credentials = self._load_credentials()
        
        # Define available recognizers
        self.available_recognizers = {}
        
        # Check which recognizers are available and add them
        if hasattr(self.recognizer, 'recognize_openai_whisper') or hasattr(self.recognizer, 'recognize_whisper_api'):
            self.available_recognizers['openai_whisper'] = self._transcribe_openai_whisper
        
        if hasattr(self.recognizer, 'recognize_google_cloud'):
            self.available_recognizers['google_cloud'] = self._transcribe_google_cloud
        
        if hasattr(self.recognizer, 'recognize_groq'):
            self.available_recognizers['groq'] = self._transcribe_groq
        
        logger.info(f"Available cloud transcription services: {', '.join(self.available_recognizers.keys())}")
    
    def _load_credentials(self):
        """Load all service credentials"""
        creds = {}
        try:
            # Google Cloud - check for credentials file in the project directory
            google_creds_path = os.path.join(os.path.dirname(__file__), "google_credentials.json")
            if os.path.exists(google_creds_path):
                creds['google_cloud'] = google_creds_path
            else:
                # Try environment variable
                creds['google_cloud'] = os.getenv('GOOGLE_APPLICATION_CREDENTIALS')
            
            # Groq API key
            creds['groq'] = os.getenv('GROQ_API_KEY')
            print('----------------------------------------------   ')
            print( "Groq API key:",creds['groq'])
            print('----------------------------------------------')
            # OpenAI API key
            creds['openai'] = os.getenv('OPENAI_API_KEY')
            print('----------------------------------------------')
            print( "OpenAI API key:",creds['openai'])
            print('----------------------------------------------')
        except Exception as e:
            logger.error(f"Error loading credentials: {e}")
        
        return creds
    
    def convert_to_wav(self, input_path):
        """Convert audio/video file to WAV format if needed"""
        input_path = Path(input_path)
        
        if input_path.suffix.lower() == '.wav':
            return str(input_path)
        
        output_path = input_path.with_suffix('.wav')
        logger.info(f"Converting {input_path} to WAV format")
        
        try:
            audio = AudioSegment.from_file(str(input_path))
            audio.export(str(output_path), format="wav")
            logger.info(f"Conversion completed: {output_path}")
            return str(output_path)
        except Exception as e:
            logger.error(f"Error converting file: {e}")
            raise
    
    @backoff.on_exception(
        backoff.expo,
        Exception,
        max_tries=3
    )
    def transcribe(self, audio_file_path, services=None, cleanup=True, language='en'):
        """
        Transcribe audio using multiple services
        
        Args:
            audio_file_path: Path to the audio file
            services: List of services to use for transcription
            cleanup: Whether to clean up temporary files
            language: Language code
            
        Returns:
            Dictionary of transcription results by service
        """
        if services is None:
            services = list(self.available_recognizers.keys())
            
        results = {}
        original_path = Path(audio_file_path)
        
        try:
            wav_path = self.convert_to_wav(audio_file_path)
            
            with sr.AudioFile(wav_path) as source:
                audio = self.recognizer.record(source)
                
            # Try each requested service
            for service in services:
                if service in self.available_recognizers:
                    try:
                        logger.info(f"Starting transcription with {service}")
                        text = self.available_recognizers[service](audio, language)
                        if text:
                            results[service] = text
                            logger.info(f"{service} transcription completed")
                    except Exception as e:
                        logger.error(f"{service} transcription failed: {e}")
                        results[service] = f"Error: {str(e)}"
            
            if cleanup and original_path.suffix.lower() != '.wav' and wav_path != str(original_path):
                os.remove(wav_path)
                logger.info("Cleaned up converted file")
                
            return results
            
        except Exception as e:
            logger.error(f"Transcription process failed: {e}")
            raise
    
    # Individual transcription methods
    def _transcribe_openai_whisper(self, audio, language):
        """Transcribe using OpenAI Whisper API"""
        if not self.credentials.get('openai'):
            raise ValueError("OpenAI API key not found")
        
        # Convert language code if needed (e.g., 'en-US' to 'en')
        whisper_lang = language.split('-')[0] if '-' in language else language
        
        # Try both method names that might be available
        if hasattr(self.recognizer, 'recognize_whisper_api'):
            return self.recognizer.recognize_whisper_api(
                audio, 
                api_key=self.credentials['openai'],
                language=whisper_lang
            )
        elif hasattr(self.recognizer, 'recognize_openai_whisper'):
            return self.recognizer.recognize_openai_whisper(
                audio, 
                api_key=self.credentials['openai'],
                language=whisper_lang
            )
        else:
            raise NotImplementedError("No OpenAI Whisper API recognition method available")
    
    def _transcribe_google_cloud(self, audio, language):
        """Transcribe using Google Cloud Speech-to-Text"""
        if not self.credentials.get('google_cloud'):
            raise ValueError("Google Cloud credentials not found")
        
        return self.recognizer.recognize_google_cloud(
            audio, 
            credentials_json=self.credentials['google_cloud'],
            language=language
        )
    
    def _transcribe_groq(self, audio, language):
        """Transcribe using Groq API"""
        if not self.credentials.get('groq'):
            raise ValueError("Groq API key not found")
        return self.recognizer.recognize_groq(audio)

class SpeechService:
    """Service for speech-to-text operations."""
    
    def __init__(self):
        """Initialize the speech service."""
        self.whisper_model = None
        self.ffmpeg_success = False
        self.cloud_transcription_service = TranscriptionService()
    
    @time_it
    def extract_audio(self, video_path: str) -> str:
        """
        Extract audio from a video file using FFmpeg (primary) or MoviePy (fallback).
        
        Args:
            video_path: Path to the video file
            
        Returns:
            Path to the extracted audio file
        """
        logger.info(f"Extracting audio from {video_path}")
        
        # Create output path
        video_filename = Path(video_path).stem
        audio_path = f"temp_{video_filename}.wav"
        
        # Try FFmpeg approach first
        self.ffmpeg_success = False
        ffmpeg_start_time = time.time()
        
        try:
            logger.info("Attempting audio extraction with FFmpeg...")
            result = subprocess.run([
                'ffmpeg',
                '-i', str(video_path),
                '-acodec', 'pcm_s16le',
                '-ar', '16000',  # 16kHz sample rate
                '-ac', '1',      # Mono channel
                '-y',            # Overwrite output file if it exists
                str(audio_path)
            ], check=True, capture_output=True, text=True)
            
            self.ffmpeg_success = True
            ffmpeg_end_time = time.time()
            ffmpeg_duration = ffmpeg_end_time - ffmpeg_start_time
            logger.info(f"FFmpeg audio extraction successful in {ffmpeg_duration:.4f} seconds")
            
        except (subprocess.CalledProcessError, FileNotFoundError) as e:
            ffmpeg_end_time = time.time()
            ffmpeg_duration = ffmpeg_end_time - ffmpeg_start_time
            logger.warning(f"FFmpeg audio extraction failed after {ffmpeg_duration:.4f} seconds: {str(e)}")
            logger.warning("Falling back to MoviePy for audio extraction...")
            
            # Fallback to MoviePy approach
            moviepy_start_time = time.time()
            try:
                # Extract audio using moviepy
                video = VideoFileClip(video_path)
                video.audio.write_audiofile(audio_path, codec='pcm_s16le', logger=None)
                video.close()  # Explicitly close to free resources
                
                moviepy_end_time = time.time()
                moviepy_duration = moviepy_end_time - moviepy_start_time
                logger.info(f"MoviePy audio extraction successful in {moviepy_duration:.4f} seconds")
                
            except Exception as e:
                moviepy_end_time = time.time()
                moviepy_duration = moviepy_end_time - moviepy_start_time
                logger.error(f"MoviePy audio extraction also failed after {moviepy_duration:.4f} seconds: {str(e)}")
                raise RuntimeError(f"Failed to extract audio from video using both FFmpeg and MoviePy: {str(e)}")
        
        # Verify the audio file exists and has content
        audio_file = Path(audio_path)
        if not audio_file.exists() or audio_file.stat().st_size == 0:
            logger.error(f"Audio extraction produced empty or missing file: {audio_path}")
            raise RuntimeError(f"Audio extraction failed: output file {audio_path} is empty or missing")
            
        logger.info(f"Audio extracted to {audio_path}")
        
        # Log performance comparison if both methods were used
        if not self.ffmpeg_success:
            logger.info(f"Audio extraction performance comparison - FFmpeg: {ffmpeg_duration:.4f}s, MoviePy: {moviepy_duration:.4f}s")
            
        return audio_path
    
    @time_it
    def split_audio(self, audio_path: str, chunk_length_ms: int = 30000) -> List[str]:
        """
        Split audio file into chunks for processing.
        
        Args:
            audio_path: Path to the audio file
            chunk_length_ms: Length of each chunk in milliseconds
            
        Returns:
            List of paths to audio chunks
        """
        logger.info(f"Splitting audio {audio_path} into {chunk_length_ms}ms chunks")
        
        # Load audio
        audio = AudioSegment.from_file(audio_path)
        
        # Create directory for chunks
        chunks_dir = Path("temp_chunks")
        chunks_dir.mkdir(exist_ok=True)
        
        # Split audio into chunks
        chunk_paths = []
        for i, chunk_start in enumerate(range(0, len(audio), chunk_length_ms)):
            chunk_end = min(chunk_start + chunk_length_ms, len(audio))
            chunk = audio[chunk_start:chunk_end]
            
            chunk_path = chunks_dir / f"chunk_{i}.wav"
            chunk.export(chunk_path, format="wav")
            chunk_paths.append(str(chunk_path))
        
        logger.info(f"Split audio into {len(chunk_paths)} chunks")
        return chunk_paths
    
    @run_on_device
    @time_it
    def transcribe_with_whisper(self, audio_path: str, language: str = 'en', device: str = 'cpu') -> str:
        """
        Transcribe audio using Whisper.
        
        Args:
            audio_path: Path to the audio file
            language: Language code
            device: Device to use for processing
            
        Returns:
            Transcribed text
        """
        logger.info(f"Transcribing {audio_path} with Whisper on {device}")
        
        try:
            # Load model if not already loaded or if device has changed
            if self.whisper_model is None or getattr(self, '_current_device', None) != device:
                # Clear existing model if it exists to free memory
                if self.whisper_model is not None:
                    del self.whisper_model
                    import gc
                    gc.collect()
                    torch.cuda.empty_cache() if device == 'cuda' else None
                    
                logger.info(f"Loading Whisper model on {device}")
                # Use tiny model instead of base to reduce memory usage
                self.whisper_model = whisper.load_model("tiny", device=device)
                self._current_device = device
            
            # Convert language code if needed (e.g., 'en-US' to 'en')
            if '-' in language:
                language = language.split('-')[0]
            
            # Transcribe audio with reduced compute settings
            result = self.whisper_model.transcribe(
                audio_path,
                language=language,
                fp16=(device == 'cuda'),  # Use fp16 only on CUDA
                beam_size=3,  # Reduce beam size (default is 5)
                best_of=1     # Reduce number of candidates (default is 5)
            )
            
            return result["text"]
        finally:
            # Force garbage collection after transcription to free memory
            import gc
            gc.collect()
            torch.cuda.empty_cache() if device == 'cuda' else None
    
    @backoff.on_exception(
        backoff.expo,
        Exception,
        max_tries=3
    )
    @time_it
    def transcribe_audio(self, audio_path: str, language: str = 'en', service: str = 'whisper') -> str:
        """
        Transcribe audio file to text.
        
        Args:
            audio_path: Path to the audio file
            language: Language code
            service: Transcription service to use ('whisper', 'groq', 'google_cloud', 'openai_whisper')
            
        Returns:
            Transcribed text
        """
        logger.info(f"Starting transcription of {audio_path} using {service}")
        
        # For cloud-based transcription services
        if service in ['groq', 'google_cloud', 'openai_whisper']:
            # Check if the requested service is available
            if service not in self.cloud_transcription_service.available_recognizers:
                logger.warning(f"Requested service {service} is not available, falling back to whisper")
                service = 'whisper'
        
        # Continue with the existing implementation
        if service in ['groq', 'google_cloud', 'openai_whisper']:
            logger.info(f"Using cloud-based transcription with {service}")
            
            # For long audio files, split into chunks and transcribe each chunk
            if os.path.getsize(audio_path) > 10 * 1024 * 1024:  # 10 MB
                logger.info(f"Audio file is large, splitting into chunks")
                chunk_paths = self.split_audio(audio_path)
                
                # Transcribe each chunk
                transcripts = []
                for chunk_path in chunk_paths:
                    # Transcribe with cloud service
                    results = self.cloud_transcription_service.transcribe(
                        chunk_path,
                        services=[service],
                        language=language
                    )
                    
                    # Get the result for the requested service
                    if service in results and results[service] and not results[service].startswith('Error:'):
                        transcripts.append(results[service])
                    else:
                        logger.warning(f"Failed to transcribe chunk with {service}, falling back to whisper")
                        transcript = self.transcribe_with_whisper(chunk_path, language)
                        transcripts.append(transcript)
                
                # Combine transcripts
                full_transcript = " ".join(transcripts)
                
                # Clean up chunks
                for chunk_path in chunk_paths:
                    os.remove(chunk_path)
                
                return full_transcript
            else:
                # Transcribe directly with cloud service
                results = self.cloud_transcription_service.transcribe(
                    audio_path,
                    services=[service],
                    language=language
                )
                
                # Get the result for the requested service
                if service in results and results[service] and not results[service].startswith('Error:'):
                    return results[service]
                else:
                    logger.warning(f"Failed to transcribe with {service}, falling back to whisper")
                    return self.transcribe_with_whisper(audio_path, language)
        
        # For local whisper transcription (default)
        else:
            # For long audio files, split into chunks and transcribe each chunk
            if os.path.getsize(audio_path) > 10 * 1024 * 1024:  # 10 MB
                logger.info(f"Audio file is large, splitting into chunks")
                chunk_paths = self.split_audio(audio_path)
                
                # Transcribe each chunk
                transcripts = []
                for chunk_path in chunk_paths:
                    transcript = self.transcribe_with_whisper(chunk_path, language)
                    transcripts.append(transcript)
                
                # Combine transcripts
                full_transcript = " ".join(transcripts)
                
                # Clean up chunks
                for chunk_path in chunk_paths:
                    os.remove(chunk_path)
                
                return full_transcript
            else:
                # Transcribe directly
                return self.transcribe_with_whisper(audio_path, language)
    
    @time_it
    def process_video_speech(self, video_path: str, language: str = 'en', service: str = 'whisper') -> str:
        """
        Process speech in a video file.
        
        Args:
            video_path: Path to the video file
            language: Language code
            service: Transcription service to use ('whisper', 'groq', 'google_cloud', 'openai_whisper')
                     If 'whisper' is selected, local Whisper model will be used.
                     If 'groq', 'google_cloud', or 'openai_whisper' are selected, cloud-based transcription will be used.
                     If the requested service is not available, it will fall back to 'whisper'.
            
        Returns:
            Transcribed text
        """
        audio_path = None
        extraction_method = None
        
        # Check if the requested service is available
        if service != 'whisper' and service not in self.cloud_transcription_service.available_recognizers:
            logger.warning(f"Requested service {service} is not available, falling back to whisper")
            service = 'whisper'
        
        try:
            # Extract audio
            start_time = time.time()
            audio_path = self.extract_audio(video_path)
            extraction_time = time.time() - start_time
            
            # Determine which method was used (for logging)
            if self.ffmpeg_success:
                extraction_method = "FFmpeg"
            else:
                extraction_method = "MoviePy"
            
            logger.info(f"Audio extracted using {extraction_method} in {extraction_time:.4f} seconds")
            
            # Transcribe audio
            start_time = time.time()
            transcript = self.transcribe_audio(audio_path, language, service)
            transcription_time = time.time() - start_time
            
            logger.info(f"Audio transcribed in {transcription_time:.4f} seconds")
            logger.info(f"Total speech processing time: {extraction_time + transcription_time:.4f} seconds")
            
            return transcript
        
        except Exception as e:
            logger.error(f"Error in process_video_speech: {str(e)}")
            raise
        
        finally:
            # Clean up
            if audio_path and os.path.exists(audio_path):
                try:
                    os.remove(audio_path)
                    logger.info(f"Temporary audio file {audio_path} removed")
                except Exception as e:
                    logger.warning(f"Failed to remove temporary audio file {audio_path}: {str(e)}")
            
            # Force garbage collection
            import gc
            gc.collect()
            if torch.cuda.is_available():
                torch.cuda.empty_cache()