Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import requests | |
| import tempfile | |
| import os | |
| import json | |
| import traceback | |
| # AudioJob integration | |
| from audiojob import AudioJobRunner | |
| from pydub import AudioSegment | |
| from typing import Optional, Tuple | |
| import logging | |
| import ffmpeg | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| def download_audio_from_url(url: str) -> str: | |
| """Download audio from URL and save to temporary file.""" | |
| try: | |
| response = requests.get(url, stream=True, timeout=30) | |
| response.raise_for_status() | |
| # Get content type to determine file extension | |
| content_type = response.headers.get('content-type', '') | |
| if 'audio/mpeg' in content_type or 'mp3' in content_type: | |
| ext = '.mp3' | |
| elif 'audio/wav' in content_type or 'wav' in content_type: | |
| ext = '.wav' | |
| elif 'audio/ogg' in content_type or 'ogg' in content_type: | |
| ext = '.ogg' | |
| elif 'audio/mp4' in content_type or 'm4a' in content_type: | |
| ext = '.m4a' | |
| else: | |
| # Try to get extension from URL | |
| ext = os.path.splitext(url.split('?')[0])[1] | |
| if not ext: | |
| ext = '.mp3' # Default fallback | |
| # Create temporary file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp_file: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| temp_file.write(chunk) | |
| return temp_file.name | |
| except Exception as e: | |
| logger.error(f"Error downloading audio: {str(e)}") | |
| raise gr.Error(f"Failed to download audio from URL: {str(e)}") | |
| def cut_audio(audio_url: str, start_time: float, duration: float) -> str: | |
| """ | |
| Cut audio from the given URL based on start time and duration. | |
| Args: | |
| audio_url: URL of the audio file | |
| start_time: Start time in seconds | |
| duration: Duration in seconds | |
| Returns: | |
| Path to the cut audio file | |
| """ | |
| try: | |
| # Validate inputs | |
| if not audio_url.strip(): | |
| raise gr.Error("Please provide a valid audio URL") | |
| if start_time < 0: | |
| raise gr.Error("Start time must be non-negative") | |
| if duration <= 0: | |
| raise gr.Error("Duration must be positive") | |
| # Download audio from URL | |
| logger.info(f"Downloading audio from: {audio_url}") | |
| temp_input_path = download_audio_from_url(audio_url) | |
| try: | |
| # Load audio file | |
| logger.info("Loading audio file...") | |
| audio = AudioSegment.from_file(temp_input_path) | |
| # Convert times to milliseconds | |
| start_ms = int(start_time * 1000) | |
| duration_ms = int(duration * 1000) | |
| end_ms = start_ms + duration_ms | |
| # Check if start time is within audio duration | |
| if start_ms >= len(audio): | |
| raise gr.Error(f"Start time ({start_time}s) is beyond audio duration ({len(audio)/1000:.2f}s)") | |
| # Adjust end time if it exceeds audio length | |
| if end_ms > len(audio): | |
| end_ms = len(audio) | |
| actual_duration = (end_ms - start_ms) / 1000 | |
| logger.warning(f"Requested duration extends beyond audio. Cutting until end. Actual duration: {actual_duration:.2f}s") | |
| # Cut the audio with ffmpeg using stream copy to preserve original codec/bitrate | |
| logger.info(f"Cutting audio (stream copy) from {start_time}s to {end_ms/1000:.2f}s") | |
| # Keep original file extension when saving to /tmp | |
| _, input_ext = os.path.splitext(temp_input_path) | |
| if not input_ext: | |
| input_ext = ".mp3" | |
| # Create an output path in /tmp | |
| fd, output_path = tempfile.mkstemp(suffix=input_ext, dir="/tmp") | |
| os.close(fd) | |
| # Duration for the cut in seconds | |
| cut_duration_seconds = (end_ms - start_ms) / 1000.0 | |
| # Try fast cut using ffmpeg stream copy to avoid re-encoding | |
| try: | |
| ( | |
| ffmpeg | |
| .input(temp_input_path, ss=start_time, t=cut_duration_seconds) | |
| .output(output_path, acodec='copy') | |
| .global_args('-loglevel', 'error', '-hide_banner') | |
| .overwrite_output() | |
| .run(capture_stdout=True, capture_stderr=True) | |
| ) | |
| except ffmpeg.Error as ff_err: | |
| # Log detailed ffmpeg stderr and fall back to re-encoding | |
| try: | |
| ffmpeg_stderr = ff_err.stderr.decode('utf-8', errors='ignore') if hasattr(ff_err, 'stderr') else str(ff_err) | |
| except Exception: | |
| ffmpeg_stderr = str(ff_err) | |
| logger.warning("ffmpeg stream copy failed, falling back to re-encode. Details: %s", ffmpeg_stderr) | |
| # Fallback: re-encode using pydub (slower but more compatible) | |
| segment = audio[start_ms:end_ms] | |
| export_format = input_ext[1:].lower() if input_ext.startswith('.') else input_ext.lower() | |
| # pydub/ffmpeg commonly expect 'mp4' as format for m4a container | |
| if export_format == 'm4a': | |
| export_format = 'mp4' | |
| segment.export(output_path, format=export_format) | |
| logger.info(f"Cut audio saved to: {output_path}") | |
| return output_path | |
| finally: | |
| # Clean up input file | |
| if os.path.exists(temp_input_path): | |
| os.unlink(temp_input_path) | |
| except gr.Error: | |
| # Re-raise Gradio errors | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error cutting audio: {str(e)}") | |
| raise gr.Error(f"Failed to process audio: {str(e)}") | |
| def process_audio_cut(audio_url: str, start_time: float, duration: float) -> Tuple[str, str]: | |
| """ | |
| Process audio cutting and return both the audio file and status message. | |
| Returns: | |
| Tuple of (audio_file_path, status_message) | |
| """ | |
| try: | |
| result_path = cut_audio(audio_url, start_time, duration) | |
| status_msg = f"✅ Successfully cut audio: {duration}s segment starting at {start_time}s" | |
| return result_path, status_msg | |
| except Exception as e: | |
| error_msg = f"❌ Error: {str(e)}" | |
| return None, error_msg | |
| # Create Gradio interface | |
| with gr.Blocks(title="Audio Editor", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown( | |
| """ | |
| # 🎵 Audio Editor | |
| Upload audio via URL and perform various editing operations. | |
| ## 🎯 Audio Cut | |
| Cut a specific segment from your audio file by providing start time and duration. | |
| """ | |
| ) | |
| with gr.Tab("Audio Cut"): | |
| gr.Markdown("### Cut Audio Segment") | |
| with gr.Row(): | |
| with gr.Column(): | |
| audio_url_input = gr.Textbox( | |
| label="Audio URL", | |
| placeholder="https://example.com/audio.mp3", | |
| info="Enter the URL of the audio file you want to edit" | |
| ) | |
| with gr.Row(): | |
| start_time_input = gr.Number( | |
| label="Start Time (seconds)", | |
| value=0, | |
| minimum=0, | |
| info="When to start cutting (in seconds)" | |
| ) | |
| duration_input = gr.Number( | |
| label="Duration (seconds)", | |
| value=10, | |
| minimum=0.1, | |
| info="How long the cut should be (in seconds)" | |
| ) | |
| cut_button = gr.Button("🎵 Cut Audio", variant="primary") | |
| with gr.Column(): | |
| status_output = gr.Textbox( | |
| label="Status", | |
| interactive=False, | |
| info="Processing status and messages" | |
| ) | |
| audio_output = gr.Audio( | |
| label="Cut Audio Result", | |
| type="filepath", | |
| ) | |
| # Examples | |
| gr.Markdown("### 📝 Examples") | |
| gr.Examples( | |
| examples=[ | |
| [ | |
| "https://www.soundjay.com/misc/sounds/bell-ringing-05.wav", | |
| 0, | |
| 5 | |
| ], | |
| [ | |
| "https://file-examples.com/storage/fe68c9d70ede98d3b5f5f90/2017/11/file_example_MP3_700KB.mp3", | |
| 10, | |
| 15 | |
| ] | |
| ], | |
| inputs=[audio_url_input, start_time_input, duration_input], | |
| label="Try these examples:" | |
| ) | |
| # Set up event handler | |
| cut_button.click( | |
| fn=process_audio_cut, | |
| inputs=[audio_url_input, start_time_input, duration_input], | |
| outputs=[audio_output, status_output] | |
| ) | |
| with gr.Tab("AudioJob Runner"): | |
| gr.Markdown("### AudioJob: preprocess -> split (inspect manifest)") | |
| with gr.Row(): | |
| with gr.Column(): | |
| aj_source_input = gr.Textbox( | |
| label="Source URI", | |
| placeholder="e.g. /abs/path/to/file.wav or s3://bucket/key", | |
| info="Source URI for AudioJobRunner" | |
| ) | |
| aj_manifest_input = gr.Textbox( | |
| label="Manifest JSON (optional)", | |
| placeholder="Paste existing manifest JSON to resume (optional)", | |
| lines=10 | |
| ) | |
| aj_s3_prefix = gr.Textbox( | |
| label="S3 Prefix", | |
| placeholder="Optional prefix for uploaded working copies (e.g. jobs/)", | |
| info="Uploaded keys will be prefixed with this value", | |
| ) | |
| aj_run_button = gr.Button("Run AudioJob", variant="primary") | |
| with gr.Column(): | |
| aj_output = gr.Textbox(label="AudioJob Output (manifest)", lines=30, interactive=False) | |
| def run_audiojob_ui(source_uri: str, manifest_json: str, s3_prefix: str) -> str: | |
| try: | |
| manifest = None | |
| if manifest_json and manifest_json.strip(): | |
| manifest = json.loads(manifest_json) | |
| work_root = tempfile.mkdtemp(prefix="audiojob_") | |
| # allow presets from top-level presets if desired; using defaults here | |
| runner = AudioJobRunner( | |
| manifest=manifest, | |
| source_uri=None if manifest else source_uri, | |
| work_root=work_root, | |
| presets={ | |
| # Read bucket and endpoint from environment where possible | |
| "s3_bucket": os.environ.get("S3_BUCKET"), | |
| "s3_region": "auto", | |
| "s3_prefix": s3_prefix or "", | |
| "s3_endpoint": os.environ.get("S3_ENDPOINT", ""), | |
| "chunk_target_ms": 15 * 60000, | |
| } | |
| ) | |
| out_manifest = runner.run_until_split() | |
| return json.dumps(out_manifest, ensure_ascii=False, indent=2) | |
| except Exception as e: | |
| tb = traceback.format_exc() | |
| return f"Error: {e}\n\n{tb}" | |
| aj_run_button.click(fn=run_audiojob_ui, inputs=[aj_source_input, aj_manifest_input, aj_s3_prefix], outputs=[aj_output]) | |
| # Launch the app | |
| if __name__ == "__main__": | |
| demo.launch() | |