| | """ |
| | Utility classes for BackgroundFX Pro |
| | Includes FileManager, VideoUtils, and ImageUtils |
| | """ |
| |
|
| | |
| | import os |
| | if 'OMP_NUM_THREADS' not in os.environ: |
| | os.environ['OMP_NUM_THREADS'] = '4' |
| | os.environ['MKL_NUM_THREADS'] = '4' |
| |
|
| | import shutil |
| | import tempfile |
| | import logging |
| | from pathlib import Path |
| | from typing import Optional, List, Union, Tuple, Dict, Any |
| | from datetime import datetime |
| | import subprocess |
| | import cv2 |
| | import numpy as np |
| | from PIL import Image, ImageEnhance, ImageFilter |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class FileManager: |
| | """Manages file operations for BackgroundFX Pro""" |
| | |
| | def __init__(self, base_dir: Optional[str] = None): |
| | """ |
| | Initialize FileManager |
| | |
| | Args: |
| | base_dir: Base directory for file operations (defaults to temp dir) |
| | """ |
| | if base_dir: |
| | self.base_dir = Path(base_dir) |
| | else: |
| | self.base_dir = Path(tempfile.gettempdir()) / "backgroundfx_pro" |
| | |
| | |
| | self.base_dir.mkdir(parents=True, exist_ok=True) |
| | |
| | |
| | self.uploads_dir = self.base_dir / "uploads" |
| | self.outputs_dir = self.base_dir / "outputs" |
| | self.temp_dir = self.base_dir / "temp" |
| | self.cache_dir = self.base_dir / "cache" |
| | |
| | for dir_path in [self.uploads_dir, self.outputs_dir, self.temp_dir, self.cache_dir]: |
| | dir_path.mkdir(parents=True, exist_ok=True) |
| | |
| | logger.info(f"FileManager initialized with base directory: {self.base_dir}") |
| | |
| | def save_upload(self, file_path: Union[str, Path], filename: Optional[str] = None) -> Path: |
| | """ |
| | Save an uploaded file to the uploads directory |
| | |
| | Args: |
| | file_path: Path to the uploaded file |
| | filename: Optional custom filename |
| | |
| | Returns: |
| | Path to the saved file |
| | """ |
| | file_path = Path(file_path) |
| | |
| | if filename: |
| | dest_path = self.uploads_dir / filename |
| | else: |
| | |
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | dest_path = self.uploads_dir / f"{timestamp}_{file_path.name}" |
| | |
| | |
| | shutil.copy2(file_path, dest_path) |
| | logger.info(f"Saved upload: {dest_path}") |
| | |
| | return dest_path |
| | |
| | def create_output_path(self, filename: str, subfolder: Optional[str] = None) -> Path: |
| | """ |
| | Create a path for an output file |
| | |
| | Args: |
| | filename: Name of the output file |
| | subfolder: Optional subfolder within outputs |
| | |
| | Returns: |
| | Path for the output file |
| | """ |
| | if subfolder: |
| | output_dir = self.outputs_dir / subfolder |
| | output_dir.mkdir(parents=True, exist_ok=True) |
| | else: |
| | output_dir = self.outputs_dir |
| | |
| | |
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | name_parts = filename.rsplit('.', 1) |
| | if len(name_parts) == 2: |
| | output_path = output_dir / f"{name_parts[0]}_{timestamp}.{name_parts[1]}" |
| | else: |
| | output_path = output_dir / f"{filename}_{timestamp}" |
| | |
| | return output_path |
| | |
| | def get_temp_path(self, filename: Optional[str] = None, extension: str = ".tmp") -> Path: |
| | """ |
| | Get a temporary file path |
| | |
| | Args: |
| | filename: Optional filename (will be made unique) |
| | extension: File extension |
| | |
| | Returns: |
| | Path for temporary file |
| | """ |
| | if filename: |
| | temp_path = self.temp_dir / filename |
| | else: |
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") |
| | temp_path = self.temp_dir / f"temp_{timestamp}{extension}" |
| | |
| | return temp_path |
| | |
| | def cleanup_temp(self, max_age_hours: int = 24): |
| | """ |
| | Clean up old temporary files |
| | |
| | Args: |
| | max_age_hours: Maximum age of temp files in hours |
| | """ |
| | try: |
| | current_time = datetime.now().timestamp() |
| | max_age_seconds = max_age_hours * 3600 |
| | |
| | for temp_file in self.temp_dir.iterdir(): |
| | if temp_file.is_file(): |
| | file_age = current_time - temp_file.stat().st_mtime |
| | if file_age > max_age_seconds: |
| | temp_file.unlink() |
| | logger.debug(f"Deleted old temp file: {temp_file}") |
| | |
| | logger.info("Temp directory cleanup completed") |
| | except Exception as e: |
| | logger.warning(f"Error during temp cleanup: {e}") |
| | |
| | def get_cache_path(self, key: str, extension: str = ".cache") -> Path: |
| | """ |
| | Get a cache file path based on a key |
| | |
| | Args: |
| | key: Cache key |
| | extension: File extension |
| | |
| | Returns: |
| | Path for cache file |
| | """ |
| | |
| | safe_key = "".join(c if c.isalnum() or c in '-_' else '_' for c in key) |
| | return self.cache_dir / f"{safe_key}{extension}" |
| | |
| | def list_outputs(self, subfolder: Optional[str] = None, extension: Optional[str] = None) -> List[Path]: |
| | """ |
| | List output files |
| | |
| | Args: |
| | subfolder: Optional subfolder to list from |
| | extension: Optional file extension filter |
| | |
| | Returns: |
| | List of output file paths |
| | """ |
| | if subfolder: |
| | search_dir = self.outputs_dir / subfolder |
| | else: |
| | search_dir = self.outputs_dir |
| | |
| | if not search_dir.exists(): |
| | return [] |
| | |
| | if extension: |
| | pattern = f"*{extension}" |
| | else: |
| | pattern = "*" |
| | |
| | return sorted(search_dir.glob(pattern), key=lambda p: p.stat().st_mtime, reverse=True) |
| | |
| | def delete_file(self, file_path: Union[str, Path]) -> bool: |
| | """ |
| | Safely delete a file |
| | |
| | Args: |
| | file_path: Path to file to delete |
| | |
| | Returns: |
| | True if successful, False otherwise |
| | """ |
| | try: |
| | file_path = Path(file_path) |
| | if file_path.exists() and file_path.is_file(): |
| | file_path.unlink() |
| | logger.info(f"Deleted file: {file_path}") |
| | return True |
| | return False |
| | except Exception as e: |
| | logger.error(f"Error deleting file {file_path}: {e}") |
| | return False |
| | |
| | def get_file_info(self, file_path: Union[str, Path]) -> dict: |
| | """ |
| | Get information about a file |
| | |
| | Args: |
| | file_path: Path to file |
| | |
| | Returns: |
| | Dictionary with file information |
| | """ |
| | file_path = Path(file_path) |
| | |
| | if not file_path.exists(): |
| | return {"exists": False} |
| | |
| | stat = file_path.stat() |
| | return { |
| | "exists": True, |
| | "name": file_path.name, |
| | "size": stat.st_size, |
| | "size_mb": stat.st_size / (1024 * 1024), |
| | "created": datetime.fromtimestamp(stat.st_ctime), |
| | "modified": datetime.fromtimestamp(stat.st_mtime), |
| | "extension": file_path.suffix, |
| | "path": str(file_path.absolute()) |
| | } |
| |
|
| |
|
| | class VideoUtils: |
| | """Utilities for video processing""" |
| | |
| | @staticmethod |
| | def get_video_info(video_path: Union[str, Path]) -> Dict[str, Any]: |
| | """ |
| | Get detailed video information |
| | |
| | Args: |
| | video_path: Path to video file |
| | |
| | Returns: |
| | Dictionary with video metadata |
| | """ |
| | video_path = str(video_path) |
| | cap = cv2.VideoCapture(video_path) |
| | |
| | if not cap.isOpened(): |
| | logger.error(f"Failed to open video: {video_path}") |
| | return {"error": "Failed to open video"} |
| | |
| | try: |
| | info = { |
| | "fps": cap.get(cv2.CAP_PROP_FPS), |
| | "frame_count": int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), |
| | "width": int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), |
| | "height": int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)), |
| | "codec": VideoUtils._fourcc_to_string(int(cap.get(cv2.CAP_PROP_FOURCC))), |
| | "duration": cap.get(cv2.CAP_PROP_FRAME_COUNT) / cap.get(cv2.CAP_PROP_FPS) if cap.get(cv2.CAP_PROP_FPS) > 0 else 0 |
| | } |
| | |
| | |
| | path = Path(video_path) |
| | if path.exists(): |
| | info["file_size_mb"] = path.stat().st_size / (1024 * 1024) |
| | |
| | return info |
| | |
| | finally: |
| | cap.release() |
| | |
| | @staticmethod |
| | def _fourcc_to_string(fourcc: int) -> str: |
| | """Convert fourcc code to string""" |
| | return "".join([chr((fourcc >> 8 * i) & 0xFF) for i in range(4)]) |
| | |
| | @staticmethod |
| | def extract_frames(video_path: Union[str, Path], |
| | output_dir: Union[str, Path], |
| | frame_interval: int = 1, |
| | max_frames: Optional[int] = None) -> List[Path]: |
| | """ |
| | Extract frames from video |
| | |
| | Args: |
| | video_path: Path to video file |
| | output_dir: Directory to save frames |
| | frame_interval: Extract every nth frame |
| | max_frames: Maximum number of frames to extract |
| | |
| | Returns: |
| | List of extracted frame paths |
| | """ |
| | video_path = str(video_path) |
| | output_dir = Path(output_dir) |
| | output_dir.mkdir(parents=True, exist_ok=True) |
| | |
| | cap = cv2.VideoCapture(video_path) |
| | if not cap.isOpened(): |
| | logger.error(f"Failed to open video: {video_path}") |
| | return [] |
| | |
| | frame_paths = [] |
| | frame_count = 0 |
| | extracted_count = 0 |
| | |
| | try: |
| | while True: |
| | ret, frame = cap.read() |
| | if not ret: |
| | break |
| | |
| | if frame_count % frame_interval == 0: |
| | frame_path = output_dir / f"frame_{frame_count:06d}.png" |
| | cv2.imwrite(str(frame_path), frame) |
| | frame_paths.append(frame_path) |
| | extracted_count += 1 |
| | |
| | if max_frames and extracted_count >= max_frames: |
| | break |
| | |
| | frame_count += 1 |
| | |
| | logger.info(f"Extracted {len(frame_paths)} frames from video") |
| | return frame_paths |
| | |
| | finally: |
| | cap.release() |
| | |
| | @staticmethod |
| | def create_video_from_frames(frame_paths: List[Union[str, Path]], |
| | output_path: Union[str, Path], |
| | fps: float = 30.0, |
| | codec: str = 'mp4v') -> bool: |
| | """ |
| | Create video from frame images |
| | |
| | Args: |
| | frame_paths: List of frame image paths |
| | output_path: Output video path |
| | fps: Frames per second |
| | codec: Video codec (fourcc) |
| | |
| | Returns: |
| | True if successful |
| | """ |
| | if not frame_paths: |
| | logger.error("No frames provided") |
| | return False |
| | |
| | |
| | first_frame = cv2.imread(str(frame_paths[0])) |
| | if first_frame is None: |
| | logger.error(f"Failed to read first frame: {frame_paths[0]}") |
| | return False |
| | |
| | height, width, layers = first_frame.shape |
| | |
| | |
| | fourcc = cv2.VideoWriter_fourcc(*codec) |
| | out = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height)) |
| | |
| | try: |
| | for frame_path in frame_paths: |
| | frame = cv2.imread(str(frame_path)) |
| | if frame is not None: |
| | out.write(frame) |
| | else: |
| | logger.warning(f"Failed to read frame: {frame_path}") |
| | |
| | logger.info(f"Created video: {output_path}") |
| | return True |
| | |
| | except Exception as e: |
| | logger.error(f"Error creating video: {e}") |
| | return False |
| | |
| | finally: |
| | out.release() |
| | |
| | @staticmethod |
| | def resize_video(input_path: Union[str, Path], |
| | output_path: Union[str, Path], |
| | target_width: Optional[int] = None, |
| | target_height: Optional[int] = None, |
| | maintain_aspect: bool = True) -> bool: |
| | """ |
| | Resize video to target dimensions |
| | |
| | Args: |
| | input_path: Input video path |
| | output_path: Output video path |
| | target_width: Target width (None to auto-calculate) |
| | target_height: Target height (None to auto-calculate) |
| | maintain_aspect: Maintain aspect ratio |
| | |
| | Returns: |
| | True if successful |
| | """ |
| | cap = cv2.VideoCapture(str(input_path)) |
| | if not cap.isOpened(): |
| | logger.error(f"Failed to open video: {input_path}") |
| | return False |
| | |
| | |
| | orig_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| | orig_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| | fps = cap.get(cv2.CAP_PROP_FPS) |
| | fourcc = int(cap.get(cv2.CAP_PROP_FOURCC)) |
| | |
| | |
| | if maintain_aspect: |
| | if target_width and not target_height: |
| | aspect = orig_width / orig_height |
| | target_height = int(target_width / aspect) |
| | elif target_height and not target_width: |
| | aspect = orig_width / orig_height |
| | target_width = int(target_height * aspect) |
| | |
| | if not target_width: |
| | target_width = orig_width |
| | if not target_height: |
| | target_height = orig_height |
| | |
| | |
| | out = cv2.VideoWriter(str(output_path), fourcc, fps, (target_width, target_height)) |
| | |
| | try: |
| | while True: |
| | ret, frame = cap.read() |
| | if not ret: |
| | break |
| | |
| | resized = cv2.resize(frame, (target_width, target_height)) |
| | out.write(resized) |
| | |
| | logger.info(f"Resized video saved to: {output_path}") |
| | return True |
| | |
| | except Exception as e: |
| | logger.error(f"Error resizing video: {e}") |
| | return False |
| | |
| | finally: |
| | cap.release() |
| | out.release() |
| | |
| | @staticmethod |
| | def extract_audio(video_path: Union[str, Path], |
| | audio_path: Union[str, Path]) -> bool: |
| | """ |
| | Extract audio from video using ffmpeg |
| | |
| | Args: |
| | video_path: Input video path |
| | audio_path: Output audio path |
| | |
| | Returns: |
| | True if successful |
| | """ |
| | try: |
| | cmd = [ |
| | 'ffmpeg', '-i', str(video_path), |
| | '-vn', '-acodec', 'copy', |
| | str(audio_path), '-y' |
| | ] |
| | |
| | result = subprocess.run(cmd, capture_output=True, text=True) |
| | |
| | if result.returncode == 0: |
| | logger.info(f"Audio extracted to: {audio_path}") |
| | return True |
| | else: |
| | logger.error(f"Failed to extract audio: {result.stderr}") |
| | return False |
| | |
| | except FileNotFoundError: |
| | logger.error("ffmpeg not found. Please install ffmpeg.") |
| | return False |
| | except Exception as e: |
| | logger.error(f"Error extracting audio: {e}") |
| | return False |
| | |
| | @staticmethod |
| | def add_audio_to_video(video_path: Union[str, Path], |
| | audio_path: Union[str, Path], |
| | output_path: Union[str, Path]) -> bool: |
| | """ |
| | Add audio track to video using ffmpeg |
| | |
| | Args: |
| | video_path: Input video path |
| | audio_path: Input audio path |
| | output_path: Output video path with audio |
| | |
| | Returns: |
| | True if successful |
| | """ |
| | try: |
| | cmd = [ |
| | 'ffmpeg', '-i', str(video_path), |
| | '-i', str(audio_path), |
| | '-c:v', 'copy', '-c:a', 'aac', |
| | '-map', '0:v:0', '-map', '1:a:0', |
| | str(output_path), '-y' |
| | ] |
| | |
| | result = subprocess.run(cmd, capture_output=True, text=True) |
| | |
| | if result.returncode == 0: |
| | logger.info(f"Video with audio saved to: {output_path}") |
| | return True |
| | else: |
| | logger.error(f"Failed to add audio: {result.stderr}") |
| | return False |
| | |
| | except FileNotFoundError: |
| | logger.error("ffmpeg not found. Please install ffmpeg.") |
| | return False |
| | except Exception as e: |
| | logger.error(f"Error adding audio: {e}") |
| | return False |
| | |
| | @staticmethod |
| | def get_frame_at_time(video_path: Union[str, Path], |
| | time_seconds: float) -> Optional[np.ndarray]: |
| | """ |
| | Get frame at specific time in video |
| | |
| | Args: |
| | video_path: Path to video |
| | time_seconds: Time in seconds |
| | |
| | Returns: |
| | Frame as numpy array or None |
| | """ |
| | cap = cv2.VideoCapture(str(video_path)) |
| | if not cap.isOpened(): |
| | logger.error(f"Failed to open video: {video_path}") |
| | return None |
| | |
| | try: |
| | fps = cap.get(cv2.CAP_PROP_FPS) |
| | frame_number = int(fps * time_seconds) |
| | |
| | cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) |
| | ret, frame = cap.read() |
| | |
| | if ret: |
| | return frame |
| | else: |
| | logger.warning(f"Could not read frame at time {time_seconds}s") |
| | return None |
| | |
| | finally: |
| | cap.release() |
| |
|
| |
|
| | |
| | _default_file_manager = None |
| |
|
| |
|
| | def get_file_manager(base_dir: Optional[str] = None) -> FileManager: |
| | """ |
| | Get or create the default FileManager instance |
| | |
| | Args: |
| | base_dir: Optional base directory |
| | |
| | Returns: |
| | FileManager instance |
| | """ |
| | global _default_file_manager |
| | if _default_file_manager is None or base_dir is not None: |
| | _default_file_manager = FileManager(base_dir) |
| | return _default_file_manager |