huache / app.py
194130157a's picture
Update app.py
166c6bb verified
import os
import sys
import subprocess
import re
import time
import datetime
import shutil
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
# ==========================================
# 0. 自动安装依赖
# ==========================================
def ensure_dependencies():
"""自动检查并安装必要的Python库"""
try:
import gradio
import requests
except ImportError:
print("正在自动安装所需依赖: gradio, requests...")
subprocess.check_call([sys.executable, "-m", "pip", "install", "gradio", "requests"])
ensure_dependencies()
import gradio as gr
import requests
# ==========================================
# 1. 核心配置区域
# ==========================================
# 🔥 您的 API KEYS
LLM_API_KEY = "sk-Vhxjwm4XXu5fKrAtRNbRZGdPbocDZjG7B9UsSUjAdOQLyMUA"
VIDEO_API_KEY = "sk-oRGvoxEa4K7MpzQUnDOzmNcImtti9OEiwCKicmfHf46vQEyY"
# API 基础地址
MERCHANT_BASE_URL = "https://yunwu.ai"
# 模型配置
VEO_MODEL = "veo3.1-fast-components"
TEXT_MODEL = "gemini-3-pro-preview-thinking"
VIDEO_SIZE = "16x9"
VIDEO_DURATION_STR = "8"
# 文件存储路径
LOG_FILE = "runtime_log.txt"
PLAN_FILE = "culture_plan.txt"
SCRIPT_RAW_FILE = "script_raw.txt"
SCRIPT_FINAL_FILE = "script_final.txt"
# 全局暂停标志
IS_PAUSED = False
# ==========================================
# 🔥 核心约束:三位一体强制注入 (V11.5 增强版) 🔥
# ==========================================
# 注意:这部分逻辑将在 step4_render 的 construct_final_prompt 函数中被严格执行
# ==========================================
# 2. 核心提示词 (System Prompts)
# ==========================================
# 1. 规划师
PROMPT_PLANNER = """
你现在是 **[全球地域文化巡游总策划]**。
用户将输入:【地域名称】 和 【需要的方向数量】。
**核心任务**:
你需要策划出指定数量的 **“宏大文化类目” (Broad Cultural Categories)**。
**选题严格标准**:
1. **必须是“集合名词”**:你提出的方向必须是一个“文件夹”,里面至少能包含 **10种完全不同的具体实体**。
* ✅ **正确**:[中华神话传说神兽] (包含龙、凤、麒麟等)。
* ✅ **正确**:[地标性古建筑群] (包含故宫、长城等)。
2. **强地域代表性**:必须是该地域一眼即识的文化符号。
3. **视觉冲击力**:主题必须适合制作成“巨型、震撼”的花车装置。
**输出格式**:
请直接输出一个列表,不要废话。
格式:
1. [类目名称]: [简述该类目包含的范围]
...
"""
# 2. 视觉导演
PROMPT_DIRECTOR = """
你现在是 **[新春大巡游视觉总监]**。
你的任务是设计 **大街上的花车游行画面**。
**输入信息**:
1. 地域名称 (Target Region)
2. 2个文化类目 (Cultural Categories)
**输出任务**:
对于 **每一个** 类目,请列举出该类目下 **10个最具代表性、互不相同的具体实体 (Distinct Entities)**,并将它们分别设计成花车。
**核心约束 (CRITICAL)**:
1. **严禁重复**: 同一个类目下的 10 个视频,必须展示 **10 个完全不同的东西**。
2. **实体描述**: 重点描述花车主体的造型。
3. **运镜**: 包含 "Slow smooth camera movement"。
**输出格式 (JSON List)**:
请直接输出一个包含 20 个对象的 JSON 列表 (每个类目 10 个),格式如下:
[
{"id": "01", "theme": "类目A", "entity_name": "实体1", "desc": "中文简述", "prompt": "English prompt focusing on the float object..."},
...
]
"""
# 3. 审查官
PROMPT_AUDITOR = """
你现在是 **[提示词审查官]**。
检查以下 Batch 是否包含重复物体。
如有重复,请修改。按原 JSON 格式输出。
"""
# ==========================================
# 3. 辅助工具函数
# ==========================================
def update_dashboard(step_name, detail, is_paused=False):
status_icon = "⏸️ 暂停中" if is_paused else "🟢 运行中"
if step_name == "IDLE": status_icon = "⚪ 待机"
return f"""
### 📊 系统状态监控
* **当前状态**: {status_icon}
* **执行步骤**: **{step_name}**
* **当前详情**: {detail}
"""
def log_msg(logs, msg, level="INFO"):
timestamp = datetime.datetime.now().strftime("%H:%M:%S")
entry = f"[{timestamp}] [{level}] {msg}"
logs.append(entry)
# 增加容错:防止文件写入冲突导致崩溃
try:
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(entry + "\n")
except Exception as e:
print(f"日志写入失败: {e}")
return "\n".join(logs)
def save_text(filename, text):
try:
with open(filename, "w", encoding="utf-8") as f: f.write(text)
except Exception as e:
print(f"文件保存失败 {filename}: {e}")
def read_text(filename):
if os.path.exists(filename):
try:
with open(filename, "r", encoding="utf-8") as f: return f.read()
except: return ""
return ""
def toggle_pause():
global IS_PAUSED
IS_PAUSED = not IS_PAUSED
return update_dashboard("暂停切换", "用户操作", IS_PAUSED)
# ==========================================
# 4. 核心执行逻辑
# ==========================================
# --- 步骤 1: 规划 ---
def step1_planner(region_name, total_videos):
logs = []
num_directions = max(1, int(total_videos // 10))
log_msg(logs, f"🧮 任务计算: {total_videos} 视频 = {num_directions} 个类目", "CALC")
log_msg(logs, f"📝 正在规划【{region_name}】的 {num_directions} 个类目...", "PLAN")
try:
url = f"{MERCHANT_BASE_URL}/v1/chat/completions"
headers = {"Authorization": f"Bearer {LLM_API_KEY}", "Content-Type": "application/json"}
data = {
"model": TEXT_MODEL,
"messages": [
{"role": "system", "content": PROMPT_PLANNER},
{"role": "user", "content": f"地域: {region_name}, 需要宽泛类目数量: {num_directions}"}
],
"temperature": 0.8
}
res = requests.post(url, headers=headers, json=data, timeout=60)
content = res.json()['choices'][0]['message']['content']
save_text(PLAN_FILE, content)
log_msg(logs, "✅ 规划完成", "SUCCESS")
return "\n".join(logs), content
except Exception as e:
log_msg(logs, f"❌ 规划失败: {e}", "ERROR")
return "\n".join(logs), f"Error: {e}"
# --- 步骤 2: 生成脚本 (并发) ---
def step2_generator(plan_text, region_name):
logs_raw = read_text(LOG_FILE); logs = logs_raw.split("\n") if logs_raw else []
directions = [line for line in plan_text.split('\n') if line.strip() and (line[0].isdigit() or '.' in line)]
total_dirs = len(directions)
if total_dirs == 0:
return "\n".join(logs), "无有效规划", update_dashboard("导演", "错误")
generated_json_objects = []
# 每2个类目一组
batch_size = 2
batches = [directions[i : i + batch_size] for i in range(0, total_dirs, batch_size)]
total_batches = len(batches)
log_msg(logs, f"🎬 启动并发生成: {total_dirs} 类目 -> {total_batches} 个并发任务...", "START")
yield "\n".join(logs), "", update_dashboard("导演", f"并发启动: {total_batches} 线程")
def process_batch(batch_dirs, batch_index):
batch_str = "\n".join(batch_dirs)
user_content = f"""
**目标地域**: {region_name}
**类目**: {batch_str}
Task: Pick 10 DISTINCT entities for EACH category. NO DUPLICATES. Visuals match {region_name}.
"""
try:
url = f"{MERCHANT_BASE_URL}/v1/chat/completions"
headers = {"Authorization": f"Bearer {LLM_API_KEY}", "Content-Type": "application/json"}
data = {
"model": TEXT_MODEL,
"messages": [{"role": "system", "content": PROMPT_DIRECTOR}, {"role": "user", "content": user_content}],
"response_format": {"type": "json_object"},
"max_tokens": 8192
}
res = requests.post(url, headers=headers, json=data, timeout=180)
if res.status_code != 200: return {"ok": False, "error": f"HTTP {res.status_code}", "items": []}
content = res.json()['choices'][0]['message']['content']
data_json = json.loads(content)
# 兼容性处理:处理可能的列表或字典格式
items = data_json if isinstance(data_json, list) else list(data_json.values())[0]
return {"ok": True, "items": items}
except Exception as e:
return {"ok": False, "error": str(e), "items": []}
# 线程池
completed_batches = 0
with ThreadPoolExecutor(max_workers=50) as executor:
future_to_batch = {executor.submit(process_batch, batch, idx): idx for idx, batch in enumerate(batches)}
for future in as_completed(future_to_batch):
# 全局暂停控制
if IS_PAUSED:
while IS_PAUSED: time.sleep(1); yield "\n".join(logs), json.dumps(generated_json_objects, indent=2, ensure_ascii=False), update_dashboard("导演", "⏸️ 暂停中", True)
idx = future_to_batch[future]
result = future.result()
if result["ok"]:
generated_json_objects.extend(result["items"])
log_msg(logs, f"✅ 批次 {idx+1}/{total_batches} 完成 (+{len(result['items'])})", "BATCH_DONE")
else:
log_msg(logs, f"❌ 批次 {idx+1}/{total_batches} 失败: {result['error']}", "BATCH_FAIL")
completed_batches += 1
current_text = json.dumps(generated_json_objects, indent=2, ensure_ascii=False)
yield "\n".join(logs), current_text, update_dashboard("导演", f"生成进度 {completed_batches}/{total_batches}")
full_text = json.dumps(generated_json_objects, indent=2, ensure_ascii=False)
save_text(SCRIPT_RAW_FILE, full_text)
log_msg(logs, f"🎉 脚本生成完成: {len(generated_json_objects)} 条", "SUCCESS")
yield "\n".join(logs), full_text, update_dashboard("导演", "完成")
# --- 步骤 3: 审查 (并发) ---
def step3_auditor(raw_script_json):
logs_raw = read_text(LOG_FILE); logs = logs_raw.split("\n") if logs_raw else []
if not raw_script_json: return "\n".join(logs), "无脚本", update_dashboard("审查", "失败")
log_msg(logs, "⚖️ 开始并发审查...", "AUDIT")
yield "\n".join(logs), "", update_dashboard("审查", "启动...")
try:
prompts_list = json.loads(raw_script_json)
except:
return "\n".join(logs), raw_script_json, update_dashboard("审查", "JSON错误")
# 1. 切分
chunk_size = 20
batches = [prompts_list[i : i + chunk_size] for i in range(0, len(prompts_list), chunk_size)]
total_batches = len(batches)
# 2. 线程函数
def audit_batch(batch):
batch_str = json.dumps(batch, ensure_ascii=False)
try:
url = f"{MERCHANT_BASE_URL}/v1/chat/completions"
headers = {"Authorization": f"Bearer {LLM_API_KEY}", "Content-Type": "application/json"}
data = {
"model": TEXT_MODEL,
"messages": [{"role": "system", "content": PROMPT_AUDITOR}, {"role": "user", "content": f"Review:\n{batch_str}"}],
"response_format": {"type": "json_object"},
"max_tokens": 8192
}
res = requests.post(url, headers=headers, json=data, timeout=180)
if res.status_code != 200: return batch # 如果API失败,原样返回,不影响流程
content = res.json()['choices'][0]['message']['content']
res_json = json.loads(content)
# 兼容处理
return res_json if isinstance(res_json, list) else list(res_json.values())[0]
except: return batch # 异常也原样返回
# 3. 执行
results_buffer = []
completed = 0
with ThreadPoolExecutor(max_workers=20) as executor:
futures = {executor.submit(audit_batch, batch): idx for idx, batch in enumerate(batches)}
for future in as_completed(futures):
if IS_PAUSED:
while IS_PAUSED: time.sleep(1)
res = future.result()
results_buffer.extend(res)
completed += 1
yield "\n".join(logs), f"Auditing batch {completed}/{total_batches}...", update_dashboard("审查", f"进度 {completed}/{total_batches}")
audited_list = results_buffer
final_text = json.dumps(audited_list, indent=2, ensure_ascii=False)
save_text(SCRIPT_FINAL_FILE, final_text)
log_msg(logs, "✅ 并发审查完成", "SUCCESS")
yield "\n".join(logs), final_text, update_dashboard("审查", "完成")
# --- 步骤 4: 渲染 (V11.5 终极强制注入版) ---
def step4_render(script_json_text, region_name):
logs_raw = read_text(LOG_FILE); logs = logs_raw.split("\n") if logs_raw else []
try:
tasks_data = json.loads(script_json_text)
except:
return "\n".join(logs), None, update_dashboard("渲染", "JSON错误")
tasks = []
for idx, item in enumerate(tasks_data):
p_text = item.get("prompt") or item.get("Prompt") or ""
shot_id = f"{idx+1:03d}"
if p_text: tasks.append({"id": shot_id, "prompt": p_text})
total_tasks = len(tasks)
log_msg(logs, f"🎥 准备渲染 {total_tasks} 个视频 (地域: {region_name})...", "RENDER_INIT")
yield "\n".join(logs), None, update_dashboard("渲染", f"队列 {total_tasks}")
safe_name = re.sub(r'[\\/*?:"<>|]', "", region_name)
save_dir = os.path.join("Parade_Output", f"{safe_name}_{int(time.time())}")
os.makedirs(save_dir, exist_ok=True)
# 🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥
# 辅助函数:构造终极 Prompt (显式展开)
# 包含:5层楼高 + 现代城市 + 实拍新闻感
# 🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥
def construct_final_prompt(raw_prompt, region):
# 1. 基础清理
clean_prompt = re.sub(r'\s+', ' ', raw_prompt).strip()
clean_prompt = clean_prompt.replace("A colossal, hyper-realistic parade float of", "")
# 2. 第一层锁死:前缀 (5层楼 + 仰拍 + 原始素材)
prefix = "Raw news footage. Low angle shot looking up at a towering, 5-story high colossal parade float moving down the street. "
# 3. 第二层锁死:地域卫士 (现代市中心 + 摩天大楼 + 当地人种)
# 强制使用传入的 region 变量
region_guard = f" The specific location is a modern downtown city street in {region}. The background features modern {region} skyscrapers and city landmarks blended with festive decorations. The cheering crowd are strictly authentic {region} local people with traditional {region} facial features, skin tone, and clothing. "
# 4. 第三层锁死:气氛卫士 (跳舞)
action_guard = f" Lively {region} dancers and performers in traditional costumes are dancing and celebrating happily on and in front of the float. "
# 5. 第四层锁死:后缀 (实拍纪录片感 + 反CG)
suffix = ", situated on a mechanical parade platform. Cinematic 8k lighting, highly detailed material texture, photorealistic, natural daylight. The footage looks exactly like a real life documentary recording. --no cgi, no 3d render, no animation, no cartoon, no sci-fi."
return f"{prefix}{region_guard}{action_guard}The float features {clean_prompt}{suffix}"
def call_video_api(task):
url_submit = f"{MERCHANT_BASE_URL}/v1/videos"
headers = {"Authorization": f"Bearer {VIDEO_API_KEY}"}
# 调用构造函数生成最终提示词
final_forced_prompt = construct_final_prompt(task["prompt"], region_name)
files_payload = {
'model': (None, VEO_MODEL),
'prompt': (None, final_forced_prompt),
'seconds': (None, str(VIDEO_DURATION_STR)),
'size': (None, VIDEO_SIZE)
}
filename = f"{region_name}_Float_{task['id']}.mp4"
filepath = os.path.join(save_dir, filename)
# 重试机制 (3次)
for attempt in range(3):
try:
req = requests.post(url_submit, headers=headers, files=files_payload, timeout=60)
if req.status_code != 200: time.sleep(2); continue
task_id = req.json().get("id")
if not task_id: continue
start_time = time.time()
while True:
# 10分钟超时
if time.time() - start_time > 600: break
status_resp = requests.get(f"{MERCHANT_BASE_URL}/v1/videos/{task_id}", headers=headers, timeout=30)
if status_resp.status_code == 200:
status = status_resp.json().get("status")
if status == "completed":
video_url = status_resp.json().get("video_url")
if video_url:
v_res = requests.get(video_url, stream=True, timeout=120)
with open(filepath, "wb") as f:
for chunk in v_res.iter_content(chunk_size=8192): f.write(chunk)
return {"ok": True, "path": filepath}
elif status == "failed": break
time.sleep(3)
else: time.sleep(3)
except: time.sleep(3)
return {"ok": False, "id": task["id"]}
completed = 0
files = []
# 200 线程高并发
with ThreadPoolExecutor(max_workers=200) as executor:
futures = {executor.submit(call_video_api, t): t for t in tasks}
for f in as_completed(futures):
if IS_PAUSED:
while IS_PAUSED: time.sleep(1)
res = f.result()
completed += 1
if res["ok"]:
files.append(res["path"])
log_msg(logs, f"✅ 视频 {os.path.basename(res['path'])} 完成", "DONE")
else:
log_msg(logs, f"❌ 任务 {res.get('id')} 失败", "FAIL")
yield "\n".join(logs), None, update_dashboard("渲染", f"进度 {completed}/{total_tasks}")
if files:
shutil.make_archive(save_dir, 'zip', save_dir)
final_zip = f"{save_dir}.zip"
log_msg(logs, f"🎉 打包完成: {final_zip}", "SUCCESS")
yield "\n".join(logs), final_zip, update_dashboard("完成", "渲染结束")
else:
yield "\n".join(logs), None, update_dashboard("完成", "无文件")
# ==========================================
# 5. UI 界面
# ==========================================
with gr.Blocks(title="全球地域文化巡游生成器 (Auto-Parade V11.5 Ultimate)") as app:
gr.Markdown("## 🎪 全球地域文化巡游 - 自动化视频生成器 V11.5 (Ultimate Edition)")
gr.Markdown(f"🛡️ **Features**: `Full Concurrency` | `5-Story Scale` | `Modern City` | `Real Footage Style`")
status_box = gr.Markdown(update_dashboard("就绪", "等待开始..."))
with gr.Row():
with gr.Column(scale=2):
region_input = gr.Textbox(label="🏙️ 地域/城市名称", placeholder="例如:北京、孟买、开罗、巴黎", value="2026北京新春")
with gr.Column(scale=1):
count_slider = gr.Slider(20, 500, value=40, step=10, label="生成视频总数")
btn_pause = gr.Button("⏸️ 暂停 / ▶️ 继续")
btn_pause.click(toggle_pause, outputs=status_box)
with gr.Tabs():
with gr.Tab("🚀 全自动流水线"):
btn_run_all = gr.Button("开始全自动生成 (规划 -> 脚本 -> 渲染)", variant="primary")
run_log = gr.Textbox(label="系统运行日志", lines=15, autoscroll=True)
zip_output = gr.File(label="最终视频包下载")
with gr.Tab("🛠️ 分步调试"):
with gr.Row():
btn_plan = gr.Button("1. 生成类目规划")
btn_gen = gr.Button("2. 生成实体脚本")
btn_audit = gr.Button("3. 审查脚本")
btn_render = gr.Button("4. 仅渲染")
with gr.Row():
plan_output = gr.Textbox(label="Cultural Categories Plan", lines=5)
script_raw_output = gr.Textbox(label="Raw Script (JSON)", lines=5)
script_final_output = gr.Textbox(label="Final Script (JSON)", lines=5)
btn_run_all.click(step1_planner, [region_input, count_slider], [run_log, plan_output]).then(
step2_generator, [plan_output, region_input], [run_log, script_raw_output, status_box]).then(
step3_auditor, [script_raw_output], [run_log, script_final_output, status_box]).then(
step4_render, [script_final_output, region_input], [run_log, zip_output, status_box])
btn_plan.click(step1_planner, [region_input, count_slider], [run_log, plan_output])
btn_gen.click(step2_generator, [plan_output, region_input], [run_log, script_raw_output, status_box])
btn_audit.click(step3_auditor, [script_raw_output], [run_log, script_final_output, status_box])
btn_render.click(step4_render, [script_final_output, region_input], [run_log, zip_output, status_box])
if __name__ == "__main__":
app.queue().launch()