Spaces:
Runtime error
Runtime error
| import json | |
| import os | |
| import re | |
| import builtins | |
| import shutil | |
| import uuid | |
| from functools import wraps | |
| import streamlit as st | |
| import pandas as pd | |
| from custom import * | |
| # 聊天记录处理 | |
| def clear_folder(path): | |
| if not os.path.exists(path): | |
| return | |
| for file_name in os.listdir(path): | |
| file_path = os.path.join(path, file_name) | |
| try: | |
| shutil.rmtree(file_path) | |
| except Exception: | |
| pass | |
| def set_chats_path(): | |
| save_path = 'chat_history' | |
| if 'apikey' not in st.secrets: | |
| clear_folder('tem_files') | |
| save_path = 'tem_files/tem_chat' + str(uuid.uuid4()) | |
| return save_path | |
| # 重新open函数,路径不存在时自动创建 | |
| def create_path(func): | |
| def wrapper(path, *args, **kwargs): | |
| if not os.path.exists(os.path.dirname(path)): | |
| os.makedirs(os.path.dirname(path)) | |
| return func(path, *args, **kwargs) | |
| return wrapper | |
| open = create_path(builtins.open) | |
| def get_history_chats(path): | |
| try: | |
| os.makedirs(path) | |
| except FileExistsError: | |
| pass | |
| files = [f for f in os.listdir(f'./{path}') if f.endswith('.json')] | |
| files_with_time = [(f, os.stat(f'./{path}/' + f).st_ctime) for f in files] | |
| sorted_files = sorted(files_with_time, key=lambda x: x[1], reverse=True) | |
| chat_names = [os.path.splitext(f[0])[0] for f in sorted_files] | |
| if len(chat_names) == 0: | |
| chat_names.append('New Chat_' + str(uuid.uuid4())) | |
| return chat_names | |
| def save_data(path: str, file_name: str, history: list, paras: dict, contexts: dict, **kwargs): | |
| with open(f"./{path}/{file_name}.json", 'w', encoding='utf-8') as f: | |
| json.dump({"history": history, "paras": paras, "contexts": contexts, **kwargs}, f) | |
| def remove_data(path: str, file_name: str): | |
| try: | |
| os.remove(f"./{path}/{file_name}.json") | |
| except FileNotFoundError: | |
| pass | |
| def load_data(path: str, file_name: str) -> dict: | |
| try: | |
| with open(f"./{path}/{file_name}.json", 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| return data | |
| except FileNotFoundError: | |
| with open(f"./{path}/{file_name}.json", 'w', encoding='utf-8') as f: | |
| f.write(json.dumps(initial_content_all)) | |
| return initial_content_all | |
| def show_each_message(message, role, area=None): | |
| if area is None: | |
| area = [st.markdown] * 2 | |
| if role == 'user': | |
| icon = user_svg | |
| name = user_name | |
| background_color = user_background_color | |
| else: | |
| icon = gpt_svg | |
| name = gpt_name | |
| background_color = gpt_background_color | |
| area[0](f"\n<div class='avatar'>{icon}<h2>{name}:</h2></div>", unsafe_allow_html=True) | |
| #area[1](f"""<div class='content-div' style='background-color: {background_color};'>\n\n{message}""", | |
| # unsafe_allow_html=True) | |
| area[1](f"""<div class='content-div'>\n\n{message}</div>""", unsafe_allow_html=True) | |
| def show_messages(messages: list): | |
| for each in messages: | |
| if (each["role"] == "user") or (each["role"] == "assistant"): | |
| show_each_message(each["content"], each["role"]) | |
| if each["role"] == "assistant": | |
| st.write("---") | |
| # 根据context_level提取history | |
| def get_history_input(history, level): | |
| df_history = pd.DataFrame(history) | |
| df_system = df_history.query('role=="system"') | |
| df_input = df_history.query('role!="system"') | |
| df_input = df_input[-level * 2:] | |
| res = pd.concat([df_system, df_input], ignore_index=True).to_dict('records') | |
| return res | |
| # 去除#号右边的空格 | |
| def remove_hashtag_right__space(text): | |
| res = re.sub(r"(#+)\s*", r"\1", text) | |
| return res | |
| # 提取文本 | |
| def extract_chars(text, num): | |
| char_num = 0 | |
| chars = '' | |
| for char in text: | |
| # 汉字算两个字符 | |
| if '\u4e00' <= char <= '\u9fff': | |
| char_num += 2 | |
| else: | |
| char_num += 1 | |
| chars += char | |
| if char_num >= num: | |
| break | |
| return chars |