Spaces:
Runtime error
Runtime error
| import json | |
| import logging | |
| import asyncio | |
| from functools import wraps, partial | |
| from os import listdir | |
| from os.path import exists, normpath | |
| from re import sub | |
| from typing import Dict | |
| from deep_translator import GoogleTranslator as Translator | |
| try: | |
| import extensions.telegram_bot.source.text_process as tp | |
| import extensions.telegram_bot.source.const as const | |
| from extensions.telegram_bot.source.conf import cfg | |
| from extensions.telegram_bot.source.user import User as User | |
| except ImportError: | |
| import source.text_process as tp | |
| import source.const as const | |
| from source.conf import cfg | |
| from source.user import User as User | |
| def async_wrap(func): | |
| async def run(*args, loop=None, executor=None, **kwargs): | |
| if loop is None: | |
| loop = asyncio.get_event_loop() | |
| target_func = partial(func, *args, **kwargs) | |
| return await loop.run_in_executor(executor, target_func) | |
| return run | |
| def translate_text(text: str, source="en", target="en"): | |
| return Translator(source=source, target=target).translate(text) | |
| async def prepare_text(original_text: str, user: User, direction="to_user"): | |
| text = original_text | |
| # translate | |
| if cfg.llm_lang != user.language: | |
| try: | |
| if direction == "to_model": | |
| text = await translate_text(text=text, source=user.language, target=cfg.llm_lang) | |
| elif direction == "to_user": | |
| text = await translate_text(text=text, source=cfg.llm_lang, target=user.language) | |
| except Exception as exception: | |
| text = "can't translate text:" + str(text) | |
| logging.error("translator_error:\n" + str(exception) + "\n" + str(exception.args)) | |
| # Add HTML tags and other... | |
| if direction not in ["to_model", "no_html"]: | |
| text = text.replace("#", "#").replace("<", "<").replace(">", ">") | |
| original_text = original_text.replace("#", "#").replace("<", "<").replace(">", ">") | |
| if len(original_text) > 2000: | |
| original_text = original_text[:2000] | |
| if len(text) > 2000: | |
| text = text[:2000] | |
| if cfg.llm_lang != user.language and direction == "to_user" and cfg.translation_as_hidden_text == "on": | |
| text = ( | |
| cfg.html_tag[0] | |
| + original_text | |
| + cfg.html_tag[1] | |
| + "\n" | |
| + cfg.translate_html_tag[0] | |
| + text | |
| + cfg.translate_html_tag[1] | |
| ) | |
| else: | |
| if len(text) > 4000: | |
| text = text[:4000] | |
| text = cfg.html_tag[0] + text + cfg.html_tag[1] | |
| return text | |
| def parse_characters_dir() -> list: | |
| char_list = [] | |
| for f in listdir(cfg.characters_dir_path): | |
| if f.endswith((".json", ".yaml", ".yml")): | |
| char_list.append(f) | |
| return char_list | |
| def parse_presets_dir() -> list: | |
| preset_list = [] | |
| for f in listdir(cfg.presets_dir_path): | |
| if f.endswith(".txt") or f.endswith(".yaml"): | |
| preset_list.append(f) | |
| return preset_list | |
| # User checking rules | |
| def check_user_permission(chat_id): | |
| # Read admins list | |
| if exists(cfg.users_file_path): | |
| with open(normpath(cfg.users_file_path), "r") as users_file: | |
| users_list = users_file.read().split() | |
| else: | |
| users_list = [] | |
| # check | |
| if str(chat_id) in users_list or len(users_list) == 0: | |
| return True | |
| else: | |
| return False | |
| def check_user_rule(chat_id, option): | |
| if exists(cfg.user_rules_file_path): | |
| with open(normpath(cfg.user_rules_file_path), "r") as user_rules_file: | |
| user_rules = json.loads(user_rules_file.read()) | |
| # if checked button with numeral postfix - delete numerals | |
| option = sub(r"[0123456789-]", "", option) | |
| if option.endswith(const.BTN_OPTION): | |
| option = const.BTN_OPTION | |
| # Read admins list | |
| if exists(cfg.admins_file_path): | |
| with open(normpath(cfg.admins_file_path), "r") as admins_file: | |
| admins_list = admins_file.read().split() | |
| else: | |
| admins_list = [] | |
| # check admin rules | |
| if str(chat_id) in admins_list or cfg.bot_mode == const.MODE_ADMIN: | |
| return bool(user_rules[option][const.MODE_ADMIN]) | |
| else: | |
| return bool(user_rules[option][cfg.bot_mode]) | |
| def init_check_user(users: Dict[int, User], chat_id): | |
| if chat_id not in users: | |
| # Load default | |
| users.update({chat_id: User()}) | |
| users[chat_id].user_id = chat_id | |
| users[chat_id].load_character_file( | |
| characters_dir_path=cfg.characters_dir_path, | |
| char_file=cfg.character_file, | |
| ) | |
| users[chat_id].load_user_history(f"{cfg.history_dir_path}/{str(chat_id)}.json") | |
| users[chat_id].find_and_load_user_char_history(chat_id, cfg.history_dir_path) | |
| def get_conversation_info(user: User): | |
| history_tokens = -1 | |
| context_tokens = -1 | |
| greeting_tokens = -1 | |
| conversation_tokens = -1 | |
| try: | |
| history_tokens = tp.generator.get_tokens_count(user.history_as_str()) | |
| context_tokens = tp.generator.get_tokens_count(user.context) | |
| greeting_tokens = tp.generator.get_tokens_count(user.greeting) | |
| conversation_tokens = history_tokens + context_tokens + greeting_tokens | |
| except Exception as e: | |
| logging.error("options_button tokens_count" + str(e)) | |
| max_token_param = "truncation_length" | |
| max_tokens = cfg.generation_params[max_token_param] if max_token_param in cfg.generation_params else "???" | |
| return ( | |
| f"{user.name2}\n" | |
| f"Conversation length {str(conversation_tokens)}/{max_tokens} tokens.\n" | |
| f"(context {(str(context_tokens))}, " | |
| f"greeting {(str(greeting_tokens))}, " | |
| f"messages {(str(history_tokens))})\n" | |
| f"Voice: {user.silero_speaker}\n" | |
| f"Language: {user.language}" | |
| ) | |