Spaces:
Sleeping
Sleeping
| import sys | |
| import subprocess | |
| import os | |
| import shutil | |
| import time | |
| import json | |
| import requests | |
| import re | |
| import threading | |
| import glob | |
| import traceback | |
| from datetime import datetime | |
| # ========================================== | |
| # 0. 环境自动修复 | |
| # ========================================== | |
| def install_package(package, pip_name=None): | |
| pip_name = pip_name or package | |
| try: | |
| __import__(package) | |
| except ImportError: | |
| print(f"📦 正在自动安装缺失库: {pip_name} ...") | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", pip_name]) | |
| install_package("gradio") | |
| install_package("requests") | |
| import gradio as gr | |
| # ================= 配置区域 ================= | |
| API_KEY = "5d8189fa5abf2d541fa69e4c56e94a49" # API 密钥 | |
| TASK_BASE_URL = "https://api.kie.ai/api/v1/jobs" | |
| UPLOAD_BASE_URL = "https://kieai.redpandaai.co/api/file-stream-upload" | |
| # 任务存储根目录 (建议挂载持久化存储) | |
| TASKS_ROOT_DIR = "motion_tasks_data" | |
| HEADERS = { | |
| "Authorization": f"Bearer {API_KEY}" | |
| } | |
| MODEL_NAME = "kling-2.6/motion-control" | |
| # ================= 核心工具函数 (Kling API 逻辑) ================= | |
| def ensure_video_resolution(video_path, logger_func): | |
| """ | |
| 使用 ffmpeg 检查并调整视频分辨率 (至少 720p)。 | |
| """ | |
| if not video_path: return None | |
| logger_func(f"🔍 正在检查视频分辨率: {os.path.basename(video_path)}") | |
| output_path = os.path.splitext(video_path)[0] + "_720p.mp4" | |
| # 简单的 ffmpeg 命令:短边至少 720 | |
| cmd = [ | |
| "ffmpeg", "-y", "-i", video_path, | |
| "-vf", "scale='if(gt(iw,ih),-2,720)':'if(gt(iw,ih),720,-2)'", | |
| "-c:v", "libx264", "-preset", "fast", "-crf", "23", | |
| "-c:a", "copy", | |
| output_path | |
| ] | |
| try: | |
| # 尝试调用系统 ffmpeg | |
| subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| logger_func(f"✅ 视频已标准化为 720p: {os.path.basename(output_path)}") | |
| return output_path | |
| except Exception as e: | |
| logger_func(f"⚠️ 分辨率调整失败 (可能缺少ffmpeg),将使用原视频: {e}") | |
| return video_path | |
| def upload_file_to_kie(file_path, logger_func): | |
| """上传文件到 KIE 服务器""" | |
| if not file_path: return None | |
| file_name = os.path.basename(file_path) | |
| # 文件名清洗 | |
| file_name = "".join(x for x in file_name if x.isalnum() or x in "._- ") | |
| logger_func(f"⬆️ 正在上传: {file_name} ...") | |
| try: | |
| files = {'file': (file_name, open(file_path, 'rb'))} | |
| data = {'uploadPath': 'images/user-uploads', 'fileName': file_name} | |
| response = requests.post(UPLOAD_BASE_URL, headers=HEADERS, files=files, data=data, timeout=120) | |
| result = response.json() | |
| if result.get("success") and result.get("code") == 200: | |
| download_url = result["data"]["downloadUrl"] | |
| logger_func("✅ 上传成功") | |
| return download_url | |
| else: | |
| logger_func(f"❌ 上传失败: {result}") | |
| return None | |
| except Exception as e: | |
| logger_func(f"❌ 上传异常: {e}") | |
| return None | |
| def create_motion_task(image_url, video_url, prompt, logger_func): | |
| """创建动作迁移任务""" | |
| url = f"{TASK_BASE_URL}/createTask" | |
| task_headers = HEADERS.copy() | |
| task_headers["Content-Type"] = "application/json" | |
| payload = { | |
| "model": MODEL_NAME, | |
| "input": { | |
| "prompt": prompt, | |
| "input_urls": [image_url], | |
| "video_urls": [video_url], | |
| "character_orientation": "video", | |
| "mode": "720p" | |
| } | |
| } | |
| try: | |
| response = requests.post(url, headers=task_headers, json=payload, timeout=30) | |
| data = response.json() | |
| if data.get("code") == 200: | |
| tid = data["data"]["taskId"] | |
| logger_func(f"🚀 任务创建成功,TaskID: {tid}") | |
| return tid | |
| else: | |
| logger_func(f"❌ API 拒绝任务: {data}") | |
| return None | |
| except Exception as e: | |
| logger_func(f"❌ 请求异常: {e}") | |
| return None | |
| def wait_for_result(task_id, logger_func): | |
| """轮询任务结果""" | |
| url = f"{TASK_BASE_URL}/recordInfo?taskId={task_id}" | |
| start_time = time.time() | |
| timeout = 900 # 15分钟超时 | |
| logger_func("⏳ 开始轮询结果...") | |
| while True: | |
| if time.time() - start_time > timeout: | |
| return None, "Timeout" | |
| try: | |
| response = requests.get(url, headers=HEADERS, timeout=30) | |
| data = response.json() | |
| if data.get("code") != 200: | |
| time.sleep(5) | |
| continue | |
| state = data["data"]["state"] | |
| if state == "success": | |
| result_json = json.loads(data["data"]["resultJson"]) | |
| video_url = result_json["resultUrls"][0] | |
| return video_url, "Success" | |
| elif state == "fail": | |
| fail_msg = data["data"].get("failMsg", "Unknown error") | |
| return None, f"Fail: {fail_msg}" | |
| # logger_func(f"Generated State: {state}") # 可选:减少日志量 | |
| time.sleep(5) | |
| except Exception as e: | |
| logger_func(f"⚠️ 轮询网络波动: {e}") | |
| time.sleep(5) | |
| def download_video_to_local(url, save_path, logger_func): | |
| """将生成的视频下载到本地文件夹,实现持久化""" | |
| try: | |
| logger_func(f"⬇️ 正在下载结果到本地...") | |
| resp = requests.get(url, stream=True, timeout=60) | |
| if resp.status_code == 200: | |
| with open(save_path, 'wb') as f: | |
| for chunk in resp.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| return True | |
| return False | |
| except Exception as e: | |
| logger_func(f"❌ 下载保存失败: {e}") | |
| return False | |
| # ================= V9.1 异步任务管理器 (Task Isolation) ================= | |
| class TaskManager: | |
| def __init__(self, root_dir=TASKS_ROOT_DIR): | |
| self.root_dir = root_dir | |
| os.makedirs(self.root_dir, exist_ok=True) | |
| def create_task(self, task_name_prefix="motion"): | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| task_id = f"{task_name_prefix}_{timestamp}" | |
| task_dir = os.path.join(self.root_dir, task_id) | |
| os.makedirs(task_dir, exist_ok=True) | |
| # 结果子目录 | |
| os.makedirs(os.path.join(task_dir, "results"), exist_ok=True) | |
| with open(os.path.join(task_dir, "log.txt"), "w", encoding="utf-8") as f: | |
| f.write(f"[{datetime.now().strftime('%H:%M:%S')}] 任务创建成功: {task_id}\n") | |
| self.update_status(task_id, "running") | |
| return task_id, task_dir | |
| def log(self, task_id, message): | |
| timestamp = datetime.now().strftime('%H:%M:%S') | |
| log_line = f"[{timestamp}] {message}\n" | |
| log_path = os.path.join(self.root_dir, task_id, "log.txt") | |
| try: | |
| with open(log_path, "a", encoding="utf-8") as f: | |
| f.write(log_line) | |
| except: pass | |
| print(f"[{task_id}] {message}") | |
| def update_status(self, task_id, status): | |
| status_path = os.path.join(self.root_dir, task_id, "status.json") | |
| data = { "status": status, "last_update": time.time() } | |
| with open(status_path, "w", encoding="utf-8") as f: | |
| json.dump(data, f) | |
| def get_task_info(self, task_id): | |
| task_dir = os.path.join(self.root_dir, task_id) | |
| if not os.path.exists(task_dir): return None | |
| log_content = "" | |
| try: | |
| with open(os.path.join(task_dir, "log.txt"), "r", encoding="utf-8") as f: | |
| log_content = f.read() | |
| except: log_content = "日志读取中..." | |
| # 扫描结果文件夹中的所有视频 | |
| results_dir = os.path.join(task_dir, "results") | |
| result_files = sorted(glob.glob(os.path.join(results_dir, "*.mp4"))) | |
| return { | |
| "id": task_id, | |
| "log": log_content, | |
| "results": result_files | |
| } | |
| def list_tasks(self): | |
| if not os.path.exists(self.root_dir): return [] | |
| tasks = [d for d in os.listdir(self.root_dir) if os.path.isdir(os.path.join(self.root_dir, d))] | |
| # 按修改时间倒序 | |
| tasks.sort(key=lambda x: os.path.getmtime(os.path.join(self.root_dir, x)), reverse=True) | |
| return tasks | |
| def clear_all_tasks(self): | |
| try: | |
| shutil.rmtree(self.root_dir) | |
| os.makedirs(self.root_dir, exist_ok=True) | |
| return "✅ 已清空所有历史任务" | |
| except Exception as e: | |
| return f"❌ 清空失败: {e}" | |
| task_manager = TaskManager() | |
| # ================= 后台工作线程 (Worker) ================= | |
| def background_worker(task_id, images, video, prompt): | |
| task_dir = os.path.join(task_manager.root_dir, task_id) | |
| results_dir = os.path.join(task_dir, "results") | |
| def log_wrapper(msg): | |
| task_manager.log(task_id, msg) | |
| try: | |
| log_wrapper("🚀 后台进程启动。即使刷新页面,任务也会继续运行。") | |
| if not images or not video: | |
| log_wrapper("❌ 缺少图片或视频,任务终止。") | |
| task_manager.update_status(task_id, "failed") | |
| return | |
| # 1. 本地文件准备 (复制到任务目录,防止gradio临时文件消失) | |
| local_video_name = os.path.basename(video) | |
| local_video_path = os.path.join(task_dir, local_video_name) | |
| shutil.copy(video, local_video_path) | |
| local_images = [] | |
| for img in images: | |
| # 处理 Gradio 可能是文件对象或路径的情况 | |
| src_path = img.name if hasattr(img, 'name') else img | |
| dst_path = os.path.join(task_dir, os.path.basename(src_path)) | |
| shutil.copy(src_path, dst_path) | |
| local_images.append(dst_path) | |
| # 2. 视频预处理 | |
| log_wrapper("--- 步骤 1: 检查并修复视频分辨率 ---") | |
| processed_video = ensure_video_resolution(local_video_path, log_wrapper) | |
| # 3. 上传视频 | |
| log_wrapper("--- 步骤 2: 上传驱动视频 ---") | |
| video_public_url = upload_file_to_kie(processed_video, log_wrapper) | |
| if not video_public_url: | |
| log_wrapper("❌ 视频上传失败,流程终止。") | |
| task_manager.update_status(task_id, "failed") | |
| return | |
| # 4. 循环处理图片 | |
| total = len(local_images) | |
| success_count = 0 | |
| for i, img_path in enumerate(local_images): | |
| log_wrapper(f"\n🎥 [处理进度 {i+1}/{total}] 图片: {os.path.basename(img_path)}") | |
| # 上传图片 | |
| img_public_url = upload_file_to_kie(img_path, log_wrapper) | |
| if not img_public_url: | |
| continue | |
| # 创建任务 | |
| api_task_id = create_motion_task(img_public_url, video_public_url, prompt, log_wrapper) | |
| if api_task_id: | |
| # 等待结果 | |
| final_video_url, msg = wait_for_result(api_task_id, log_wrapper) | |
| if final_video_url: | |
| log_wrapper("✅ 生成成功,正在下载...") | |
| # 保存文件名: output_01_origName.mp4 | |
| save_name = f"output_{i+1:02d}_{os.path.basename(img_path).split('.')[0]}.mp4" | |
| save_full_path = os.path.join(results_dir, save_name) | |
| if download_video_to_local(final_video_url, save_full_path, log_wrapper): | |
| log_wrapper(f"💾 已保存到: {save_name}") | |
| success_count += 1 | |
| else: | |
| log_wrapper("⚠️ 下载失败,仅提供链接") | |
| else: | |
| log_wrapper(f"❌ 生成失败: {msg}") | |
| else: | |
| log_wrapper("❌ 任务创建失败") | |
| log_wrapper(f"\n🎉 任务结束。成功: {success_count}/{total}") | |
| task_manager.update_status(task_id, "completed") | |
| except Exception as e: | |
| log_wrapper(f"💥 致命错误: {traceback.format_exc()}") | |
| task_manager.update_status(task_id, "error") | |
| # ================= 交互逻辑 ================= | |
| def submit_new_task(images, video, prompt): | |
| if not images: return "❌ 请上传至少一张图片", None | |
| if not video: return "❌ 请上传参考视频", None | |
| # 任务名以第一张图片命名 | |
| first_img_name = os.path.basename(images[0].name if hasattr(images[0], 'name') else images[0]) | |
| task_name = f"motion_{first_img_name[:10]}" | |
| task_id, task_dir = task_manager.create_task(task_name) | |
| # 启动线程 | |
| t = threading.Thread(target=background_worker, args=(task_id, images, video, prompt)) | |
| t.start() | |
| return f"✅ 任务已后台启动!ID: {task_id}\n请切换到【任务监控】标签页查看进度。", task_id | |
| def refresh_task_list(): | |
| tasks = task_manager.list_tasks() | |
| return gr.Dropdown(choices=tasks, value=tasks[0] if tasks else None) | |
| def get_task_details(task_id): | |
| if not task_id: return "请选择任务", [] | |
| info = task_manager.get_task_info(task_id) | |
| if not info: return "任务不存在", [] | |
| # 返回: 日志内容, 结果文件列表(用于Gallery) | |
| return info['log'], info['results'] | |
| def handle_clear_storage(): | |
| msg = task_manager.clear_all_tasks() | |
| return msg, gr.Dropdown(choices=[], value=None) | |
| # ================= UI 构建 ================= | |
| with gr.Blocks(title="Kling Motion V9.1 Async") as demo: | |
| gr.Markdown("## 💃 Kling 动作迁移批量工厂 (V9.1 异步防断连版)") | |
| gr.Markdown("**特性**:支持多任务并发、刷新页面不中断、结果自动保存到硬盘、视频分辨率自动修复。") | |
| with gr.Tabs(): | |
| # --- Tab 1: 提交任务 --- | |
| with gr.Tab("🚀 提交新任务"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| img_input = gr.File(label="1. 上传图片 (支持多选)", file_count="multiple", file_types=["image"]) | |
| video_input = gr.Video(label="2. 上传参考视频", format="mp4") | |
| prompt_input = gr.Textbox(label="3. 提示词", value="The character is performing the action from the video.") | |
| submit_btn = gr.Button("🔥 立即启动后台任务", variant="primary") | |
| with gr.Column(): | |
| submit_result = gr.Textbox(label="提交结果", lines=2) | |
| new_task_id_storage = gr.State() | |
| # --- Tab 2: 监控任务 --- | |
| with gr.Tab("📺 任务监控"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| refresh_list_btn = gr.Button("🔄 刷新任务列表") | |
| task_dropdown = gr.Dropdown(label="选择历史任务", choices=[], interactive=True) | |
| gr.Markdown("### 📥 结果展示") | |
| # Gallery 直接展示本地文件路径,Gradio 会自动处理 serving | |
| result_gallery = gr.Gallery(label="生成结果 (本地保存)", columns=2, height="auto", allow_preview=True) | |
| gr.Markdown("---") | |
| clear_btn = gr.Button("🗑️ 清空所有任务", variant="stop") | |
| clear_msg = gr.Label("") | |
| with gr.Column(scale=2): | |
| log_monitor = gr.Textbox(label="运行日志 (自动刷新)", lines=25, max_lines=25) | |
| auto_timer = gr.Timer(2) # 2秒刷新一次 | |
| # --- 事件绑定 --- | |
| # 提交 | |
| submit_btn.click( | |
| submit_new_task, | |
| [img_input, video_input, prompt_input], | |
| [submit_result, new_task_id_storage] | |
| ) | |
| # 提交成功后,自动更新下拉框并选中新任务 | |
| def auto_select_new_task(new_id): | |
| all_tasks = task_manager.list_tasks() | |
| return gr.Dropdown(choices=all_tasks, value=new_id) | |
| submit_btn.click(auto_select_new_task, new_task_id_storage, task_dropdown) | |
| # 刷新列表 | |
| refresh_list_btn.click(refresh_task_list, outputs=task_dropdown) | |
| # 定时刷新日志和画廊 | |
| # 注意:outputs 对应 [日志框, 画廊组件] | |
| auto_timer.tick(get_task_details, inputs=[task_dropdown], outputs=[log_monitor, result_gallery]) | |
| task_dropdown.change(get_task_details, inputs=[task_dropdown], outputs=[log_monitor, result_gallery]) | |
| # 清空 | |
| clear_btn.click(handle_clear_storage, outputs=[clear_msg, task_dropdown]) | |
| # 初始化 | |
| demo.load(refresh_task_list, outputs=task_dropdown) | |
| if __name__ == "__main__": | |
| demo.queue().launch(server_name="0.0.0.0", inbrowser=True) |