| """ |
| ShortSmith v2 - Helper Utilities |
| |
| Common utility functions for file handling, validation, and data manipulation. |
| """ |
|
|
| import os |
| import shutil |
| import tempfile |
| import uuid |
| from pathlib import Path |
| from typing import Optional, List, Tuple, Union |
| from dataclasses import dataclass |
|
|
| from utils.logger import get_logger |
|
|
| logger = get_logger("utils.helpers") |
|
|
| |
| SUPPORTED_VIDEO_FORMATS = {".mp4", ".avi", ".mov", ".mkv", ".webm", ".flv", ".wmv", ".m4v"} |
| SUPPORTED_IMAGE_FORMATS = {".jpg", ".jpeg", ".png", ".webp", ".bmp", ".gif"} |
| SUPPORTED_AUDIO_FORMATS = {".mp3", ".wav", ".aac", ".flac", ".ogg", ".m4a"} |
|
|
|
|
| @dataclass |
| class ValidationResult: |
| """Result of file validation.""" |
| is_valid: bool |
| error_message: Optional[str] = None |
| file_path: Optional[Path] = None |
| file_size: int = 0 |
|
|
|
|
| class FileValidationError(Exception): |
| """Exception raised for file validation errors.""" |
| pass |
|
|
|
|
| class VideoProcessingError(Exception): |
| """Exception raised for video processing errors.""" |
| pass |
|
|
|
|
| class ModelLoadError(Exception): |
| """Exception raised when model loading fails.""" |
| pass |
|
|
|
|
| class InferenceError(Exception): |
| """Exception raised during model inference.""" |
| pass |
|
|
|
|
| def validate_video_file( |
| file_path: Union[str, Path], |
| max_size_mb: float = 1024.0, |
| check_exists: bool = True, |
| ) -> ValidationResult: |
| """ |
| Validate a video file for processing. |
| |
| Args: |
| file_path: Path to the video file |
| max_size_mb: Maximum allowed file size in megabytes |
| check_exists: Whether to check if file exists |
| |
| Returns: |
| ValidationResult with validation status and details |
| |
| Raises: |
| FileValidationError: If validation fails and raise_on_error is True |
| """ |
| try: |
| path = Path(file_path) |
|
|
| |
| if check_exists and not path.exists(): |
| return ValidationResult( |
| is_valid=False, |
| error_message=f"Video file not found: {path}" |
| ) |
|
|
| |
| if path.suffix.lower() not in SUPPORTED_VIDEO_FORMATS: |
| return ValidationResult( |
| is_valid=False, |
| error_message=f"Unsupported video format: {path.suffix}. " |
| f"Supported: {', '.join(SUPPORTED_VIDEO_FORMATS)}" |
| ) |
|
|
| |
| if check_exists: |
| file_size = path.stat().st_size |
| size_mb = file_size / (1024 * 1024) |
|
|
| if size_mb > max_size_mb: |
| return ValidationResult( |
| is_valid=False, |
| error_message=f"Video file too large: {size_mb:.1f}MB (max: {max_size_mb}MB)", |
| file_size=file_size |
| ) |
| else: |
| file_size = 0 |
|
|
| logger.debug(f"Video file validated: {path}") |
| return ValidationResult( |
| is_valid=True, |
| file_path=path, |
| file_size=file_size |
| ) |
|
|
| except Exception as e: |
| logger.error(f"Error validating video file {file_path}: {e}") |
| return ValidationResult( |
| is_valid=False, |
| error_message=f"Validation error: {str(e)}" |
| ) |
|
|
|
|
| def validate_image_file( |
| file_path: Union[str, Path], |
| max_size_mb: float = 10.0, |
| check_exists: bool = True, |
| ) -> ValidationResult: |
| """ |
| Validate an image file (e.g., reference image for person detection). |
| |
| Args: |
| file_path: Path to the image file |
| max_size_mb: Maximum allowed file size in megabytes |
| check_exists: Whether to check if file exists |
| |
| Returns: |
| ValidationResult with validation status and details |
| """ |
| try: |
| path = Path(file_path) |
|
|
| |
| if check_exists and not path.exists(): |
| return ValidationResult( |
| is_valid=False, |
| error_message=f"Image file not found: {path}" |
| ) |
|
|
| |
| if path.suffix.lower() not in SUPPORTED_IMAGE_FORMATS: |
| return ValidationResult( |
| is_valid=False, |
| error_message=f"Unsupported image format: {path.suffix}. " |
| f"Supported: {', '.join(SUPPORTED_IMAGE_FORMATS)}" |
| ) |
|
|
| |
| if check_exists: |
| file_size = path.stat().st_size |
| size_mb = file_size / (1024 * 1024) |
|
|
| if size_mb > max_size_mb: |
| return ValidationResult( |
| is_valid=False, |
| error_message=f"Image file too large: {size_mb:.1f}MB (max: {max_size_mb}MB)", |
| file_size=file_size |
| ) |
| else: |
| file_size = 0 |
|
|
| logger.debug(f"Image file validated: {path}") |
| return ValidationResult( |
| is_valid=True, |
| file_path=path, |
| file_size=file_size |
| ) |
|
|
| except Exception as e: |
| logger.error(f"Error validating image file {file_path}: {e}") |
| return ValidationResult( |
| is_valid=False, |
| error_message=f"Validation error: {str(e)}" |
| ) |
|
|
|
|
| def get_temp_dir(prefix: str = "shortsmith_") -> Path: |
| """ |
| Create a temporary directory for processing. |
| |
| Args: |
| prefix: Prefix for the temp directory name |
| |
| Returns: |
| Path to the created temporary directory |
| |
| Raises: |
| OSError: If directory creation fails |
| """ |
| try: |
| |
| base_temp = tempfile.gettempdir() |
| unique_id = str(uuid.uuid4())[:8] |
| temp_dir = Path(base_temp) / f"{prefix}{unique_id}" |
| temp_dir.mkdir(parents=True, exist_ok=True) |
|
|
| logger.debug(f"Created temp directory: {temp_dir}") |
| return temp_dir |
|
|
| except Exception as e: |
| logger.error(f"Failed to create temp directory: {e}") |
| raise OSError(f"Could not create temporary directory: {e}") from e |
|
|
|
|
| def cleanup_temp_files( |
| temp_dir: Union[str, Path], |
| ignore_errors: bool = True |
| ) -> bool: |
| """ |
| Clean up temporary files and directories. |
| |
| Args: |
| temp_dir: Path to the temporary directory to clean |
| ignore_errors: Whether to ignore cleanup errors |
| |
| Returns: |
| True if cleanup was successful, False otherwise |
| """ |
| try: |
| path = Path(temp_dir) |
| if path.exists(): |
| shutil.rmtree(path, ignore_errors=ignore_errors) |
| logger.debug(f"Cleaned up temp directory: {path}") |
| return True |
|
|
| except Exception as e: |
| logger.warning(f"Failed to cleanup temp directory {temp_dir}: {e}") |
| return False |
|
|
|
|
| def format_duration(seconds: float) -> str: |
| """ |
| Format duration in seconds to human-readable string. |
| |
| Args: |
| seconds: Duration in seconds |
| |
| Returns: |
| Formatted string (e.g., "1:23:45" or "5:30") |
| """ |
| if seconds < 0: |
| return "0:00" |
|
|
| hours = int(seconds // 3600) |
| minutes = int((seconds % 3600) // 60) |
| secs = int(seconds % 60) |
|
|
| if hours > 0: |
| return f"{hours}:{minutes:02d}:{secs:02d}" |
| else: |
| return f"{minutes}:{secs:02d}" |
|
|
|
|
| def format_timestamp(seconds: float, include_ms: bool = False) -> str: |
| """ |
| Format timestamp for display. |
| |
| Args: |
| seconds: Timestamp in seconds |
| include_ms: Whether to include milliseconds |
| |
| Returns: |
| Formatted timestamp string |
| """ |
| hours = int(seconds // 3600) |
| minutes = int((seconds % 3600) // 60) |
| secs = seconds % 60 |
|
|
| if include_ms: |
| if hours > 0: |
| return f"{hours}:{minutes:02d}:{secs:06.3f}" |
| else: |
| return f"{minutes}:{secs:06.3f}" |
| else: |
| secs = int(secs) |
| if hours > 0: |
| return f"{hours}:{minutes:02d}:{secs:02d}" |
| else: |
| return f"{minutes}:{secs:02d}" |
|
|
|
|
| def safe_divide( |
| numerator: float, |
| denominator: float, |
| default: float = 0.0 |
| ) -> float: |
| """ |
| Safely divide two numbers, returning default if denominator is zero. |
| |
| Args: |
| numerator: The numerator |
| denominator: The denominator |
| default: Value to return if denominator is zero |
| |
| Returns: |
| Result of division or default value |
| """ |
| if denominator == 0: |
| return default |
| return numerator / denominator |
|
|
|
|
| def clamp( |
| value: float, |
| min_value: float, |
| max_value: float |
| ) -> float: |
| """ |
| Clamp a value to a specified range. |
| |
| Args: |
| value: The value to clamp |
| min_value: Minimum allowed value |
| max_value: Maximum allowed value |
| |
| Returns: |
| Clamped value |
| """ |
| return max(min_value, min(value, max_value)) |
|
|
|
|
| def normalize_scores(scores: List[float]) -> List[float]: |
| """ |
| Normalize a list of scores to [0, 1] range. |
| |
| Args: |
| scores: List of raw scores |
| |
| Returns: |
| Normalized scores |
| """ |
| if not scores: |
| return [] |
|
|
| min_score = min(scores) |
| max_score = max(scores) |
| score_range = max_score - min_score |
|
|
| if score_range == 0: |
| return [0.5] * len(scores) |
|
|
| return [(s - min_score) / score_range for s in scores] |
|
|
|
|
| def batch_list(items: List, batch_size: int) -> List[List]: |
| """ |
| Split a list into batches of specified size. |
| |
| Args: |
| items: List to split |
| batch_size: Size of each batch |
| |
| Returns: |
| List of batches |
| """ |
| return [items[i:i + batch_size] for i in range(0, len(items), batch_size)] |
|
|
|
|
| def merge_overlapping_segments( |
| segments: List[Tuple[float, float]], |
| min_gap: float = 0.0 |
| ) -> List[Tuple[float, float]]: |
| """ |
| Merge overlapping or closely spaced time segments. |
| |
| Args: |
| segments: List of (start, end) tuples |
| min_gap: Minimum gap to keep segments separate |
| |
| Returns: |
| List of merged segments |
| """ |
| if not segments: |
| return [] |
|
|
| |
| sorted_segments = sorted(segments, key=lambda x: x[0]) |
| merged = [sorted_segments[0]] |
|
|
| for start, end in sorted_segments[1:]: |
| last_start, last_end = merged[-1] |
|
|
| |
| if start <= last_end + min_gap: |
| |
| merged[-1] = (last_start, max(last_end, end)) |
| else: |
| merged.append((start, end)) |
|
|
| return merged |
|
|
|
|
| def ensure_dir(path: Union[str, Path]) -> Path: |
| """ |
| Ensure a directory exists, creating it if necessary. |
| |
| Args: |
| path: Path to the directory |
| |
| Returns: |
| Path object for the directory |
| """ |
| path = Path(path) |
| path.mkdir(parents=True, exist_ok=True) |
| return path |
|
|
|
|
| def get_unique_filename( |
| directory: Union[str, Path], |
| base_name: str, |
| extension: str |
| ) -> Path: |
| """ |
| Generate a unique filename in the given directory. |
| |
| Args: |
| directory: Directory for the file |
| base_name: Base name for the file |
| extension: File extension (with or without dot) |
| |
| Returns: |
| Path to a unique file |
| """ |
| directory = Path(directory) |
| extension = extension if extension.startswith(".") else f".{extension}" |
|
|
| |
| candidate = directory / f"{base_name}{extension}" |
| if not candidate.exists(): |
| return candidate |
|
|
| |
| counter = 1 |
| while True: |
| candidate = directory / f"{base_name}_{counter}{extension}" |
| if not candidate.exists(): |
| return candidate |
| counter += 1 |
|
|
|
|
| |
| __all__ = [ |
| "SUPPORTED_VIDEO_FORMATS", |
| "SUPPORTED_IMAGE_FORMATS", |
| "SUPPORTED_AUDIO_FORMATS", |
| "ValidationResult", |
| "FileValidationError", |
| "VideoProcessingError", |
| "ModelLoadError", |
| "InferenceError", |
| "validate_video_file", |
| "validate_image_file", |
| "get_temp_dir", |
| "cleanup_temp_files", |
| "format_duration", |
| "format_timestamp", |
| "safe_divide", |
| "clamp", |
| "normalize_scores", |
| "batch_list", |
| "merge_overlapping_segments", |
| "ensure_dir", |
| "get_unique_filename", |
| ] |
|
|