import re from rich.traceback import install install(show_locals=False) import gradio as gr import yaml import functools import time import sys import os import copy sys.path.append(os.path.dirname(os.path.abspath(__file__))) sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from config import RENDER_SAVE_LOAD_BTN, RENDER_STOP_BTN from core.backend import call_write, call_accept, match_quote_text from core.frontend_copy import enable_copy_js, on_copy from core.frontend_setting import new_setting, render_setting from core.frontend_utils import ( title, info, create_progress_md, create_text_md, messages2chatbot, init_writer, has_accept, is_running, try_cancel, writer_y_is_empty, writer_x_is_empty, cancellable, process_writer_to_backend, process_writer_from_backend, init_chapters_w, init_draft_w ) from core.writer_utils import KeyPointMsg from prompts.baseprompt import clean_txt_content, load_prompt # 读取YAML文件 with open('prompts/idea-examples.yaml', 'r', encoding='utf-8') as file: examples_data = yaml.safe_load(file) # 准备示例列表 examples = [[example['idea']] for example in examples_data['examples']] with gr.Blocks(head=enable_copy_js) as demo: gr.HTML(title) with gr.Accordion("使用指南"): gr.Markdown(info) writer_state = gr.State(init_writer('', check_empty=False)) setting_state = gr.State(new_setting()) if RENDER_SAVE_LOAD_BTN: with gr.Row(): save_button = gr.Button("保存状态") load_button = gr.Button("加载状态") save_file_name = gr.Textbox(value='states', placeholder='输入文件名', lines=1, label=None, show_label=False, container=False) def save_states(save_file_name, writer, setting): import json json_file_name = save_file_name + '.json' with open(json_file_name, 'w', encoding='utf-8') as f: json.dump({ 'writer': writer, 'setting': setting }, f, ensure_ascii=False, indent=2) gr.Info(f"状态已保存到文件:{json_file_name}") def load_states(save_file_name): import json json_file_name = save_file_name + '.json' try: with open(json_file_name, 'r', encoding='utf-8') as f: states = json.load(f) gr.Info(f"状态文件已加载:{json_file_name}") states['setting']['render_time'] = time.time() # 为了确保setting被渲染,选择模型是不会赋值setting_state的 # 需要保证setting界面持有的对象和setting_state是同一个 return states['writer'], states['setting'] except FileNotFoundError: raise gr.Error(f"未找到保存的状态文件:{json_file_name}") idea_textbox = gr.Textbox(placeholder='用一段话描述你要写的小说,或者从下方示例中选择一个创意...', lines=2, scale=1, label=None, show_label=False, container=False, max_length=1000) gr.Examples( label='示例', examples=examples, inputs=[idea_textbox], ) with gr.Row(): outline_btn = gr.Button("创作大纲", scale=1, min_width=1, interactive = True, variant='primary') chapters_btn = gr.Button("创作剧情", scale=1, min_width=1, interactive = False, variant='secondary') draft_btn = gr.Button("创作正文", scale=1, min_width=1, interactive = False, variant='secondary') auto_checkbox = gr.Checkbox(label='一键生成', scale=1, value=False, visible=False) # TODO: V1.10版本 “自动”尚不完善,暂不显示 progress_md = create_progress_md(writer_state.value) text_md = create_text_md(writer_state.value) @gr.render(inputs=writer_state) def create_prompt_preview(writer): prompt_outputs = writer['prompt_outputs'] if 'prompt_outputs' in writer else [] with gr.Accordion("Prompt预览", open=bool(prompt_outputs)): pause_on_prompt_finished_checkbox = gr.Checkbox(label='允许在LLM响应完成后,预览Prompt', scale=1, value=writer['pause_on_prompt_finished_flag']) for i, prompt_output in enumerate(prompt_outputs, 1): with gr.Tab(f"Prompt {i}"): gr.Chatbot(messages2chatbot(prompt_output['response_msgs']), type='messages') if not prompt_outputs: gr.Markdown('当前没有可预览的Prompt。') continue_btn = gr.Button('继续', visible=bool(prompt_outputs), variant='primary') def on_pause_on_prompt_finished(value): if value: gr.Info("在LLM响应完成后,将可以预览Prompt") writer['pause_on_prompt_finished_flag'] = value pause_on_prompt_finished_checkbox.change(on_pause_on_prompt_finished, [pause_on_prompt_finished_checkbox]) def on_continue(writer): writer['pause_flag'] = False writer['prompt_outputs'] = [] return writer continue_btn.click(on_continue, writer_state, writer_state) with gr.Row(): rewrite_all_button = gr.Button("开始创作", min_width=100, scale=2, variant='secondary', interactive=False) suggestion_dropdown = gr.Dropdown(choices=[], min_width=100, scale=2, label=None, show_label=False, container=False, allow_custom_value=False) quote_checkbox = gr.Checkbox(label='允许引用', min_width=100, scale=2, value=False) gr.Textbox('窗口大小:', container=False, text_align='right', scale=1, min_width=100) chunk_length_dropdown = gr.Dropdown(choices=[], min_width=80, scale=1, label=None, show_label=False, container=False, allow_custom_value=False) quote_md = gr.Markdown(visible=False) def on_quote_checkbox_change(writer, value): if writer['current_w'] == 'outline_w': gr.Info("大纲创作不支持引用\n考虑在剧情和正文创作中使用吧~") return gr.update(value=False, visible=False) if value: gr.Info("允许引用(右键或Ctrl+C复制你想引用的文本)") writer['quote_span'] = None writer['quoted_text'] = '' return gr.update(value=None, visible=False) quote_checkbox.change(on_quote_checkbox_change, [writer_state, quote_checkbox], [quote_md]) def on_chunk_length_change(writer, value): current_w_name = writer['current_w'] writer[current_w_name]['y_chunk_length'] = value return gr.update(value=value) chunk_length_dropdown.change(on_chunk_length_change, [writer_state, chunk_length_dropdown], [chunk_length_dropdown]) def on_copy_handle(text, writer, setting, quote_checkbox): # gr.Info(f"Copy: {text}") text = text.strip() if has_accept(writer): gr.Info('考虑先接受或拒绝修改哦~') return gr.update(visible=False) if len(text) < 10: gr.Info('选中的文本太短,无法引用') return gr.update(visible=False) if quote_checkbox: quote_span, quoted_text = match_quote_text(writer, setting, text) if quote_span: writer['quote_span'] = quote_span writer['quoted_text'] = quoted_text lines = quoted_text.split('\n') if len(lines) > 10: lines[5:-5] = ['......'] lines = ['```', ] + lines + ['```', ] quoted_text = '\n'.join(["> " + e for e in lines]) return gr.update(value=quoted_text, visible=True) else: gr.Info('未找到匹配的引用文本') writer['quote_span'] = None writer['quoted_text'] = '' return gr.update(visible=False) on_copy(on_copy_handle, [writer_state, setting_state, quote_checkbox], [quote_md]) suggestion_textbox = gr.Textbox(max_length=1000, placeholder='在这里输入你的意见,或者从右上单选框选择', lines=2, scale=1, label=None, show_label=False, container=False) with gr.Row(): accept_button = gr.Button("接受", scale=1, min_width=1, variant='secondary', interactive=False) pause_button = gr.Button("暂停", scale=1, min_width=1, variant='secondary', visible=RENDER_STOP_BTN) stop_button = gr.Button("取消", scale=1, min_width=1, variant='secondary') flash_button = gr.Button("刷新", scale=1, min_width=1, variant='secondary') def flash_interface(writer): current_w_name = writer['current_w'] can_accept_flag = has_accept(writer) and not is_running(writer) can_write_flag = not writer_x_is_empty(writer, current_w_name) and not can_accept_flag match current_w_name: case 'outline_w': rewrite_all_button = gr.update(value='开始创作', variant='primary' if can_write_flag else 'secondary', interactive=can_write_flag) case 'chapters_w': rewrite_all_button = gr.update(value='开始创作', variant='primary' if can_write_flag else 'secondary', interactive=can_write_flag) case 'draft_w': rewrite_all_button = gr.update(value='开始创作', variant='primary' if can_write_flag else 'secondary', interactive=can_write_flag) accept_button = gr.update(variant='primary' if can_accept_flag else 'secondary', interactive=can_accept_flag) # 更新 chapters_btn 和 draft_btn 的 interactive 状态 outline_btn = gr.update( variant='primary' if current_w_name == 'outline_w' else 'secondary' ) chapters_btn = gr.update( interactive=not writer_y_is_empty(writer, 'outline_w'), variant='primary' if current_w_name == 'chapters_w' else 'secondary' ) draft_btn = gr.update( interactive=not writer_y_is_empty(writer, 'chapters_w'), variant='primary' if current_w_name == 'draft_w' else 'secondary' ) pause_button = gr.update( value="继续" if writer['pause_flag'] else "暂停", variant='secondary', ) suggestion_choices = writer['suggestions'][current_w_name] # suggestion_choices = ['自动', ] + writer['suggestions'][current_w_name] # TODO: V1.10版本 “自动”尚不完善,暂不显示 if writer_y_is_empty(writer, current_w_name): suggestion_dropdown = gr.update(choices=suggestion_choices, value=suggestion_choices[0]) else: suggestion_dropdown = gr.update(choices=suggestion_choices,) chunk_length_choices = writer['chunk_length'][current_w_name] if cur_chunk_length := writer[current_w_name].get('y_chunk_length', None): chunk_length_dropdown = gr.update(choices=chunk_length_choices, value=cur_chunk_length) else: chunk_length_dropdown = gr.update(choices=chunk_length_choices, value=chunk_length_choices[0]) return ( create_text_md(writer), create_progress_md(writer), rewrite_all_button, accept_button, outline_btn, chapters_btn, draft_btn, pause_button, suggestion_dropdown, chunk_length_dropdown ) # 更新 flash_event 字典以包含新的输出 flash_event = dict( fn=flash_interface, inputs=[writer_state], outputs=[ text_md, progress_md, rewrite_all_button, accept_button, outline_btn, chapters_btn, draft_btn, pause_button, suggestion_dropdown, chunk_length_dropdown ] ) flash_button.click(**flash_event) if RENDER_SAVE_LOAD_BTN: save_button.click(save_states, inputs=[save_file_name, writer_state, setting_state], outputs=[]) load_button.click(load_states, inputs=[save_file_name], outputs=[writer_state, setting_state]).success(**flash_event) # stop_write_long_novel_button.click(on_cancel, inputs=[writer_state]) stop_button.click(try_cancel, inputs=[writer_state]).success(**flash_event).success(lambda :gr.update(), None, writer_state) # TODO: stop_btn对writer_state的更新没有起效 @cancellable def _on_write_all(writer, setting, auto_write=False, suggestion=None): current_w_name = writer['current_w'] if writer_x_is_empty(writer, current_w_name): gr.Info('请先输入需要创作的内容!') return writer['prompt_outputs'].clear() if writer['quote_span']: quote_span, quoted_text = match_quote_text(writer, setting, writer['quoted_text']) if quote_span != writer['quote_span'] or quoted_text != writer['quoted_text']: raise gr.Error('引用文本不存在!') generator = call_write(process_writer_to_backend(writer), setting, auto_write, suggestion) new_writer = None while True: try: kp_msg = next(generator) if isinstance(kp_msg, KeyPointMsg): # TODO: 由于KeyPointMsg的设计问题,这里的逻辑比较复杂,后续可以考虑优化 if kp_msg.is_prompt() and kp_msg.is_finished() and writer['pause_on_prompt_finished_flag']: gr.Info('LLM响应完成,可以预览Prompt') writer['pause_flag'] = True if new_writer is None: continue elif kp_msg.is_title(): # TODO: 标题节点还未实现finish逻辑 # if new_writer is not None: # # 说明这是一个关键节点,进行保存 # process_writer_from_backend(writer, new_writer) # yield create_text_md(writer), writer # gr.Info(f'已自动保存进度') continue # 关键节点保存的逻辑比较复杂,有bug,之后版本考虑提供 else: continue else: new_writer = kp_msg if writer['pause_flag']: writer['prompt_outputs'] = copy.deepcopy(new_writer['prompt_outputs']) # 将prompt_outputs传递到writer_state中,使得暂停时能显示prompt, 需要序列化,否则writer会不断更新,导致prompt不断渲染 yield create_text_md(new_writer), writer while writer['pause_flag'] and not writer['cancel_flag']: time.sleep(0.1) else: yield create_text_md(new_writer), gr.update() except StopIteration as e: # 这里处理最终状态 process_writer_from_backend(writer, e.value) yield create_text_md(writer), writer if has_accept(writer): gr.Info('创作完成!点击接受按钮接受修改。') else: gr.Info('本次创作没有任何更改。') # 通常因为审阅意见认为无需更改 return def on_auto_write_all(writer, setting, auto_write): if auto_write: yield from _on_write_all(writer, setting, True) else: pass # suggestion = writer['suggestions'][writer['current_w']][0] # yield from _on_write_all(writer, setting, False, suggestion) writer_all_events = dict( fn=on_auto_write_all, queue=True, inputs=[writer_state, setting_state, auto_checkbox], outputs=[text_md, writer_state], concurrency_limit=10 ) def on_init_outline(idea, writer): if not idea.strip(): gr.Info("先输入小说简介或从示例中选择一个") return gr.update() new_writer = init_writer(idea) writer.update({ k:v for k, v in new_writer.items() if k in ['current_w', 'outline_w', 'prompt_outputs'] }) return writer outline_btn.click(on_init_outline, inputs=[idea_textbox, writer_state], outputs=[writer_state]).success(**writer_all_events).then(**flash_event) chapters_btn.click(lambda writer: init_chapters_w(writer), inputs=[writer_state], outputs=[writer_state]).success(**writer_all_events).then(**flash_event) draft_btn.click(lambda writer: init_draft_w(writer), inputs=[writer_state], outputs=[writer_state]).success(**writer_all_events).then(**flash_event) def on_select_suggestion(writer, setting, choice): if choice == '自动': return gr.update(value=choice, visible=False) current_w_name = writer['current_w'] dirname = writer['suggestions_dirname'][current_w_name] suggestion = clean_txt_content(load_prompt(dirname, choice)) if suggestion.startswith("user:\n"): suggestion = suggestion[len("user:\n"):] return gr.update(value=suggestion, visible=True) suggestion_dropdown.change(on_select_suggestion, inputs=[writer_state, setting_state, suggestion_dropdown], outputs=[suggestion_textbox]) def on_write_all(writer, setting, suggestion): if not suggestion.strip(): gr.Info('需要输入创作意见!') return yield from _on_write_all(writer, setting, False, suggestion) rewrite_all_button.click( on_write_all, queue=True, inputs=[writer_state, setting_state, suggestion_textbox], outputs=[text_md, writer_state], concurrency_limit=10 ).then(**flash_event) @cancellable def on_accept_write(writer, setting): current_w_name = writer['current_w'] current_w = writer[current_w_name] if not current_w['apply_chunks']: raise gr.Error('请先进行创作!') new_writer = call_accept(process_writer_to_backend(writer), setting) process_writer_from_backend(writer, new_writer) yield create_text_md(writer), writer accept_button.click(fn=on_accept_write, inputs=[writer_state, setting_state], outputs=[text_md, writer_state]).then(**flash_event) def toggle_pause(writer): if not is_running(writer): gr.Info('当前没有正在进行的操作') return gr.update() writer['pause_flag'] = not writer['pause_flag'] # gr.Info('已' + ('暂停' if writer['pause_flag'] else '继续') + '操作') return gr.update(value="暂停" if not writer['pause_flag'] else "继续") pause_button.click( toggle_pause, inputs=[writer_state], outputs=[pause_button] ) @gr.render(inputs=setting_state) def _render_setting(setting): return render_setting(setting, setting_state) demo.queue() demo.launch(server_name="0.0.0.0", server_port=7860) #demo.launch()