Spaces:
Sleeping
Sleeping
| """ | |
| Audio Processor | |
| Extracts audio from video and transcribes using Whisper | |
| Optimized for Arabic Quran recitation | |
| """ | |
| import os | |
| import subprocess | |
| import tempfile | |
| from pathlib import Path | |
| from typing import Optional, List, Dict | |
| import json | |
| # Try to import whisper | |
| try: | |
| import whisper | |
| WHISPER_AVAILABLE = True | |
| except ImportError: | |
| WHISPER_AVAILABLE = False | |
| print("Warning: Whisper not installed. Install with: pip install openai-whisper") | |
| class AudioProcessor: | |
| def __init__(self, model_size: str = "medium"): | |
| """ | |
| Initialize the audio processor | |
| Args: | |
| model_size: Whisper model size | |
| - "tiny": Fastest, least accurate | |
| - "base": Fast, basic accuracy | |
| - "small": Good balance | |
| - "medium": Recommended for Arabic (best balance) | |
| - "large": Most accurate, slowest (requires more VRAM) | |
| """ | |
| self.model_size = model_size | |
| self.model = None | |
| self.temp_dir = Path(tempfile.gettempdir()) / "quran_srt" | |
| self.temp_dir.mkdir(exist_ok=True) | |
| def load_model(self): | |
| """Load Whisper model (lazy loading)""" | |
| if not WHISPER_AVAILABLE: | |
| raise RuntimeError("Whisper is not installed. Run: pip install openai-whisper") | |
| if self.model is None: | |
| print(f"Loading Whisper {self.model_size} model...") | |
| self.model = whisper.load_model(self.model_size) | |
| print("Model loaded successfully!") | |
| return self.model | |
| def extract_audio(self, video_path: str, output_path: Optional[str] = None) -> str: | |
| """ | |
| Extract audio from video file using FFmpeg | |
| Args: | |
| video_path: Path to the video file | |
| output_path: Optional output path for audio file | |
| Returns: | |
| Path to the extracted audio file | |
| """ | |
| video_path = Path(video_path) | |
| if not video_path.exists(): | |
| raise FileNotFoundError(f"Video file not found: {video_path}") | |
| if output_path is None: | |
| output_path = self.temp_dir / f"{video_path.stem}_audio.wav" | |
| else: | |
| output_path = Path(output_path) | |
| # FFmpeg command to extract audio as WAV (16kHz for Whisper) | |
| cmd = [ | |
| "ffmpeg", | |
| "-i", str(video_path), | |
| "-vn", # No video | |
| "-acodec", "pcm_s16le", # PCM format | |
| "-ar", "16000", # 16kHz sample rate (Whisper optimal) | |
| "-ac", "1", # Mono | |
| "-y", # Overwrite output | |
| str(output_path) | |
| ] | |
| try: | |
| result = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| check=True | |
| ) | |
| print(f"Audio extracted to: {output_path}") | |
| return str(output_path) | |
| except subprocess.CalledProcessError as e: | |
| raise RuntimeError(f"FFmpeg error: {e.stderr}") | |
| except FileNotFoundError: | |
| raise RuntimeError("FFmpeg not found. Please install FFmpeg.") | |
| def transcribe( | |
| self, | |
| audio_path: str, | |
| language: str = "ar", | |
| task: str = "transcribe" | |
| ) -> Dict: | |
| """ | |
| Transcribe audio using Whisper | |
| Args: | |
| audio_path: Path to audio file | |
| language: Language code ("ar" for Arabic) | |
| task: "transcribe" for same language, "translate" for English | |
| Returns: | |
| Transcription result with segments and timestamps | |
| """ | |
| model = self.load_model() | |
| print(f"Transcribing audio: {audio_path}") | |
| print("This may take a few minutes depending on the video length...") | |
| result = model.transcribe( | |
| audio_path, | |
| language=language, | |
| task=task, | |
| word_timestamps=True, # Get word-level timestamps | |
| verbose=False, | |
| initial_prompt="ุจุณู ุงููู ุงูุฑุญู ู ุงูุฑุญูู ", # Help with Quran context | |
| ) | |
| return result | |
| def transcribe_video( | |
| self, | |
| video_path: str, | |
| language: str = "ar" | |
| ) -> Dict: | |
| """ | |
| Full pipeline: extract audio and transcribe | |
| Args: | |
| video_path: Path to video file | |
| language: Language code | |
| Returns: | |
| Transcription result with segments | |
| """ | |
| # Extract audio | |
| audio_path = self.extract_audio(video_path) | |
| # Transcribe | |
| result = self.transcribe(audio_path, language=language) | |
| # Clean up temp audio file | |
| try: | |
| os.remove(audio_path) | |
| except: | |
| pass | |
| return result | |
| def get_segments_with_timing(self, transcription: Dict) -> List[Dict]: | |
| """ | |
| Extract segments with precise timing from transcription | |
| Args: | |
| transcription: Whisper transcription result | |
| Returns: | |
| List of segments with start, end, and text | |
| """ | |
| segments = [] | |
| for segment in transcription.get("segments", []): | |
| segments.append({ | |
| "id": segment.get("id", len(segments)), | |
| "start": segment.get("start", 0), | |
| "end": segment.get("end", 0), | |
| "text": segment.get("text", "").strip(), | |
| "words": segment.get("words", []), | |
| "confidence": segment.get("avg_logprob", 0) | |
| }) | |
| return segments | |
| class MockAudioProcessor: | |
| """ | |
| Mock processor for testing without Whisper installed | |
| """ | |
| def __init__(self, model_size: str = "medium"): | |
| self.model_size = model_size | |
| def transcribe_video(self, video_path: str, language: str = "ar") -> Dict: | |
| """Return mock transcription for testing""" | |
| return { | |
| "text": "ุจุณู ุงููู ุงูุฑุญู ู ุงูุฑุญูู ุงูุญู ุฏ ููู ุฑุจ ุงูุนุงูู ูู", | |
| "segments": [ | |
| { | |
| "id": 0, | |
| "start": 0.0, | |
| "end": 3.5, | |
| "text": "ุจุณู ุงููู ุงูุฑุญู ู ุงูุฑุญูู ", | |
| "words": [] | |
| }, | |
| { | |
| "id": 1, | |
| "start": 3.5, | |
| "end": 6.0, | |
| "text": "ุงูุญู ุฏ ููู ุฑุจ ุงูุนุงูู ูู", | |
| "words": [] | |
| } | |
| ], | |
| "language": "ar" | |
| } | |
| def get_segments_with_timing(self, transcription: Dict) -> List[Dict]: | |
| return transcription.get("segments", []) | |
| def get_processor(model_size: str = "medium") -> AudioProcessor: | |
| """ | |
| Get appropriate processor based on Whisper availability | |
| """ | |
| if WHISPER_AVAILABLE: | |
| return AudioProcessor(model_size) | |
| else: | |
| print("Using mock processor (Whisper not installed)") | |
| return MockAudioProcessor(model_size) | |
| # For testing | |
| if __name__ == "__main__": | |
| processor = get_processor() | |
| print(f"Whisper available: {WHISPER_AVAILABLE}") | |
| print(f"Processor type: {type(processor).__name__}") | |
| # Test with mock data | |
| if not WHISPER_AVAILABLE: | |
| result = processor.transcribe_video("test.mp4") | |
| segments = processor.get_segments_with_timing(result) | |
| print("\nMock transcription result:") | |
| for seg in segments: | |
| print(f"[{seg['start']:.2f} - {seg['end']:.2f}] {seg['text']}") | |