Spaces:
Running
Running
| import gradio as gr | |
| import requests | |
| from datetime import datetime | |
| import json | |
| from dotenv import load_dotenv | |
| import os | |
| import tempfile | |
| load_dotenv() | |
| API_BASE_URL = os.getenv("API_BASE_URL","http://localhost:8000") | |
| class AudioState: | |
| def __init__(self): | |
| self.current_audio_id = None | |
| self.current_transcription = None | |
| self.audio_data = None | |
| self.audio_url = None | |
| state = AudioState() | |
| def download_audio(url): | |
| if not url: | |
| return None | |
| try: | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| # Create a temporary file | |
| temp_dir = tempfile.gettempdir() | |
| temp_path = os.path.join(temp_dir, f"temp_audio_{datetime.now().timestamp()}.mp3") | |
| # Write the content to the temporary file | |
| with open(temp_path, 'wb') as f: | |
| f.write(response.content) | |
| return temp_path | |
| except Exception as e: | |
| print(f"Error downloading audio: {str(e)}") | |
| return None | |
| def upload_audio(audio_path, language_code): | |
| if audio_path is None: | |
| return None, "Please record or upload an audio file.", None, None, None | |
| try: | |
| with open(audio_path, 'rb') as audio_file: | |
| files = { | |
| 'file': ('audio.wav', audio_file, 'audio/wav') | |
| } | |
| data = { | |
| 'language_code': language_code | |
| } | |
| response = requests.post(f"{API_BASE_URL}/upload/", files=files, data=data) | |
| response.raise_for_status() | |
| audio_data = response.json() | |
| # Store the audio data in state | |
| state.current_audio_id = audio_data.get('id') | |
| state.current_transcription = audio_data.get('original_transcribed_text', '') | |
| state.audio_data = audio_data | |
| state.audio_url = audio_data.get('file_path') | |
| # Download the audio file for playback | |
| audio_filepath = download_audio(state.audio_url) | |
| status = f"""Upload successful! | |
| Audio ID: {state.current_audio_id} | |
| Transcription completed: Yes""" | |
| return ( | |
| state.current_transcription, | |
| status, | |
| state.current_transcription, | |
| f"Ready to edit transcription for Audio ID: {state.current_audio_id}", | |
| audio_filepath | |
| ) | |
| except requests.exceptions.RequestException as e: | |
| error_msg = f"Error uploading audio: {str(e)}" | |
| return None, error_msg, None, "Upload failed", None | |
| def get_audio_by_id(audio_id): | |
| try: | |
| response = requests.get(f"{API_BASE_URL}/audio/{audio_id}/") | |
| response.raise_for_status() | |
| audio_data = response.json() | |
| # Update state with fetched data | |
| state.current_audio_id = audio_data['id'] | |
| state.current_transcription = audio_data.get('original_transcribed_text', '') | |
| state.audio_data = audio_data | |
| state.audio_url = audio_data.get('file_path') | |
| # Download the audio file for playback | |
| audio_filepath = download_audio(state.audio_url) | |
| return ( | |
| audio_data.get('original_transcribed_text', ''), | |
| audio_data.get('updated_transcribed_text', ''), | |
| f"Loaded Audio ID: {audio_id}", | |
| audio_filepath | |
| ) | |
| except requests.exceptions.RequestException as e: | |
| return None, None, f"Error loading audio: {str(e)}", None | |
| def update_transcription(transcription, audio_id=None): | |
| # Use either provided ID or stored ID | |
| update_id = audio_id if audio_id else state.current_audio_id | |
| if not update_id: | |
| return "No audio ID provided or selected." | |
| try: | |
| response = requests.put( | |
| f"{API_BASE_URL}/audio/{update_id}/", | |
| json={"updated_transcribed_text": transcription} | |
| ) | |
| response.raise_for_status() | |
| updated_data = response.json() | |
| return f"Transcription updated successfully for Audio ID: {update_id}!" | |
| except requests.exceptions.RequestException as e: | |
| return f"Error updating transcription: {str(e)}" | |
| def list_audios(): | |
| try: | |
| response = requests.get(f"{API_BASE_URL}/audio/") | |
| response.raise_for_status() | |
| audios = response.json() | |
| if not audios: | |
| return "No audio files found." | |
| result = [] | |
| for audio in audios: | |
| result.append(f"ID: {audio['id']}") | |
| result.append(f"Original Transcription: {audio['original_transcribed_text']}") | |
| if audio.get('updated_transcribed_text'): | |
| result.append(f"Updated Transcription: {audio['updated_transcribed_text']}") | |
| result.append("---") | |
| return "\n".join(result) | |
| except requests.exceptions.RequestException as e: | |
| return f"Error listing audios: {str(e)}" | |
| # Create the Gradio interface | |
| with gr.Blocks(title="Audio Transcription System") as app: | |
| gr.Markdown("# Audio Transcription System") | |
| with gr.Tab("Record & Transcribe"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| audio_input = gr.Audio( | |
| sources=["microphone", "upload"], | |
| type="filepath", | |
| label="Record or Upload Audio" | |
| ) | |
| language_dropdown = gr.Dropdown( | |
| choices=["rw", "en", "es", "fr", "de"], | |
| value="rw", | |
| label="Language Code" | |
| ) | |
| upload_button = gr.Button("Process Audio", variant="primary") | |
| with gr.Column(): | |
| status_output = gr.Textbox( | |
| label="Status", | |
| interactive=False, | |
| lines=5 | |
| ) | |
| audio_playback = gr.Audio( | |
| label="Audio Playback", | |
| interactive=False, | |
| type="filepath" | |
| ) | |
| transcription_output = gr.Textbox( | |
| label="Original Transcription", | |
| interactive=False, | |
| lines=3 | |
| ) | |
| edit_transcription = gr.Textbox( | |
| label="Edit Transcription", | |
| lines=3, | |
| interactive=True | |
| ) | |
| edit_status = gr.Textbox( | |
| label="Edit Status", | |
| interactive=False | |
| ) | |
| update_button = gr.Button("Update Transcription", variant="primary") | |
| with gr.Tab("Edit by ID"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| audio_id_input = gr.Number( | |
| label="Audio ID", | |
| precision=0 | |
| ) | |
| load_button = gr.Button("Load Audio", variant="primary") | |
| audio_playback_by_id = gr.Audio( | |
| label="Audio Playback", | |
| interactive=False, | |
| type="filepath" | |
| ) | |
| with gr.Column(): | |
| loaded_original = gr.Textbox( | |
| label="Original Transcription", | |
| interactive=False, | |
| lines=3 | |
| ) | |
| loaded_current = gr.Textbox( | |
| label="Current Transcription", | |
| interactive=False, | |
| lines=3 | |
| ) | |
| new_transcription = gr.Textbox( | |
| label="New Transcription", | |
| lines=3, | |
| interactive=True | |
| ) | |
| update_status = gr.Textbox( | |
| label="Update Status", | |
| interactive=False | |
| ) | |
| update_by_id_button = gr.Button("Update Transcription", variant="primary") | |
| with gr.Tab("View All Recordings"): | |
| refresh_button = gr.Button("Refresh Audio List") | |
| audio_list = gr.Textbox( | |
| label="Audio Files", | |
| lines=15, | |
| interactive=False | |
| ) | |
| # Event handlers | |
| upload_button.click( | |
| fn=upload_audio, | |
| inputs=[audio_input, language_dropdown], | |
| outputs=[transcription_output, status_output, edit_transcription, edit_status, audio_playback] | |
| ) | |
| update_button.click( | |
| fn=update_transcription, | |
| inputs=[edit_transcription], | |
| outputs=edit_status | |
| ) | |
| load_button.click( | |
| fn=get_audio_by_id, | |
| inputs=[audio_id_input], | |
| outputs=[loaded_original, loaded_current, update_status, audio_playback_by_id] | |
| ) | |
| update_by_id_button.click( | |
| fn=update_transcription, | |
| inputs=[new_transcription, audio_id_input], | |
| outputs=update_status | |
| ) | |
| refresh_button.click( | |
| fn=list_audios, | |
| inputs=[], | |
| outputs=audio_list | |
| ) | |
| if __name__ == "__main__": | |
| app.launch() |