Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| import os | |
| from pathlib import Path | |
| from typing import List, Optional, Tuple, Union | |
| import librosa | |
| import numpy as np | |
| import soundfile as sf | |
| try: | |
| from .audio_info import validate_audio_path | |
| except ImportError: | |
| from audio_info import validate_audio_path | |
| def cut_audio( | |
| audio_path: str, | |
| start_time: float, | |
| end_time: float, | |
| output_path: Optional[str] = None, | |
| output_format: str = "wav", | |
| ) -> str: | |
| """ | |
| Cut a segment from an audio file between specified start and end times. | |
| Args: | |
| audio_path: Path to input audio file or URL | |
| start_time: Start time in seconds | |
| end_time: End time in seconds | |
| output_path: Optional output directory (default: None, uses current directory) | |
| output_format: Output format ('wav' or 'mp3', default: 'wav') | |
| Returns: | |
| Path to the cut audio file | |
| Raises: | |
| ValueError: If start_time >= end_time or times are out of range | |
| FileNotFoundError: If audio file doesn't exist | |
| """ | |
| try: | |
| # Validate audio path | |
| validated_path = validate_audio_path(audio_path) | |
| # Load audio | |
| y, sr = librosa.load(validated_path, sr=None, mono=False) | |
| # Get audio duration | |
| duration = len(y) / sr if y.ndim == 1 else len(y[0]) / sr | |
| if start_time >= end_time: | |
| raise ValueError( | |
| f"Start time ({start_time}s) must be less than end time ({end_time}s)" | |
| ) | |
| if start_time < 0: | |
| raise ValueError(f"Start time ({start_time}s) cannot be negative") | |
| if end_time > duration: | |
| raise ValueError( | |
| f"End time ({end_time}s) exceeds audio duration ({duration:.2f}s)" | |
| ) | |
| # Convert time to sample indices | |
| start_sample = int(start_time * sr) | |
| end_sample = int(end_time * sr) | |
| # Cut the audio segment | |
| if y.ndim == 1: | |
| # Mono audio | |
| y_cut = y[start_sample:end_sample] | |
| else: | |
| # Multi-channel audio | |
| y_cut = y[:, start_sample:end_sample] | |
| # Generate output filename | |
| if not output_path: | |
| output_path = "." | |
| os.makedirs(output_path, exist_ok=True) | |
| original_filename = Path(validated_path).stem | |
| output_filename = f"{original_filename}_cut_{start_time:.1f}s_to_{end_time:.1f}s.{output_format.lower()}" | |
| output_file_path = os.path.join(output_path, output_filename) | |
| # Save the cut audio | |
| if y_cut.ndim == 2: | |
| y_cut = y_cut.T # Transpose for soundfile | |
| if output_format.lower() == "mp3": | |
| # For MP3, use ffmpeg through subprocess | |
| import tempfile | |
| import subprocess | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_wav: | |
| sf.write(temp_wav.name, y_cut, sr) | |
| cmd = [ | |
| "ffmpeg", | |
| "-y", | |
| "-i", | |
| temp_wav.name, | |
| "-c:a", | |
| "libmp3lame", | |
| "-b:a", | |
| "192k", | |
| output_file_path, | |
| ] | |
| subprocess.run(cmd, capture_output=True, check=True) | |
| os.unlink(temp_wav.name) | |
| else: | |
| sf.write(output_file_path, y_cut, sr) | |
| return output_file_path | |
| except Exception as e: | |
| raise RuntimeError(f"Error cutting audio: {str(e)}") | |
| def mute_time_windows( | |
| audio_path: str, | |
| mute_windows: List[Tuple[float, float]], | |
| output_path: Optional[str] = None, | |
| output_format: str = "wav", | |
| fade_duration: float = 0.1, | |
| ) -> str: | |
| """ | |
| Mute specific time windows in an audio file. | |
| Args: | |
| audio_path: Path to input audio file or URL | |
| mute_windows: List of (start_time, end_time) tuples in seconds | |
| output_path: Optional output directory (default: None, uses current directory) | |
| output_format: Output format ('wav' or 'mp3', default: 'wav') | |
| fade_duration: Fade in/out duration in seconds for smooth transitions (default: 0.1s) | |
| Returns: | |
| Path to the processed audio file with muted sections | |
| Raises: | |
| ValueError: If mute windows are invalid or overlapping | |
| """ | |
| try: | |
| # Validate audio path | |
| validated_path = validate_audio_path(audio_path) | |
| # Load audio | |
| y, sr = librosa.load(validated_path, sr=None, mono=False) | |
| # Get audio duration | |
| duration = len(y) / sr if y.ndim == 1 else len(y[0]) / sr | |
| # Validate and sort mute windows | |
| sorted_windows = sorted(mute_windows, key=lambda x: x[0]) | |
| for i, (start, end) in enumerate(sorted_windows): | |
| if start >= end: | |
| raise ValueError( | |
| f"Window {i}: start time ({start}s) must be less than end time ({end}s)" | |
| ) | |
| if start < 0 or end > duration: | |
| raise ValueError( | |
| f"Window {i}: time range ({start}s-{end}s) outside audio duration (0-{duration:.2f}s)" | |
| ) | |
| # Check for overlaps | |
| if i > 0: | |
| prev_start, prev_end = sorted_windows[i - 1] | |
| if start < prev_end: | |
| raise ValueError(f"Window {i} overlaps with previous window") | |
| # Create a copy of the audio for processing | |
| y_processed = y.copy() | |
| # Apply muting with fade in/out | |
| for start_time, end_time in sorted_windows: | |
| start_sample = int(start_time * sr) | |
| end_sample = int(end_time * sr) | |
| fade_samples = int(fade_duration * sr) | |
| if y_processed.ndim == 1: | |
| # Mono audio | |
| # Apply fade out before mute | |
| fade_start = max(0, start_sample - fade_samples) | |
| if fade_start < start_sample: | |
| fade_out = np.linspace(1, 0, start_sample - fade_start) | |
| y_processed[fade_start:start_sample] *= fade_out | |
| # Apply mute | |
| y_processed[start_sample:end_sample] = 0 | |
| # Apply fade in after mute | |
| fade_end = min(len(y_processed), end_sample + fade_samples) | |
| if end_sample < fade_end: | |
| fade_in = np.linspace(0, 1, fade_end - end_sample) | |
| y_processed[end_sample:fade_end] *= fade_in | |
| else: | |
| # Multi-channel audio | |
| # Apply fade out before mute | |
| fade_start = max(0, start_sample - fade_samples) | |
| if fade_start < start_sample: | |
| fade_out = np.linspace(1, 0, start_sample - fade_start) | |
| y_processed[:, fade_start:start_sample] *= fade_out[np.newaxis, :] | |
| # Apply mute | |
| y_processed[:, start_sample:end_sample] = 0 | |
| # Apply fade in after mute | |
| fade_end = min(y_processed.shape[1], end_sample + fade_samples) | |
| if end_sample < fade_end: | |
| fade_in = np.linspace(0, 1, fade_end - end_sample) | |
| y_processed[:, end_sample:fade_end] *= fade_in[np.newaxis, :] | |
| # Generate output filename | |
| if not output_path: | |
| output_path = "." | |
| os.makedirs(output_path, exist_ok=True) | |
| original_filename = Path(validated_path).stem | |
| windows_str = "_".join([f"{s:.1f}-{e:.1f}" for s, e in sorted_windows[:3]]) | |
| if len(sorted_windows) > 3: | |
| windows_str += f"_and_{len(sorted_windows) - 3}_more" | |
| output_filename = ( | |
| f"{original_filename}_muted_{windows_str}.{output_format.lower()}" | |
| ) | |
| output_file_path = os.path.join(output_path, output_filename) | |
| # Save the processed audio | |
| if y_processed.ndim == 2: | |
| y_processed = y_processed.T # Transpose for soundfile | |
| if output_format.lower() == "mp3": | |
| # For MP3, use ffmpeg through subprocess | |
| import tempfile | |
| import subprocess | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_wav: | |
| sf.write(temp_wav.name, y_processed, sr) | |
| cmd = [ | |
| "ffmpeg", | |
| "-y", | |
| "-i", | |
| temp_wav.name, | |
| "-c:a", | |
| "libmp3lame", | |
| "-b:a", | |
| "192k", | |
| output_file_path, | |
| ] | |
| subprocess.run(cmd, capture_output=True, check=True) | |
| os.unlink(temp_wav.name) | |
| else: | |
| sf.write(output_file_path, y_processed, sr) | |
| return output_file_path | |
| except Exception as e: | |
| raise RuntimeError(f"Error muting audio windows: {str(e)}") | |
| def extract_segments( | |
| audio_path: str, | |
| segments: List[Tuple[float, float]], | |
| output_path: Optional[str] = None, | |
| output_format: str = "wav", | |
| join_segments: bool = False, | |
| ) -> Union[str, List[str]]: | |
| """ | |
| Extract multiple segments from an audio file. | |
| Args: | |
| audio_path: Path to input audio file or URL | |
| segments: List of (start_time, end_time) tuples in seconds | |
| output_path: Optional output directory (default: None, uses current directory) | |
| output_format: Output format ('wav' or 'mp3', default: 'wav') | |
| join_segments: If True, join all segments into one file; if False, save separately | |
| Returns: | |
| If join_segments=True: Path to joined audio file | |
| If join_segments=False: List of paths to individual segment files | |
| Raises: | |
| ValueError: If segments are invalid | |
| """ | |
| try: | |
| # Validate audio path | |
| validated_path = validate_audio_path(audio_path) | |
| # Load audio | |
| y, sr = librosa.load(validated_path, sr=None, mono=False) | |
| # Get audio duration | |
| duration = len(y) / sr if y.ndim == 1 else len(y[0]) / sr | |
| # Validate segments | |
| for i, (start, end) in enumerate(segments): | |
| if start >= end: | |
| raise ValueError( | |
| f"Segment {i}: start time ({start}s) must be less than end time ({end}s)" | |
| ) | |
| if start < 0 or end > duration: | |
| raise ValueError( | |
| f"Segment {i}: time range ({start}s-{end}s) outside audio duration" | |
| ) | |
| if not output_path: | |
| output_path = "." | |
| os.makedirs(output_path, exist_ok=True) | |
| original_filename = Path(validated_path).stem | |
| if join_segments: | |
| # Join all segments into one file | |
| segments_audio = [] | |
| for start_time, end_time in segments: | |
| start_sample = int(start_time * sr) | |
| end_sample = int(end_time * sr) | |
| if y.ndim == 1: | |
| segment = y[start_sample:end_sample] | |
| else: | |
| segment = y[:, start_sample:end_sample] | |
| segments_audio.append(segment) | |
| # Concatenate all segments | |
| if y.ndim == 1: | |
| y_joined = np.concatenate(segments_audio) | |
| else: | |
| y_joined = np.concatenate(segments_audio, axis=1) | |
| # Save joined audio | |
| output_filename = ( | |
| f"{original_filename}_segments_joined.{output_format.lower()}" | |
| ) | |
| output_file_path = os.path.join(output_path, output_filename) | |
| if y_joined.ndim == 2: | |
| y_joined = y_joined.T | |
| if output_format.lower() == "mp3": | |
| import tempfile | |
| import subprocess | |
| with tempfile.NamedTemporaryFile( | |
| suffix=".wav", delete=False | |
| ) as temp_wav: | |
| sf.write(temp_wav.name, y_joined, sr) | |
| cmd = [ | |
| "ffmpeg", | |
| "-y", | |
| "-i", | |
| temp_wav.name, | |
| "-c:a", | |
| "libmp3lame", | |
| "-b:a", | |
| "192k", | |
| output_file_path, | |
| ] | |
| subprocess.run(cmd, capture_output=True, check=True) | |
| os.unlink(temp_wav.name) | |
| else: | |
| sf.write(output_file_path, y_joined, sr) | |
| return output_file_path | |
| else: | |
| # Save segments separately | |
| segment_files = [] | |
| for i, (start_time, end_time) in enumerate(segments): | |
| start_sample = int(start_time * sr) | |
| end_sample = int(end_time * sr) | |
| if y.ndim == 1: | |
| segment = y[start_sample:end_sample] | |
| else: | |
| segment = y[:, start_sample:end_sample] | |
| output_filename = f"{original_filename}_segment_{i + 1}_{start_time:.1f}s_to_{end_time:.1f}s.{output_format.lower()}" | |
| output_file_path = os.path.join(output_path, output_filename) | |
| if segment.ndim == 2: | |
| segment = segment.T | |
| if output_format.lower() == "mp3": | |
| import tempfile | |
| import subprocess | |
| with tempfile.NamedTemporaryFile( | |
| suffix=".wav", delete=False | |
| ) as temp_wav: | |
| sf.write(temp_wav.name, segment, sr) | |
| cmd = [ | |
| "ffmpeg", | |
| "-y", | |
| "-i", | |
| temp_wav.name, | |
| "-c:a", | |
| "libmp3lame", | |
| "-b:a", | |
| "192k", | |
| output_file_path, | |
| ] | |
| subprocess.run(cmd, capture_output=True, check=True) | |
| os.unlink(temp_wav.name) | |
| else: | |
| sf.write(output_file_path, segment, sr) | |
| segment_files.append(output_file_path) | |
| return segment_files | |
| except Exception as e: | |
| raise RuntimeError(f"Error extracting segments: {str(e)}") | |
| def trim_audio( | |
| audio_path: str, | |
| trim_start: Optional[float] = None, | |
| trim_end: Optional[float] = None, | |
| output_path: Optional[str] = None, | |
| output_format: str = "wav", | |
| ) -> str: | |
| """ | |
| Trim audio from the beginning and/or end. | |
| Args: | |
| audio_path: Path to input audio file or URL | |
| trim_start: Amount to trim from start in seconds (None = no trim from start) | |
| trim_end: Amount to trim from end in seconds (None = no trim from end) | |
| output_path: Optional output directory (default: None, uses current directory) | |
| output_format: Output format ('wav' or 'mp3', default: 'wav') | |
| Returns: | |
| Path to the trimmed audio file | |
| Raises: | |
| ValueError: If trim amounts are invalid or exceed audio duration | |
| """ | |
| try: | |
| # Validate audio path | |
| validated_path = validate_audio_path(audio_path) | |
| # Load audio | |
| y, sr = librosa.load(validated_path, sr=None, mono=False) | |
| # Get audio duration | |
| duration = len(y) / sr if y.ndim == 1 else len(y[0]) / sr | |
| # Validate trim amounts | |
| if trim_start is not None and trim_start < 0: | |
| raise ValueError("Trim start amount cannot be negative") | |
| if trim_end is not None and trim_end < 0: | |
| raise ValueError("Trim end amount cannot be negative") | |
| if trim_start is None: | |
| trim_start = 0.0 | |
| if trim_end is None: | |
| trim_end = 0.0 | |
| total_trim = trim_start + trim_end | |
| if total_trim >= duration: | |
| raise ValueError( | |
| f"Total trim ({total_trim}s) exceeds or equals audio duration ({duration:.2f}s)" | |
| ) | |
| # Calculate trim boundaries | |
| start_sample = int(trim_start * sr) | |
| if trim_end > 0: | |
| end_sample = int((duration - trim_end) * sr) | |
| else: | |
| end_sample = len(y) if y.ndim == 1 else y.shape[1] | |
| # Trim the audio | |
| if y.ndim == 1: | |
| y_trimmed = y[start_sample:end_sample] | |
| else: | |
| y_trimmed = y[:, start_sample:end_sample] | |
| # Generate output filename | |
| if not output_path: | |
| output_path = "." | |
| os.makedirs(output_path, exist_ok=True) | |
| original_filename = Path(validated_path).stem | |
| trim_parts = [] | |
| if trim_start > 0: | |
| trim_parts.append(f"start_{trim_start:.1f}s") | |
| if trim_end > 0: | |
| trim_parts.append(f"end_{trim_end:.1f}s") | |
| trim_str = "_".join(trim_parts) if trim_parts else "trimmed" | |
| output_filename = f"{original_filename}_{trim_str}.{output_format.lower()}" | |
| output_file_path = os.path.join(output_path, output_filename) | |
| # Save the trimmed audio | |
| if y_trimmed.ndim == 2: | |
| y_trimmed = y_trimmed.T # Transpose for soundfile | |
| if output_format.lower() == "mp3": | |
| # For MP3, use ffmpeg through subprocess | |
| import tempfile | |
| import subprocess | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_wav: | |
| sf.write(temp_wav.name, y_trimmed, sr) | |
| cmd = [ | |
| "ffmpeg", | |
| "-y", | |
| "-i", | |
| temp_wav.name, | |
| "-c:a", | |
| "libmp3lame", | |
| "-b:a", | |
| "192k", | |
| output_file_path, | |
| ] | |
| subprocess.run(cmd, capture_output=True, check=True) | |
| os.unlink(temp_wav.name) | |
| else: | |
| sf.write(output_file_path, y_trimmed, sr) | |
| return output_file_path | |
| except Exception as e: | |
| raise RuntimeError(f"Error trimming audio: {str(e)}") | |
| if __name__ == "__main__": | |
| import argparse | |
| import json | |
| parser = argparse.ArgumentParser(description="Audio cutting and editing tools") | |
| subparsers = parser.add_subparsers(dest="command", help="Available commands") | |
| # Cut audio | |
| cut_parser = subparsers.add_parser("cut", help="Cut audio segment") | |
| cut_parser.add_argument("audio", help="Path to audio file") | |
| cut_parser.add_argument("start", type=float, help="Start time in seconds") | |
| cut_parser.add_argument("end", type=float, help="End time in seconds") | |
| cut_parser.add_argument( | |
| "--format", default="wav", choices=["wav", "mp3"], help="Output format" | |
| ) | |
| # Mute windows | |
| mute_parser = subparsers.add_parser("mute", help="Mute time windows") | |
| mute_parser.add_argument("audio", help="Path to audio file") | |
| mute_parser.add_argument("windows", help="JSON array of [start, end] pairs") | |
| mute_parser.add_argument( | |
| "--format", default="wav", choices=["wav", "mp3"], help="Output format" | |
| ) | |
| # Extract segments | |
| extract_parser = subparsers.add_parser("extract", help="Extract segments") | |
| extract_parser.add_argument("audio", help="Path to audio file") | |
| extract_parser.add_argument("segments", help="JSON array of [start, end] pairs") | |
| extract_parser.add_argument( | |
| "--join", action="store_true", help="Join segments into one file" | |
| ) | |
| extract_parser.add_argument( | |
| "--format", default="wav", choices=["wav", "mp3"], help="Output format" | |
| ) | |
| # Trim audio | |
| trim_parser = subparsers.add_parser("trim", help="Trim audio from start/end") | |
| trim_parser.add_argument("audio", help="Path to audio file") | |
| trim_parser.add_argument( | |
| "--start", type=float, help="Trim amount from start in seconds" | |
| ) | |
| trim_parser.add_argument( | |
| "--end", type=float, help="Trim amount from end in seconds" | |
| ) | |
| trim_parser.add_argument( | |
| "--format", default="wav", choices=["wav", "mp3"], help="Output format" | |
| ) | |
| args = parser.parse_args() | |
| try: | |
| if args.command == "cut": | |
| output = cut_audio( | |
| args.audio, args.start, args.end, output_format=args.format | |
| ) | |
| print(f"Cut audio saved to: {output}") | |
| elif args.command == "mute": | |
| windows = json.loads(args.windows) | |
| output = mute_time_windows(args.audio, windows, output_format=args.format) | |
| print(f"Muted audio saved to: {output}") | |
| elif args.command == "extract": | |
| segments = json.loads(args.segments) | |
| result = extract_segments( | |
| args.audio, segments, join_segments=args.join, output_format=args.format | |
| ) | |
| if args.join: | |
| print(f"Joined segments saved to: {result}") | |
| else: | |
| print("Extracted segments:") | |
| for i, segment_file in enumerate(result): | |
| print(f" {i + 1}. {segment_file}") | |
| elif args.command == "trim": | |
| output = trim_audio( | |
| args.audio, args.start, args.end, output_format=args.format | |
| ) | |
| print(f"Trimmed audio saved to: {output}") | |
| else: | |
| parser.print_help() | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| exit(1) | |