Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from pydub import AudioSegment | |
| import os | |
| import speech_recognition as sr | |
| import concurrent.futures | |
| def split_audio(audio_path, chunk_length_ms=60000, overlap_ms=2000): | |
| audio = AudioSegment.from_file(audio_path) | |
| chunks = [] | |
| for i in range(0, len(audio), chunk_length_ms - overlap_ms): | |
| chunks.append(audio[i:i + chunk_length_ms]) | |
| return chunks | |
| def convert_audio_to_wav(input_path, output_path): | |
| audio = AudioSegment.from_file(input_path) | |
| audio = audio.set_frame_rate(16000).set_channels(1) | |
| audio.export(output_path, format="wav") | |
| def transcribe_chunk_indexed(indexed_chunk_language): | |
| index, chunk, language = indexed_chunk_language | |
| recognizer = sr.Recognizer() | |
| try: | |
| with open(f"chunk_{index}.wav", "wb") as f: | |
| chunk.export(f.name, format="wav") | |
| with sr.AudioFile(f"chunk_{index}.wav") as source: | |
| audio_data = recognizer.record(source) | |
| text = recognizer.recognize_google(audio_data, language=language) | |
| os.remove(f"chunk_{index}.wav") | |
| return index, text | |
| except sr.RequestError: | |
| return index, "[Error: API unavailable or unresponsive]" | |
| except sr.UnknownValueError: | |
| return index, "[Error: Unable to recognize speech]" | |
| except Exception as e: | |
| return index, f"[Error: {str(e)}]" | |
| def transcribe_audio_with_google_parallel(audio_path, chunk_length_ms=60000, overlap_ms=2000, language="en-US"): | |
| chunks = split_audio(audio_path, chunk_length_ms, overlap_ms) | |
| indexed_chunks = [(i, chunk, language) for i, chunk in enumerate(chunks)] | |
| transcription = [""] * len(indexed_chunks) | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: | |
| futures = {executor.submit(transcribe_chunk_indexed, ic): ic[0] for ic in indexed_chunks} | |
| for future in concurrent.futures.as_completed(futures): | |
| idx, text = future.result() | |
| transcription[idx] = text | |
| return " ".join(transcription) | |
| def transcribe(audio_file_path, language): | |
| if audio_file_path is None: | |
| return "Please upload an audio file." | |
| try: | |
| converted_path = audio_file_path + "_converted.wav" | |
| convert_audio_to_wav(audio_file_path, converted_path) | |
| temp_path = converted_path | |
| except Exception as e: | |
| return f"Error processing audio: {e}" | |
| transcription = transcribe_audio_with_google_parallel(temp_path, chunk_length_ms=60000, overlap_ms=2000, language=language) | |
| try: | |
| os.remove(temp_path) | |
| except Exception: | |
| pass | |
| return transcription | |
| language_options = { | |
| "English (US)": "en-US", | |
| "Dutch": "nl-NL", | |
| "English (UK)": "en-GB", | |
| "Spanish": "es-ES", | |
| "French": "fr-FR", | |
| "German": "de-DE", | |
| "Hindi": "hi-IN", | |
| "Chinese (Mandarin)": "zh-CN", | |
| "Arabic": "ar-SA", | |
| "Turkish": "tr-TR", | |
| } | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Audio to Text Transcription") | |
| gr.Markdown("Upload an audio file, and we'll transcribe it into text using chunk processing.") | |
| with gr.Row(): | |
| audio_input = gr.Audio(type="filepath", label="Upload audio file (mp3, wav, m4a, ogg)") | |
| language_dropdown = gr.Dropdown(list(language_options.keys()), label="Select language", value="English (US)") | |
| transcribe_btn = gr.Button("Transcribe") | |
| output_text = gr.Textbox(label="Transcription Output", lines=15) | |
| def on_transcribe(audio_path, lang_name): | |
| lang_code = language_options[lang_name] | |
| return transcribe(audio_path, lang_code) | |
| transcribe_btn.click(on_transcribe, inputs=[audio_input, language_dropdown], outputs=output_text) | |
| demo.launch() | |