Spaces:
Running
on
Zero
Running
on
Zero
| """ | |
| Utility functions for CLI operations. | |
| Provides helpers for file discovery, validation, and batch operations. | |
| """ | |
| import logging | |
| from pathlib import Path | |
| from typing import List | |
| logger = logging.getLogger(__name__) | |
| def discover_audio_files(paths: List[Path], pattern: str = "*.m4a") -> List[Path]: | |
| """ | |
| Discover audio files from paths (files, directories, or glob patterns). | |
| Args: | |
| paths: List of file paths, directory paths, or patterns | |
| pattern: Default glob pattern for directories (default: *.m4a) | |
| Returns: | |
| List of audio file paths | |
| """ | |
| audio_files = [] | |
| seen = set() # Avoid duplicates | |
| for path in paths: | |
| if path.is_file(): | |
| # Direct file path | |
| if path.suffix.lower() in [".m4a", ".wav", ".mp3", ".flac"]: | |
| if str(path) not in seen: | |
| audio_files.append(path) | |
| seen.add(str(path)) | |
| logger.debug(f"Added file: {path}") | |
| else: | |
| logger.warning(f"Skipping non-audio file: {path}") | |
| elif path.is_dir(): | |
| # Directory - find all audio files matching pattern | |
| logger.info(f"Searching directory: {path} with pattern {pattern}") | |
| found = list(path.glob(pattern)) | |
| for file in found: | |
| if file.is_file() and str(file) not in seen: | |
| audio_files.append(file) | |
| seen.add(str(file)) | |
| logger.info(f"Found {len(found)} files in {path}") | |
| else: | |
| # Could be a glob pattern | |
| parent = path.parent if path.parent.exists() else Path(".") | |
| pattern_str = path.name | |
| logger.info(f"Searching with glob pattern: {pattern_str} in {parent}") | |
| found = list(parent.glob(pattern_str)) | |
| for file in found: | |
| if file.is_file() and str(file) not in seen: | |
| audio_files.append(file) | |
| seen.add(str(file)) | |
| logger.info(f"Found {len(found)} files matching pattern {pattern_str}") | |
| # Sort for consistent processing order | |
| audio_files.sort() | |
| logger.info(f"Total discovered files: {len(audio_files)}") | |
| return audio_files | |
| def validate_audio_files(files: List[Path]) -> tuple[List[Path], List[str]]: | |
| """ | |
| Validate audio files and return valid files and error messages. | |
| Args: | |
| files: List of file paths to validate | |
| Returns: | |
| Tuple of (valid_files, error_messages) | |
| """ | |
| valid_files = [] | |
| errors = [] | |
| for file in files: | |
| if not file.exists(): | |
| errors.append(f"File not found: {file}") | |
| continue | |
| if not file.is_file(): | |
| errors.append(f"Not a file: {file}") | |
| continue | |
| if file.suffix.lower() not in [".m4a", ".wav", ".mp3", ".flac"]: | |
| errors.append(f"Unsupported audio format: {file}") | |
| continue | |
| if file.stat().st_size == 0: | |
| errors.append(f"Empty file: {file}") | |
| continue | |
| valid_files.append(file) | |
| return valid_files, errors | |
| def generate_output_filename( | |
| input_file: Path, | |
| segment_number: int, | |
| segment_type: str, | |
| start_time: float, | |
| end_time: float, | |
| output_dir: Path, | |
| ) -> Path: | |
| """ | |
| Generate output filename with collision avoidance. | |
| Args: | |
| input_file: Source input file | |
| segment_number: Segment number | |
| segment_type: Type of segment (speech, nonverbal) | |
| start_time: Start time in seconds | |
| end_time: End time in seconds | |
| output_dir: Output directory | |
| Returns: | |
| Unique output file path | |
| """ | |
| base_name = input_file.stem | |
| filename = ( | |
| f"{base_name}_segment_{segment_number:03d}_" | |
| f"{segment_type}_{start_time:.2f}s-{end_time:.2f}s.m4a" | |
| ) | |
| output_path = output_dir / filename | |
| # Handle collisions | |
| if output_path.exists(): | |
| counter = 1 | |
| while output_path.exists(): | |
| filename = ( | |
| f"{base_name}_segment_{segment_number:03d}_" | |
| f"{segment_type}_{start_time:.2f}s-{end_time:.2f}s_({counter}).m4a" | |
| ) | |
| output_path = output_dir / filename | |
| counter += 1 | |
| return output_path | |
| def format_duration(seconds: float) -> str: | |
| """ | |
| Format duration in seconds to human-readable string. | |
| Args: | |
| seconds: Duration in seconds | |
| Returns: | |
| Formatted string (e.g., "1h 23m 45s" or "12m 34s" or "45s") | |
| """ | |
| if seconds < 60: | |
| return f"{seconds:.0f}s" | |
| minutes = int(seconds // 60) | |
| secs = int(seconds % 60) | |
| if minutes < 60: | |
| return f"{minutes}m {secs}s" | |
| hours = minutes // 60 | |
| minutes = minutes % 60 | |
| return f"{hours}h {minutes}m {secs}s" | |
| def format_file_size(bytes: int) -> str: | |
| """ | |
| Format file size to human-readable string. | |
| Args: | |
| bytes: File size in bytes | |
| Returns: | |
| Formatted string (e.g., "1.5 MB") | |
| """ | |
| if bytes < 1024: | |
| return f"{bytes} B" | |
| kb = bytes / 1024 | |
| if kb < 1024: | |
| return f"{kb:.1f} KB" | |
| mb = kb / 1024 | |
| if mb < 1024: | |
| return f"{mb:.1f} MB" | |
| gb = mb / 1024 | |
| return f"{gb:.2f} GB" | |