transcribe / app.py
mimishanmi's picture
Update app.py
fd226a9 verified
import os
import asyncio
import whisper
import gradio as gr
import torch
import logging
from pathlib import Path
import ffmpeg
import re
from tqdm import tqdm
from cryptography.fernet import Fernet
from pyannote.audio import Pipeline
from pyannote.core import Segment
import numpy as np
import sounddevice as sd
import soundfile as sf
import time
import threading
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
TEMP_FOLDER = 'temp/'
SUPPORTED_FORMATS = ['.mp3', '.wav', '.aac', '.flac', '.ogg', '.m4a', '.mp4', '.avi', '.mov', '.mkv', '.webm']
MAX_AUDIO_LENGTH = 600
class WhisperModelCache:
_instance = None
@staticmethod
def get_instance():
if WhisperModelCache._instance is None:
WhisperModelCache._instance = WhisperModelCache()
return WhisperModelCache._instance
def __init__(self):
self.model = None
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def load_model(self, model_size="medium"):
if self.model is None:
logger.info(f"Loading Whisper model: {model_size} on {self.device}")
self.model = whisper.load_model(model_size, device=self.device)
return self.model
def create_folders():
Path(TEMP_FOLDER).mkdir(exist_ok=True)
def is_supported_format(file):
return file is not None and any(file.name.lower().endswith(ext) for ext in SUPPORTED_FORMATS)
def convert_to_wav(original_file_path):
output_path = os.path.join(TEMP_FOLDER, os.path.splitext(os.path.basename(original_file_path))[0] + '.wav')
try:
(
ffmpeg
.input(original_file_path)
.output(output_path, acodec='pcm_s16le', ac=1, ar='16k')
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True)
)
return output_path
except ffmpeg.Error as e:
logger.error(f'Error converting {original_file_path}: {e.stderr.decode()}')
return None
def generate_key():
return Fernet.generate_key()
def encrypt_file(key, filename):
f = Fernet(key)
with open(filename, "rb") as file:
original_data = file.read()
encrypted_data = f.encrypt(original_data)
with open(filename, "wb") as file:
file.write(encrypted_data)
def decrypt_file(key, filename):
f = Fernet(key)
with open(filename, "rb") as file:
encrypted_data = file.read()
decrypted_data = f.decrypt(encrypted_data)
with open(filename, "wb") as file:
file.write(decrypted_data)
async def transcribe_audio(audio_path, language, task='transcribe', initial_prompt=None, temperature=0.5, num_speakers=1):
try:
model = WhisperModelCache.get_instance().load_model()
result = await asyncio.to_thread(
model.transcribe,
audio_path,
language=language,
task=task,
initial_prompt=initial_prompt,
temperature=temperature,
)
if num_speakers > 1:
diarization = await perform_diarization(audio_path, num_speakers)
result['text'] = apply_diarization(result, diarization)
return result['text']
except Exception as e:
logger.error(f"Error transcribing {audio_path}: {str(e)}")
return f"Error during transcription: {str(e)}"
async def perform_diarization(audio_path, num_speakers):
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization@2.1",
use_auth_token="YOUR_HF_AUTH_TOKEN")
return pipeline(audio_path, num_speakers=num_speakers)
def apply_diarization(whisper_result, diarization):
speaker_segments = []
for turn, _, speaker in diarization.itertracks(yield_label=True):
speaker_segments.append((turn.start, turn.end, speaker))
diarized_text = ""
for segment in whisper_result['segments']:
start_time = segment['start']
end_time = segment['end']
text = segment['text']
speaker = "Unknown"
for s_start, s_end, s_label in speaker_segments:
if Segment(start_time, end_time).intersects(Segment(s_start, s_end)):
speaker = s_label
break
diarized_text += f"[{speaker}]: {text}\n"
return diarized_text
def anonymize_text(text):
text = re.sub(r'\b[A-Z][a-z]+ [A-Z][a-z]+\b|\S+@\S+|\d{3}[-.]?\d{3}[-.]?\d{4}',
lambda m: '[NAME]' if re.match(r'\b[A-Z][a-z]+ [A-Z][a-z]+\b', m.group()) else
'[EMAIL]' if '@' in m.group() else '[PHONE]',
text)
return text
class RealTimeTranscriber:
def __init__(self, language, task, initial_prompt, temperature):
self.language = language
self.task = task
self.initial_prompt = initial_prompt
self.temperature = temperature
self.model = WhisperModelCache.get_instance().load_model()
self.audio_queue = asyncio.Queue()
self.is_recording = False
self.transcription = ""
async def start_recording(self):
self.is_recording = True
threading.Thread(target=self._record_audio, daemon=True).start()
while self.is_recording:
audio_chunk = await self.audio_queue.get()
if audio_chunk is not None:
result = await asyncio.to_thread(
self.model.transcribe,
audio_chunk,
language=self.language,
task=self.task,
initial_prompt=self.initial_prompt,
temperature=self.temperature
)
self.transcription += result['text'] + " "
await asyncio.sleep(0.1)
return self.transcription
def stop_recording(self):
self.is_recording = False
def _record_audio(self):
with sd.InputStream(samplerate=16000, channels=1, callback=self._audio_callback):
while self.is_recording:
sd.sleep(100)
def _audio_callback(self, indata, frames, time, status):
if status:
logger.warning(f"Audio callback status: {status}")
audio_chunk = np.frombuffer(indata, dtype=np.float32)
asyncio.run_coroutine_threadsafe(self.audio_queue.put(audio_chunk), asyncio.get_event_loop())
async def process_audio(file, language, task, anonymize, initial_prompt, temperature, encryption_key, num_speakers):
try:
if not file:
return "Error: Please upload an audio or video file."
if not is_supported_format(file):
return f"Error: Unsupported file format: {file.name}"
if encryption_key:
try:
encrypt_file(encryption_key.encode(), file.name)
logger.info("File encrypted successfully.")
except Exception as e:
logger.error(f"Encryption failed: {str(e)}")
return f"Error: Encryption failed: {str(e)}"
temp_audio_path = convert_to_wav(file.name)
if not temp_audio_path:
return f"Error: Failed to convert {file.name} to WAV format."
transcription = await transcribe_audio(
temp_audio_path,
language,
task=task,
initial_prompt=initial_prompt,
temperature=temperature,
num_speakers=num_speakers
)
os.remove(temp_audio_path)
if anonymize:
transcription = anonymize_text(transcription)
if encryption_key:
try:
decrypt_file(encryption_key.encode(), file.name)
logger.info("File decrypted successfully.")
except Exception as e:
logger.error(f"Decryption failed: {str(e)}")
return f"Error: Decryption failed: {str(e)}"
return transcription
except Exception as e:
logger.error(f"Error processing audio: {e}")
return f"Error: {str(e)}"
def create_ui():
languages = {
"en": "English", "es": "Spanish", "fr": "French", "de": "German", "it": "Italian",
"pt": "Portuguese", "nl": "Dutch", "ru": "Russian", "zh": "Chinese", "ja": "Japanese",
"ko": "Korean", "ar": "Arabic", "hi": "Hindi", "bn": "Bengali", "ur": "Urdu",
"te": "Telugu", "ta": "Tamil", "mr": "Marathi", "gu": "Gujarati", "kn": "Kannada"
}
with gr.Blocks(title="Advanced Whisper Transcription App", theme=gr.themes.Soft()) as interface:
gr.Markdown(
"""
# 🎙️ Advanced Whisper Transcription App
Transcribe or translate your audio and video files with ease, now with real-time processing!
## Features:
- Support for multiple audio and video formats
- Speaker diarization for multi-speaker audio
- Real-time transcription
- Anonymization of personal information
- File encryption for enhanced security
"""
)
with gr.Tabs():
with gr.TabItem("File Upload"):
with gr.Row():
with gr.Column(scale=2):
file_input = gr.File(label="Upload Audio/Video")
language_dropdown = gr.Dropdown(
choices=list(languages.items()),
label="Language",
value="en",
info="Select the language of the audio."
)
task_dropdown = gr.Dropdown(
choices=["transcribe", "translate"],
label="Task",
value="transcribe"
)
num_speakers = gr.Slider(
minimum=1,
maximum=10,
value=1,
step=1,
label="Number of Speakers",
info="Set to 1 for single-speaker audio, or higher for multi-speaker recognition."
)
anonymize_checkbox = gr.Checkbox(label="Anonymize Transcription")
prompt_input = gr.Textbox(
label="Initial Prompt",
lines=2,
placeholder="Optional prompt to guide transcription"
)
temperature_slider = gr.Slider(
minimum=0.0,
maximum=1.0,
value=0.5,
label="Temperature"
)
encryption_key = gr.Textbox(label="Encryption Key (Optional)", type="password")
process_button = gr.Button("Process Audio", variant="primary")
with gr.Column(scale=3):
output_text = gr.Textbox(label="Transcription Output", lines=20)
process_button.click(
fn=process_audio,
inputs=[file_input, language_dropdown, task_dropdown, anonymize_checkbox, prompt_input, temperature_slider, encryption_key, num_speakers],
outputs=output_text
)
with gr.TabItem("Real-time Transcription"):
with gr.Row():
with gr.Column(scale=2):
rt_language_dropdown = gr.Dropdown(
choices=list(languages.items()),
label="Language",
value="en",
info="Select the language for real-time transcription."
)
rt_task_dropdown = gr.Dropdown(
choices=["transcribe", "translate"],
label="Task",
value="transcribe"
)
rt_prompt_input = gr.Textbox(
label="Initial Prompt",
lines=2,
placeholder="Optional prompt to guide transcription"
)
rt_temperature_slider = gr.Slider(
minimum=0.0,
maximum=1.0,
value=0.5,
label="Temperature"
)
rt_start_button = gr.Button("Start Real-time Transcription", variant="primary")
rt_stop_button = gr.Button("Stop Transcription", variant="secondary")
with gr.Column(scale=3):
rt_output_text = gr.Textbox(label="Real-time Transcription Output", lines=20)
async def start_real_time_transcription(language, task, prompt, temperature):
transcriber = RealTimeTranscriber(language, task, prompt, temperature)
transcription = await transcriber.start_recording()
return transcription
def stop_real_time_transcription():
return "Transcription stopped."
rt_start_button.click(
fn=start_real_time_transcription,
inputs=[rt_language_dropdown, rt_task_dropdown, rt_prompt_input, rt_temperature_slider],
outputs=rt_output_text
)
rt_stop_button.click(
fn=stop_real_time_transcription,
inputs=[],
outputs=rt_output_text
)
gr.Markdown(
"""
## How to use
1. Choose between File Upload or Real-time Transcription.
2. For File Upload:
- Upload an audio or video file.
- Select the language and task (transcribe or translate).
- Set the number of speakers for multi-speaker audio.
- Optionally, enable anonymization and set an encryption key.
- Click "Process Audio" and wait for the results.
3. For Real-time Transcription:
- Select the language and task.
- Optionally, provide an initial prompt and adjust the temperature.
- Click "Start Real-time Transcription" and speak into your microphone.
- Click "Stop Transcription" when you're done.
"""
)
return interface
if __name__ == "__main__":
create_folders()
iface = create_ui()
iface.launch()