Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import warnings | |
| import torch | |
| import os | |
| import whisper | |
| import ssl | |
| import zipfile | |
| from pydub import AudioSegment | |
| from pydub.silence import detect_nonsilent | |
| import subprocess | |
| import tempfile | |
| import time | |
| ssl._create_default_https_context = ssl._create_unverified_context | |
| def process_audio( | |
| audio_paths, | |
| remove_silence=False, | |
| min_silence_len=500, | |
| silence_thresh=-50, | |
| enable_chunking=False, | |
| chunk_duration=600, | |
| ffmpeg_path="ffmpeg", | |
| model_size="large-v3-turbo", | |
| language="de" | |
| ): | |
| try: | |
| if not audio_paths: | |
| return "No files selected.", "", None | |
| # Clean up any existing temp directory at the start | |
| temp_dir = "temp_processing" | |
| if os.path.exists(temp_dir): | |
| for file in os.listdir(temp_dir): | |
| file_path = os.path.join(temp_dir, file) | |
| try: | |
| if os.path.isfile(file_path): | |
| os.remove(file_path) | |
| except Exception as e: | |
| print(f"Error cleaning up {file_path}: {e}") | |
| try: | |
| os.rmdir(temp_dir) | |
| except Exception as e: | |
| print(f"Error removing temp directory: {e}") | |
| # Create fresh temp directory with unique timestamp | |
| temp_dir = f"temp_processing_{int(time.time())}" | |
| os.makedirs(temp_dir, exist_ok=True) | |
| processed_files = [] | |
| all_results = [] | |
| all_segments = [] | |
| all_txt_paths = [] | |
| try: | |
| # Step 1: Process each audio file | |
| for audio_path in audio_paths: | |
| if not audio_path: | |
| continue | |
| current_file = audio_path | |
| temp_files = [] | |
| # Step 1a: Split audio if chunking is enabled | |
| if enable_chunking: | |
| base_name = os.path.splitext(os.path.basename(current_file))[0] | |
| output_pattern = os.path.join(temp_dir, f"{base_name}_part_%d.mp3") | |
| cmd = [ | |
| ffmpeg_path, "-i", current_file, | |
| "-f", "segment", | |
| "-segment_time", str(chunk_duration), | |
| "-c:a", "copy", | |
| "-segment_start_number", "1", | |
| output_pattern | |
| ] | |
| subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| chunk_files = sorted([os.path.join(temp_dir, f) for f in os.listdir(temp_dir) | |
| if f.startswith(f"{base_name}_part_")]) | |
| temp_files.extend(chunk_files) | |
| else: | |
| temp_files.append(current_file) | |
| # Step 1b: Remove silence if requested | |
| if remove_silence: | |
| silence_removed_files = [] | |
| for file in temp_files: | |
| audio = AudioSegment.from_file(file) | |
| nonsilent = detect_nonsilent( | |
| audio, | |
| min_silence_len=min_silence_len, | |
| silence_thresh=silence_thresh | |
| ) | |
| output = AudioSegment.empty() | |
| for start, end in nonsilent: | |
| output += audio[start:end] | |
| # Save the silence-removed file | |
| silence_removed_path = os.path.join(temp_dir, f"silence_removed_{os.path.basename(file)}") | |
| output.export(silence_removed_path, format="mp3") | |
| silence_removed_files.append(silence_removed_path) | |
| processed_files.extend(silence_removed_files) | |
| else: | |
| processed_files.extend(temp_files) | |
| # Step 2: Transcribe all processed files | |
| print(f"Loading Whisper model '{model_size}'...") | |
| model = whisper.load_model(model_size, device="cpu") | |
| for file in processed_files: | |
| print(f"Transcribing: {file}") | |
| warnings.filterwarnings("ignore", message="FP16 is not supported on CPU; using FP32 instead") | |
| result = model.transcribe(file, fp16=False, language=language, temperature=0.0) | |
| full_text = result["text"] | |
| segments = "" | |
| for segment in result["segments"]: | |
| segments += f"[{segment['start']:.2f} - {segment['end']:.2f}]: {segment['text']}\n" | |
| # Store transcript files in temp directory | |
| txt_path = os.path.join(temp_dir, f"transcript_{os.path.splitext(os.path.basename(file))[0]}.txt") | |
| with open(txt_path, "w", encoding="utf-8") as f: | |
| f.write("=== Full Transcription ===\n\n") | |
| f.write(full_text) | |
| f.write("\n\n=== Segment-wise Transcription ===\n") | |
| f.write(segments) | |
| all_results.append(full_text) | |
| all_segments.append(segments) | |
| all_txt_paths.append(txt_path) | |
| # Create combined transcript file in temp directory | |
| combined_txt_path = os.path.join(temp_dir, "combined_transcripts.txt") | |
| with open(combined_txt_path, "w", encoding="utf-8") as f: | |
| f.write("=== Combined Transcriptions ===\n\n") | |
| for i, (result, segment, path) in enumerate(zip(all_results, all_segments, all_txt_paths)): | |
| filename = os.path.basename(processed_files[i]) | |
| f.write(f"File: {filename}\n") | |
| f.write("=== Full Transcription ===\n") | |
| f.write(result) | |
| f.write("\n\n=== Segment-wise Transcription ===\n") | |
| f.write(segment) | |
| f.write("\n" + "-"*50 + "\n\n") | |
| # Format display output | |
| combined_results = "=== File Transcriptions ===\n\n" | |
| combined_segments = "=== File Segments ===\n\n" | |
| for i, (result, segment) in enumerate(zip(all_results, all_segments)): | |
| filename = os.path.basename(processed_files[i]) | |
| combined_results += f"File: {filename}\n{result}\n\n" | |
| combined_segments += f"File: {filename}\n{segment}\n\n" | |
| # Create ZIP with all processed files and transcripts | |
| zip_path = f"processed_files_and_transcripts_{int(time.time())}.zip" | |
| cleanup_files = processed_files.copy() | |
| with zipfile.ZipFile(zip_path, 'w') as zipf: | |
| for file in processed_files: | |
| if os.path.exists(file): | |
| zipf.write(file, os.path.basename(file)) | |
| for txt_file in all_txt_paths: | |
| if os.path.exists(txt_file): | |
| zipf.write(txt_file) | |
| if os.path.exists(combined_txt_path): | |
| zipf.write(combined_txt_path) | |
| # Cleanup files after ZIP creation | |
| for file in cleanup_files: | |
| if os.path.exists(file): | |
| os.remove(file) | |
| for txt_file in all_txt_paths: | |
| if os.path.exists(txt_file): | |
| os.remove(txt_file) | |
| if os.path.exists(combined_txt_path): | |
| os.remove(combined_txt_path) | |
| # Clean up temp directory | |
| if os.path.exists(temp_dir): | |
| for file in os.listdir(temp_dir): | |
| file_path = os.path.join(temp_dir, file) | |
| if os.path.isfile(file_path): | |
| os.remove(file_path) | |
| os.rmdir(temp_dir) | |
| return combined_results, combined_segments, zip_path | |
| except Exception as inner_e: | |
| print(f"Error during processing: {inner_e}") | |
| raise inner_e | |
| except Exception as e: | |
| print(f"Error in process_audio: {e}") | |
| if 'temp_dir' in locals() and os.path.exists(temp_dir): | |
| try: | |
| for file in os.listdir(temp_dir): | |
| file_path = os.path.join(temp_dir, file) | |
| if os.path.isfile(file_path): | |
| os.remove(file_path) | |
| os.rmdir(temp_dir) | |
| except: | |
| pass | |
| return f"Error: {str(e)}", "", None | |
| def create_interface(): | |
| with gr.Blocks(title="Interview Audio Processing App") as app: | |
| gr.Markdown(""" | |
| # Audio Processing App | |
| Upload audio files (MP3 or M4A) for processing and transcription.\\ | |
| Intended use case: transcription of interviews. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| audio_input = gr.File( | |
| label="Upload Audio Files", | |
| file_count="multiple", | |
| type="filepath" | |
| ) | |
| with gr.Group(): | |
| gr.Markdown("### Silence Removal Settings") | |
| gr.Markdown(" Default settings are working very well. Silence removal helps to reduce hallucination.") | |
| remove_silence = gr.Checkbox( | |
| label="Remove Silence", | |
| value=False | |
| ) | |
| min_silence_len = gr.Slider( | |
| minimum=100, | |
| maximum=2000, | |
| value=500, | |
| step=100, | |
| label="Minimum Silence Length (ms)", | |
| visible=False | |
| ) | |
| silence_thresh = gr.Slider( | |
| minimum=-70, | |
| maximum=-30, | |
| value=-50, | |
| step=5, | |
| label="Silence Threshold (dB)", | |
| visible=False | |
| ) | |
| with gr.Group(): | |
| gr.Markdown("### Chunking Settings") | |
| gr.Markdown(" Chunking reduces the load on the model. 10min chunks work really good.") | |
| enable_chunking = gr.Checkbox( | |
| label="Enable Chunking", | |
| value=False | |
| ) | |
| chunk_duration = gr.Slider( | |
| minimum=60, | |
| maximum=3600, | |
| value=600, | |
| step=60, | |
| label="Chunk Duration (seconds)", | |
| visible=False | |
| ) | |
| ffmpeg_path = gr.Textbox( | |
| label="FFmpeg Path", | |
| value="ffmpeg", | |
| placeholder="Path to ffmpeg executable", | |
| visible=False | |
| ) | |
| with gr.Group(): | |
| gr.Markdown("### Transcription Settings") | |
| gr.Markdown(" tiny is the fastest, but the worst quality. Large-v3-turbo is the best, but slower.") | |
| model_size = gr.Dropdown( | |
| choices=["tiny", "base", "small", "medium", "large", "large-v2", "large-v3", "turbo", "large-v3-turbo"], | |
| value="large-v3-turbo", | |
| label="Whisper Model Size" | |
| ) | |
| language = gr.Dropdown( | |
| choices=["de", "en", "fr", "es", "it"], | |
| value="de", | |
| label="Language" | |
| ) | |
| process_btn = gr.Button("Process", variant="primary") | |
| delete_btn = gr.Button("Delete Everything", variant="stop") | |
| with gr.Column(): | |
| full_transcription = gr.Textbox(label="Full Transcription", lines=15) | |
| segmented_transcription = gr.Textbox(label="Segmented Transcription", lines=15) | |
| download_output = gr.File(label="Download Processed Files and Transcripts (ZIP)") | |
| def update_silence_controls(remove_silence): | |
| return { | |
| min_silence_len: gr.update(visible=remove_silence), | |
| silence_thresh: gr.update(visible=remove_silence), | |
| full_transcription: gr.update(value=""), | |
| segmented_transcription: gr.update(value=""), | |
| download_output: gr.update(value=None) | |
| } | |
| def update_chunking_controls(enable_chunking): | |
| return { | |
| chunk_duration: gr.update(visible=enable_chunking), | |
| ffmpeg_path: gr.update(visible=enable_chunking), | |
| full_transcription: gr.update(value=""), | |
| segmented_transcription: gr.update(value=""), | |
| download_output: gr.update(value=None) | |
| } | |
| remove_silence.change( | |
| fn=update_silence_controls, | |
| inputs=[remove_silence], | |
| outputs=[ | |
| min_silence_len, | |
| silence_thresh, | |
| full_transcription, | |
| segmented_transcription, | |
| download_output | |
| ] | |
| ) | |
| enable_chunking.change( | |
| fn=update_chunking_controls, | |
| inputs=[enable_chunking], | |
| outputs=[ | |
| chunk_duration, | |
| ffmpeg_path, | |
| full_transcription, | |
| segmented_transcription, | |
| download_output | |
| ] | |
| ) | |
| process_btn.click( | |
| fn=process_audio, | |
| inputs=[ | |
| audio_input, | |
| remove_silence, | |
| min_silence_len, | |
| silence_thresh, | |
| enable_chunking, | |
| chunk_duration, | |
| ffmpeg_path, | |
| model_size, | |
| language, | |
| ], | |
| outputs=[ | |
| full_transcription, | |
| segmented_transcription, | |
| download_output, | |
| ] | |
| ) | |
| # Add cleanup function | |
| def cleanup_files(): | |
| try: | |
| # Clean up temp directories | |
| temp_dirs = [d for d in os.listdir('.') if d.startswith('temp_processing')] | |
| for temp_dir in temp_dirs: | |
| if os.path.exists(temp_dir): | |
| for file in os.listdir(temp_dir): | |
| file_path = os.path.join(temp_dir, file) | |
| if os.path.isfile(file_path): | |
| os.remove(file_path) | |
| os.rmdir(temp_dir) | |
| # Clean up ZIP files | |
| zip_files = [f for f in os.listdir('.') if f.startswith('processed_files_and_transcripts_')] | |
| for zip_file in zip_files: | |
| if os.path.exists(zip_file): | |
| os.remove(zip_file) | |
| # Clean up transcript files | |
| transcript_files = [f for f in os.listdir('.') if f.startswith('transcript_')] | |
| for transcript_file in transcript_files: | |
| if os.path.exists(transcript_file): | |
| os.remove(transcript_file) | |
| # Return updates for all output fields | |
| return { | |
| full_transcription: gr.update(value="All temporary files have been deleted."), | |
| segmented_transcription: gr.update(value=""), | |
| download_output: gr.update(value=None) | |
| } | |
| except Exception as e: | |
| return { | |
| full_transcription: gr.update(value=f"Error during cleanup: {str(e)}"), | |
| segmented_transcription: gr.update(value=""), | |
| download_output: gr.update(value=None) | |
| } | |
| # Update the delete button click handler | |
| delete_btn.click( | |
| fn=cleanup_files, | |
| inputs=[], | |
| outputs=[ | |
| full_transcription, | |
| segmented_transcription, | |
| download_output | |
| ] | |
| ) | |
| return app | |
| if __name__ == "__main__": | |
| app = create_interface() | |
| app.launch(share=False) |