VTuberAI / src /utils /prompter /prompter.py
Saidie000's picture
Upload 90 files
1905805 verified
import os
import datetime
from typing import AsyncGenerator, Dict, List, Any
from utils.helpers.time import get_current_time
from utils.helpers.singleton import Singleton
from utils.helpers.path import portable_path
from utils.config import Config
from .context import ContextMetadata
from .message import Message, ChatMessage, RequestMessage, MCPMessage, CustomMessage
class Prompter(metaclass=Singleton):
def __init__(self):
self.context_metadata: Dict[str, ContextMetadata] = dict()
self.history: List[Message] = list()
self.instruction_prompt_filename: str = 'example.txt'
self.character_prompt_filename: str = 'example.txt'
self.scene_prompt_filename: str = 'example.txt'
self.character_name: str = "J.A.I.son"
self.name_translations: Dict[str, str] = {"old name": "new:name"}
self.history_length: int = 50
self.tooling_prompt = ""
self.response_template = ""
async def configure(self, config_d: Dict[str, Any]):
if "instruction_prompt_filename" in config_d: self.instruction_prompt_filename = str(config_d["instruction_prompt_filename"])
if "character_prompt_filename" in config_d: self.character_prompt_filename = str(config_d["character_prompt_filename"])
if "scene_prompt_filename" in config_d: self.scene_prompt_filename = str(config_d["scene_prompt_filename"])
if "character_name" in config_d: self.character_name = str(config_d["character_name"])
if "name_translations" in config_d: self.name_translations = dict(config_d["name_translations"])
if "history_length" in config_d: self.history_length = int(config_d["history_length"])
assert (
self.instruction_prompt_filename is not None and
len(self.instruction_prompt_filename) > 0 and
os.path.isfile(portable_path(os.path.join(
Config().PROMPT_DIR,
Config().PROMPT_INSTRUCTION_SUBDIR,
self.instruction_prompt_filename
)))
)
assert (
self.character_prompt_filename is not None and
len(self.character_prompt_filename) > 0 and
os.path.isfile(portable_path(os.path.join(
Config().PROMPT_DIR,
Config().PROMPT_CHARACTER_SUBDIR,
self.character_prompt_filename
)))
)
assert (
self.scene_prompt_filename is not None and
len(self.scene_prompt_filename) > 0 and
os.path.isfile(portable_path(os.path.join(
Config().PROMPT_DIR,
Config().PROMPT_SCENE_SUBDIR,
self.scene_prompt_filename
)))
)
assert self.character_name is not None and len(self.character_name)
assert self.history_length > 0
def clear_history(self):
self.history = list()
def insert_history(self, message: Message):
self.history.append(message)
self.history = self.history[-(self.history_length):]
with open(Config().history_filepath, 'a', encoding="utf-8") as f:
f.write(message.to_line())
f.write("\n")
# Custom context
def register_custom_context(self, context_id: str, context_name: str, context_description: str = None):
self.context_metadata[context_id] = ContextMetadata(context_id, context_name, context_description)
def remove_custom_context(self, context_id: str):
assert context_id in self.context_metadata
del self.context_metadata[context_id]
def add_custom_context(self, context_id: str, contents: str, time: datetime.datetime = None):
assert context_id in self.context_metadata
assert contents and len(contents) > 0
if time is None: time = get_current_time(include_ms=False, as_str=False)
self.insert_history(CustomMessage(self.context_metadata[context_id], contents, time))
# Main conversation
def translate_name(self, name: str):
return self.name_translations.get(name, name)
def add_chat(self, name: str, message: str, time: datetime.datetime = None):
assert name and len(name) > 0
assert message and len(message) > 0
if time is None: time = get_current_time(include_ms=False, as_str=False)
self.insert_history(ChatMessage(self.translate_name(name), message, time))
async def add_chat_stream(self, name: str, in_stream: AsyncGenerator, time: datetime.datetime = None):
if time is None: time = get_current_time(include_ms=False, as_str=False)
message = ''
async for in_d in in_stream:
message += in_d['content']
self.insert_history(ChatMessage(self.translate_name(name), message, time))
# Requests
def add_request(self, message: str, time: datetime.datetime = None):
assert message and len(message) > 0
if time is None: time = get_current_time(include_ms=False, as_str=False)
self.insert_history(RequestMessage(message, time))
# Prompt generators
def get_instructions_prompt(self):
with open(portable_path(os.path.join(
Config().PROMPT_DIR,
Config().PROMPT_INSTRUCTION_SUBDIR,
self.instruction_prompt_filename
)), 'r') as f:
return f.read()
def get_context_descriptions(self):
result = ""
for context_id in self.context_metadata:
result += "{name}: {description}\n".format(
name=self.context_metadata[context_id].name,
description=self.context_metadata[context_id].description
)
return result
def get_character_prompt(self):
with open(portable_path(os.path.join(
Config().PROMPT_DIR,
Config().PROMPT_CHARACTER_SUBDIR,
self.character_prompt_filename
)), 'r') as f:
return f.read()
def get_scene_prompt(self):
with open(portable_path(os.path.join(
Config().PROMPT_DIR,
Config().PROMPT_SCENE_SUBDIR,
self.scene_prompt_filename
)), 'r') as f:
return f.read()
def get_sys_prompt(self):
return "{instructions}\n{mcp_usage}\n{contexts}\n### Character ###\n{character}\n### Scene ###\n{scene}".format(
instructions = self.get_instructions_prompt(),
contexts = self.get_context_descriptions(),
mcp_usage = self.response_template,
character = self.get_character_prompt(),
scene = self.get_scene_prompt(),
)
def get_history_text(self):
prompt = ""
for message in self.history:
message_line = message.to_line()
prompt += "\n{}".format(message_line)
return prompt
def get_history(self):
return self.history
def add_mcp_usage_prompt(self, tooling_prompt: str, response_template: str):
self.tooling_prompt = tooling_prompt
self.response_template = response_template
def generate_mcp_system_context(self):
return self.tooling_prompt
def generate_mcp_user_context(self):
user_prompt = self.get_history_text()
character = self.get_character_prompt()
scene = self.get_scene_prompt()
return f"<CHARACTER>{character}<SCENE>{scene}<SCRIPT>{user_prompt}\n"
def add_mcp_results(self, results):
for result in results:
tool_name = result[0]
tool_result = result[1]
time = get_current_time(include_ms=False, as_str=False)
self.insert_history(MCPMessage(tool_name, tool_result, time))