Spaces:
Sleeping
Sleeping
| import os | |
| import sys | |
| import subprocess | |
| import re | |
| import time | |
| import datetime | |
| import shutil | |
| import json | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| # ========================================== | |
| # 0. 自动安装依赖 | |
| # ========================================== | |
| def ensure_dependencies(): | |
| """自动检查并安装必要的Python库""" | |
| try: | |
| import gradio | |
| import requests | |
| except ImportError: | |
| print("正在自动安装所需依赖: gradio, requests...") | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "gradio", "requests"]) | |
| ensure_dependencies() | |
| import gradio as gr | |
| import requests | |
| # ========================================== | |
| # 1. 核心配置区域 | |
| # ========================================== | |
| # 🔥 您的 API KEYS | |
| LLM_API_KEY = "sk-Vhxjwm4XXu5fKrAtRNbRZGdPbocDZjG7B9UsSUjAdOQLyMUA" | |
| VIDEO_API_KEY = "sk-oRGvoxEa4K7MpzQUnDOzmNcImtti9OEiwCKicmfHf46vQEyY" | |
| # API 基础地址 | |
| MERCHANT_BASE_URL = "https://yunwu.ai" | |
| # 模型配置 | |
| VEO_MODEL = "veo3.1-fast-components" | |
| TEXT_MODEL = "gemini-3-pro-preview-thinking" | |
| VIDEO_SIZE = "16x9" | |
| VIDEO_DURATION_STR = "8" | |
| # 文件存储路径 | |
| LOG_FILE = "runtime_log.txt" | |
| PLAN_FILE = "culture_plan.txt" | |
| SCRIPT_RAW_FILE = "script_raw.txt" | |
| SCRIPT_FINAL_FILE = "script_final.txt" | |
| # 全局暂停标志 | |
| IS_PAUSED = False | |
| # ========================================== | |
| # 🔥 核心约束:三位一体强制注入 (V11.5 增强版) 🔥 | |
| # ========================================== | |
| # 注意:这部分逻辑将在 step4_render 的 construct_final_prompt 函数中被严格执行 | |
| # ========================================== | |
| # 2. 核心提示词 (System Prompts) | |
| # ========================================== | |
| # 1. 规划师 | |
| PROMPT_PLANNER = """ | |
| 你现在是 **[全球地域文化巡游总策划]**。 | |
| 用户将输入:【地域名称】 和 【需要的方向数量】。 | |
| **核心任务**: | |
| 你需要策划出指定数量的 **“宏大文化类目” (Broad Cultural Categories)**。 | |
| **选题严格标准**: | |
| 1. **必须是“集合名词”**:你提出的方向必须是一个“文件夹”,里面至少能包含 **10种完全不同的具体实体**。 | |
| * ✅ **正确**:[中华神话传说神兽] (包含龙、凤、麒麟等)。 | |
| * ✅ **正确**:[地标性古建筑群] (包含故宫、长城等)。 | |
| 2. **强地域代表性**:必须是该地域一眼即识的文化符号。 | |
| 3. **视觉冲击力**:主题必须适合制作成“巨型、震撼”的花车装置。 | |
| **输出格式**: | |
| 请直接输出一个列表,不要废话。 | |
| 格式: | |
| 1. [类目名称]: [简述该类目包含的范围] | |
| ... | |
| """ | |
| # 2. 视觉导演 | |
| PROMPT_DIRECTOR = """ | |
| 你现在是 **[新春大巡游视觉总监]**。 | |
| 你的任务是设计 **大街上的花车游行画面**。 | |
| **输入信息**: | |
| 1. 地域名称 (Target Region) | |
| 2. 2个文化类目 (Cultural Categories) | |
| **输出任务**: | |
| 对于 **每一个** 类目,请列举出该类目下 **10个最具代表性、互不相同的具体实体 (Distinct Entities)**,并将它们分别设计成花车。 | |
| **核心约束 (CRITICAL)**: | |
| 1. **严禁重复**: 同一个类目下的 10 个视频,必须展示 **10 个完全不同的东西**。 | |
| 2. **实体描述**: 重点描述花车主体的造型。 | |
| 3. **运镜**: 包含 "Slow smooth camera movement"。 | |
| **输出格式 (JSON List)**: | |
| 请直接输出一个包含 20 个对象的 JSON 列表 (每个类目 10 个),格式如下: | |
| [ | |
| {"id": "01", "theme": "类目A", "entity_name": "实体1", "desc": "中文简述", "prompt": "English prompt focusing on the float object..."}, | |
| ... | |
| ] | |
| """ | |
| # 3. 审查官 | |
| PROMPT_AUDITOR = """ | |
| 你现在是 **[提示词审查官]**。 | |
| 检查以下 Batch 是否包含重复物体。 | |
| 如有重复,请修改。按原 JSON 格式输出。 | |
| """ | |
| # ========================================== | |
| # 3. 辅助工具函数 | |
| # ========================================== | |
| def update_dashboard(step_name, detail, is_paused=False): | |
| status_icon = "⏸️ 暂停中" if is_paused else "🟢 运行中" | |
| if step_name == "IDLE": status_icon = "⚪ 待机" | |
| return f""" | |
| ### 📊 系统状态监控 | |
| * **当前状态**: {status_icon} | |
| * **执行步骤**: **{step_name}** | |
| * **当前详情**: {detail} | |
| """ | |
| def log_msg(logs, msg, level="INFO"): | |
| timestamp = datetime.datetime.now().strftime("%H:%M:%S") | |
| entry = f"[{timestamp}] [{level}] {msg}" | |
| logs.append(entry) | |
| # 增加容错:防止文件写入冲突导致崩溃 | |
| try: | |
| with open(LOG_FILE, "a", encoding="utf-8") as f: | |
| f.write(entry + "\n") | |
| except Exception as e: | |
| print(f"日志写入失败: {e}") | |
| return "\n".join(logs) | |
| def save_text(filename, text): | |
| try: | |
| with open(filename, "w", encoding="utf-8") as f: f.write(text) | |
| except Exception as e: | |
| print(f"文件保存失败 {filename}: {e}") | |
| def read_text(filename): | |
| if os.path.exists(filename): | |
| try: | |
| with open(filename, "r", encoding="utf-8") as f: return f.read() | |
| except: return "" | |
| return "" | |
| def toggle_pause(): | |
| global IS_PAUSED | |
| IS_PAUSED = not IS_PAUSED | |
| return update_dashboard("暂停切换", "用户操作", IS_PAUSED) | |
| # ========================================== | |
| # 4. 核心执行逻辑 | |
| # ========================================== | |
| # --- 步骤 1: 规划 --- | |
| def step1_planner(region_name, total_videos): | |
| logs = [] | |
| num_directions = max(1, int(total_videos // 10)) | |
| log_msg(logs, f"🧮 任务计算: {total_videos} 视频 = {num_directions} 个类目", "CALC") | |
| log_msg(logs, f"📝 正在规划【{region_name}】的 {num_directions} 个类目...", "PLAN") | |
| try: | |
| url = f"{MERCHANT_BASE_URL}/v1/chat/completions" | |
| headers = {"Authorization": f"Bearer {LLM_API_KEY}", "Content-Type": "application/json"} | |
| data = { | |
| "model": TEXT_MODEL, | |
| "messages": [ | |
| {"role": "system", "content": PROMPT_PLANNER}, | |
| {"role": "user", "content": f"地域: {region_name}, 需要宽泛类目数量: {num_directions}"} | |
| ], | |
| "temperature": 0.8 | |
| } | |
| res = requests.post(url, headers=headers, json=data, timeout=60) | |
| content = res.json()['choices'][0]['message']['content'] | |
| save_text(PLAN_FILE, content) | |
| log_msg(logs, "✅ 规划完成", "SUCCESS") | |
| return "\n".join(logs), content | |
| except Exception as e: | |
| log_msg(logs, f"❌ 规划失败: {e}", "ERROR") | |
| return "\n".join(logs), f"Error: {e}" | |
| # --- 步骤 2: 生成脚本 (并发) --- | |
| def step2_generator(plan_text, region_name): | |
| logs_raw = read_text(LOG_FILE); logs = logs_raw.split("\n") if logs_raw else [] | |
| directions = [line for line in plan_text.split('\n') if line.strip() and (line[0].isdigit() or '.' in line)] | |
| total_dirs = len(directions) | |
| if total_dirs == 0: | |
| return "\n".join(logs), "无有效规划", update_dashboard("导演", "错误") | |
| generated_json_objects = [] | |
| # 每2个类目一组 | |
| batch_size = 2 | |
| batches = [directions[i : i + batch_size] for i in range(0, total_dirs, batch_size)] | |
| total_batches = len(batches) | |
| log_msg(logs, f"🎬 启动并发生成: {total_dirs} 类目 -> {total_batches} 个并发任务...", "START") | |
| yield "\n".join(logs), "", update_dashboard("导演", f"并发启动: {total_batches} 线程") | |
| def process_batch(batch_dirs, batch_index): | |
| batch_str = "\n".join(batch_dirs) | |
| user_content = f""" | |
| **目标地域**: {region_name} | |
| **类目**: {batch_str} | |
| Task: Pick 10 DISTINCT entities for EACH category. NO DUPLICATES. Visuals match {region_name}. | |
| """ | |
| try: | |
| url = f"{MERCHANT_BASE_URL}/v1/chat/completions" | |
| headers = {"Authorization": f"Bearer {LLM_API_KEY}", "Content-Type": "application/json"} | |
| data = { | |
| "model": TEXT_MODEL, | |
| "messages": [{"role": "system", "content": PROMPT_DIRECTOR}, {"role": "user", "content": user_content}], | |
| "response_format": {"type": "json_object"}, | |
| "max_tokens": 8192 | |
| } | |
| res = requests.post(url, headers=headers, json=data, timeout=180) | |
| if res.status_code != 200: return {"ok": False, "error": f"HTTP {res.status_code}", "items": []} | |
| content = res.json()['choices'][0]['message']['content'] | |
| data_json = json.loads(content) | |
| # 兼容性处理:处理可能的列表或字典格式 | |
| items = data_json if isinstance(data_json, list) else list(data_json.values())[0] | |
| return {"ok": True, "items": items} | |
| except Exception as e: | |
| return {"ok": False, "error": str(e), "items": []} | |
| # 线程池 | |
| completed_batches = 0 | |
| with ThreadPoolExecutor(max_workers=50) as executor: | |
| future_to_batch = {executor.submit(process_batch, batch, idx): idx for idx, batch in enumerate(batches)} | |
| for future in as_completed(future_to_batch): | |
| # 全局暂停控制 | |
| if IS_PAUSED: | |
| while IS_PAUSED: time.sleep(1); yield "\n".join(logs), json.dumps(generated_json_objects, indent=2, ensure_ascii=False), update_dashboard("导演", "⏸️ 暂停中", True) | |
| idx = future_to_batch[future] | |
| result = future.result() | |
| if result["ok"]: | |
| generated_json_objects.extend(result["items"]) | |
| log_msg(logs, f"✅ 批次 {idx+1}/{total_batches} 完成 (+{len(result['items'])})", "BATCH_DONE") | |
| else: | |
| log_msg(logs, f"❌ 批次 {idx+1}/{total_batches} 失败: {result['error']}", "BATCH_FAIL") | |
| completed_batches += 1 | |
| current_text = json.dumps(generated_json_objects, indent=2, ensure_ascii=False) | |
| yield "\n".join(logs), current_text, update_dashboard("导演", f"生成进度 {completed_batches}/{total_batches}") | |
| full_text = json.dumps(generated_json_objects, indent=2, ensure_ascii=False) | |
| save_text(SCRIPT_RAW_FILE, full_text) | |
| log_msg(logs, f"🎉 脚本生成完成: {len(generated_json_objects)} 条", "SUCCESS") | |
| yield "\n".join(logs), full_text, update_dashboard("导演", "完成") | |
| # --- 步骤 3: 审查 (并发) --- | |
| def step3_auditor(raw_script_json): | |
| logs_raw = read_text(LOG_FILE); logs = logs_raw.split("\n") if logs_raw else [] | |
| if not raw_script_json: return "\n".join(logs), "无脚本", update_dashboard("审查", "失败") | |
| log_msg(logs, "⚖️ 开始并发审查...", "AUDIT") | |
| yield "\n".join(logs), "", update_dashboard("审查", "启动...") | |
| try: | |
| prompts_list = json.loads(raw_script_json) | |
| except: | |
| return "\n".join(logs), raw_script_json, update_dashboard("审查", "JSON错误") | |
| # 1. 切分 | |
| chunk_size = 20 | |
| batches = [prompts_list[i : i + chunk_size] for i in range(0, len(prompts_list), chunk_size)] | |
| total_batches = len(batches) | |
| # 2. 线程函数 | |
| def audit_batch(batch): | |
| batch_str = json.dumps(batch, ensure_ascii=False) | |
| try: | |
| url = f"{MERCHANT_BASE_URL}/v1/chat/completions" | |
| headers = {"Authorization": f"Bearer {LLM_API_KEY}", "Content-Type": "application/json"} | |
| data = { | |
| "model": TEXT_MODEL, | |
| "messages": [{"role": "system", "content": PROMPT_AUDITOR}, {"role": "user", "content": f"Review:\n{batch_str}"}], | |
| "response_format": {"type": "json_object"}, | |
| "max_tokens": 8192 | |
| } | |
| res = requests.post(url, headers=headers, json=data, timeout=180) | |
| if res.status_code != 200: return batch # 如果API失败,原样返回,不影响流程 | |
| content = res.json()['choices'][0]['message']['content'] | |
| res_json = json.loads(content) | |
| # 兼容处理 | |
| return res_json if isinstance(res_json, list) else list(res_json.values())[0] | |
| except: return batch # 异常也原样返回 | |
| # 3. 执行 | |
| results_buffer = [] | |
| completed = 0 | |
| with ThreadPoolExecutor(max_workers=20) as executor: | |
| futures = {executor.submit(audit_batch, batch): idx for idx, batch in enumerate(batches)} | |
| for future in as_completed(futures): | |
| if IS_PAUSED: | |
| while IS_PAUSED: time.sleep(1) | |
| res = future.result() | |
| results_buffer.extend(res) | |
| completed += 1 | |
| yield "\n".join(logs), f"Auditing batch {completed}/{total_batches}...", update_dashboard("审查", f"进度 {completed}/{total_batches}") | |
| audited_list = results_buffer | |
| final_text = json.dumps(audited_list, indent=2, ensure_ascii=False) | |
| save_text(SCRIPT_FINAL_FILE, final_text) | |
| log_msg(logs, "✅ 并发审查完成", "SUCCESS") | |
| yield "\n".join(logs), final_text, update_dashboard("审查", "完成") | |
| # --- 步骤 4: 渲染 (V11.5 终极强制注入版) --- | |
| def step4_render(script_json_text, region_name): | |
| logs_raw = read_text(LOG_FILE); logs = logs_raw.split("\n") if logs_raw else [] | |
| try: | |
| tasks_data = json.loads(script_json_text) | |
| except: | |
| return "\n".join(logs), None, update_dashboard("渲染", "JSON错误") | |
| tasks = [] | |
| for idx, item in enumerate(tasks_data): | |
| p_text = item.get("prompt") or item.get("Prompt") or "" | |
| shot_id = f"{idx+1:03d}" | |
| if p_text: tasks.append({"id": shot_id, "prompt": p_text}) | |
| total_tasks = len(tasks) | |
| log_msg(logs, f"🎥 准备渲染 {total_tasks} 个视频 (地域: {region_name})...", "RENDER_INIT") | |
| yield "\n".join(logs), None, update_dashboard("渲染", f"队列 {total_tasks}") | |
| safe_name = re.sub(r'[\\/*?:"<>|]', "", region_name) | |
| save_dir = os.path.join("Parade_Output", f"{safe_name}_{int(time.time())}") | |
| os.makedirs(save_dir, exist_ok=True) | |
| # 🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥 | |
| # 辅助函数:构造终极 Prompt (显式展开) | |
| # 包含:5层楼高 + 现代城市 + 实拍新闻感 | |
| # 🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥 | |
| def construct_final_prompt(raw_prompt, region): | |
| # 1. 基础清理 | |
| clean_prompt = re.sub(r'\s+', ' ', raw_prompt).strip() | |
| clean_prompt = clean_prompt.replace("A colossal, hyper-realistic parade float of", "") | |
| # 2. 第一层锁死:前缀 (5层楼 + 仰拍 + 原始素材) | |
| prefix = "Raw news footage. Low angle shot looking up at a towering, 5-story high colossal parade float moving down the street. " | |
| # 3. 第二层锁死:地域卫士 (现代市中心 + 摩天大楼 + 当地人种) | |
| # 强制使用传入的 region 变量 | |
| region_guard = f" The specific location is a modern downtown city street in {region}. The background features modern {region} skyscrapers and city landmarks blended with festive decorations. The cheering crowd are strictly authentic {region} local people with traditional {region} facial features, skin tone, and clothing. " | |
| # 4. 第三层锁死:气氛卫士 (跳舞) | |
| action_guard = f" Lively {region} dancers and performers in traditional costumes are dancing and celebrating happily on and in front of the float. " | |
| # 5. 第四层锁死:后缀 (实拍纪录片感 + 反CG) | |
| suffix = ", situated on a mechanical parade platform. Cinematic 8k lighting, highly detailed material texture, photorealistic, natural daylight. The footage looks exactly like a real life documentary recording. --no cgi, no 3d render, no animation, no cartoon, no sci-fi." | |
| return f"{prefix}{region_guard}{action_guard}The float features {clean_prompt}{suffix}" | |
| def call_video_api(task): | |
| url_submit = f"{MERCHANT_BASE_URL}/v1/videos" | |
| headers = {"Authorization": f"Bearer {VIDEO_API_KEY}"} | |
| # 调用构造函数生成最终提示词 | |
| final_forced_prompt = construct_final_prompt(task["prompt"], region_name) | |
| files_payload = { | |
| 'model': (None, VEO_MODEL), | |
| 'prompt': (None, final_forced_prompt), | |
| 'seconds': (None, str(VIDEO_DURATION_STR)), | |
| 'size': (None, VIDEO_SIZE) | |
| } | |
| filename = f"{region_name}_Float_{task['id']}.mp4" | |
| filepath = os.path.join(save_dir, filename) | |
| # 重试机制 (3次) | |
| for attempt in range(3): | |
| try: | |
| req = requests.post(url_submit, headers=headers, files=files_payload, timeout=60) | |
| if req.status_code != 200: time.sleep(2); continue | |
| task_id = req.json().get("id") | |
| if not task_id: continue | |
| start_time = time.time() | |
| while True: | |
| # 10分钟超时 | |
| if time.time() - start_time > 600: break | |
| status_resp = requests.get(f"{MERCHANT_BASE_URL}/v1/videos/{task_id}", headers=headers, timeout=30) | |
| if status_resp.status_code == 200: | |
| status = status_resp.json().get("status") | |
| if status == "completed": | |
| video_url = status_resp.json().get("video_url") | |
| if video_url: | |
| v_res = requests.get(video_url, stream=True, timeout=120) | |
| with open(filepath, "wb") as f: | |
| for chunk in v_res.iter_content(chunk_size=8192): f.write(chunk) | |
| return {"ok": True, "path": filepath} | |
| elif status == "failed": break | |
| time.sleep(3) | |
| else: time.sleep(3) | |
| except: time.sleep(3) | |
| return {"ok": False, "id": task["id"]} | |
| completed = 0 | |
| files = [] | |
| # 200 线程高并发 | |
| with ThreadPoolExecutor(max_workers=200) as executor: | |
| futures = {executor.submit(call_video_api, t): t for t in tasks} | |
| for f in as_completed(futures): | |
| if IS_PAUSED: | |
| while IS_PAUSED: time.sleep(1) | |
| res = f.result() | |
| completed += 1 | |
| if res["ok"]: | |
| files.append(res["path"]) | |
| log_msg(logs, f"✅ 视频 {os.path.basename(res['path'])} 完成", "DONE") | |
| else: | |
| log_msg(logs, f"❌ 任务 {res.get('id')} 失败", "FAIL") | |
| yield "\n".join(logs), None, update_dashboard("渲染", f"进度 {completed}/{total_tasks}") | |
| if files: | |
| shutil.make_archive(save_dir, 'zip', save_dir) | |
| final_zip = f"{save_dir}.zip" | |
| log_msg(logs, f"🎉 打包完成: {final_zip}", "SUCCESS") | |
| yield "\n".join(logs), final_zip, update_dashboard("完成", "渲染结束") | |
| else: | |
| yield "\n".join(logs), None, update_dashboard("完成", "无文件") | |
| # ========================================== | |
| # 5. UI 界面 | |
| # ========================================== | |
| with gr.Blocks(title="全球地域文化巡游生成器 (Auto-Parade V11.5 Ultimate)") as app: | |
| gr.Markdown("## 🎪 全球地域文化巡游 - 自动化视频生成器 V11.5 (Ultimate Edition)") | |
| gr.Markdown(f"🛡️ **Features**: `Full Concurrency` | `5-Story Scale` | `Modern City` | `Real Footage Style`") | |
| status_box = gr.Markdown(update_dashboard("就绪", "等待开始...")) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| region_input = gr.Textbox(label="🏙️ 地域/城市名称", placeholder="例如:北京、孟买、开罗、巴黎", value="2026北京新春") | |
| with gr.Column(scale=1): | |
| count_slider = gr.Slider(20, 500, value=40, step=10, label="生成视频总数") | |
| btn_pause = gr.Button("⏸️ 暂停 / ▶️ 继续") | |
| btn_pause.click(toggle_pause, outputs=status_box) | |
| with gr.Tabs(): | |
| with gr.Tab("🚀 全自动流水线"): | |
| btn_run_all = gr.Button("开始全自动生成 (规划 -> 脚本 -> 渲染)", variant="primary") | |
| run_log = gr.Textbox(label="系统运行日志", lines=15, autoscroll=True) | |
| zip_output = gr.File(label="最终视频包下载") | |
| with gr.Tab("🛠️ 分步调试"): | |
| with gr.Row(): | |
| btn_plan = gr.Button("1. 生成类目规划") | |
| btn_gen = gr.Button("2. 生成实体脚本") | |
| btn_audit = gr.Button("3. 审查脚本") | |
| btn_render = gr.Button("4. 仅渲染") | |
| with gr.Row(): | |
| plan_output = gr.Textbox(label="Cultural Categories Plan", lines=5) | |
| script_raw_output = gr.Textbox(label="Raw Script (JSON)", lines=5) | |
| script_final_output = gr.Textbox(label="Final Script (JSON)", lines=5) | |
| btn_run_all.click(step1_planner, [region_input, count_slider], [run_log, plan_output]).then( | |
| step2_generator, [plan_output, region_input], [run_log, script_raw_output, status_box]).then( | |
| step3_auditor, [script_raw_output], [run_log, script_final_output, status_box]).then( | |
| step4_render, [script_final_output, region_input], [run_log, zip_output, status_box]) | |
| btn_plan.click(step1_planner, [region_input, count_slider], [run_log, plan_output]) | |
| btn_gen.click(step2_generator, [plan_output, region_input], [run_log, script_raw_output, status_box]) | |
| btn_audit.click(step3_auditor, [script_raw_output], [run_log, script_final_output, status_box]) | |
| btn_render.click(step4_render, [script_final_output, region_input], [run_log, zip_output, status_box]) | |
| if __name__ == "__main__": | |
| app.queue().launch() |