import torch import torch.nn.functional as F import cv2 import numpy as np from PIL import Image from pathlib import Path import asyncio from concurrent.futures import ThreadPoolExecutor import gc class VideoProcessor: def __init__(self): # Use CPU if no GPU self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Using device: {self.device}") # Load MiDaS (small model for speed) self.model = torch.hub.load("intel-isl/MiDaS", "MiDaS_small") self.model.to(self.device) self.model.eval() # Load transforms midas_transforms = torch.hub.load("intel-isl/MiDaS", "transforms") self.transform = midas_transforms.small_transform self.executor = ThreadPoolExecutor(max_workers=1) def hex_to_rgb(self, hex_color: str): """Convert hex to RGB""" hex_color = hex_color.lstrip('#') return tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4)) async def process_video(self, input_path: str, threshold: float, bg_color: str, session_id: str) -> str: """Process video asynchronously""" loop = asyncio.get_event_loop() output_path = str(Path("/tmp") / f"{session_id}_output.mp4") # Run in thread pool await loop.run_in_executor( self.executor, self._process_video_sync, input_path, output_path, threshold, bg_color ) return output_path def _process_video_sync(self, input_path: str, output_path: str, threshold: float, bg_color: str): """Synchronous video processing""" cap = cv2.VideoCapture(input_path) fps = int(cap.get(cv2.CAP_PROP_FPS)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # Output video fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) bg_rgb = self.hex_to_rgb(bg_color) frame_count = 0 while cap.isOpened(): ret, frame = cap.read() if not ret: break # Process frame processed = self.process_frame(frame, threshold, bg_rgb) out.write(processed) frame_count += 1 if frame_count % 30 == 0: print(f"Progress: {frame_count}/{total_frames}") # Clear cache occasionally if frame_count % 100 == 0: gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() cap.release() out.release() def process_frame(self, frame: np.ndarray, threshold: float, bg_color: tuple) -> np.ndarray: """Process a single frame""" # Resize for speed h, w = frame.shape[:2] new_h, new_w = 256, int(256 * w / h) frame_small = cv2.resize(frame, (new_w, new_h)) frame_rgb = cv2.cvtColor(frame_small, cv2.COLOR_BGR2RGB) # Get depth map img = Image.fromarray(frame_rgb) input_batch = self.transform(img).to(self.device) with torch.no_grad(): depth = self.model(input_batch) depth = F.interpolate( depth.unsqueeze(1), size=(new_h, new_w), mode="bicubic", align_corners=False ).squeeze().cpu().numpy() # Normalize depth depth_norm = (depth - depth.min()) / (depth.max() - depth.min() + 1e-8) # Create mask and resize to original mask = (depth_norm > threshold).astype(np.uint8) * 255 mask = cv2.resize(mask, (w, h), interpolation=cv2.INTER_LINEAR) mask = mask.astype(bool) # Apply background result = frame.copy() result[~mask] = bg_color return result