|
|
import sys |
|
|
import os |
|
|
from pathlib import Path |
|
|
import subprocess |
|
|
import gradio as gr |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
project_root = Path(__file__).parent.parent |
|
|
sys.path.insert(0, str(project_root)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
from kling.core import VideoWatermarkRemover |
|
|
except ImportError as e: |
|
|
raise RuntimeError( |
|
|
f"导入 kling 模块失败:{str(e)}\n" |
|
|
"请检查:\n" |
|
|
"1. 子模块是否已通过 `git submodule update --init --recursive` 初始化?\n" |
|
|
"2. 项目根目录是否正确添加到 sys.path?(当前路径:{project_root})" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_video(input_video, progress=gr.Progress()): |
|
|
try: |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as f: |
|
|
f.write(input_video.read()) |
|
|
input_path = f.name |
|
|
progress(0.1, desc="输入视频已保存") |
|
|
|
|
|
|
|
|
subprocess.run(["ffmpeg", "-version"], check=True, capture_output=True) |
|
|
progress(0.2, desc="FFmpeg 可用") |
|
|
|
|
|
|
|
|
model_path = str(project_root / "kling" / "models" / "watermark_remover.pth") |
|
|
remover = VideoWatermarkRemover(device="cpu", model_path=model_path) |
|
|
progress(0.4, desc="模型加载完成") |
|
|
|
|
|
|
|
|
output_filename = f"processed_{Path(input_path).stem}.mp4" |
|
|
output_path = os.path.join(tempfile.gettempdir(), output_filename) |
|
|
remover.remove_watermark(input_path=input_path, output_path=output_path) |
|
|
progress(0.9, desc="去水印完成") |
|
|
|
|
|
|
|
|
download_link = f"/files/{output_filename}" |
|
|
return output_path, download_link |
|
|
|
|
|
except Exception as e: |
|
|
return f"❌ 处理失败:{str(e)}", None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(title="KLing 视频去水印工具") as demo: |
|
|
gr.Markdown("# 🚀 KLing 视频去水印工具") |
|
|
gr.Markdown("上传视频后自动处理并支持下载") |
|
|
|
|
|
with gr.Row(): |
|
|
input_video = gr.Video( |
|
|
label="输入视频", |
|
|
type="filepath", |
|
|
max_file_size=200 * 1024 * 1024 |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
output_video = gr.Video(label="输出视频", interactive=False) |
|
|
download_btn = gr.Button("⬇️ 下载结果", variant="secondary", interactive=False) |
|
|
|
|
|
submit_btn = gr.Button("开始处理") |
|
|
spinner = gr.Spinner(visible=False) |
|
|
|
|
|
|
|
|
def handle_submit(input_video): |
|
|
spinner.visible = True |
|
|
output_path, download_path = process_video(input_video) |
|
|
spinner.visible = False |
|
|
if download_path: |
|
|
output_video.value = output_path |
|
|
download_btn.interactive = True |
|
|
return output_video, download_btn |
|
|
else: |
|
|
output_video.value = None |
|
|
download_btn.interactive = False |
|
|
return output_video, download_btn |
|
|
|
|
|
submit_btn.click(handle_submit, inputs=input_video, outputs=[output_video, download_btn]) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
|
|
|
pass |