| | """ |
| | Video processing API module for BackgroundFX Pro. |
| | Wraps CoreVideoProcessor with additional API features for streaming, batching, and real-time processing. |
| | """ |
| |
|
| | import cv2 |
| | import numpy as np |
| | import torch |
| | from typing import Dict, List, Optional, Tuple, Union, Callable, Generator, Any |
| | from dataclasses import dataclass, field |
| | from enum import Enum |
| | from pathlib import Path |
| | import time |
| | import threading |
| | from queue import Queue, Empty |
| | import tempfile |
| | import shutil |
| | from concurrent.futures import ThreadPoolExecutor, as_completed |
| | import subprocess |
| | import json |
| | import os |
| | import asyncio |
| | from datetime import datetime |
| |
|
| | from ..utils.logger import setup_logger |
| | from ..utils.device import DeviceManager |
| | from ..utils import TimeEstimator, MemoryMonitor |
| | from ..core.temporal import TemporalCoherence |
| | from .pipeline import ProcessingPipeline, PipelineConfig, PipelineResult, ProcessingMode |
| |
|
| | |
| | from core_video import CoreVideoProcessor |
| |
|
| | logger = setup_logger(__name__) |
| |
|
| |
|
| | class VideoStreamMode(Enum): |
| | """Video streaming modes.""" |
| | FILE = "file" |
| | WEBCAM = "webcam" |
| | RTSP = "rtsp" |
| | HTTP = "http" |
| | VIRTUAL = "virtual" |
| | SCREEN = "screen" |
| |
|
| |
|
| | class OutputFormat(Enum): |
| | """Output format options.""" |
| | MP4 = "mp4" |
| | AVI = "avi" |
| | MOV = "mov" |
| | WEBM = "webm" |
| | HLS = "hls" |
| | DASH = "dash" |
| | FRAMES = "frames" |
| |
|
| |
|
| | @dataclass |
| | class StreamConfig: |
| | """Configuration for video streaming.""" |
| | |
| | source: Union[str, int] = 0 |
| | stream_mode: VideoStreamMode = VideoStreamMode.FILE |
| | |
| | |
| | output_path: Optional[str] = None |
| | output_format: OutputFormat = OutputFormat.MP4 |
| | output_codec: str = "h264" |
| | output_bitrate: str = "5M" |
| | output_fps: Optional[float] = None |
| | |
| | |
| | buffer_size: int = 30 |
| | chunk_duration: float = 2.0 |
| | enable_adaptive_bitrate: bool = False |
| | |
| | |
| | enable_preview: bool = False |
| | preview_scale: float = 0.5 |
| | low_latency: bool = False |
| | |
| | |
| | hardware_acceleration: bool = True |
| | num_threads: int = 4 |
| |
|
| |
|
| | @dataclass |
| | class VideoStats: |
| | """Enhanced video processing statistics.""" |
| | |
| | start_time: float = 0.0 |
| | total_duration: float = 0.0 |
| | processing_fps: float = 0.0 |
| | |
| | |
| | frames_total: int = 0 |
| | frames_processed: int = 0 |
| | frames_dropped: int = 0 |
| | frames_cached: int = 0 |
| | |
| | |
| | avg_quality_score: float = 0.0 |
| | min_quality_score: float = 1.0 |
| | max_quality_score: float = 0.0 |
| | |
| | |
| | cpu_usage: float = 0.0 |
| | gpu_usage: float = 0.0 |
| | memory_usage_mb: float = 0.0 |
| | |
| | |
| | error_count: int = 0 |
| | warnings: List[str] = field(default_factory=list) |
| |
|
| |
|
| | class VideoProcessorAPI: |
| | """ |
| | API wrapper for video processing with streaming and real-time capabilities. |
| | Extends CoreVideoProcessor with additional features. |
| | """ |
| | |
| | def __init__(self, core_processor: Optional[CoreVideoProcessor] = None): |
| | """ |
| | Initialize Video Processor API. |
| | |
| | Args: |
| | core_processor: Optional existing CoreVideoProcessor instance |
| | """ |
| | self.logger = setup_logger(f"{__name__}.VideoProcessorAPI") |
| | |
| | |
| | self.core_processor = core_processor |
| | self.pipeline = ProcessingPipeline(PipelineConfig(mode=ProcessingMode.VIDEO)) |
| | |
| | |
| | self.is_processing = False |
| | self.is_streaming = False |
| | self.should_stop = False |
| | |
| | |
| | self.stats = VideoStats() |
| | |
| | |
| | self.input_queue = Queue(maxsize=100) |
| | self.output_queue = Queue(maxsize=100) |
| | self.preview_queue = Queue(maxsize=10) |
| | |
| | |
| | self.executor = ThreadPoolExecutor(max_workers=8) |
| | self.stream_thread = None |
| | self.process_threads = [] |
| | |
| | |
| | self.ffmpeg_process = None |
| | |
| | |
| | self.webrtc_peers = {} |
| | |
| | self.logger.info("VideoProcessorAPI initialized") |
| | |
| | async def process_video_async(self, |
| | input_path: str, |
| | output_path: str, |
| | background: Optional[Union[str, np.ndarray]] = None, |
| | progress_callback: Optional[Callable] = None) -> VideoStats: |
| | """ |
| | Asynchronously process a video file. |
| | |
| | Args: |
| | input_path: Path to input video |
| | output_path: Path to output video |
| | background: Background image or path |
| | progress_callback: Progress callback function |
| | |
| | Returns: |
| | Processing statistics |
| | """ |
| | return await asyncio.get_event_loop().run_in_executor( |
| | None, |
| | self.process_video, |
| | input_path, |
| | output_path, |
| | background, |
| | progress_callback |
| | ) |
| | |
| | def process_video(self, |
| | input_path: str, |
| | output_path: str, |
| | background: Optional[Union[str, np.ndarray]] = None, |
| | progress_callback: Optional[Callable] = None) -> VideoStats: |
| | """ |
| | Process a video file using either CoreVideoProcessor or Pipeline. |
| | |
| | Args: |
| | input_path: Path to input video |
| | output_path: Path to output video |
| | background: Background image or path |
| | progress_callback: Progress callback function |
| | |
| | Returns: |
| | Processing statistics |
| | """ |
| | self.stats = VideoStats(start_time=time.time()) |
| | self.is_processing = True |
| | |
| | try: |
| | |
| | if self.core_processor: |
| | return self._process_with_core( |
| | input_path, output_path, background, progress_callback |
| | ) |
| | else: |
| | |
| | return self._process_with_pipeline( |
| | input_path, output_path, background, progress_callback |
| | ) |
| | |
| | finally: |
| | self.is_processing = False |
| | self.stats.total_duration = time.time() - self.stats.start_time |
| | |
| | def _process_with_pipeline(self, |
| | input_path: str, |
| | output_path: str, |
| | background: Optional[Union[str, np.ndarray]], |
| | progress_callback: Optional[Callable]) -> VideoStats: |
| | """Process video using the Pipeline system.""" |
| | |
| | cap = cv2.VideoCapture(input_path) |
| | if not cap.isOpened(): |
| | raise ValueError(f"Cannot open video: {input_path}") |
| | |
| | |
| | fps = cap.get(cv2.CAP_PROP_FPS) |
| | width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| | height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| | total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| | |
| | self.stats.frames_total = total_frames |
| | |
| | |
| | fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
| | out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) |
| | |
| | frame_idx = 0 |
| | |
| | try: |
| | while True: |
| | ret, frame = cap.read() |
| | if not ret: |
| | break |
| | |
| | |
| | result = self.pipeline.process_image(frame, background) |
| | |
| | if result.success and result.output_image is not None: |
| | out.write(result.output_image) |
| | self.stats.frames_processed += 1 |
| | |
| | |
| | self._update_quality_stats(result.quality_score) |
| | else: |
| | |
| | out.write(frame) |
| | self.stats.frames_dropped += 1 |
| | |
| | frame_idx += 1 |
| | |
| | |
| | if progress_callback: |
| | progress = frame_idx / total_frames |
| | progress_callback(progress, { |
| | 'current_frame': frame_idx, |
| | 'total_frames': total_frames, |
| | 'fps': self.stats.frames_processed / (time.time() - self.stats.start_time) |
| | }) |
| | |
| | |
| | if self.should_stop: |
| | break |
| | |
| | finally: |
| | cap.release() |
| | out.release() |
| | |
| | self.stats.processing_fps = self.stats.frames_processed / (time.time() - self.stats.start_time) |
| | return self.stats |
| | |
| | def _process_with_core(self, |
| | input_path: str, |
| | output_path: str, |
| | background: Optional[Union[str, np.ndarray]], |
| | progress_callback: Optional[Callable]) -> VideoStats: |
| | """Process video using CoreVideoProcessor.""" |
| | |
| | |
| | if isinstance(background, str): |
| | if os.path.exists(background): |
| | bg_choice = "custom" |
| | custom_bg = background |
| | else: |
| | bg_choice = background |
| | custom_bg = None |
| | elif isinstance(background, np.ndarray): |
| | |
| | temp_bg = tempfile.NamedTemporaryFile(suffix='.png', delete=False) |
| | cv2.imwrite(temp_bg.name, background) |
| | bg_choice = "custom" |
| | custom_bg = temp_bg.name |
| | else: |
| | bg_choice = "blur" |
| | custom_bg = None |
| | |
| | |
| | output, message = self.core_processor.process_video( |
| | input_path, |
| | bg_choice, |
| | custom_bg, |
| | progress_callback |
| | ) |
| | |
| | if output: |
| | |
| | shutil.move(output, output_path) |
| | |
| | |
| | core_stats = self.core_processor.stats |
| | self.stats.frames_processed = core_stats.get('successful_frames', 0) |
| | self.stats.frames_dropped = core_stats.get('failed_frames', 0) |
| | self.stats.processing_fps = core_stats.get('average_fps', 0) |
| | |
| | return self.stats |
| | |
| | def start_stream_processing(self, |
| | config: StreamConfig, |
| | background: Optional[Union[str, np.ndarray]] = None) -> bool: |
| | """ |
| | Start real-time stream processing. |
| | |
| | Args: |
| | config: Stream configuration |
| | background: Background for replacement |
| | |
| | Returns: |
| | True if stream started successfully |
| | """ |
| | if self.is_streaming: |
| | self.logger.warning("Stream already active") |
| | return False |
| | |
| | self.is_streaming = True |
| | self.should_stop = False |
| | |
| | |
| | self.stream_thread = threading.Thread( |
| | target=self._stream_input_handler, |
| | args=(config,) |
| | ) |
| | self.stream_thread.start() |
| | |
| | |
| | for i in range(config.num_threads): |
| | thread = threading.Thread( |
| | target=self._stream_processor, |
| | args=(background,) |
| | ) |
| | thread.start() |
| | self.process_threads.append(thread) |
| | |
| | |
| | if config.output_format in [OutputFormat.HLS, OutputFormat.DASH]: |
| | self._start_adaptive_streaming(config) |
| | else: |
| | self._start_output_handler(config) |
| | |
| | self.logger.info(f"Stream processing started: {config.stream_mode.value}") |
| | return True |
| | |
| | def _stream_input_handler(self, config: StreamConfig): |
| | """Handle input stream capture.""" |
| | try: |
| | |
| | if config.stream_mode == VideoStreamMode.FILE: |
| | cap = cv2.VideoCapture(config.source) |
| | elif config.stream_mode == VideoStreamMode.WEBCAM: |
| | cap = cv2.VideoCapture(int(config.source)) |
| | elif config.stream_mode in [VideoStreamMode.RTSP, VideoStreamMode.HTTP]: |
| | cap = cv2.VideoCapture(config.source) |
| | elif config.stream_mode == VideoStreamMode.SCREEN: |
| | |
| | cap = self._setup_screen_capture() |
| | else: |
| | raise ValueError(f"Unsupported stream mode: {config.stream_mode}") |
| | |
| | if not cap.isOpened(): |
| | raise ValueError("Failed to open stream") |
| | |
| | frame_count = 0 |
| | |
| | while self.is_streaming and not self.should_stop: |
| | ret, frame = cap.read() |
| | if not ret: |
| | if config.stream_mode == VideoStreamMode.FILE: |
| | |
| | break |
| | else: |
| | |
| | time.sleep(0.1) |
| | continue |
| | |
| | |
| | try: |
| | self.input_queue.put(frame, timeout=0.1) |
| | frame_count += 1 |
| | except: |
| | |
| | self.stats.frames_dropped += 1 |
| | |
| | |
| | if config.stream_mode != VideoStreamMode.FILE: |
| | time.sleep(1.0 / 30) |
| | |
| | cap.release() |
| | |
| | except Exception as e: |
| | self.logger.error(f"Stream input handler error: {e}") |
| | finally: |
| | self.is_streaming = False |
| | |
| | def _stream_processor(self, background: Optional[Union[str, np.ndarray]]): |
| | """Process frames from input queue.""" |
| | while self.is_streaming or not self.input_queue.empty(): |
| | try: |
| | frame = self.input_queue.get(timeout=0.5) |
| | |
| | |
| | result = self.pipeline.process_image(frame, background) |
| | |
| | if result.success and result.output_image is not None: |
| | |
| | self.output_queue.put(result.output_image) |
| | |
| | |
| | self.stats.frames_processed += 1 |
| | self._update_quality_stats(result.quality_score) |
| | |
| | |
| | if not self.preview_queue.full(): |
| | preview = cv2.resize(result.output_image, None, fx=0.5, fy=0.5) |
| | try: |
| | self.preview_queue.put_nowait(preview) |
| | except: |
| | pass |
| | |
| | except Empty: |
| | continue |
| | except Exception as e: |
| | self.logger.error(f"Stream processor error: {e}") |
| | self.stats.error_count += 1 |
| | |
| | def _start_output_handler(self, config: StreamConfig): |
| | """Start output stream handler.""" |
| | output_thread = threading.Thread( |
| | target=self._output_handler, |
| | args=(config,) |
| | ) |
| | output_thread.start() |
| | self.process_threads.append(output_thread) |
| | |
| | def _output_handler(self, config: StreamConfig): |
| | """Handle output stream writing.""" |
| | try: |
| | if config.output_format == OutputFormat.FRAMES: |
| | |
| | self._save_frames_output(config) |
| | else: |
| | |
| | self._save_video_output(config) |
| | |
| | except Exception as e: |
| | self.logger.error(f"Output handler error: {e}") |
| | |
| | def _save_video_output(self, config: StreamConfig): |
| | """Save processed frames to video file.""" |
| | out = None |
| | frame_count = 0 |
| | |
| | try: |
| | while self.is_streaming or not self.output_queue.empty(): |
| | try: |
| | frame = self.output_queue.get(timeout=0.5) |
| | |
| | |
| | if out is None: |
| | h, w = frame.shape[:2] |
| | fps = config.output_fps or 30.0 |
| | |
| | if config.output_format == OutputFormat.MP4: |
| | fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
| | elif config.output_format == OutputFormat.AVI: |
| | fourcc = cv2.VideoWriter_fourcc(*'XVID') |
| | else: |
| | fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
| | |
| | out = cv2.VideoWriter( |
| | config.output_path, |
| | fourcc, |
| | fps, |
| | (w, h) |
| | ) |
| | |
| | out.write(frame) |
| | frame_count += 1 |
| | |
| | except Empty: |
| | continue |
| | |
| | finally: |
| | if out: |
| | out.release() |
| | self.logger.info(f"Saved {frame_count} frames to {config.output_path}") |
| | |
| | def _save_frames_output(self, config: StreamConfig): |
| | """Save processed frames as individual images.""" |
| | output_dir = Path(config.output_path) |
| | output_dir.mkdir(parents=True, exist_ok=True) |
| | |
| | frame_count = 0 |
| | |
| | while self.is_streaming or not self.output_queue.empty(): |
| | try: |
| | frame = self.output_queue.get(timeout=0.5) |
| | |
| | |
| | frame_path = output_dir / f"frame_{frame_count:06d}.png" |
| | cv2.imwrite(str(frame_path), frame) |
| | frame_count += 1 |
| | |
| | except Empty: |
| | continue |
| | |
| | def _start_adaptive_streaming(self, config: StreamConfig): |
| | """Start HLS or DASH adaptive streaming.""" |
| | try: |
| | |
| | if config.output_format == OutputFormat.HLS: |
| | self._start_hls_streaming(config) |
| | elif config.output_format == OutputFormat.DASH: |
| | self._start_dash_streaming(config) |
| | |
| | except Exception as e: |
| | self.logger.error(f"Adaptive streaming setup failed: {e}") |
| | |
| | def _start_hls_streaming(self, config: StreamConfig): |
| | """Start HLS streaming with FFmpeg.""" |
| | output_dir = Path(config.output_path) |
| | output_dir.mkdir(parents=True, exist_ok=True) |
| | |
| | |
| | cmd = [ |
| | 'ffmpeg', |
| | '-f', 'rawvideo', |
| | '-pix_fmt', 'bgr24', |
| | '-s', '1920x1080', |
| | '-r', '30', |
| | '-i', '-', |
| | '-c:v', 'libx264', |
| | '-preset', 'ultrafast', |
| | '-tune', 'zerolatency', |
| | '-f', 'hls', |
| | '-hls_time', str(config.chunk_duration), |
| | '-hls_list_size', '10', |
| | '-hls_flags', 'delete_segments', |
| | str(output_dir / 'stream.m3u8') |
| | ] |
| | |
| | |
| | self.ffmpeg_process = subprocess.Popen( |
| | cmd, |
| | stdin=subprocess.PIPE, |
| | stdout=subprocess.PIPE, |
| | stderr=subprocess.PIPE |
| | ) |
| | |
| | |
| | ffmpeg_thread = threading.Thread( |
| | target=self._pipe_to_ffmpeg |
| | ) |
| | ffmpeg_thread.start() |
| | self.process_threads.append(ffmpeg_thread) |
| | |
| | self.logger.info(f"HLS streaming started: {output_dir / 'stream.m3u8'}") |
| | |
| | def _pipe_to_ffmpeg(self): |
| | """Pipe processed frames to FFmpeg.""" |
| | while self.is_streaming or not self.output_queue.empty(): |
| | try: |
| | frame = self.output_queue.get(timeout=0.5) |
| | |
| | if self.ffmpeg_process and self.ffmpeg_process.stdin: |
| | self.ffmpeg_process.stdin.write(frame.tobytes()) |
| | |
| | except Empty: |
| | continue |
| | except Exception as e: |
| | self.logger.error(f"FFmpeg pipe error: {e}") |
| | break |
| | |
| | def _setup_screen_capture(self) -> cv2.VideoCapture: |
| | """Setup screen capture (platform-specific).""" |
| | |
| | |
| | return cv2.VideoCapture(0) |
| | |
| | def _update_quality_stats(self, quality_score: float): |
| | """Update quality statistics.""" |
| | n = self.stats.frames_processed |
| | if n == 0: |
| | self.stats.avg_quality_score = quality_score |
| | else: |
| | self.stats.avg_quality_score = ( |
| | (self.stats.avg_quality_score * n + quality_score) / (n + 1) |
| | ) |
| | |
| | self.stats.min_quality_score = min(self.stats.min_quality_score, quality_score) |
| | self.stats.max_quality_score = max(self.stats.max_quality_score, quality_score) |
| | |
| | def stop_stream_processing(self): |
| | """Stop stream processing.""" |
| | self.should_stop = True |
| | self.is_streaming = False |
| | |
| | |
| | if self.stream_thread: |
| | self.stream_thread.join(timeout=5) |
| | |
| | for thread in self.process_threads: |
| | thread.join(timeout=5) |
| | |
| | |
| | if self.ffmpeg_process: |
| | self.ffmpeg_process.terminate() |
| | self.ffmpeg_process.wait(timeout=5) |
| | |
| | self.logger.info("Stream processing stopped") |
| | |
| | def get_preview_frame(self) -> Optional[np.ndarray]: |
| | """Get a preview frame from the preview queue.""" |
| | try: |
| | return self.preview_queue.get_nowait() |
| | except Empty: |
| | return None |
| | |
| | def get_stats(self) -> VideoStats: |
| | """Get current processing statistics.""" |
| | if self.is_processing or self.is_streaming: |
| | self.stats.processing_fps = ( |
| | self.stats.frames_processed / |
| | (time.time() - self.stats.start_time) |
| | ) |
| | return self.stats |
| | |
| | def process_video_batch(self, |
| | input_paths: List[str], |
| | output_dir: str, |
| | background: Optional[Union[str, np.ndarray]] = None, |
| | parallel: bool = True) -> List[VideoStats]: |
| | """ |
| | Process multiple videos in batch. |
| | |
| | Args: |
| | input_paths: List of input video paths |
| | output_dir: Output directory |
| | background: Background for all videos |
| | parallel: Process in parallel |
| | |
| | Returns: |
| | List of processing statistics |
| | """ |
| | output_dir = Path(output_dir) |
| | output_dir.mkdir(parents=True, exist_ok=True) |
| | |
| | results = [] |
| | |
| | if parallel: |
| | |
| | futures = [] |
| | |
| | for input_path in input_paths: |
| | input_name = Path(input_path).stem |
| | output_path = output_dir / f"{input_name}_processed.mp4" |
| | |
| | future = self.executor.submit( |
| | self.process_video, |
| | input_path, |
| | str(output_path), |
| | background |
| | ) |
| | futures.append(future) |
| | |
| | |
| | for future in as_completed(futures): |
| | try: |
| | stats = future.result(timeout=3600) |
| | results.append(stats) |
| | except Exception as e: |
| | self.logger.error(f"Batch processing error: {e}") |
| | results.append(VideoStats(error_count=1)) |
| | else: |
| | |
| | for input_path in input_paths: |
| | input_name = Path(input_path).stem |
| | output_path = output_dir / f"{input_name}_processed.mp4" |
| | |
| | stats = self.process_video( |
| | input_path, |
| | str(output_path), |
| | background |
| | ) |
| | results.append(stats) |
| | |
| | return results |
| | |
| | def export_to_format(self, |
| | input_path: str, |
| | output_path: str, |
| | format: OutputFormat, |
| | **kwargs) -> bool: |
| | """ |
| | Export processed video to specific format. |
| | |
| | Args: |
| | input_path: Input video path |
| | output_path: Output path |
| | format: Target format |
| | **kwargs: Format-specific options |
| | |
| | Returns: |
| | True if successful |
| | """ |
| | try: |
| | if format == OutputFormat.WEBM: |
| | cmd = [ |
| | 'ffmpeg', '-i', input_path, |
| | '-c:v', 'libvpx-vp9', |
| | '-crf', '30', |
| | '-b:v', '0', |
| | output_path |
| | ] |
| | elif format == OutputFormat.HLS: |
| | cmd = [ |
| | 'ffmpeg', '-i', input_path, |
| | '-c:v', 'libx264', |
| | '-hls_time', '10', |
| | '-hls_list_size', '0', |
| | '-f', 'hls', |
| | output_path |
| | ] |
| | else: |
| | |
| | cmd = [ |
| | 'ffmpeg', '-i', input_path, |
| | '-c:v', 'libx264', |
| | '-preset', 'medium', |
| | '-crf', '23', |
| | output_path |
| | ] |
| | |
| | result = subprocess.run(cmd, capture_output=True, text=True) |
| | return result.returncode == 0 |
| | |
| | except Exception as e: |
| | self.logger.error(f"Export failed: {e}") |
| | return False |
| | |
| | def cleanup(self): |
| | """Cleanup resources.""" |
| | self.stop_stream_processing() |
| | self.executor.shutdown(wait=True) |
| | |
| | if self.core_processor: |
| | self.core_processor.cleanup() |
| | |
| | self.logger.info("VideoProcessorAPI cleanup complete") |