import os import cv2 import json import shutil import mimetypes import tempfile import subprocess import numpy as np from fastapi import ( FastAPI, UploadFile, File, Form, HTTPException ) from fastapi.responses import ( HTMLResponse, FileResponse ) from starlette.background import BackgroundTask # ========================================================= # APP # ========================================================= app = FastAPI( title="Fast Watermark Remover" ) # ========================================================= # CONFIG # ========================================================= MAX_FILE_SIZE = 500 * 1024 * 1024 PREVIEW_WIDTH = 720 PREVIEW_HEIGHT = 1280 PROCESS_WIDTH = 720 ALLOWED_EXTENSIONS = [ ".mp4", ".mov", ".avi", ".mkv", ".webm", ".flv", ".wmv", ".mpeg", ".mpg", ".m4v", ".3gp" ] # ========================================================= # HELPERS # ========================================================= def cleanup(path): try: if os.path.exists(path): shutil.rmtree(path) except: pass def allowed_file(filename): ext = os.path.splitext( filename )[1].lower() return ext in ALLOWED_EXTENSIONS # ========================================================= # MASK # ========================================================= def create_mask( width, height, brush_points ): mask = np.zeros( (height, width), dtype=np.uint8 ) for point in brush_points: x = int(point["x"]) y = int(point["y"]) size = int(point["size"]) cv2.circle( mask, (x, y), size, 255, -1 ) mask = cv2.GaussianBlur( mask, (15, 15), 0 ) return mask def remove_watermark( frame, brush_points ): h, w = frame.shape[:2] mask = create_mask( w, h, brush_points ) result = cv2.inpaint( frame, mask, 5, cv2.INPAINT_TELEA ) return result # ========================================================= # UI # ========================================================= @app.get("/", response_class=HTMLResponse) async def home(): return """ Fast Watermark Remover
Fast Watermark Remover
Brush paint watermark area and remove it instantly
Original
Processed
Upload video and paint watermark

Processing video...

""" # ========================================================= # REMOVE API # ========================================================= @app.post("/remove/") async def remove_video( file: UploadFile = File(...), brush_points: str = Form(...) ): if not allowed_file( file.filename ): raise HTTPException( status_code=400, detail="Unsupported file" ) brush_points = json.loads( brush_points ) session_dir = tempfile.mkdtemp() original_ext = os.path.splitext( file.filename )[1] original_name = os.path.splitext( file.filename )[0] output_filename = ( original_name + "_cleaned" + original_ext ) input_path = os.path.join( session_dir, file.filename ) output_path = os.path.join( session_dir, output_filename ) size = 0 with open(input_path, "wb") as f: while True: chunk = await file.read( 1024 * 1024 ) if not chunk: break size += len(chunk) if size > MAX_FILE_SIZE: cleanup(session_dir) raise HTTPException( status_code=400, detail="File too large" ) f.write(chunk) cap = cv2.VideoCapture( input_path ) width = int( cap.get( cv2.CAP_PROP_FRAME_WIDTH ) ) height = int( cap.get( cv2.CAP_PROP_FRAME_HEIGHT ) ) fps = cap.get( cv2.CAP_PROP_FPS ) if fps <= 0: fps = 30 process_width = PROCESS_WIDTH scale_ratio = process_width / width process_height = int( height * scale_ratio ) scaled_points = [] for point in brush_points: scale_x = ( process_width / point["canvasWidth"] ) scale_y = ( process_height / point["canvasHeight"] ) scaled_points.append({ "x": int( point["x"] * scale_x ), "y": int( point["y"] * scale_y ), "size": int( point["size"] * scale_x ) }) fourcc = cv2.VideoWriter_fourcc( *"mp4v" ) temp_video = os.path.join( session_dir, "temp.mp4" ) writer = cv2.VideoWriter( temp_video, fourcc, fps, ( process_width, process_height ) ) while True: ret, frame = cap.read() if not ret: break frame = cv2.resize( frame, ( process_width, process_height ) ) cleaned = remove_watermark( frame, scaled_points ) writer.write( cleaned ) cap.release() writer.release() subprocess.run([ "ffmpeg", "-y", "-i", temp_video, "-i", input_path, "-map", "0:v", "-map", "1:a?", "-c:v", "copy", "-c:a", "aac", output_path ]) mime_type = mimetypes.guess_type( output_path )[0] if mime_type is None: mime_type = "video/mp4" return FileResponse( output_path, media_type=mime_type, filename=output_filename, background=BackgroundTask( cleanup, session_dir ) ) # ========================================================= # MAIN # ========================================================= if __name__ == "__main__": import uvicorn uvicorn.run( "app:app", host="0.0.0.0", port=7860, reload=False )