Spaces:
Sleeping
Sleeping
| import os | |
| import glob | |
| import json | |
| import time | |
| import traceback | |
| import streamlit as st | |
| from loguru import logger | |
| from app.config import config | |
| from app.models.schema import VideoClipParams | |
| from app.utils import utils, check_script | |
| from webui.tools.generate_script_docu import generate_script_docu | |
| from webui.tools.generate_script_short import generate_script_short | |
| from webui.tools.generate_short_summary import generate_script_short_sunmmary | |
| def render_script_panel(tr): | |
| """渲染脚本配置面板""" | |
| with st.container(border=True): | |
| st.write(tr("Video Script Configuration")) | |
| params = VideoClipParams() | |
| # 渲染脚本文件选择 | |
| render_script_file(tr, params) | |
| # 渲染视频文件选择 | |
| render_video_file(tr, params) | |
| # 获取当前选择的脚本类型 | |
| script_path = st.session_state.get('video_clip_json_path', '') | |
| # 根据脚本类型显示不同的布局 | |
| if script_path == "auto": | |
| # 画面解说 | |
| render_video_details(tr) | |
| elif script_path == "short": | |
| # 短剧混剪 | |
| render_short_generate_options(tr) | |
| elif script_path == "summary": | |
| # 短剧解说 | |
| short_drama_summary(tr) | |
| else: | |
| # 默认为空 | |
| pass | |
| # 渲染脚本操作按钮 | |
| render_script_buttons(tr, params) | |
| def render_script_file(tr, params): | |
| """渲染脚本文件选择""" | |
| # 定义功能模式 | |
| MODE_FILE = "file_selection" | |
| MODE_AUTO = "auto" | |
| MODE_SHORT = "short" | |
| MODE_SUMMARY = "summary" | |
| # 模式选项映射 | |
| mode_options = { | |
| tr("Select/Upload Script"): MODE_FILE, | |
| tr("Auto Generate"): MODE_AUTO, | |
| tr("Short Generate"): MODE_SHORT, | |
| tr("Short Drama Summary"): MODE_SUMMARY, | |
| } | |
| # 获取当前状态 | |
| current_path = st.session_state.get('video_clip_json_path', '') | |
| # 确定当前选中的模式索引 | |
| default_index = 0 | |
| mode_keys = list(mode_options.keys()) | |
| if current_path == "auto": | |
| default_index = mode_keys.index(tr("Auto Generate")) | |
| elif current_path == "short": | |
| default_index = mode_keys.index(tr("Short Generate")) | |
| elif current_path == "summary": | |
| default_index = mode_keys.index(tr("Short Drama Summary")) | |
| else: | |
| default_index = mode_keys.index(tr("Select/Upload Script")) | |
| # 1. 渲染功能选择下拉框 | |
| # 使用 segmented_control 替代 selectbox,提供更好的视觉体验 | |
| default_mode_label = mode_keys[default_index] | |
| # 定义回调函数来处理状态更新 | |
| def update_script_mode(): | |
| # 获取当前选中的标签 | |
| selected_label = st.session_state.script_mode_selection | |
| if selected_label: | |
| # 更新实际的 path 状态 | |
| new_mode = mode_options[selected_label] | |
| st.session_state.video_clip_json_path = new_mode | |
| params.video_clip_json_path = new_mode | |
| else: | |
| # 如果用户取消选择(segmented_control 允许取消),恢复到默认或上一个状态 | |
| # 这里我们强制保持当前状态,或者重置为默认 | |
| st.session_state.script_mode_selection = default_mode_label | |
| # 渲染组件 | |
| selected_mode_label = st.segmented_control( | |
| tr("Video Type"), | |
| options=mode_keys, | |
| default=default_mode_label, | |
| key="script_mode_selection", | |
| on_change=update_script_mode | |
| ) | |
| # 处理未选择的情况(虽然有default,但在某些交互下可能为空) | |
| if not selected_mode_label: | |
| selected_mode_label = default_mode_label | |
| selected_mode = mode_options[selected_mode_label] | |
| # 2. 根据选择的模式处理逻辑 | |
| if selected_mode == MODE_FILE: | |
| # --- 文件选择模式 --- | |
| script_list = [ | |
| (tr("None"), ""), | |
| (tr("Upload Script"), "upload_script") | |
| ] | |
| # 获取已有脚本文件 | |
| suffix = "*.json" | |
| script_dir = utils.script_dir() | |
| files = glob.glob(os.path.join(script_dir, suffix)) | |
| file_list = [] | |
| for file in files: | |
| file_list.append({ | |
| "name": os.path.basename(file), | |
| "file": file, | |
| "ctime": os.path.getctime(file) | |
| }) | |
| file_list.sort(key=lambda x: x["ctime"], reverse=True) | |
| for file in file_list: | |
| display_name = file['file'].replace(config.root_dir, "") | |
| script_list.append((display_name, file['file'])) | |
| # 找到保存的脚本文件在列表中的索引 | |
| # 如果当前path是特殊值(auto/short/summary),则重置为空 | |
| saved_script_path = current_path if current_path not in [MODE_AUTO, MODE_SHORT, MODE_SUMMARY] else "" | |
| selected_index = 0 | |
| for i, (_, path) in enumerate(script_list): | |
| if path == saved_script_path: | |
| selected_index = i | |
| break | |
| selected_script_index = st.selectbox( | |
| tr("Script Files"), | |
| index=selected_index, | |
| options=range(len(script_list)), | |
| format_func=lambda x: script_list[x][0], | |
| key="script_file_selection" | |
| ) | |
| script_path = script_list[selected_script_index][1] | |
| st.session_state['video_clip_json_path'] = script_path | |
| params.video_clip_json_path = script_path | |
| # 处理脚本上传 | |
| if script_path == "upload_script": | |
| uploaded_file = st.file_uploader( | |
| tr("Upload Script File"), | |
| type=["json"], | |
| accept_multiple_files=False, | |
| ) | |
| if uploaded_file is not None: | |
| try: | |
| # 读取上传的JSON内容并验证格式 | |
| script_content = uploaded_file.read().decode('utf-8') | |
| json_data = json.loads(script_content) | |
| # 保存到脚本目录 | |
| script_file_path = os.path.join(script_dir, uploaded_file.name) | |
| file_name, file_extension = os.path.splitext(uploaded_file.name) | |
| # 如果文件已存在,添加时间戳 | |
| if os.path.exists(script_file_path): | |
| timestamp = time.strftime("%Y%m%d%H%M%S") | |
| file_name_with_timestamp = f"{file_name}_{timestamp}" | |
| script_file_path = os.path.join(script_dir, file_name_with_timestamp + file_extension) | |
| # 写入文件 | |
| with open(script_file_path, "w", encoding='utf-8') as f: | |
| json.dump(json_data, f, ensure_ascii=False, indent=2) | |
| # 更新状态 | |
| st.success(tr("Script Uploaded Successfully")) | |
| st.session_state['video_clip_json_path'] = script_file_path | |
| params.video_clip_json_path = script_file_path | |
| time.sleep(1) | |
| st.rerun() | |
| except json.JSONDecodeError: | |
| st.error(tr("Invalid JSON format")) | |
| except Exception as e: | |
| st.error(f"{tr('Upload failed')}: {str(e)}") | |
| else: | |
| # --- 功能生成模式 --- | |
| st.session_state['video_clip_json_path'] = selected_mode | |
| params.video_clip_json_path = selected_mode | |
| def render_video_file(tr, params): | |
| """渲染视频文件选择""" | |
| video_list = [(tr("None"), ""), (tr("Upload Local Files"), "upload_local")] | |
| # 获取已有视频文件 | |
| for suffix in ["*.mp4", "*.mov", "*.avi", "*.mkv"]: | |
| video_files = glob.glob(os.path.join(utils.video_dir(), suffix)) | |
| for file in video_files: | |
| display_name = file.replace(config.root_dir, "") | |
| video_list.append((display_name, file)) | |
| selected_video_index = st.selectbox( | |
| tr("Video File"), | |
| index=0, | |
| options=range(len(video_list)), | |
| format_func=lambda x: video_list[x][0] | |
| ) | |
| video_path = video_list[selected_video_index][1] | |
| st.session_state['video_origin_path'] = video_path | |
| params.video_origin_path = video_path | |
| if video_path == "upload_local": | |
| uploaded_file = st.file_uploader( | |
| tr("Upload Local Files"), | |
| type=["mp4", "mov", "avi", "flv", "mkv"], | |
| accept_multiple_files=False, | |
| ) | |
| if uploaded_file is not None: | |
| video_file_path = os.path.join(utils.video_dir(), uploaded_file.name) | |
| file_name, file_extension = os.path.splitext(uploaded_file.name) | |
| if os.path.exists(video_file_path): | |
| timestamp = time.strftime("%Y%m%d%H%M%S") | |
| file_name_with_timestamp = f"{file_name}_{timestamp}" | |
| video_file_path = os.path.join(utils.video_dir(), file_name_with_timestamp + file_extension) | |
| with open(video_file_path, "wb") as f: | |
| f.write(uploaded_file.read()) | |
| st.success(tr("File Uploaded Successfully")) | |
| st.session_state['video_origin_path'] = video_file_path | |
| params.video_origin_path = video_file_path | |
| time.sleep(1) | |
| st.rerun() | |
| def render_short_generate_options(tr): | |
| """ | |
| 渲染Short Generate模式下的特殊选项 | |
| 在Short Generate模式下,替换原有的输入框为自定义片段选项 | |
| """ | |
| short_drama_summary(tr) | |
| # 显示自定义片段数量选择器 | |
| custom_clips = st.number_input( | |
| tr("自定义片段"), | |
| min_value=1, | |
| max_value=20, | |
| value=st.session_state.get('custom_clips', 5), | |
| help=tr("设置需要生成的短视频片段数量"), | |
| key="custom_clips_input" | |
| ) | |
| st.session_state['custom_clips'] = custom_clips | |
| def render_video_details(tr): | |
| """画面解说 渲染视频主题和提示词""" | |
| video_theme = st.text_input(tr("Video Theme")) | |
| custom_prompt = st.text_area( | |
| tr("Generation Prompt"), | |
| value=st.session_state.get('video_plot', ''), | |
| help=tr("Custom prompt for LLM, leave empty to use default prompt"), | |
| height=180 | |
| ) | |
| # 非短视频模式下显示原有的三个输入框 | |
| input_cols = st.columns(2) | |
| with input_cols[0]: | |
| st.number_input( | |
| tr("Frame Interval (seconds)"), | |
| min_value=0, | |
| value=st.session_state.get('frame_interval_input', config.frames.get('frame_interval_input', 3)), | |
| help=tr("Frame Interval (seconds) (More keyframes consume more tokens)"), | |
| key="frame_interval_input" | |
| ) | |
| with input_cols[1]: | |
| st.number_input( | |
| tr("Batch Size"), | |
| min_value=0, | |
| value=st.session_state.get('vision_batch_size', config.frames.get('vision_batch_size', 10)), | |
| help=tr("Batch Size (More keyframes consume more tokens)"), | |
| key="vision_batch_size" | |
| ) | |
| st.session_state['video_theme'] = video_theme | |
| st.session_state['custom_prompt'] = custom_prompt | |
| return video_theme, custom_prompt | |
| def short_drama_summary(tr): | |
| """短剧解说 渲染视频主题和提示词""" | |
| # 检查是否已经处理过字幕文件 | |
| if 'subtitle_file_processed' not in st.session_state: | |
| st.session_state['subtitle_file_processed'] = False | |
| subtitle_file = st.file_uploader( | |
| tr("上传字幕文件"), | |
| type=["srt"], | |
| accept_multiple_files=False, | |
| key="subtitle_file_uploader" # 添加唯一key | |
| ) | |
| # 显示当前已上传的字幕文件路径 | |
| if 'subtitle_path' in st.session_state and st.session_state['subtitle_path']: | |
| st.info(f"已上传字幕: {os.path.basename(st.session_state['subtitle_path'])}") | |
| if st.button(tr("清除已上传字幕")): | |
| st.session_state['subtitle_path'] = None | |
| st.session_state['subtitle_file_processed'] = False | |
| st.rerun() | |
| # 只有当有文件上传且尚未处理时才执行处理逻辑 | |
| if subtitle_file is not None and not st.session_state['subtitle_file_processed']: | |
| try: | |
| # 读取上传的SRT内容 | |
| script_content = subtitle_file.read().decode('utf-8') | |
| # 保存到字幕目录 | |
| script_file_path = os.path.join(utils.subtitle_dir(), subtitle_file.name) | |
| file_name, file_extension = os.path.splitext(subtitle_file.name) | |
| # 如果文件已存在,添加时间戳 | |
| if os.path.exists(script_file_path): | |
| timestamp = time.strftime("%Y%m%d%H%M%S") | |
| file_name_with_timestamp = f"{file_name}_{timestamp}" | |
| script_file_path = os.path.join(utils.subtitle_dir(), file_name_with_timestamp + file_extension) | |
| # 直接写入SRT内容,不进行JSON转换 | |
| with open(script_file_path, "w", encoding='utf-8') as f: | |
| f.write(script_content) | |
| # 更新状态 | |
| st.success(tr("字幕上传成功")) | |
| st.session_state['subtitle_path'] = script_file_path | |
| st.session_state['subtitle_file_processed'] = True # 标记已处理 | |
| # 避免使用rerun,使用更新状态的方式 | |
| # st.rerun() | |
| except Exception as e: | |
| st.error(f"{tr('Upload failed')}: {str(e)}") | |
| # 名称输入框 | |
| video_theme = st.text_input(tr("短剧名称")) | |
| st.session_state['video_theme'] = video_theme | |
| # 数字输入框 | |
| temperature = st.slider("temperature", 0.0, 2.0, 0.7) | |
| st.session_state['temperature'] = temperature | |
| return video_theme | |
| def render_script_buttons(tr, params): | |
| """渲染脚本操作按钮""" | |
| # 获取当前选择的脚本类型 | |
| script_path = st.session_state.get('video_clip_json_path', '') | |
| # 生成/加载按钮 | |
| if script_path == "auto": | |
| button_name = tr("Generate Video Script") | |
| elif script_path == "short": | |
| button_name = tr("Generate Short Video Script") | |
| elif script_path == "summary": | |
| button_name = tr("生成短剧解说脚本") | |
| elif script_path.endswith("json"): | |
| button_name = tr("Load Video Script") | |
| else: | |
| button_name = tr("Please Select Script File") | |
| if st.button(button_name, key="script_action", disabled=not script_path): | |
| if script_path == "auto": | |
| # 执行纪录片视频脚本生成(视频无字幕无配音) | |
| generate_script_docu(params) | |
| elif script_path == "short": | |
| # 执行 短剧混剪 脚本生成 | |
| custom_clips = st.session_state.get('custom_clips') | |
| generate_script_short(tr, params, custom_clips) | |
| elif script_path == "summary": | |
| # 执行 短剧解说 脚本生成 | |
| subtitle_path = st.session_state.get('subtitle_path') | |
| video_theme = st.session_state.get('video_theme') | |
| temperature = st.session_state.get('temperature') | |
| generate_script_short_sunmmary(params, subtitle_path, video_theme, temperature) | |
| else: | |
| load_script(tr, script_path) | |
| # 视频脚本编辑区 | |
| video_clip_json_details = st.text_area( | |
| tr("Video Script"), | |
| value=json.dumps(st.session_state.get('video_clip_json', []), indent=2, ensure_ascii=False), | |
| height=500 | |
| ) | |
| # 操作按钮行 - 合并格式检查和保存功能 | |
| if st.button(tr("Save Script"), key="save_script", use_container_width=True): | |
| save_script_with_validation(tr, video_clip_json_details) | |
| def load_script(tr, script_path): | |
| """加载脚本文件""" | |
| try: | |
| with open(script_path, 'r', encoding='utf-8') as f: | |
| script = f.read() | |
| script = utils.clean_model_output(script) | |
| st.session_state['video_clip_json'] = json.loads(script) | |
| st.success(tr("Script loaded successfully")) | |
| st.rerun() | |
| except Exception as e: | |
| logger.error(f"加载脚本文件时发生错误\n{traceback.format_exc()}") | |
| st.error(f"{tr('Failed to load script')}: {str(e)}") | |
| def save_script_with_validation(tr, video_clip_json_details): | |
| """保存视频脚本(包含格式验证)""" | |
| if not video_clip_json_details: | |
| st.error(tr("请输入视频脚本")) | |
| st.stop() | |
| # 第一步:格式验证 | |
| with st.spinner("正在验证脚本格式..."): | |
| try: | |
| result = check_script.check_format(video_clip_json_details) | |
| if not result.get('success'): | |
| # 格式验证失败,显示详细错误信息 | |
| error_message = result.get('message', '未知错误') | |
| error_details = result.get('details', '') | |
| st.error(f"**脚本格式验证失败**") | |
| st.error(f"**错误信息:** {error_message}") | |
| if error_details: | |
| st.error(f"**详细说明:** {error_details}") | |
| # 显示正确格式示例 | |
| st.info("**正确的脚本格式示例:**") | |
| example_script = [ | |
| { | |
| "_id": 1, | |
| "timestamp": "00:00:00,600-00:00:07,559", | |
| "picture": "工地上,蔡晓艳奋力救人,场面混乱", | |
| "narration": "灾后重建,工地上险象环生!泼辣女工蔡晓艳挺身而出,救人第一!", | |
| "OST": 0 | |
| }, | |
| { | |
| "_id": 2, | |
| "timestamp": "00:00:08,240-00:00:12,359", | |
| "picture": "领导视察,蔡晓艳不屑一顾", | |
| "narration": "播放原片4", | |
| "OST": 1 | |
| } | |
| ] | |
| st.code(json.dumps(example_script, ensure_ascii=False, indent=2), language='json') | |
| st.stop() | |
| except Exception as e: | |
| st.error(f"格式验证过程中发生错误: {str(e)}") | |
| st.stop() | |
| # 第二步:保存脚本 | |
| with st.spinner(tr("Save Script")): | |
| script_dir = utils.script_dir() | |
| timestamp = time.strftime("%Y-%m%d-%H%M%S") | |
| save_path = os.path.join(script_dir, f"{timestamp}.json") | |
| try: | |
| data = json.loads(video_clip_json_details) | |
| with open(save_path, 'w', encoding='utf-8') as file: | |
| json.dump(data, file, ensure_ascii=False, indent=4) | |
| st.session_state['video_clip_json'] = data | |
| st.session_state['video_clip_json_path'] = save_path | |
| # 更新配置 | |
| config.app["video_clip_json_path"] = save_path | |
| # 显示成功消息 | |
| st.success("✅ 脚本格式验证通过,保存成功!") | |
| # 强制重新加载页面更新选择框 | |
| time.sleep(0.5) # 给一点时间让用户看到成功消息 | |
| st.rerun() | |
| except Exception as err: | |
| st.error(f"{tr('Failed to save script')}: {str(err)}") | |
| st.stop() | |
| # crop_video函数已移除 - 现在使用统一裁剪策略,不再需要预裁剪步骤 | |
| def get_script_params(): | |
| """获取脚本参数""" | |
| return { | |
| 'video_language': st.session_state.get('video_language', ''), | |
| 'video_clip_json_path': st.session_state.get('video_clip_json_path', ''), | |
| 'video_origin_path': st.session_state.get('video_origin_path', ''), | |
| 'video_name': st.session_state.get('video_name', ''), | |
| 'video_plot': st.session_state.get('video_plot', '') | |
| } | |