import os
import cv2
import json
import shutil
import mimetypes
import tempfile
import subprocess
import numpy as np
from fastapi import (
FastAPI,
UploadFile,
File,
Form,
HTTPException
)
from fastapi.responses import (
HTMLResponse,
FileResponse
)
from starlette.background import BackgroundTask
# =========================================================
# APP
# =========================================================
app = FastAPI(
title="Fast Watermark Remover"
)
# =========================================================
# CONFIG
# =========================================================
MAX_FILE_SIZE = 500 * 1024 * 1024
PREVIEW_WIDTH = 720
PREVIEW_HEIGHT = 1280
PROCESS_WIDTH = 720
ALLOWED_EXTENSIONS = [
".mp4",
".mov",
".avi",
".mkv",
".webm",
".flv",
".wmv",
".mpeg",
".mpg",
".m4v",
".3gp"
]
# =========================================================
# HELPERS
# =========================================================
def cleanup(path):
try:
if os.path.exists(path):
shutil.rmtree(path)
except:
pass
def allowed_file(filename):
ext = os.path.splitext(
filename
)[1].lower()
return ext in ALLOWED_EXTENSIONS
# =========================================================
# MASK
# =========================================================
def create_mask(
width,
height,
brush_points
):
mask = np.zeros(
(height, width),
dtype=np.uint8
)
for point in brush_points:
x = int(point["x"])
y = int(point["y"])
size = int(point["size"])
cv2.circle(
mask,
(x, y),
size,
255,
-1
)
mask = cv2.GaussianBlur(
mask,
(15, 15),
0
)
return mask
def remove_watermark(
frame,
brush_points
):
h, w = frame.shape[:2]
mask = create_mask(
w,
h,
brush_points
)
result = cv2.inpaint(
frame,
mask,
5,
cv2.INPAINT_TELEA
)
return result
# =========================================================
# UI
# =========================================================
@app.get("/", response_class=HTMLResponse)
async def home():
return """
Fast Watermark Remover
Fast Watermark Remover
Brush paint watermark area and remove it instantly
"""
# =========================================================
# REMOVE API
# =========================================================
@app.post("/remove/")
async def remove_video(
file: UploadFile = File(...),
brush_points: str = Form(...)
):
if not allowed_file(
file.filename
):
raise HTTPException(
status_code=400,
detail="Unsupported file"
)
brush_points = json.loads(
brush_points
)
session_dir = tempfile.mkdtemp()
original_ext = os.path.splitext(
file.filename
)[1]
original_name = os.path.splitext(
file.filename
)[0]
output_filename = (
original_name +
"_cleaned" +
original_ext
)
input_path = os.path.join(
session_dir,
file.filename
)
output_path = os.path.join(
session_dir,
output_filename
)
size = 0
with open(input_path, "wb") as f:
while True:
chunk = await file.read(
1024 * 1024
)
if not chunk:
break
size += len(chunk)
if size > MAX_FILE_SIZE:
cleanup(session_dir)
raise HTTPException(
status_code=400,
detail="File too large"
)
f.write(chunk)
cap = cv2.VideoCapture(
input_path
)
width = int(
cap.get(
cv2.CAP_PROP_FRAME_WIDTH
)
)
height = int(
cap.get(
cv2.CAP_PROP_FRAME_HEIGHT
)
)
fps = cap.get(
cv2.CAP_PROP_FPS
)
if fps <= 0:
fps = 30
process_width = PROCESS_WIDTH
scale_ratio = process_width / width
process_height = int(
height * scale_ratio
)
scaled_points = []
for point in brush_points:
scale_x = (
process_width /
point["canvasWidth"]
)
scale_y = (
process_height /
point["canvasHeight"]
)
scaled_points.append({
"x":
int(
point["x"] *
scale_x
),
"y":
int(
point["y"] *
scale_y
),
"size":
int(
point["size"] *
scale_x
)
})
fourcc = cv2.VideoWriter_fourcc(
*"mp4v"
)
temp_video = os.path.join(
session_dir,
"temp.mp4"
)
writer = cv2.VideoWriter(
temp_video,
fourcc,
fps,
(
process_width,
process_height
)
)
while True:
ret, frame = cap.read()
if not ret:
break
frame = cv2.resize(
frame,
(
process_width,
process_height
)
)
cleaned = remove_watermark(
frame,
scaled_points
)
writer.write(
cleaned
)
cap.release()
writer.release()
subprocess.run([
"ffmpeg",
"-y",
"-i",
temp_video,
"-i",
input_path,
"-map",
"0:v",
"-map",
"1:a?",
"-c:v",
"copy",
"-c:a",
"aac",
output_path
])
mime_type = mimetypes.guess_type(
output_path
)[0]
if mime_type is None:
mime_type = "video/mp4"
return FileResponse(
output_path,
media_type=mime_type,
filename=output_filename,
background=BackgroundTask(
cleanup,
session_dir
)
)
# =========================================================
# MAIN
# =========================================================
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"app:app",
host="0.0.0.0",
port=7860,
reload=False
)