import random
import threading
import time
import requests
import json
import base64
import os
import gradio as gr
import re
from AIGN import AIGN
from openAI import openAIChatLLM
chatLLM = openAIChatLLM()
STREAM_INTERVAL = 0.
CURRENT_MODEL = "gpt-4o-mini" # 默认模型
# 在文件开头添加以下函数
def clear_console():
# 根据操作系统类型选择清屏命令
if os.name == 'nt': # Windows
os.system('cls')
else: # Mac and Linux
os.system('clear')
def get_model_options():
try:
response = requests.get("https://api.168369.xyz/v1/models")
data = response.json()
return [model["id"] for model in data["data"]]
except:
return ["gpt-4o-mini", "claude-3-haiku-20240307"] # 默认选项
def set_api_model(model):
global CURRENT_MODEL
CURRENT_MODEL = model
#return f"{model}"
return f"选择模型成功"
# 修改获取章节名的函数
def get_chapter_name(content):
# 查找最后一个章节标题,支持数字和汉字表示的章节
matches = re.findall(r'(?:##?|)?\s*第([一二三四五六七八九十百千万亿\d]+)章[::]?\s*(.+)', content)
if matches:
chapter_num, chapter_title = matches[-1]
# 如果章节号是数字,直接使用;如果是汉字,转换为数字
if chapter_num.isdigit():
return f"第{chapter_num}章:{chapter_title.strip()}"
else:
# 这里可以添加一个函数来将汉字数字转换为阿拉伯数字
# 为了简单起见,这里仍然使用汉字表示
return f"第{chapter_num}章:{chapter_title.strip()}"
return "未知章节"
# 保存进度函数
def save_progress(aign):
chapter_name = get_chapter_name(aign.novel_content)
# 移除文件名中的非法字符
filename = re.sub(r'[\\/*?:"<>|]', '', chapter_name)
filename = f"{filename}.json"
data = {
"novel_outline": aign.novel_outline,
"paragraph_list": aign.paragraph_list,
"novel_content": aign.novel_content,
"writing_plan": aign.writing_plan,
"temp_setting": aign.temp_setting,
"writing_memory": aign.writing_memory,
"user_idea": aign.user_idea,
"user_requriments": aign.user_requriments,
}
json_str = json.dumps(data, ensure_ascii=False, indent=2)
b64 = base64.b64encode(json_str.encode()).decode()
href = f"data:application/json;base64,{b64}"
download_link = f'点击下载进度文件'
return download_link
# 加载函数
def load_progress(aign, file):
try:
if file is None:
return aign, "请选择要加载的文件", None, None, None, None, None, None, None, None
# 检查 file 是否为字符串(文件路径)
if isinstance(file, str):
with open(file, 'r', encoding='utf-8') as f:
content = f.read()
# 检查 file 是否有 name 属性(Gradio File 对象)
elif hasattr(file, 'name'):
with open(file.name, 'r', encoding='utf-8') as f:
content = f.read()
else:
return aign, f"无法读取文件", None, None, None, None, None, None, None, None
data = json.loads(content)
aign.novel_outline = data["novel_outline"]
aign.paragraph_list = data["paragraph_list"]
aign.novel_content = data["novel_content"]
aign.writing_plan = data["writing_plan"]
aign.temp_setting = data["temp_setting"]
aign.writing_memory = data["writing_memory"]
aign.user_idea = data["user_idea"]
aign.user_requriments = data["user_requriments"]
aign.update_chapter_list()
chapter_choices = [title for title, _ in aign.chapter_list]
return aign, f"进度已加载", data["novel_outline"], data["novel_content"], data["writing_plan"], data["temp_setting"], data["writing_memory"], data["user_idea"], data["user_requriments"], gr.Dropdown(choices=chapter_choices, value=chapter_choices[-1] if chapter_choices else None)
except Exception as e:
return aign, f"加载失败: {str(e)}", None, None, None, None, None, None, None, None
# 保存正文函数
def save_content(aign):
if not aign.novel_content:
return "正文为空,无需保存"
# 准备要保存的内容
content_to_save = ""
content_to_save += "正文:\n" + aign.novel_content
content_to_save += "大纲:\n" + aign.novel_outline + "\n\n"
# Base64 编码
b64 = base64.b64encode(content_to_save.encode()).decode()
href = f"data:text/plain;base64,{b64}"
download_link = f'点击下载正文'
return download_link
def display_chapter(chapters, page_num):
if 1 <= page_num <= len(chapters):
chapter_title, chapter_content = chapters[page_num - 1]
return f"{chapter_title}\n\n{chapter_content}"
else:
return "没有更多章节了。"
def prev_page(aign, current_page):
chapters = aign.chapter_list
if current_page > 1:
current_page -= 1
return current_page, display_chapter(chapters, current_page)
def next_page(aign, current_page):
chapters = aign.chapter_list
if current_page < len(chapters):
current_page += 1
return current_page, display_chapter(chapters, current_page)
def select_chapter(aign, chapter_title):
chapters = aign.chapter_list
for i, (title, _) in enumerate(chapters, start=1):
if title == chapter_title:
return i, display_chapter(chapters, i)
return 1, "章节未找到。"
def make_middle_chat():
carrier = threading.Event()
carrier.history = []
def middle_chat(messages, temperature=None, top_p=None):
nonlocal carrier
carrier.history.append([None, ""])
if len(carrier.history) > 20:
carrier.history = carrier.history[-16:]
try:
for resp in chatLLM(
messages, temperature=temperature, top_p=top_p, stream=True, model=CURRENT_MODEL
):
output_text = resp["content"]
total_tokens = resp["total_tokens"]
carrier.history[-1][1] = f"total_tokens: {total_tokens}\n{output_text}"
return {
"content": output_text,
"total_tokens": total_tokens,
}
except Exception as e:
carrier.history[-1][1] = f"Error: {e}"
raise e
return carrier, middle_chat
def gen_ouline_button_clicked(aign, user_idea, history):
clear_console() # 清空命令行
aign.user_idea = user_idea
carrier, middle_chat = make_middle_chat()
carrier.history = [] # 清空历史记录
aign.novel_outline_writer.chatLLM = middle_chat
gen_ouline_thread = threading.Thread(target=aign.genNovelOutline)
gen_ouline_thread.start()
while gen_ouline_thread.is_alive():
yield [
aign,
carrier.history,
aign.novel_outline,
gr.Button(visible=False),
]
time.sleep(STREAM_INTERVAL)
yield [
aign,
carrier.history,
aign.novel_outline,
gr.Button(visible=True),
]
def gen_next_paragraph_button_clicked(
aign,
history,
user_idea,
novel_outline,
writing_memory,
temp_setting,
writing_plan,
user_requriments,
):
# 生成保存进度链接
save_link = save_progress(aign) # 在函数开头定义保存进度链接
# 清空命令行
clear_console()
# 更新 AIGN 对象的各个字段
aign.user_idea = user_idea
aign.novel_outline = novel_outline
aign.writing_memory = writing_memory
aign.temp_setting = temp_setting
aign.writing_plan = writing_plan
aign.user_requriments = user_requriments
carrier, middle_chat = make_middle_chat()
carrier.history = [] # 清空历史记录
aign.novel_writer.chatLLM = middle_chat
aign.memory_maker.chatLLM = middle_chat
gen_next_paragraph_thread = threading.Thread(target=aign.genNextParagraph)
gen_next_paragraph_thread.start()
while gen_next_paragraph_thread.is_alive():
# 生成保存进度链接
save_link = save_progress(aign)
aign.update_chapter_list()
# 获取当前章节内容
current_chapter_content = display_chapter(aign.chapter_list, len(aign.chapter_list))
chapter_choices = [title for title, _ in aign.chapter_list]
yield [
aign,
carrier.history,
aign.writing_plan,
aign.temp_setting,
aign.writing_memory,
#aign.novel_content,#全部章节内容
current_chapter_content, # 只更新当前章节内容
gr.Button(visible=False),
save_link, # 返回生成的保存进度链接
gr.Dropdown(choices=chapter_choices, value=chapter_choices[-1] if chapter_choices else None),
]
time.sleep(STREAM_INTERVAL)
# 生成保存进度链接
save_link = save_progress(aign)
aign.update_chapter_list()
# 获取最终的当前章节内容
current_chapter_content = display_chapter(aign.chapter_list, len(aign.chapter_list))
chapter_choices = [title for title, _ in aign.chapter_list]
yield [
aign,
carrier.history,
aign.writing_plan,
aign.temp_setting,
aign.writing_memory,
#aign.novel_content, #全部章节内容
current_chapter_content, # 只更新当前章节内容
gr.Button(visible=True),
save_link, # 返回生成的保存进度链接
gr.Dropdown(choices=chapter_choices, value=chapter_choices[-1] if chapter_choices else None),
]
css = """
/* 默认布局 (桌面端) */
/* 将 row1 和 row2 放在同一行 */
#row1, #row2 {
display: inline-block;
vertical-align: top; /* 顶部对齐 */
width: 49%; /* 调整宽度以适应你的需求 */
box-sizing: border-box; /* 包括内边距和边框 */
}
#row3 {
overflow: auto;
width: 100%; /* row3 独占一行 */
}
/* 移动端布局 (屏幕宽度小于 768px 时生效) */
@media (max-width: 768px) {
#row1, #row2, #row3 {
width: 100%; /* 让每一列占据整个屏幕宽度 */
}
}
"""
with gr.Blocks(css=css) as demo:
aign = gr.State(AIGN(chatLLM))
gr.Markdown("## AI 写小说大纲")
with gr.Row():
with gr.Column(scale=1, elem_id="row1"):
with gr.Tab("⚙"):
model_dropdown = gr.Dropdown(
choices=get_model_options(),
label="选择模型",
interactive=True,
value=CURRENT_MODEL
)
model_output = gr.Textbox(label="模型设置结果", interactive=False)
load_status = gr.Textbox(label="加载状态", interactive=False)
load_file = gr.File(label="选择加载文件", file_count="single", file_types=[".json"])
load_button = gr.Button("加载进度")
with gr.Tab("开始"):
gr.Markdown("生成大纲->大纲标签->生成章节->状态标签->生成章节")
user_idea_text = gr.Textbox(
"架空历史:\n\n1. 选择关键历史节点并改变\n2. 描述由此引发的历史走向变化\n3. 塑造新兴历史人物(可改编或原创)\n4. 构建独特社会结构、文化和思潮\n5. 想象可能出现的科技创新\n6. 概述世界格局重塑(国界、政体、国际关系)\n7. 用生动叙事呈现,让读者身临其境\n\n目标:创造合理又富想象力的平行宇宙,既有趣又引人深思。",
label="想法",
lines=13,
interactive=True,
)
user_requriments_text = gr.Textbox(
"1. 语言要求:\n - 不直白\n - 句式多变\n - 避免陈词滥调\n - 使用不寻常的词句,合理创作现代诗、古诗词\n - 运用隐喻和象征\n2. 创作风格:\n - 抽象\n - 富有意境和想象力\n - 具创意个性\n - 有力度\n - 画面感强\n - 音乐感佳\n - 浪漫气息浓厚\n - 语言深邃\n3. 表达目标:\n - 传达独特的神秘和魔幻感\n - 探索和反思自我与世界\n - 表达对自己和社会的孤独与关注\n4. 读者体验:有趣、惊奇、新鲜",
label="写作要求",
lines=6,
interactive=True,
)
gen_ouline_button = gr.Button("生成大纲")
with gr.Tab("大纲"):
novel_outline_text = gr.Textbox(
label="大纲", lines=28, interactive=True
)
with gr.Tab("状态"):
writing_memory_text = gr.Textbox(
label="记忆",
lines=8,
interactive=True,
max_lines=8,
)
writing_plan_text = gr.Textbox(label="计划", lines=6, interactive=True, max_lines=6)
temp_setting_text = gr.Textbox(
label="临时设定", lines=5, interactive=True, max_lines=5
)
gen_next_paragraph_button = gr.Button("生成章节")
with gr.Tab("导航"):
save_content_button = gr.Button("保存所有章节")
save_button = gr.Button("保存进度")
download_link = gr.HTML()
current_page = gr.Number(value=1, label="当前页码", interactive=False)
prev_button = gr.Button("上一页")
next_button = gr.Button("下一页")
# 章节导航下拉列表
chapter_dropdown = gr.Dropdown(label="章节导航", choices=[], interactive=True)
with gr.Column(scale=1, elem_id="row2"):
novel_content_text = gr.Textbox(
label="小说正文", lines=32, interactive=True, show_copy_button=True
)
with gr.Column(scale=3, elem_id="row3"):
chatBox = gr.Chatbot(height=f"80vh", label="输出")
prev_button.click(
prev_page,
inputs=[aign, current_page],
outputs=[current_page, novel_content_text],
queue=False
)
next_button.click(
next_page,
inputs=[aign, current_page],
outputs=[current_page, novel_content_text],
queue=False
)
# 章节下拉列表的事件处理
chapter_dropdown.change(
select_chapter,
inputs=[aign, chapter_dropdown],
outputs=[current_page, novel_content_text],
queue=False
)
# 模型选择的事件处理
model_dropdown.change(
set_api_model,
inputs=[model_dropdown],
outputs=[model_output]
)
save_button.click(
save_progress,
inputs=[aign],
outputs=[download_link]
)
# 加载按钮事件,包含章节下拉列表的更新
load_button.click(
load_progress,
inputs=[aign, load_file],
outputs=[aign, load_status, novel_outline_text, novel_content_text, writing_plan_text, temp_setting_text, writing_memory_text, user_idea_text, user_requriments_text, chapter_dropdown]
)
# 保存正文按钮事件
save_content_button.click(
save_content,
inputs=[aign],
outputs=[download_link]
)
gen_ouline_button.click(
gen_ouline_button_clicked,
[aign, user_idea_text, chatBox],
[aign, chatBox, novel_outline_text, gen_ouline_button],
)
# 生成下一段按钮事件,包含章节下拉列表的更新
gen_next_paragraph_button.click(
gen_next_paragraph_button_clicked,
[
aign,
chatBox,
user_idea_text,
novel_outline_text,
writing_memory_text,
temp_setting_text,
writing_plan_text,
user_requriments_text,
],
[
aign,
chatBox,
writing_plan_text,
temp_setting_text,
writing_memory_text,
novel_content_text,
gen_next_paragraph_button,
download_link, # 这里添加输出到下载链接
chapter_dropdown
],
)
if __name__ == "__main__":
demo.queue()
demo.launch()