Spaces:
Build error
Build error
| import os | |
| import asyncio | |
| import whisper | |
| import gradio as gr | |
| import torch | |
| import logging | |
| from pathlib import Path | |
| import ffmpeg | |
| import re | |
| from tqdm import tqdm | |
| from cryptography.fernet import Fernet | |
| from pyannote.audio import Pipeline | |
| from pyannote.core import Segment | |
| import numpy as np | |
| import sounddevice as sd | |
| import soundfile as sf | |
| import time | |
| import threading | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| TEMP_FOLDER = 'temp/' | |
| SUPPORTED_FORMATS = ['.mp3', '.wav', '.aac', '.flac', '.ogg', '.m4a', '.mp4', '.avi', '.mov', '.mkv', '.webm'] | |
| MAX_AUDIO_LENGTH = 600 | |
| class WhisperModelCache: | |
| _instance = None | |
| def get_instance(): | |
| if WhisperModelCache._instance is None: | |
| WhisperModelCache._instance = WhisperModelCache() | |
| return WhisperModelCache._instance | |
| def __init__(self): | |
| self.model = None | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| def load_model(self, model_size="medium"): | |
| if self.model is None: | |
| logger.info(f"Loading Whisper model: {model_size} on {self.device}") | |
| self.model = whisper.load_model(model_size, device=self.device) | |
| return self.model | |
| def create_folders(): | |
| Path(TEMP_FOLDER).mkdir(exist_ok=True) | |
| def is_supported_format(file): | |
| return file is not None and any(file.name.lower().endswith(ext) for ext in SUPPORTED_FORMATS) | |
| def convert_to_wav(original_file_path): | |
| output_path = os.path.join(TEMP_FOLDER, os.path.splitext(os.path.basename(original_file_path))[0] + '.wav') | |
| try: | |
| ( | |
| ffmpeg | |
| .input(original_file_path) | |
| .output(output_path, acodec='pcm_s16le', ac=1, ar='16k') | |
| .overwrite_output() | |
| .run(capture_stdout=True, capture_stderr=True) | |
| ) | |
| return output_path | |
| except ffmpeg.Error as e: | |
| logger.error(f'Error converting {original_file_path}: {e.stderr.decode()}') | |
| return None | |
| def generate_key(): | |
| return Fernet.generate_key() | |
| def encrypt_file(key, filename): | |
| f = Fernet(key) | |
| with open(filename, "rb") as file: | |
| original_data = file.read() | |
| encrypted_data = f.encrypt(original_data) | |
| with open(filename, "wb") as file: | |
| file.write(encrypted_data) | |
| def decrypt_file(key, filename): | |
| f = Fernet(key) | |
| with open(filename, "rb") as file: | |
| encrypted_data = file.read() | |
| decrypted_data = f.decrypt(encrypted_data) | |
| with open(filename, "wb") as file: | |
| file.write(decrypted_data) | |
| async def transcribe_audio(audio_path, language, task='transcribe', initial_prompt=None, temperature=0.5, num_speakers=1): | |
| try: | |
| model = WhisperModelCache.get_instance().load_model() | |
| result = await asyncio.to_thread( | |
| model.transcribe, | |
| audio_path, | |
| language=language, | |
| task=task, | |
| initial_prompt=initial_prompt, | |
| temperature=temperature, | |
| ) | |
| if num_speakers > 1: | |
| diarization = await perform_diarization(audio_path, num_speakers) | |
| result['text'] = apply_diarization(result, diarization) | |
| return result['text'] | |
| except Exception as e: | |
| logger.error(f"Error transcribing {audio_path}: {str(e)}") | |
| return f"Error during transcription: {str(e)}" | |
| async def perform_diarization(audio_path, num_speakers): | |
| pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization@2.1", | |
| use_auth_token="YOUR_HF_AUTH_TOKEN") | |
| return pipeline(audio_path, num_speakers=num_speakers) | |
| def apply_diarization(whisper_result, diarization): | |
| speaker_segments = [] | |
| for turn, _, speaker in diarization.itertracks(yield_label=True): | |
| speaker_segments.append((turn.start, turn.end, speaker)) | |
| diarized_text = "" | |
| for segment in whisper_result['segments']: | |
| start_time = segment['start'] | |
| end_time = segment['end'] | |
| text = segment['text'] | |
| speaker = "Unknown" | |
| for s_start, s_end, s_label in speaker_segments: | |
| if Segment(start_time, end_time).intersects(Segment(s_start, s_end)): | |
| speaker = s_label | |
| break | |
| diarized_text += f"[{speaker}]: {text}\n" | |
| return diarized_text | |
| def anonymize_text(text): | |
| text = re.sub(r'\b[A-Z][a-z]+ [A-Z][a-z]+\b|\S+@\S+|\d{3}[-.]?\d{3}[-.]?\d{4}', | |
| lambda m: '[NAME]' if re.match(r'\b[A-Z][a-z]+ [A-Z][a-z]+\b', m.group()) else | |
| '[EMAIL]' if '@' in m.group() else '[PHONE]', | |
| text) | |
| return text | |
| class RealTimeTranscriber: | |
| def __init__(self, language, task, initial_prompt, temperature): | |
| self.language = language | |
| self.task = task | |
| self.initial_prompt = initial_prompt | |
| self.temperature = temperature | |
| self.model = WhisperModelCache.get_instance().load_model() | |
| self.audio_queue = asyncio.Queue() | |
| self.is_recording = False | |
| self.transcription = "" | |
| async def start_recording(self): | |
| self.is_recording = True | |
| threading.Thread(target=self._record_audio, daemon=True).start() | |
| while self.is_recording: | |
| audio_chunk = await self.audio_queue.get() | |
| if audio_chunk is not None: | |
| result = await asyncio.to_thread( | |
| self.model.transcribe, | |
| audio_chunk, | |
| language=self.language, | |
| task=self.task, | |
| initial_prompt=self.initial_prompt, | |
| temperature=self.temperature | |
| ) | |
| self.transcription += result['text'] + " " | |
| await asyncio.sleep(0.1) | |
| return self.transcription | |
| def stop_recording(self): | |
| self.is_recording = False | |
| def _record_audio(self): | |
| with sd.InputStream(samplerate=16000, channels=1, callback=self._audio_callback): | |
| while self.is_recording: | |
| sd.sleep(100) | |
| def _audio_callback(self, indata, frames, time, status): | |
| if status: | |
| logger.warning(f"Audio callback status: {status}") | |
| audio_chunk = np.frombuffer(indata, dtype=np.float32) | |
| asyncio.run_coroutine_threadsafe(self.audio_queue.put(audio_chunk), asyncio.get_event_loop()) | |
| async def process_audio(file, language, task, anonymize, initial_prompt, temperature, encryption_key, num_speakers): | |
| try: | |
| if not file: | |
| return "Error: Please upload an audio or video file." | |
| if not is_supported_format(file): | |
| return f"Error: Unsupported file format: {file.name}" | |
| if encryption_key: | |
| try: | |
| encrypt_file(encryption_key.encode(), file.name) | |
| logger.info("File encrypted successfully.") | |
| except Exception as e: | |
| logger.error(f"Encryption failed: {str(e)}") | |
| return f"Error: Encryption failed: {str(e)}" | |
| temp_audio_path = convert_to_wav(file.name) | |
| if not temp_audio_path: | |
| return f"Error: Failed to convert {file.name} to WAV format." | |
| transcription = await transcribe_audio( | |
| temp_audio_path, | |
| language, | |
| task=task, | |
| initial_prompt=initial_prompt, | |
| temperature=temperature, | |
| num_speakers=num_speakers | |
| ) | |
| os.remove(temp_audio_path) | |
| if anonymize: | |
| transcription = anonymize_text(transcription) | |
| if encryption_key: | |
| try: | |
| decrypt_file(encryption_key.encode(), file.name) | |
| logger.info("File decrypted successfully.") | |
| except Exception as e: | |
| logger.error(f"Decryption failed: {str(e)}") | |
| return f"Error: Decryption failed: {str(e)}" | |
| return transcription | |
| except Exception as e: | |
| logger.error(f"Error processing audio: {e}") | |
| return f"Error: {str(e)}" | |
| def create_ui(): | |
| languages = { | |
| "en": "English", "es": "Spanish", "fr": "French", "de": "German", "it": "Italian", | |
| "pt": "Portuguese", "nl": "Dutch", "ru": "Russian", "zh": "Chinese", "ja": "Japanese", | |
| "ko": "Korean", "ar": "Arabic", "hi": "Hindi", "bn": "Bengali", "ur": "Urdu", | |
| "te": "Telugu", "ta": "Tamil", "mr": "Marathi", "gu": "Gujarati", "kn": "Kannada" | |
| } | |
| with gr.Blocks(title="Advanced Whisper Transcription App", theme=gr.themes.Soft()) as interface: | |
| gr.Markdown( | |
| """ | |
| # 🎙️ Advanced Whisper Transcription App | |
| Transcribe or translate your audio and video files with ease, now with real-time processing! | |
| ## Features: | |
| - Support for multiple audio and video formats | |
| - Speaker diarization for multi-speaker audio | |
| - Real-time transcription | |
| - Anonymization of personal information | |
| - File encryption for enhanced security | |
| """ | |
| ) | |
| with gr.Tabs(): | |
| with gr.TabItem("File Upload"): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| file_input = gr.File(label="Upload Audio/Video") | |
| language_dropdown = gr.Dropdown( | |
| choices=list(languages.items()), | |
| label="Language", | |
| value="en", | |
| info="Select the language of the audio." | |
| ) | |
| task_dropdown = gr.Dropdown( | |
| choices=["transcribe", "translate"], | |
| label="Task", | |
| value="transcribe" | |
| ) | |
| num_speakers = gr.Slider( | |
| minimum=1, | |
| maximum=10, | |
| value=1, | |
| step=1, | |
| label="Number of Speakers", | |
| info="Set to 1 for single-speaker audio, or higher for multi-speaker recognition." | |
| ) | |
| anonymize_checkbox = gr.Checkbox(label="Anonymize Transcription") | |
| prompt_input = gr.Textbox( | |
| label="Initial Prompt", | |
| lines=2, | |
| placeholder="Optional prompt to guide transcription" | |
| ) | |
| temperature_slider = gr.Slider( | |
| minimum=0.0, | |
| maximum=1.0, | |
| value=0.5, | |
| label="Temperature" | |
| ) | |
| encryption_key = gr.Textbox(label="Encryption Key (Optional)", type="password") | |
| process_button = gr.Button("Process Audio", variant="primary") | |
| with gr.Column(scale=3): | |
| output_text = gr.Textbox(label="Transcription Output", lines=20) | |
| process_button.click( | |
| fn=process_audio, | |
| inputs=[file_input, language_dropdown, task_dropdown, anonymize_checkbox, prompt_input, temperature_slider, encryption_key, num_speakers], | |
| outputs=output_text | |
| ) | |
| with gr.TabItem("Real-time Transcription"): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| rt_language_dropdown = gr.Dropdown( | |
| choices=list(languages.items()), | |
| label="Language", | |
| value="en", | |
| info="Select the language for real-time transcription." | |
| ) | |
| rt_task_dropdown = gr.Dropdown( | |
| choices=["transcribe", "translate"], | |
| label="Task", | |
| value="transcribe" | |
| ) | |
| rt_prompt_input = gr.Textbox( | |
| label="Initial Prompt", | |
| lines=2, | |
| placeholder="Optional prompt to guide transcription" | |
| ) | |
| rt_temperature_slider = gr.Slider( | |
| minimum=0.0, | |
| maximum=1.0, | |
| value=0.5, | |
| label="Temperature" | |
| ) | |
| rt_start_button = gr.Button("Start Real-time Transcription", variant="primary") | |
| rt_stop_button = gr.Button("Stop Transcription", variant="secondary") | |
| with gr.Column(scale=3): | |
| rt_output_text = gr.Textbox(label="Real-time Transcription Output", lines=20) | |
| async def start_real_time_transcription(language, task, prompt, temperature): | |
| transcriber = RealTimeTranscriber(language, task, prompt, temperature) | |
| transcription = await transcriber.start_recording() | |
| return transcription | |
| def stop_real_time_transcription(): | |
| return "Transcription stopped." | |
| rt_start_button.click( | |
| fn=start_real_time_transcription, | |
| inputs=[rt_language_dropdown, rt_task_dropdown, rt_prompt_input, rt_temperature_slider], | |
| outputs=rt_output_text | |
| ) | |
| rt_stop_button.click( | |
| fn=stop_real_time_transcription, | |
| inputs=[], | |
| outputs=rt_output_text | |
| ) | |
| gr.Markdown( | |
| """ | |
| ## How to use | |
| 1. Choose between File Upload or Real-time Transcription. | |
| 2. For File Upload: | |
| - Upload an audio or video file. | |
| - Select the language and task (transcribe or translate). | |
| - Set the number of speakers for multi-speaker audio. | |
| - Optionally, enable anonymization and set an encryption key. | |
| - Click "Process Audio" and wait for the results. | |
| 3. For Real-time Transcription: | |
| - Select the language and task. | |
| - Optionally, provide an initial prompt and adjust the temperature. | |
| - Click "Start Real-time Transcription" and speak into your microphone. | |
| - Click "Stop Transcription" when you're done. | |
| """ | |
| ) | |
| return interface | |
| if __name__ == "__main__": | |
| create_folders() | |
| iface = create_ui() | |
| iface.launch() |