|
|
import os |
|
|
import cv2 |
|
|
import numpy as np |
|
|
from tqdm import tqdm |
|
|
|
|
|
watermark_path = '/app/src/assets/watermark.png' |
|
|
|
|
|
def precompute_watermark(frame_width, frame_height): |
|
|
""" |
|
|
Precomputes the resized watermark and its alpha mask while preserving its aspect ratio. |
|
|
""" |
|
|
watermark = cv2.imread(watermark_path, cv2.IMREAD_UNCHANGED) |
|
|
if watermark is None: |
|
|
raise FileNotFoundError(f"Watermark image not found at: {watermark_path}") |
|
|
|
|
|
wm_rgb = watermark[:, :, :3] |
|
|
wm_alpha = watermark[:, :, 3] / 255.0 |
|
|
|
|
|
|
|
|
wm_original_height, wm_original_width = wm_rgb.shape[:2] |
|
|
scale = min(frame_width * 0.2 / wm_original_width, frame_height * 0.2 / wm_original_height) |
|
|
wm_width = int(wm_original_width * scale) |
|
|
wm_height = int(wm_original_height * scale) |
|
|
|
|
|
|
|
|
wm_resized = cv2.resize(wm_rgb, (wm_width, wm_height)) |
|
|
wm_alpha_resized = cv2.resize(wm_alpha, (wm_width, wm_height)) |
|
|
|
|
|
|
|
|
x_offset = frame_width - wm_width - 10 |
|
|
y_offset = frame_height - wm_height - 10 |
|
|
|
|
|
return wm_resized, wm_alpha_resized, wm_width, wm_height, x_offset, y_offset |
|
|
|
|
|
def add_watermark_to_frame(frame, watermark_data): |
|
|
""" |
|
|
Adds a watermark to a single frame while preserving its aspect ratio. |
|
|
""" |
|
|
wm_resized, wm_alpha_resized, wm_width, wm_height, x_offset, y_offset = watermark_data |
|
|
|
|
|
|
|
|
overlay = frame.copy() |
|
|
overlay[y_offset:y_offset + wm_height, x_offset:x_offset + wm_width] = ( |
|
|
wm_alpha_resized[:, :, None] * wm_resized + |
|
|
(1 - wm_alpha_resized[:, :, None]) * overlay[y_offset:y_offset + wm_height, x_offset:x_offset + wm_width] |
|
|
).astype(np.uint8) |
|
|
|
|
|
return overlay |
|
|
|
|
|
def add_watermark_to_frames(frames_folder, output_folder): |
|
|
""" |
|
|
Applies a watermark to all frames in `frames_folder` and saves them to `output_folder`. |
|
|
""" |
|
|
os.makedirs(output_folder, exist_ok=True) |
|
|
|
|
|
frame_files = sorted([ |
|
|
f for f in os.listdir(frames_folder) |
|
|
if f.lower().endswith(('.jpg', '.png')) |
|
|
], key=lambda x: int(os.path.splitext(x)[0])) |
|
|
|
|
|
if not frame_files: |
|
|
raise FileNotFoundError("No frames found in the input folder.") |
|
|
|
|
|
|
|
|
sample_frame_path = os.path.join(frames_folder, frame_files[0]) |
|
|
sample_frame = cv2.imread(sample_frame_path) |
|
|
if sample_frame is None: |
|
|
raise ValueError(f"Could not read sample frame: {sample_frame_path}") |
|
|
|
|
|
frame_height, frame_width = sample_frame.shape[:2] |
|
|
watermark_data = precompute_watermark(frame_width, frame_height) |
|
|
|
|
|
print("💧 Adding watermark to frames...") |
|
|
for filename in tqdm(frame_files, desc="Watermarking"): |
|
|
input_path = os.path.join(frames_folder, filename) |
|
|
output_path = os.path.join(output_folder, filename) |
|
|
|
|
|
frame = cv2.imread(input_path) |
|
|
if frame is None: |
|
|
print(f"⚠️ Could not read frame: {input_path}") |
|
|
continue |
|
|
|
|
|
watermarked = add_watermark_to_frame(frame, watermark_data) |
|
|
cv2.imwrite(output_path, watermarked) |
|
|
|
|
|
print(f"✅ Watermarked frames saved to: {output_folder}") |
|
|
|
|
|
|