import sys import subprocess import os import shutil import time import json import requests import re import threading import glob import traceback from datetime import datetime # ========================================== # 0. 环境自动修复 # ========================================== def install_package(package, pip_name=None): pip_name = pip_name or package try: __import__(package) except ImportError: print(f"📦 正在自动安装缺失库: {pip_name} ...") subprocess.check_call([sys.executable, "-m", "pip", "install", pip_name]) install_package("gradio") install_package("requests") import gradio as gr # ================= 配置区域 ================= API_KEY = "5d8189fa5abf2d541fa69e4c56e94a49" # API 密钥 TASK_BASE_URL = "https://api.kie.ai/api/v1/jobs" UPLOAD_BASE_URL = "https://kieai.redpandaai.co/api/file-stream-upload" # 任务存储根目录 (建议挂载持久化存储) TASKS_ROOT_DIR = "motion_tasks_data" HEADERS = { "Authorization": f"Bearer {API_KEY}" } MODEL_NAME = "kling-2.6/motion-control" # ================= 核心工具函数 (Kling API 逻辑) ================= def ensure_video_resolution(video_path, logger_func): """ 使用 ffmpeg 检查并调整视频分辨率 (至少 720p)。 """ if not video_path: return None logger_func(f"🔍 正在检查视频分辨率: {os.path.basename(video_path)}") output_path = os.path.splitext(video_path)[0] + "_720p.mp4" # 简单的 ffmpeg 命令:短边至少 720 cmd = [ "ffmpeg", "-y", "-i", video_path, "-vf", "scale='if(gt(iw,ih),-2,720)':'if(gt(iw,ih),720,-2)'", "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-c:a", "copy", output_path ] try: # 尝试调用系统 ffmpeg subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) logger_func(f"✅ 视频已标准化为 720p: {os.path.basename(output_path)}") return output_path except Exception as e: logger_func(f"⚠️ 分辨率调整失败 (可能缺少ffmpeg),将使用原视频: {e}") return video_path def upload_file_to_kie(file_path, logger_func): """上传文件到 KIE 服务器""" if not file_path: return None file_name = os.path.basename(file_path) # 文件名清洗 file_name = "".join(x for x in file_name if x.isalnum() or x in "._- ") logger_func(f"⬆️ 正在上传: {file_name} ...") try: files = {'file': (file_name, open(file_path, 'rb'))} data = {'uploadPath': 'images/user-uploads', 'fileName': file_name} response = requests.post(UPLOAD_BASE_URL, headers=HEADERS, files=files, data=data, timeout=120) result = response.json() if result.get("success") and result.get("code") == 200: download_url = result["data"]["downloadUrl"] logger_func("✅ 上传成功") return download_url else: logger_func(f"❌ 上传失败: {result}") return None except Exception as e: logger_func(f"❌ 上传异常: {e}") return None def create_motion_task(image_url, video_url, prompt, logger_func): """创建动作迁移任务""" url = f"{TASK_BASE_URL}/createTask" task_headers = HEADERS.copy() task_headers["Content-Type"] = "application/json" payload = { "model": MODEL_NAME, "input": { "prompt": prompt, "input_urls": [image_url], "video_urls": [video_url], "character_orientation": "video", "mode": "720p" } } try: response = requests.post(url, headers=task_headers, json=payload, timeout=30) data = response.json() if data.get("code") == 200: tid = data["data"]["taskId"] logger_func(f"🚀 任务创建成功,TaskID: {tid}") return tid else: logger_func(f"❌ API 拒绝任务: {data}") return None except Exception as e: logger_func(f"❌ 请求异常: {e}") return None def wait_for_result(task_id, logger_func): """轮询任务结果""" url = f"{TASK_BASE_URL}/recordInfo?taskId={task_id}" start_time = time.time() timeout = 900 # 15分钟超时 logger_func("⏳ 开始轮询结果...") while True: if time.time() - start_time > timeout: return None, "Timeout" try: response = requests.get(url, headers=HEADERS, timeout=30) data = response.json() if data.get("code") != 200: time.sleep(5) continue state = data["data"]["state"] if state == "success": result_json = json.loads(data["data"]["resultJson"]) video_url = result_json["resultUrls"][0] return video_url, "Success" elif state == "fail": fail_msg = data["data"].get("failMsg", "Unknown error") return None, f"Fail: {fail_msg}" # logger_func(f"Generated State: {state}") # 可选:减少日志量 time.sleep(5) except Exception as e: logger_func(f"⚠️ 轮询网络波动: {e}") time.sleep(5) def download_video_to_local(url, save_path, logger_func): """将生成的视频下载到本地文件夹,实现持久化""" try: logger_func(f"⬇️ 正在下载结果到本地...") resp = requests.get(url, stream=True, timeout=60) if resp.status_code == 200: with open(save_path, 'wb') as f: for chunk in resp.iter_content(chunk_size=8192): f.write(chunk) return True return False except Exception as e: logger_func(f"❌ 下载保存失败: {e}") return False # ================= V9.1 异步任务管理器 (Task Isolation) ================= class TaskManager: def __init__(self, root_dir=TASKS_ROOT_DIR): self.root_dir = root_dir os.makedirs(self.root_dir, exist_ok=True) def create_task(self, task_name_prefix="motion"): timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') task_id = f"{task_name_prefix}_{timestamp}" task_dir = os.path.join(self.root_dir, task_id) os.makedirs(task_dir, exist_ok=True) # 结果子目录 os.makedirs(os.path.join(task_dir, "results"), exist_ok=True) with open(os.path.join(task_dir, "log.txt"), "w", encoding="utf-8") as f: f.write(f"[{datetime.now().strftime('%H:%M:%S')}] 任务创建成功: {task_id}\n") self.update_status(task_id, "running") return task_id, task_dir def log(self, task_id, message): timestamp = datetime.now().strftime('%H:%M:%S') log_line = f"[{timestamp}] {message}\n" log_path = os.path.join(self.root_dir, task_id, "log.txt") try: with open(log_path, "a", encoding="utf-8") as f: f.write(log_line) except: pass print(f"[{task_id}] {message}") def update_status(self, task_id, status): status_path = os.path.join(self.root_dir, task_id, "status.json") data = { "status": status, "last_update": time.time() } with open(status_path, "w", encoding="utf-8") as f: json.dump(data, f) def get_task_info(self, task_id): task_dir = os.path.join(self.root_dir, task_id) if not os.path.exists(task_dir): return None log_content = "" try: with open(os.path.join(task_dir, "log.txt"), "r", encoding="utf-8") as f: log_content = f.read() except: log_content = "日志读取中..." # 扫描结果文件夹中的所有视频 results_dir = os.path.join(task_dir, "results") result_files = sorted(glob.glob(os.path.join(results_dir, "*.mp4"))) return { "id": task_id, "log": log_content, "results": result_files } def list_tasks(self): if not os.path.exists(self.root_dir): return [] tasks = [d for d in os.listdir(self.root_dir) if os.path.isdir(os.path.join(self.root_dir, d))] # 按修改时间倒序 tasks.sort(key=lambda x: os.path.getmtime(os.path.join(self.root_dir, x)), reverse=True) return tasks def clear_all_tasks(self): try: shutil.rmtree(self.root_dir) os.makedirs(self.root_dir, exist_ok=True) return "✅ 已清空所有历史任务" except Exception as e: return f"❌ 清空失败: {e}" task_manager = TaskManager() # ================= 后台工作线程 (Worker) ================= def background_worker(task_id, images, video, prompt): task_dir = os.path.join(task_manager.root_dir, task_id) results_dir = os.path.join(task_dir, "results") def log_wrapper(msg): task_manager.log(task_id, msg) try: log_wrapper("🚀 后台进程启动。即使刷新页面,任务也会继续运行。") if not images or not video: log_wrapper("❌ 缺少图片或视频,任务终止。") task_manager.update_status(task_id, "failed") return # 1. 本地文件准备 (复制到任务目录,防止gradio临时文件消失) local_video_name = os.path.basename(video) local_video_path = os.path.join(task_dir, local_video_name) shutil.copy(video, local_video_path) local_images = [] for img in images: # 处理 Gradio 可能是文件对象或路径的情况 src_path = img.name if hasattr(img, 'name') else img dst_path = os.path.join(task_dir, os.path.basename(src_path)) shutil.copy(src_path, dst_path) local_images.append(dst_path) # 2. 视频预处理 log_wrapper("--- 步骤 1: 检查并修复视频分辨率 ---") processed_video = ensure_video_resolution(local_video_path, log_wrapper) # 3. 上传视频 log_wrapper("--- 步骤 2: 上传驱动视频 ---") video_public_url = upload_file_to_kie(processed_video, log_wrapper) if not video_public_url: log_wrapper("❌ 视频上传失败,流程终止。") task_manager.update_status(task_id, "failed") return # 4. 循环处理图片 total = len(local_images) success_count = 0 for i, img_path in enumerate(local_images): log_wrapper(f"\n🎥 [处理进度 {i+1}/{total}] 图片: {os.path.basename(img_path)}") # 上传图片 img_public_url = upload_file_to_kie(img_path, log_wrapper) if not img_public_url: continue # 创建任务 api_task_id = create_motion_task(img_public_url, video_public_url, prompt, log_wrapper) if api_task_id: # 等待结果 final_video_url, msg = wait_for_result(api_task_id, log_wrapper) if final_video_url: log_wrapper("✅ 生成成功,正在下载...") # 保存文件名: output_01_origName.mp4 save_name = f"output_{i+1:02d}_{os.path.basename(img_path).split('.')[0]}.mp4" save_full_path = os.path.join(results_dir, save_name) if download_video_to_local(final_video_url, save_full_path, log_wrapper): log_wrapper(f"💾 已保存到: {save_name}") success_count += 1 else: log_wrapper("⚠️ 下载失败,仅提供链接") else: log_wrapper(f"❌ 生成失败: {msg}") else: log_wrapper("❌ 任务创建失败") log_wrapper(f"\n🎉 任务结束。成功: {success_count}/{total}") task_manager.update_status(task_id, "completed") except Exception as e: log_wrapper(f"💥 致命错误: {traceback.format_exc()}") task_manager.update_status(task_id, "error") # ================= 交互逻辑 ================= def submit_new_task(images, video, prompt): if not images: return "❌ 请上传至少一张图片", None if not video: return "❌ 请上传参考视频", None # 任务名以第一张图片命名 first_img_name = os.path.basename(images[0].name if hasattr(images[0], 'name') else images[0]) task_name = f"motion_{first_img_name[:10]}" task_id, task_dir = task_manager.create_task(task_name) # 启动线程 t = threading.Thread(target=background_worker, args=(task_id, images, video, prompt)) t.start() return f"✅ 任务已后台启动!ID: {task_id}\n请切换到【任务监控】标签页查看进度。", task_id def refresh_task_list(): tasks = task_manager.list_tasks() return gr.Dropdown(choices=tasks, value=tasks[0] if tasks else None) def get_task_details(task_id): if not task_id: return "请选择任务", [] info = task_manager.get_task_info(task_id) if not info: return "任务不存在", [] # 返回: 日志内容, 结果文件列表(用于Gallery) return info['log'], info['results'] def handle_clear_storage(): msg = task_manager.clear_all_tasks() return msg, gr.Dropdown(choices=[], value=None) # ================= UI 构建 ================= with gr.Blocks(title="Kling Motion V9.1 Async") as demo: gr.Markdown("## 💃 Kling 动作迁移批量工厂 (V9.1 异步防断连版)") gr.Markdown("**特性**:支持多任务并发、刷新页面不中断、结果自动保存到硬盘、视频分辨率自动修复。") with gr.Tabs(): # --- Tab 1: 提交任务 --- with gr.Tab("🚀 提交新任务"): with gr.Row(): with gr.Column(): img_input = gr.File(label="1. 上传图片 (支持多选)", file_count="multiple", file_types=["image"]) video_input = gr.Video(label="2. 上传参考视频", format="mp4") prompt_input = gr.Textbox(label="3. 提示词", value="The character is performing the action from the video.") submit_btn = gr.Button("🔥 立即启动后台任务", variant="primary") with gr.Column(): submit_result = gr.Textbox(label="提交结果", lines=2) new_task_id_storage = gr.State() # --- Tab 2: 监控任务 --- with gr.Tab("📺 任务监控"): with gr.Row(): with gr.Column(scale=1): refresh_list_btn = gr.Button("🔄 刷新任务列表") task_dropdown = gr.Dropdown(label="选择历史任务", choices=[], interactive=True) gr.Markdown("### 📥 结果展示") # Gallery 直接展示本地文件路径,Gradio 会自动处理 serving result_gallery = gr.Gallery(label="生成结果 (本地保存)", columns=2, height="auto", allow_preview=True) gr.Markdown("---") clear_btn = gr.Button("🗑️ 清空所有任务", variant="stop") clear_msg = gr.Label("") with gr.Column(scale=2): log_monitor = gr.Textbox(label="运行日志 (自动刷新)", lines=25, max_lines=25) auto_timer = gr.Timer(2) # 2秒刷新一次 # --- 事件绑定 --- # 提交 submit_btn.click( submit_new_task, [img_input, video_input, prompt_input], [submit_result, new_task_id_storage] ) # 提交成功后,自动更新下拉框并选中新任务 def auto_select_new_task(new_id): all_tasks = task_manager.list_tasks() return gr.Dropdown(choices=all_tasks, value=new_id) submit_btn.click(auto_select_new_task, new_task_id_storage, task_dropdown) # 刷新列表 refresh_list_btn.click(refresh_task_list, outputs=task_dropdown) # 定时刷新日志和画廊 # 注意:outputs 对应 [日志框, 画廊组件] auto_timer.tick(get_task_details, inputs=[task_dropdown], outputs=[log_monitor, result_gallery]) task_dropdown.change(get_task_details, inputs=[task_dropdown], outputs=[log_monitor, result_gallery]) # 清空 clear_btn.click(handle_clear_storage, outputs=[clear_msg, task_dropdown]) # 初始化 demo.load(refresh_task_list, outputs=task_dropdown) if __name__ == "__main__": demo.queue().launch(server_name="0.0.0.0", inbrowser=True)