Spaces:
Sleeping
Sleeping
| import asyncio | |
| import gradio as gr | |
| import os | |
| from agent import AudioAgent | |
| # Global agent instance | |
| agent = None | |
| # Global demo instance | |
| demo = None | |
| def get_share_url(path): | |
| """Get the share URL for a given path""" | |
| agent_url = os.environ.get('AGENT_URL') | |
| if agent_url: | |
| return f"{agent_url}/gradio_api/file={path}" | |
| if demo: | |
| return f"{demo.share_url}/gradio_api/file={path}" | |
| return path | |
| def update_agent(model_name, temperature, api_key): | |
| """Update the agent with new configuration""" | |
| global agent | |
| try: | |
| agent = AudioAgent( | |
| model_name=model_name, | |
| temperature=float(temperature), | |
| api_key=api_key | |
| ) | |
| return True, None | |
| except Exception as e: | |
| return False, str(e) | |
| def user_input(user_message, audio_files, history, custom_history, model_name, temperature, api_key): | |
| """ | |
| Handle user input with text and audio files | |
| """ | |
| # Try to update agent configuration | |
| success, error = update_agent(model_name, temperature, api_key) | |
| if not success: | |
| raise gr.Error(error) | |
| if not user_message.strip() and not audio_files: | |
| return "", audio_files, history, custom_history | |
| # Process audio files into URLs | |
| audio_file_urls = [] | |
| if audio_files: | |
| for audio_file in audio_files: | |
| if hasattr(audio_file, 'name'): | |
| file_path = audio_file.name | |
| else: | |
| file_path = str(audio_file) | |
| audio_file_urls.append(get_share_url(file_path)) | |
| # Add user message to history with input files | |
| history.append({ | |
| "role": "user", | |
| "content": user_message, | |
| }) | |
| # Update custom history | |
| custom_history.append({ | |
| "role": "user", | |
| "content": user_message, | |
| "input_files": audio_file_urls | |
| }) | |
| return "", audio_files, history, custom_history | |
| async def bot_response(history, audio_file_urls, custom_history): | |
| """ | |
| Generate bot response using the agent | |
| """ | |
| if not agent: | |
| raise gr.Error("Please configure the agent first") | |
| if not history or history[-1]["role"] != "user": | |
| return history, [] | |
| # Get the user message and input files | |
| user_message = custom_history[-1]["content"] | |
| input_files = custom_history[-1].get("input_files", []) | |
| # If message is empty but we have audio files, provide default message | |
| if not user_message.strip() and audio_file_urls: | |
| user_message = "Please process these audio files" | |
| try: | |
| # Use the agent's run_agent method with history | |
| result = await agent.run_agent(user_message, input_files, custom_history[:-1]) | |
| # Extract the final response and audio files from the result | |
| final_response = result["final_response"] | |
| output_audio_files = result["output_audio_files"] | |
| # Add assistant response to history with output files | |
| history.append({ | |
| "role": "assistant", | |
| "content": final_response, | |
| }) | |
| # Update custom history | |
| custom_history.append({ | |
| "role": "assistant", | |
| "content": final_response, | |
| "output_files": output_audio_files | |
| }) | |
| return history, output_audio_files | |
| except Exception as e: | |
| history.pop() | |
| custom_history.pop() | |
| raise gr.Error(str(e)) | |
| def bot_response_sync(history, audio_file_urls, custom_history): | |
| """ | |
| Synchronous wrapper for the async bot response | |
| """ | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| return loop.run_until_complete(bot_response(history, audio_file_urls, custom_history)) | |
| finally: | |
| loop.close() | |
| def create_interface(): | |
| with gr.Blocks( | |
| title="Audio Agent - Professional Audio Processing", | |
| theme=gr.themes.Default(), | |
| ) as interface: | |
| gr.Markdown(""" | |
| # Audio Agent - Your AI Audio Assistant | |
| Upload your audio files and tell me what you need. I'll handle the rest! | |
| """) | |
| # Hidden state to store audio file URLs and custom history | |
| audio_urls_state = gr.State([]) | |
| custom_history_state = gr.State([]) | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| chatbot = gr.Chatbot( | |
| type="messages", | |
| height=500, | |
| show_copy_button=True, | |
| show_share_button=False | |
| ) | |
| msg = gr.Textbox( | |
| label="Describe what you want to do?", | |
| placeholder="e.g., 'Remove filler words and improve audio quality''", | |
| lines=3, | |
| submit_btn=True | |
| ) | |
| with gr.Column(scale=1): | |
| # Model Configuration | |
| with gr.Group(): | |
| model_name = gr.Dropdown( | |
| choices=["gpt-4.1", "gpt-4.1-mini", "gpt-4o", "o3"], | |
| value="gpt-4.1", | |
| label="Model", | |
| info="Select the model to use" | |
| ) | |
| temperature = gr.Slider( | |
| minimum=0.0, | |
| maximum=1.0, | |
| value=0.3, | |
| step=0.1, | |
| label="Temperature", | |
| info="Higher values make output more random" | |
| ) | |
| api_key = gr.Textbox( | |
| label="OpenAI API Key", | |
| placeholder="sk-...", | |
| type="password", | |
| info="Your OpenAI API key" | |
| ) | |
| # Set temperature to 1.0 when o3 model is selected | |
| def update_temperature(model): | |
| if model == "o3": | |
| return gr.update(value=1.0, interactive=False) | |
| return gr.update(interactive=True) | |
| model_name.change( | |
| update_temperature, | |
| inputs=[model_name], | |
| outputs=[temperature] | |
| ) | |
| with gr.Group(): | |
| audio_files = gr.File( | |
| file_count="multiple", | |
| file_types=["audio"], | |
| label="Upload Audio Files to Process", | |
| height=150 | |
| ) | |
| output_audio_files = gr.File( | |
| file_count="multiple", | |
| file_types=["audio"], | |
| label="Download Generated Audio", | |
| height=150, | |
| interactive=False, | |
| visible=False # Start hidden | |
| ) | |
| # Handle user input and bot response | |
| def handle_submit(message, files, history, custom_history, model, temp, key): | |
| new_msg, new_files, updated_history, updated_custom_history = user_input( | |
| message, files, history, custom_history, model, temp, key | |
| ) | |
| return new_msg, new_files, updated_history, updated_custom_history | |
| def handle_bot_response(history, audio_urls, custom_history): | |
| updated_history, output_files = bot_response_sync(history, audio_urls, custom_history) | |
| output_visible = bool(output_files) # True if there are files, else False | |
| return updated_history, gr.update(value=output_files, visible=output_visible), custom_history | |
| msg.submit( | |
| handle_submit, | |
| [msg, audio_files, chatbot, custom_history_state, model_name, temperature, api_key], | |
| [msg, audio_files, chatbot, custom_history_state], | |
| queue=False | |
| ).then( | |
| handle_bot_response, | |
| [chatbot, audio_urls_state, custom_history_state], | |
| [chatbot, output_audio_files, custom_history_state] | |
| ) | |
| gr.Markdown(""" | |
| --- | |
| """) | |
| with gr.Row(): | |
| gr.Markdown(""" | |
| ## 🎚️ What I Can Do For You | |
| **Audio Manipulation:** | |
| - Merge multiple audio files into one continuous track | |
| - Cut or trim specific sections from any file | |
| - Adjust volume levels (increase or decrease) | |
| - Normalize audio levels for consistency | |
| - Apply fade-in or fade-out effects for smooth transitions (Mono channel only) | |
| - Change playback speed (faster or slower, with pitch change) | |
| - Reverse audio for creative effects | |
| - Remove silence from beginning or end of files | |
| **Analysis & Transcription:** (English only) | |
| - Transcribe speech in audio to text | |
| - Analyze audio properties (duration, sample rate, etc.) | |
| """) | |
| gr.Markdown(""" | |
| ## 💡 Example Requests | |
| - *"Merge these two audio files and add a fade-in effect"* | |
| - *"Remove the silence at the beginning of this recording"* | |
| - *"Transcribe the speech in this audio file"* | |
| - *"Increase the volume of the first track and normalize both files"* | |
| - *"Cut out the middle section from 1:30 to 2:45"* | |
| - *"Make this audio play 1.5x faster"* | |
| - *"Apply a fade-out effect to the end of this track"* | |
| """) | |
| return interface | |
| if __name__ == "__main__": | |
| demo = create_interface() | |
| demo.launch() | |