Spaces:
Paused
Paused
| import torch | |
| import torch.nn.functional as F | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| from pathlib import Path | |
| import asyncio | |
| from concurrent.futures import ThreadPoolExecutor | |
| import gc | |
| class VideoProcessor: | |
| def __init__(self): | |
| # Use CPU if no GPU | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| print(f"Using device: {self.device}") | |
| # Load MiDaS (small model for speed) | |
| self.model = torch.hub.load("intel-isl/MiDaS", "MiDaS_small") | |
| self.model.to(self.device) | |
| self.model.eval() | |
| # Load transforms | |
| midas_transforms = torch.hub.load("intel-isl/MiDaS", "transforms") | |
| self.transform = midas_transforms.small_transform | |
| self.executor = ThreadPoolExecutor(max_workers=1) | |
| def hex_to_rgb(self, hex_color: str): | |
| """Convert hex to RGB""" | |
| hex_color = hex_color.lstrip('#') | |
| return tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4)) | |
| async def process_video(self, input_path: str, threshold: float, | |
| bg_color: str, session_id: str) -> str: | |
| """Process video asynchronously""" | |
| loop = asyncio.get_event_loop() | |
| output_path = str(Path("/tmp") / f"{session_id}_output.mp4") | |
| # Run in thread pool | |
| await loop.run_in_executor( | |
| self.executor, | |
| self._process_video_sync, | |
| input_path, output_path, threshold, bg_color | |
| ) | |
| return output_path | |
| def _process_video_sync(self, input_path: str, output_path: str, | |
| threshold: float, bg_color: str): | |
| """Synchronous video processing""" | |
| cap = cv2.VideoCapture(input_path) | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| # Output video | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) | |
| bg_rgb = self.hex_to_rgb(bg_color) | |
| frame_count = 0 | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Process frame | |
| processed = self.process_frame(frame, threshold, bg_rgb) | |
| out.write(processed) | |
| frame_count += 1 | |
| if frame_count % 30 == 0: | |
| print(f"Progress: {frame_count}/{total_frames}") | |
| # Clear cache occasionally | |
| if frame_count % 100 == 0: | |
| gc.collect() | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| cap.release() | |
| out.release() | |
| def process_frame(self, frame: np.ndarray, threshold: float, | |
| bg_color: tuple) -> np.ndarray: | |
| """Process a single frame""" | |
| # Resize for speed | |
| h, w = frame.shape[:2] | |
| new_h, new_w = 256, int(256 * w / h) | |
| frame_small = cv2.resize(frame, (new_w, new_h)) | |
| frame_rgb = cv2.cvtColor(frame_small, cv2.COLOR_BGR2RGB) | |
| # Get depth map | |
| img = Image.fromarray(frame_rgb) | |
| input_batch = self.transform(img).to(self.device) | |
| with torch.no_grad(): | |
| depth = self.model(input_batch) | |
| depth = F.interpolate( | |
| depth.unsqueeze(1), | |
| size=(new_h, new_w), | |
| mode="bicubic", | |
| align_corners=False | |
| ).squeeze().cpu().numpy() | |
| # Normalize depth | |
| depth_norm = (depth - depth.min()) / (depth.max() - depth.min() + 1e-8) | |
| # Create mask and resize to original | |
| mask = (depth_norm > threshold).astype(np.uint8) * 255 | |
| mask = cv2.resize(mask, (w, h), interpolation=cv2.INTER_LINEAR) | |
| mask = mask.astype(bool) | |
| # Apply background | |
| result = frame.copy() | |
| result[~mask] = bg_color | |
| return result |