Spaces:
Paused
Paused
| """ | |
| ShortSmith v2 - Helper Utilities | |
| Common utility functions for file handling, validation, and data manipulation. | |
| """ | |
| import os | |
| import shutil | |
| import tempfile | |
| import uuid | |
| from pathlib import Path | |
| from typing import Optional, List, Tuple, Union | |
| from dataclasses import dataclass | |
| from utils.logger import get_logger | |
| logger = get_logger("utils.helpers") | |
| # Supported file formats | |
| SUPPORTED_VIDEO_FORMATS = {".mp4", ".avi", ".mov", ".mkv", ".webm", ".flv", ".wmv", ".m4v"} | |
| SUPPORTED_IMAGE_FORMATS = {".jpg", ".jpeg", ".png", ".webp", ".bmp", ".gif"} | |
| SUPPORTED_AUDIO_FORMATS = {".mp3", ".wav", ".aac", ".flac", ".ogg", ".m4a"} | |
| class ValidationResult: | |
| """Result of file validation.""" | |
| is_valid: bool | |
| error_message: Optional[str] = None | |
| file_path: Optional[Path] = None | |
| file_size: int = 0 | |
| class FileValidationError(Exception): | |
| """Exception raised for file validation errors.""" | |
| pass | |
| class VideoProcessingError(Exception): | |
| """Exception raised for video processing errors.""" | |
| pass | |
| class ModelLoadError(Exception): | |
| """Exception raised when model loading fails.""" | |
| pass | |
| class InferenceError(Exception): | |
| """Exception raised during model inference.""" | |
| pass | |
| def validate_video_file( | |
| file_path: Union[str, Path], | |
| max_size_mb: float = 500.0, | |
| check_exists: bool = True, | |
| ) -> ValidationResult: | |
| """ | |
| Validate a video file for processing. | |
| Args: | |
| file_path: Path to the video file | |
| max_size_mb: Maximum allowed file size in megabytes | |
| check_exists: Whether to check if file exists | |
| Returns: | |
| ValidationResult with validation status and details | |
| Raises: | |
| FileValidationError: If validation fails and raise_on_error is True | |
| """ | |
| try: | |
| path = Path(file_path) | |
| # Check existence | |
| if check_exists and not path.exists(): | |
| return ValidationResult( | |
| is_valid=False, | |
| error_message=f"Video file not found: {path}" | |
| ) | |
| # Check extension | |
| if path.suffix.lower() not in SUPPORTED_VIDEO_FORMATS: | |
| return ValidationResult( | |
| is_valid=False, | |
| error_message=f"Unsupported video format: {path.suffix}. " | |
| f"Supported: {', '.join(SUPPORTED_VIDEO_FORMATS)}" | |
| ) | |
| # Check file size | |
| if check_exists: | |
| file_size = path.stat().st_size | |
| size_mb = file_size / (1024 * 1024) | |
| if size_mb > max_size_mb: | |
| return ValidationResult( | |
| is_valid=False, | |
| error_message=f"Video file too large: {size_mb:.1f}MB (max: {max_size_mb}MB)", | |
| file_size=file_size | |
| ) | |
| else: | |
| file_size = 0 | |
| logger.debug(f"Video file validated: {path}") | |
| return ValidationResult( | |
| is_valid=True, | |
| file_path=path, | |
| file_size=file_size | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error validating video file {file_path}: {e}") | |
| return ValidationResult( | |
| is_valid=False, | |
| error_message=f"Validation error: {str(e)}" | |
| ) | |
| def validate_image_file( | |
| file_path: Union[str, Path], | |
| max_size_mb: float = 10.0, | |
| check_exists: bool = True, | |
| ) -> ValidationResult: | |
| """ | |
| Validate an image file (e.g., reference image for person detection). | |
| Args: | |
| file_path: Path to the image file | |
| max_size_mb: Maximum allowed file size in megabytes | |
| check_exists: Whether to check if file exists | |
| Returns: | |
| ValidationResult with validation status and details | |
| """ | |
| try: | |
| path = Path(file_path) | |
| # Check existence | |
| if check_exists and not path.exists(): | |
| return ValidationResult( | |
| is_valid=False, | |
| error_message=f"Image file not found: {path}" | |
| ) | |
| # Check extension | |
| if path.suffix.lower() not in SUPPORTED_IMAGE_FORMATS: | |
| return ValidationResult( | |
| is_valid=False, | |
| error_message=f"Unsupported image format: {path.suffix}. " | |
| f"Supported: {', '.join(SUPPORTED_IMAGE_FORMATS)}" | |
| ) | |
| # Check file size | |
| if check_exists: | |
| file_size = path.stat().st_size | |
| size_mb = file_size / (1024 * 1024) | |
| if size_mb > max_size_mb: | |
| return ValidationResult( | |
| is_valid=False, | |
| error_message=f"Image file too large: {size_mb:.1f}MB (max: {max_size_mb}MB)", | |
| file_size=file_size | |
| ) | |
| else: | |
| file_size = 0 | |
| logger.debug(f"Image file validated: {path}") | |
| return ValidationResult( | |
| is_valid=True, | |
| file_path=path, | |
| file_size=file_size | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error validating image file {file_path}: {e}") | |
| return ValidationResult( | |
| is_valid=False, | |
| error_message=f"Validation error: {str(e)}" | |
| ) | |
| def get_temp_dir(prefix: str = "shortsmith_") -> Path: | |
| """ | |
| Create a temporary directory for processing. | |
| Args: | |
| prefix: Prefix for the temp directory name | |
| Returns: | |
| Path to the created temporary directory | |
| Raises: | |
| OSError: If directory creation fails | |
| """ | |
| try: | |
| # Use system temp dir or custom if configured | |
| base_temp = tempfile.gettempdir() | |
| unique_id = str(uuid.uuid4())[:8] | |
| temp_dir = Path(base_temp) / f"{prefix}{unique_id}" | |
| temp_dir.mkdir(parents=True, exist_ok=True) | |
| logger.debug(f"Created temp directory: {temp_dir}") | |
| return temp_dir | |
| except Exception as e: | |
| logger.error(f"Failed to create temp directory: {e}") | |
| raise OSError(f"Could not create temporary directory: {e}") from e | |
| def cleanup_temp_files( | |
| temp_dir: Union[str, Path], | |
| ignore_errors: bool = True | |
| ) -> bool: | |
| """ | |
| Clean up temporary files and directories. | |
| Args: | |
| temp_dir: Path to the temporary directory to clean | |
| ignore_errors: Whether to ignore cleanup errors | |
| Returns: | |
| True if cleanup was successful, False otherwise | |
| """ | |
| try: | |
| path = Path(temp_dir) | |
| if path.exists(): | |
| shutil.rmtree(path, ignore_errors=ignore_errors) | |
| logger.debug(f"Cleaned up temp directory: {path}") | |
| return True | |
| except Exception as e: | |
| logger.warning(f"Failed to cleanup temp directory {temp_dir}: {e}") | |
| return False | |
| def format_duration(seconds: float) -> str: | |
| """ | |
| Format duration in seconds to human-readable string. | |
| Args: | |
| seconds: Duration in seconds | |
| Returns: | |
| Formatted string (e.g., "1:23:45" or "5:30") | |
| """ | |
| if seconds < 0: | |
| return "0:00" | |
| hours = int(seconds // 3600) | |
| minutes = int((seconds % 3600) // 60) | |
| secs = int(seconds % 60) | |
| if hours > 0: | |
| return f"{hours}:{minutes:02d}:{secs:02d}" | |
| else: | |
| return f"{minutes}:{secs:02d}" | |
| def format_timestamp(seconds: float, include_ms: bool = False) -> str: | |
| """ | |
| Format timestamp for display. | |
| Args: | |
| seconds: Timestamp in seconds | |
| include_ms: Whether to include milliseconds | |
| Returns: | |
| Formatted timestamp string | |
| """ | |
| hours = int(seconds // 3600) | |
| minutes = int((seconds % 3600) // 60) | |
| secs = seconds % 60 | |
| if include_ms: | |
| if hours > 0: | |
| return f"{hours}:{minutes:02d}:{secs:06.3f}" | |
| else: | |
| return f"{minutes}:{secs:06.3f}" | |
| else: | |
| secs = int(secs) | |
| if hours > 0: | |
| return f"{hours}:{minutes:02d}:{secs:02d}" | |
| else: | |
| return f"{minutes}:{secs:02d}" | |
| def safe_divide( | |
| numerator: float, | |
| denominator: float, | |
| default: float = 0.0 | |
| ) -> float: | |
| """ | |
| Safely divide two numbers, returning default if denominator is zero. | |
| Args: | |
| numerator: The numerator | |
| denominator: The denominator | |
| default: Value to return if denominator is zero | |
| Returns: | |
| Result of division or default value | |
| """ | |
| if denominator == 0: | |
| return default | |
| return numerator / denominator | |
| def clamp( | |
| value: float, | |
| min_value: float, | |
| max_value: float | |
| ) -> float: | |
| """ | |
| Clamp a value to a specified range. | |
| Args: | |
| value: The value to clamp | |
| min_value: Minimum allowed value | |
| max_value: Maximum allowed value | |
| Returns: | |
| Clamped value | |
| """ | |
| return max(min_value, min(value, max_value)) | |
| def normalize_scores(scores: List[float]) -> List[float]: | |
| """ | |
| Normalize a list of scores to [0, 1] range. | |
| Args: | |
| scores: List of raw scores | |
| Returns: | |
| Normalized scores | |
| """ | |
| if not scores: | |
| return [] | |
| min_score = min(scores) | |
| max_score = max(scores) | |
| score_range = max_score - min_score | |
| if score_range == 0: | |
| return [0.5] * len(scores) | |
| return [(s - min_score) / score_range for s in scores] | |
| def batch_list(items: List, batch_size: int) -> List[List]: | |
| """ | |
| Split a list into batches of specified size. | |
| Args: | |
| items: List to split | |
| batch_size: Size of each batch | |
| Returns: | |
| List of batches | |
| """ | |
| return [items[i:i + batch_size] for i in range(0, len(items), batch_size)] | |
| def merge_overlapping_segments( | |
| segments: List[Tuple[float, float]], | |
| min_gap: float = 0.0 | |
| ) -> List[Tuple[float, float]]: | |
| """ | |
| Merge overlapping or closely spaced time segments. | |
| Args: | |
| segments: List of (start, end) tuples | |
| min_gap: Minimum gap to keep segments separate | |
| Returns: | |
| List of merged segments | |
| """ | |
| if not segments: | |
| return [] | |
| # Sort by start time | |
| sorted_segments = sorted(segments, key=lambda x: x[0]) | |
| merged = [sorted_segments[0]] | |
| for start, end in sorted_segments[1:]: | |
| last_start, last_end = merged[-1] | |
| # Check if segments overlap or are close enough | |
| if start <= last_end + min_gap: | |
| # Merge by extending the end | |
| merged[-1] = (last_start, max(last_end, end)) | |
| else: | |
| merged.append((start, end)) | |
| return merged | |
| def ensure_dir(path: Union[str, Path]) -> Path: | |
| """ | |
| Ensure a directory exists, creating it if necessary. | |
| Args: | |
| path: Path to the directory | |
| Returns: | |
| Path object for the directory | |
| """ | |
| path = Path(path) | |
| path.mkdir(parents=True, exist_ok=True) | |
| return path | |
| def get_unique_filename( | |
| directory: Union[str, Path], | |
| base_name: str, | |
| extension: str | |
| ) -> Path: | |
| """ | |
| Generate a unique filename in the given directory. | |
| Args: | |
| directory: Directory for the file | |
| base_name: Base name for the file | |
| extension: File extension (with or without dot) | |
| Returns: | |
| Path to a unique file | |
| """ | |
| directory = Path(directory) | |
| extension = extension if extension.startswith(".") else f".{extension}" | |
| # Try base name first | |
| candidate = directory / f"{base_name}{extension}" | |
| if not candidate.exists(): | |
| return candidate | |
| # Add counter | |
| counter = 1 | |
| while True: | |
| candidate = directory / f"{base_name}_{counter}{extension}" | |
| if not candidate.exists(): | |
| return candidate | |
| counter += 1 | |
| # Export all public functions | |
| __all__ = [ | |
| "SUPPORTED_VIDEO_FORMATS", | |
| "SUPPORTED_IMAGE_FORMATS", | |
| "SUPPORTED_AUDIO_FORMATS", | |
| "ValidationResult", | |
| "FileValidationError", | |
| "VideoProcessingError", | |
| "ModelLoadError", | |
| "InferenceError", | |
| "validate_video_file", | |
| "validate_image_file", | |
| "get_temp_dir", | |
| "cleanup_temp_files", | |
| "format_duration", | |
| "format_timestamp", | |
| "safe_divide", | |
| "clamp", | |
| "normalize_scores", | |
| "batch_list", | |
| "merge_overlapping_segments", | |
| "ensure_dir", | |
| "get_unique_filename", | |
| ] | |