import sys import os from pathlib import Path import subprocess import gradio as gr # -------------------------- # 关键修复:添加项目根目录到 Python 路径 # -------------------------- # 获取当前文件(app.py)的父目录(Hugging Face Space 工作目录为 /app) project_root = Path(__file__).parent.parent # 若 app.py 在 /app 目录下,需向上两级 sys.path.insert(0, str(project_root)) # 确保优先搜索项目根目录 # -------------------------- # 导入 kling 模块(修复后) # -------------------------- try: from kling.core import VideoWatermarkRemover except ImportError as e: raise RuntimeError( f"导入 kling 模块失败:{str(e)}\n" "请检查:\n" "1. 子模块是否已通过 `git submodule update --init --recursive` 初始化?\n" "2. 项目根目录是否正确添加到 sys.path?(当前路径:{project_root})" ) # -------------------------- # 视频处理函数 # -------------------------- def process_video(input_video, progress=gr.Progress()): try: # 保存输入视频到临时文件 with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as f: f.write(input_video.read()) input_path = f.name progress(0.1, desc="输入视频已保存") # 检查 FFmpeg subprocess.run(["ffmpeg", "-version"], check=True, capture_output=True) progress(0.2, desc="FFmpeg 可用") # 初始化模型 model_path = str(project_root / "kling" / "models" / "watermark_remover.pth") remover = VideoWatermarkRemover(device="cpu", model_path=model_path) progress(0.4, desc="模型加载完成") # 执行去水印 output_filename = f"processed_{Path(input_path).stem}.mp4" output_path = os.path.join(tempfile.gettempdir(), output_filename) remover.remove_watermark(input_path=input_path, output_path=output_path) progress(0.9, desc="去水印完成") # 生成下载链接 download_link = f"/files/{output_filename}" return output_path, download_link except Exception as e: return f"❌ 处理失败:{str(e)}", None # -------------------------- # Gradio 界面配置 # -------------------------- with gr.Blocks(title="KLing 视频去水印工具") as demo: gr.Markdown("# 🚀 KLing 视频去水印工具") gr.Markdown("上传视频后自动处理并支持下载") with gr.Row(): input_video = gr.Video( label="输入视频", type="filepath", max_file_size=200 * 1024 * 1024 # 200MB 限制 ) with gr.Row(): output_video = gr.Video(label="输出视频", interactive=False) download_btn = gr.Button("⬇️ 下载结果", variant="secondary", interactive=False) submit_btn = gr.Button("开始处理") spinner = gr.Spinner(visible=False) # 绑定处理逻辑 def handle_submit(input_video): spinner.visible = True output_path, download_path = process_video(input_video) spinner.visible = False if download_path: output_video.value = output_path download_btn.interactive = True return output_video, download_btn else: output_video.value = None download_btn.interactive = False return output_video, download_btn submit_btn.click(handle_submit, inputs=input_video, outputs=[output_video, download_btn]) if __name__ == "__main__": # 本地调试 # demo.launch(debug=True, server_port=7860) pass