NeuralFalcon's picture
Update app.py
9f2e634 verified
gpu=False
import easyocr
reader = easyocr.Reader(['ch_sim','en'],gpu=gpu) # this needs to run only once to load the model into memory
import cv2
import os
import cv2
import numpy as np
import shutil
from concurrent.futures import ThreadPoolExecutor
import re
import subprocess
def extract_frames(video_path, output_folder):
if os.path.exists(output_folder):
shutil.rmtree(output_folder)
os.makedirs(output_folder, exist_ok=True)
cap = cv2.VideoCapture(video_path)
frame_count = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break # Stop when video ends
frame_path = os.path.join(output_folder, f"{frame_count:06d}.png")
cv2.imwrite(frame_path, frame)
frame_count += 1
cap.release()
print(f"Extracted {frame_count} frames to {output_folder}")
# Initialize text reader
def remove_watermark(image, blur_type="strong_gaussian"):
results = reader.readtext(image) # Detect text regions
for (bbox, text, prob) in results:
top_left = tuple(map(int, bbox[0]))
bottom_right = tuple(map(int, bbox[2]))
x1, y1 = top_left
x2, y2 = bottom_right
roi = image[y1:y2, x1:x2]
if blur_type == "strong_gaussian":
blurred_roi = cv2.GaussianBlur(roi, (25, 25), 50)
elif blur_type == "pixelation":
h, w = roi.shape[:2]
temp = cv2.resize(roi, (8, 8), interpolation=cv2.INTER_LINEAR)
blurred_roi = cv2.resize(temp, (w, h), interpolation=cv2.INTER_NEAREST)
elif blur_type == "median":
blurred_roi = cv2.medianBlur(roi, 21)
elif blur_type == "motion":
size = 25
kernel = np.zeros((size, size))
kernel[:, size//2] = 1
kernel = kernel / kernel.sum()
blurred_roi = cv2.filter2D(roi, -1, kernel)
elif blur_type == "bilateral":
blurred_roi = cv2.bilateralFilter(roi, d=15, sigmaColor=75, sigmaSpace=75)
elif blur_type == "box":
blurred_roi = cv2.blur(roi, (25, 25))
elif blur_type == "stacked":
temp = cv2.GaussianBlur(roi, (15, 15), 25)
blurred_roi = cv2.medianBlur(temp, 15)
elif blur_type == "adaptive":
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
_, mask = cv2.threshold(gray, 220, 255, cv2.THRESH_BINARY)
blurred = cv2.GaussianBlur(roi, (25, 25), 25)
blurred_roi = np.where(mask[..., None] > 0, blurred, roi)
else:
blurred_roi = cv2.GaussianBlur(roi, (25, 25), 50)
image[y1:y2, x1:x2] = blurred_roi
return image
# return cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
def process_frame(frame_path, save_path, blur_type):
image = cv2.imread(frame_path)
if image is None:
print(f"Failed to load: {frame_path}") # Debugging step
return
no_watermark_image = remove_watermark(image, blur_type=blur_type)
output_file = os.path.join(save_path, os.path.basename(frame_path))
success = cv2.imwrite(output_file, no_watermark_image)
if not success:
print(f"Failed to save: {output_file}") # Debugging step
def batch_process(blur_type="median", batch_size=100):
input_folder = "./frames"
output_folder = "./clean"
if os.path.exists(output_folder):
shutil.rmtree(output_folder)
os.makedirs(output_folder, exist_ok=True)
frame_paths = [os.path.join(input_folder, f) for f in os.listdir(input_folder) if f.endswith((".jpg", ".png"))]
with ThreadPoolExecutor() as executor:
executor.map(process_frame, frame_paths, [output_folder] * len(frame_paths), [blur_type] * len(frame_paths))
print(f"Processing complete! {len(frame_paths)} frames saved to {output_folder}")
def get_video_fps(video_path):
"""Extract FPS from the original video."""
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return None
fps = cap.get(cv2.CAP_PROP_FPS)
cap.release()
return fps
def sorted_files(directory):
"""Returns a list of sorted .png files based on numeric order."""
files = [f for f in os.listdir(directory) if f.endswith(".png")]
files.sort(key=lambda f: int(re.search(r'\d+', f).group()) if re.search(r'\d+', f) else float('inf'))
return [os.path.join(directory, f) for f in files]
def create_video_chunks(frame_dir, output_dir, fps, batch_size=100):
"""Creates chunked videos from frames in batches."""
# Remove old "chunks" folder if exists
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
os.makedirs(output_dir, exist_ok=True)
sorted_images = sorted_files(frame_dir)
total_chunks = (len(sorted_images) // batch_size) + (1 if len(sorted_images) % batch_size else 0)
for i in range(total_chunks):
chunk_frames = sorted_images[i * batch_size:(i + 1) * batch_size]
if not chunk_frames:
continue
chunk_folder = os.path.join(output_dir, f"chunk_{i+1}")
os.makedirs(chunk_folder, exist_ok=True)
# Copy frames to a temp folder
for j, frame in enumerate(chunk_frames):
frame_dest = os.path.join(chunk_folder, f"{j:05d}.png") # Zero-padded filenames
shutil.copy(frame, frame_dest)
# Generate video from frames
chunk_output = os.path.join(output_dir, f"{i+1}.mp4")
ffmpeg_cmd = f'ffmpeg -y -framerate {fps} -i "{chunk_folder}/%05d.png" -c:v libx264 -pix_fmt yuv420p "{chunk_output}"'
subprocess.run(ffmpeg_cmd, shell=True, check=True)
# Cleanup temp chunk folder
shutil.rmtree(chunk_folder)
print(f"✅ All {total_chunks} video chunks created in {output_dir}")
def vido_chunks(video_path):
# Extract original FPS
fps = get_video_fps(video_path)
if fps is None:
raise ValueError("Failed to retrieve FPS from video.")
# Define folders
frame_dir = "./clean"
output_dir = "./chunks"
# Process frames into video chunks
create_video_chunks(frame_dir, output_dir, fps, batch_size=100)
import os
import re
import uuid
def sanitize_file(file_path):
folder = os.path.dirname(file_path)
text, ext = os.path.splitext(os.path.basename(file_path))
# Keep alphabets, spaces, and underscores only
text = re.sub(r'[^a-zA-Z_ ]', '', text)
text = text.lower().strip()
text = text.replace(" ", "_")
# Truncate or handle empty text
truncated_text = text[:20] if len(text) > 20 else text if len(text) > 0 else "empty"
# Generate a random string for uniqueness
random_string = uuid.uuid4().hex[:8].upper()
# Construct the new file name
# file_name = f"{folder}/{truncated_text}_{random_string}{ext}"
file_name = f"{truncated_text}_{random_string}{ext}"
return file_name
def upload_file(video_path):
if os.path.exists("./upload"):
shutil.rmtree("./upload")
os.makedirs("./upload",exist_ok=True)
new_path=sanitize_file(video_path)
new_path=f"./upload/{new_path}"
shutil.copy(video_path,new_path)
return new_path
import os
import re
import subprocess
def sorted_video_files(directory):
"""Returns a list of full paths of .mp4 files sorted by the numeric part of the filename."""
files = [f for f in os.listdir(directory) if f.endswith(".mp4")]
# Extract the numeric part using regex and sort
files.sort(key=lambda f: int(re.search(r'\d+', f).group()) if re.search(r'\d+', f) else float('inf'))
# Convert filenames to full paths
full_paths = [os.path.join(directory, f) for f in files]
return full_paths
def marge_video(gpu=True):
os.makedirs("./result/",exist_ok=True)
output_path=f"./result/no_water_mark.mp4"
video_list=sorted_video_files("./chunks")
with open("./join.txt", "w") as f:
for video in video_list:
f.write(f"file '{video}'\n")
if gpu:
join_command = f'ffmpeg -hwaccel cuda -f concat -safe 0 -i ./join.txt -c copy "{output_path}" -y'
else:
join_command = f'ffmpeg -f concat -safe 0 -i ./join.txt -c copy "{output_path}" -y'
subprocess.run(join_command, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return output_path
def recover_audio(upload_path):
output_path=f"./result/no_water_mark.mp4"
audio_path="./upload/temp.wav"
os.makedirs("./result/",exist_ok=True)
base_name=os.path.basename(upload_path)
save_path=f"./result/{base_name.replace('.mp4','_no_watermark.mp4')}"
# save_path=upload_path.replace(".mp4","_no_watermark.mp4")
var=os.system(f"ffmpeg -i {upload_path} -q:a 0 -map a {audio_path} -y")
if var==0:
var2=os.system(f"ffmpeg -i {output_path} -i {audio_path} -c:v copy -map 0:v:0 -map 1:a:0 -shortest {save_path} -y")
if var2==0:
return save_path
return None
def video_watermark_remover(video_path, blur_type="median"):
global gpu
upload_path=upload_file(video_path)
extract_frames(upload_path, "./frames")
batch_process(blur_type=blur_type)
vido_chunks(upload_path)
marge_video(gpu=gpu)
save_path=recover_audio(upload_path)
return save_path
import gradio as gr
import click
def gradio_interface(video_file, blur_type):
vid_path=video_watermark_remover(video_file, blur_type=blur_type)
return vid_path,vid_path
blur_types = ["strong_gaussian", "median"]
demo = gr.Interface(
fn=gradio_interface,
inputs=[
gr.Video(label="Upload Video"),
gr.Dropdown(choices=blur_types, label="Blur Type", value="strong_gaussian") # Default to median
],
outputs=[gr.File(label="Download Video"),gr.Video(label="Play Video")],
title="Remove Watermark Text from Video",
description="Upload a video, and this tool will blur texts",
examples=[
["./examples/chinese.mp4"],
["./examples/english.mp4"]
]
)
# demo.launch()
@click.command()
@click.option("--debug", is_flag=True, default=False, help="Enable debug mode.")
@click.option("--share", is_flag=True, default=False, help="Enable sharing of the interface.")
def main(debug, share):
demo.queue().launch(debug=debug, share=share)
if __name__ == "__main__":
main()