Spaces:
Sleeping
Sleeping
| # core/transcription_service.py | |
| import whisperx | |
| import gradio as gr | |
| import gc | |
| import time | |
| import os | |
| from datetime import datetime | |
| import warnings | |
| # Suppress specific pyannote/pytorch warning about degrees of freedom | |
| warnings.filterwarnings("ignore", message="std\(\): degrees of freedom is <= 0") | |
| from src.config.settings import Config | |
| # CORRECT WAY: Import DiarizationPipeline at point of use | |
| from whisperx.diarize import DiarizationPipeline | |
| class TranscriptionService: | |
| def __init__(self): | |
| self.config = Config | |
| self.models_loaded = False | |
| self.whisper_model = None | |
| self.diarize_model = None | |
| self.batch_size = 16 | |
| def load_models(self): | |
| """Load AI models once - use pre-loaded models from init""" | |
| if not self.models_loaded: | |
| print("π₯ Loading transcription models...") | |
| # Use the model from config instead of hardcoding | |
| self.whisper_model = whisperx.load_model( | |
| self.config.WHISPER_MODEL, | |
| self.config.DEVICE, | |
| compute_type=self.config.COMPUTE_TYPE, | |
| language="en" | |
| ) | |
| self.diarize_model = DiarizationPipeline( | |
| use_auth_token=self.config.HUGGINGFACE_TOKEN, | |
| device=self.config.DEVICE | |
| ) | |
| self.models_loaded = True | |
| print("β Models loaded successfully") | |
| def transcribe_video(self, video_file_path, progress_callback=None): | |
| """Clean transcription pipeline without Gradio dependencies. | |
| Added optional progress callback""" | |
| try: | |
| if not self.models_loaded: | |
| self.load_models() | |
| start_time = time.time() | |
| print(f"π¬ Processing video: {os.path.basename(video_file_path)}") | |
| # ====================== | |
| # STEP 1: Load Audio from Video | |
| # ====================== | |
| if progress_callback: | |
| print(f"DEBUG: Calling progress callback 0.1. Type: {type(progress_callback)}") | |
| try: | |
| progress_callback(0.1, desc="π¬ Loading audio from video...") | |
| print("DEBUG: Progress callback 0.1 called successfully") | |
| except Exception as e: | |
| print(f"DEBUG: Error calling progress callback: {e}") | |
| time.sleep(0.5) | |
| print("1οΈβ£ Loading audio directly from video...") | |
| audio = whisperx.load_audio(video_file_path) | |
| print(f"β Audio loaded: {len(audio)} samples") | |
| # ====================== | |
| # STEP 2: Transcribe with Whisper | |
| # ====================== | |
| print("2οΈβ£ Loading Whisper model...") | |
| if progress_callback: | |
| progress_callback(0.3, desc="π€ Loading Whisper model...") | |
| time.sleep(0.5) | |
| if progress_callback: | |
| progress_callback(0.4, desc="π Transcribing audio...") | |
| time.sleep(0.5) | |
| print("3οΈβ£ Transcribing audio...") | |
| result = self.whisper_model.transcribe(audio, batch_size=self.batch_size) | |
| detected_language = result['language'] # Save language before it gets lost | |
| print(f"β Transcription complete ({detected_language} detected)") | |
| # ====================== | |
| # STEP 3: Align Timestamps | |
| # ====================== | |
| if progress_callback: | |
| progress_callback(0.5, desc="β±οΈ Aligning timestamps...") | |
| time.sleep(0.5) | |
| print("4οΈβ£ Aligning word-level timestamps...") | |
| # Load the alignment model and its metadata from whisperx for word-level timestamp alignment. | |
| model_a, metadata = whisperx.load_align_model( | |
| language_code=detected_language, | |
| device=self.config.DEVICE | |
| ) | |
| result = whisperx.align( | |
| result["segments"], | |
| model_a, | |
| metadata, | |
| audio, | |
| self.config.DEVICE, | |
| return_char_alignments=False | |
| ) | |
| # Restore language to result dict after alignment | |
| result["language"] = detected_language | |
| print("β Timestamps aligned") | |
| # ====================== | |
| # STEP 4: Speaker Diarization - CORRECT IMPORT | |
| # ====================== | |
| if progress_callback: | |
| progress_callback(0.7, desc="π₯ Identifying speakers...") | |
| time.sleep(0.5) | |
| print("5οΈβ£ Loading speaker diarization model...") | |
| diarize_segments = self.diarize_model(audio) | |
| # ====================== | |
| # STEP 5: Assign speakers | |
| # ====================== | |
| # | |
| if progress_callback: | |
| progress_callback(0.9, desc="π Assigning speakers to text...") | |
| time.sleep(0.5) | |
| result = whisperx.assign_word_speakers(diarize_segments, result) | |
| print("6οΈβ£ Assigning speakers to transcript...") | |
| print("π Assigning speakers to text...") | |
| result = whisperx.assign_word_speakers(diarize_segments, result) | |
| print("β Speaker assignment complete") | |
| if progress_callback: | |
| progress_callback(1.0, desc="β Complete!") | |
| time.sleep(0.5) | |
| # ====================== | |
| # STEP 6: Format results | |
| # ====================== | |
| processing_time = time.time() - start_time | |
| transcription = self._format_results(result, video_file_path) | |
| timing_info = self._get_timing_info(result, processing_time, video_file_path) | |
| return { | |
| "success": True, | |
| "transcription": transcription, | |
| "timing_info": timing_info, | |
| "raw_data": result, # Keep for potential storage | |
| "processing_time": processing_time, | |
| "speakers_count": len(set(seg.get("speaker", "UNKNOWN") for seg in result["segments"])) | |
| } | |
| except Exception as e: | |
| error_msg = f"Transcription failed: {str(e)}" | |
| print(f"β ERROR: {error_msg}") | |
| return { | |
| "success": False, | |
| "error": error_msg | |
| } | |
| def _format_results(self, result, video_file_path): | |
| """Format transcription with speaker labels and comprehensive meeting metadata""" | |
| if not result["segments"]: | |
| return "No transcription segments found" | |
| # Extract meeting metadata | |
| segments = result["segments"] | |
| speakers = set(segment.get("speaker", "UNKNOWN") for segment in segments) | |
| total_duration = segments[-1]["end"] if segments else 0 | |
| language = result.get("language", "unknown") | |
| # Calculate statistics | |
| total_words = sum(len(seg.get("text", "").split()) for seg in segments) | |
| avg_segment_length = total_words / len(segments) if segments else 0 | |
| # Build header with meeting context | |
| output = "# π― Meeting Transcription\n\n" | |
| output += "## π Meeting Information\n\n" | |
| output += f"**π File:** `{os.path.basename(video_file_path)}`\n" | |
| output += f"**π Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}\n" | |
| output += f"**β±οΈ Duration:** {self._format_timestamp(total_duration)}\n" | |
| output += f"**π₯ Speakers:** {len(speakers)}\n" | |
| output += f"**π Language:** {language.upper()}\n" | |
| output += f"**π€ Model:** {self.config.WHISPER_MODEL}\n\n" | |
| output += "---\n\n" | |
| output += "## π¬ Transcript\n\n" | |
| # Add transcript content | |
| current_speaker = None | |
| for segment in segments: | |
| speaker = segment.get("speaker", "UNKNOWN") | |
| start_time = self._format_timestamp(segment["start"]) | |
| if speaker != current_speaker: | |
| output += f"\n**π€ {speaker}:**\n" | |
| current_speaker = speaker | |
| output += f"[{start_time}] {segment['text'].strip()}\n" | |
| # Add comprehensive footer | |
| output += "\n---\n\n" | |
| output += "## π Transcript Statistics\n\n" | |
| output += f"**Total Segments:** {len(segments)}\n" | |
| output += f"**Total Words:** {total_words:,}\n" | |
| output += f"**Avg Words/Segment:** {avg_segment_length:.1f}\n" | |
| output += f"**Unique Speakers:** {len(speakers)}\n" | |
| output += f"**Speaker IDs:** {', '.join(sorted(speakers))}\n" | |
| return output | |
| def _get_timing_info(self, result, processing_time, video_file_path): | |
| """Generate timing information""" | |
| if not result["segments"]: | |
| return "No timing information available" | |
| total_duration = result["segments"][-1]["end"] | |
| speed_ratio = total_duration / processing_time if processing_time > 0 else 0 | |
| video_name = os.path.basename(video_file_path) | |
| return f""" | |
| ## β±οΈ Processing Statistics | |
| **File:** {video_name} | |
| \n**Duration:** {self._format_timestamp(total_duration)} | |
| **Processing Time:** {processing_time:.1f}s | |
| \n**Speed:** {speed_ratio:.1f}x ({'Faster' if speed_ratio > 1 else 'Slower'} than real-time) | |
| **Completed:** {datetime.now().strftime("%H:%M:%S")} | |
| """ | |
| def _format_timestamp(self, seconds): | |
| """Convert seconds to MM:SS format""" | |
| if seconds is None: | |
| return "00:00" | |
| minutes = int(seconds // 60) | |
| seconds = int(seconds % 60) | |
| return f"{minutes:02d}:{seconds:02d}" |