Spaces:
Running
Running
| import cv2 | |
| import numpy as np | |
| import os | |
| import shutil | |
| import subprocess | |
| import urllib.request | |
| import onnxruntime as ort | |
| from moviepy.editor import VideoFileClip | |
| # 🚀 INITIALIZE THE LAMA AI ENGINE GLOBALLY | |
| MODEL_PATH = os.path.join(os.path.dirname(__file__), "models", "big-lama.onnx") | |
| if not os.path.exists(MODEL_PATH): | |
| print("📥 AI Model file not found. Auto-downloading from production mirror...") | |
| os.makedirs(os.path.dirname(MODEL_PATH), exist_ok=True) | |
| url = "https://huggingface.co/Carve/LaMa-ONNX/resolve/main/lama_fp32.onnx" | |
| try: | |
| req = urllib.request.Request( | |
| url, | |
| headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'} | |
| ) | |
| with urllib.request.urlopen(req) as response, open(MODEL_PATH, 'wb') as out_file: | |
| shutil.copyfileobj(response, out_file) | |
| print("✅ AI Model downloaded successfully!") | |
| except Exception as download_err: | |
| print(f"🚨 Mirror download failed ({download_err}).") | |
| try: | |
| if os.path.exists(MODEL_PATH): | |
| opts = ort.SessionOptions() | |
| opts.intra_op_num_threads = 1 | |
| opts.inter_op_num_threads = 1 | |
| opts.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL | |
| providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] | |
| ort_session = ort.InferenceSession(MODEL_PATH, sess_options=opts, providers=providers) | |
| print("🚀 LaMa AI Inpainting Engine Initialized Safely!") | |
| else: | |
| ort_session = None | |
| except Exception as e: | |
| print(f"⚠️ ONNX Session initialization failed ({e}). Falling back to classical.") | |
| ort_session = None | |
| def remove_watermark_pro(input_path, output_path, x, y, w, h, mode="fast"): | |
| """ | |
| Video Watermark Removal with adaptive dominant-color patching for seamless graphics removal. | |
| """ | |
| global ort_session | |
| base_name = os.path.basename(input_path) | |
| normalized_input = f"normalized_{base_name}.mp4" | |
| try: | |
| conversion_cmd = [ | |
| "ffmpeg", "-y", "-i", input_path, | |
| "-vcodec", "libx264", "-acodec", "aac", | |
| "-pix_fmt", "yuv420p", "-preset", "ultrafast", | |
| normalized_input | |
| ] | |
| subprocess.run(conversion_cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| processing_source = normalized_input | |
| except Exception: | |
| processing_source = input_path | |
| cap = cv2.VideoCapture(processing_source) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| width = width if width % 2 == 0 else width - 1 | |
| height = height if height % 2 == 0 else height - 1 | |
| temp_output = f"temp_pro_{base_name}" | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(temp_output, fourcc, fps, (width, height)) | |
| # Calculate bounded crop area coordinates for AI mode | |
| padding = 16 | |
| x1 = max(0, int(x) - padding) | |
| y1 = max(0, int(y) - padding) | |
| x2 = min(width, int(x + w) + padding) | |
| y2 = min(height, int(y + h) + padding) | |
| crop_w = x2 - x1 | |
| crop_h = y2 - y1 | |
| if crop_w % 8 != 0: x2 = min(width, x2 + (8 - (crop_w % 8))) | |
| if crop_h % 8 != 0: y2 = min(height, y2 + (8 - (crop_h % 8))) | |
| img_key, mask_key = None, None | |
| if ort_session is not None and mode == "ai": | |
| try: | |
| inputs = ort_session.get_inputs() | |
| for node in inputs: | |
| if "mask" in node.name.lower(): | |
| mask_key = node.name | |
| else: | |
| img_key = node.name | |
| except Exception: | |
| ort_session = None | |
| frames_processed = 0 | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| if frame.shape[1] != width or frame.shape[0] != height: | |
| frame = cv2.resize(frame, (width, height)) | |
| mask = np.zeros(frame.shape[:2], dtype=np.uint8) | |
| cv2.rectangle(mask, (int(x), int(y)), (int(x + w), int(y + h)), 255, -1) | |
| if mode == "ai" and ort_session is not None and (y2 > y1 and x2 > x1): | |
| try: | |
| crop_frame = frame[y1:y2, x1:x2] | |
| crop_mask = mask[y1:y2, x1:x2] | |
| orig_h, orig_w = crop_frame.shape[:2] | |
| crop_frame_resized = cv2.resize(crop_frame, (512, 512)) | |
| crop_mask_resized = cv2.resize(crop_mask, (512, 512)) | |
| crop_rgb = cv2.cvtColor(crop_frame_resized, cv2.COLOR_BGR2RGB) | |
| img_tensor = crop_rgb.astype(np.float32) / 255.0 | |
| img_tensor = np.transpose(img_tensor, (2, 0, 1))[np.newaxis, ...] | |
| mask_tensor = crop_mask_resized.astype(np.float32) | |
| mask_tensor = np.where(mask_tensor > 0, 1.0, 0.0).astype(np.float32) | |
| mask_tensor = mask_tensor[np.newaxis, np.newaxis, ...] | |
| ai_outputs = ort_session.run(None, {img_key: img_tensor, mask_key: mask_tensor}) | |
| ai_output = ai_outputs[0][0] | |
| ai_output = np.transpose(ai_output, (1, 2, 0)) | |
| ai_output = np.clip(ai_output * 255.0, 0, 255).astype(np.uint8) | |
| clean_crop_bgr = cv2.cvtColor(ai_output, cv2.COLOR_RGB2BGR) | |
| frame[y1:y2, x1:x2] = cv2.resize(clean_crop_bgr, (orig_w, orig_h)) | |
| except Exception: | |
| frame = cv2.inpaint(frame, mask, 1, cv2.INPAINT_TELEA) | |
| else: | |
| # ⚡ HIGH-FIDELITY FAST MODE: Content-Aware Dominant Color Patching | |
| ymin, ymax = max(0, int(y)), min(height, int(y + h)) | |
| xmin, xmax = max(0, int(x)), min(width, int(x + w)) | |
| if (ymax > ymin) and (xmax > xmin): | |
| # Sample 4 pixels wide around the boundary of the selection box | |
| sample_top = frame[max(0, ymin-4):ymin, xmin:xmax] | |
| sample_bottom = frame[ymax:min(height, ymax+4), xmin:xmax] | |
| sample_left = frame[ymin:ymax, max(0, xmin-4):xmin] | |
| sample_right = frame[ymin:ymax, xmax:min(width, xmax+4)] | |
| samples = [] | |
| for s in [sample_top, sample_bottom, sample_left, sample_right]: | |
| if s.size > 0: | |
| samples.append(s.reshape(-1, 3)) | |
| if samples: | |
| # Use median value to completely disregard text letter colors and extract pure background tint | |
| all_pixels = np.vstack(samples) | |
| bg_color = np.median(all_pixels, axis=0).astype(int) | |
| # Paint a clean, solid patch over the watermark zone matching the background perfectly | |
| cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), (int(bg_color[0]), int(bg_color[1]), int(bg_color[2])), -1) | |
| # Smooth the local border bounds slightly so it blends organically | |
| if xmin > 2 and xmax < width - 2 and ymin > 2 and ymax < height - 2: | |
| roi = frame[ymin-2:ymax+2, xmin-2:xmax+2] | |
| frame[ymin-2:ymax+2, xmin-2:xmax+2] = cv2.GaussianBlur(roi, (3, 3), 0) | |
| else: | |
| frame = cv2.inpaint(frame, mask, 1, cv2.INPAINT_TELEA) | |
| else: | |
| frame = cv2.inpaint(frame, mask, 1, cv2.INPAINT_TELEA) | |
| out.write(frame) | |
| frames_processed += 1 | |
| cap.release() | |
| out.release() | |
| if frames_processed == 0: | |
| if os.path.exists(temp_output): os.remove(temp_output) | |
| if os.path.exists(normalized_input): os.remove(normalized_input) | |
| return False | |
| try: | |
| original_clip = VideoFileClip(processing_source) | |
| processed_clip = VideoFileClip(temp_output) | |
| if original_clip.audio is not None: | |
| final_video = processed_clip.set_audio(original_clip.audio) | |
| else: | |
| final_video = processed_clip | |
| final_video.write_videofile( | |
| output_path, codec="libx264", audio_codec="aac", | |
| preset="ultrafast", threads=1, | |
| ffmpeg_params=["-movflags", "+faststart"], logger=None | |
| ) | |
| original_clip.close() | |
| processed_clip.close() | |
| except Exception: | |
| try: | |
| conversion_command = [ | |
| "ffmpeg", "-y", "-i", temp_output, | |
| "-vcodec", "libx264", "-pix_fmt", "yuv420p", | |
| "-movflags", "+faststart", "-an", output_path | |
| ] | |
| subprocess.run(conversion_command, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| except Exception: | |
| shutil.copy(temp_output, output_path) | |
| finally: | |
| if os.path.exists(temp_output): os.remove(temp_output) | |
| if os.path.exists(normalized_input): os.remove(normalized_input) | |
| return True |