liumaolin
Standardize service lifecycle management by replacing `stop` with `exit` and introducing `is_exited` check
f5226c0
raw
history blame
9.15 kB
import copy
import time
from queue import Queue, Empty
from langchain.memory import ConversationBufferWindowMemory
from langchain_core.chat_history import InMemoryChatMessageHistory
from voice_dialogue.config import paths
from voice_dialogue.core.base import BaseThread
from voice_dialogue.core.constants import chat_history_cache
from voice_dialogue.models.voice_task import VoiceTask, QuestionDisplayMessage
from voice_dialogue.services.text.processor import preprocess_sentence_text, \
create_langchain_chat_llamacpp_instance, create_langchain_pipeline, warmup_langchain_pipeline
CHINESE_SYSTEM_PROMPT = ("你是善于模拟真实的思考过程的AI助手。"
"回答时,必须首先生成一个不超过5个字的简短句子,"
"比如:\"让我想一下\"、\"在我看来\"、\"稍等我理一理\"、\"不错的问题\"、\"稍等片刻\"等,然后再进行正式回答,"
"保持中文口语化表达,禁用emoji和系统相关描述,确保衔接词与内容存在合理逻辑关联。")
ENGLISH_SYSTEM_PROMPT = ("You are an AI assistant skilled at simulating authentic thinking processes. "
"When responding, you must first generate a brief phrase of no more than 5 words, "
"such as: 'Let me think', 'I see', 'Let me process this', 'Good question', 'One moment', etc., then proceed with your formal response. "
"Maintain natural conversational English expression, avoid emojis and system-related descriptions, and ensure logical coherence between transitional phrases and content.")
class LLMResponseGenerator(BaseThread):
"""LLM 回答生成器 - 负责使用语言模型生成回答文本"""
def __init__(
self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None,
user_question_queue: Queue,
generated_answer_queue: Queue,
websocket_message_queue: Queue = None
):
super().__init__(group, target, name, args, kwargs, daemon=daemon)
self.user_question_queue = user_question_queue
self.generated_answer_queue = generated_answer_queue
self.websocket_message_queue = websocket_message_queue
self.english_sentence_end_marks = {'!', '?', '.', ',', ':', ';'}
self.chinese_sentence_end_marks = {',', '。', '!', '?', ':', ';', '、'}
self.sentence_end_marks = self.english_sentence_end_marks | self.chinese_sentence_end_marks
def _get_prompt_by_language(self, language: str) -> str:
"""根据语言获取对应的 prompt"""
if language == "zh":
return CHINESE_SYSTEM_PROMPT
else:
return ENGLISH_SYSTEM_PROMPT
def get_session_history(self, session_id: str) -> InMemoryChatMessageHistory:
message_history = InMemoryChatMessageHistory()
if session_id not in chat_history_cache:
return message_history
for k, message in chat_history_cache.get(session_id).items():
identity = k.rsplit(':')[-1]
if identity == 'human':
message_history.add_user_message(message)
elif identity == 'ai':
message_history.add_ai_message(' '.join(message))
memory = ConversationBufferWindowMemory(
chat_memory=message_history,
k=3,
return_messages=True,
)
assert len(memory.memory_variables) == 1
key = memory.memory_variables[0]
messages = memory.load_memory_variables({})[key]
return InMemoryChatMessageHistory(messages=messages)
def _should_end_sentence(self, sentence: str, sentence_end_mark: str, is_first_sentence: bool) -> bool:
"""判断是否应该结束当前句子"""
if not sentence or sentence_end_mark not in self.sentence_end_marks:
return False
is_chinese_sentence = False
if sentence_end_mark in self.chinese_sentence_end_marks:
is_chinese_sentence = True
# 第一个句子的特殊处理逻辑
if is_first_sentence:
if is_chinese_sentence:
return (len(sentence) > 2 and sentence_end_mark in self.chinese_sentence_end_marks)
else:
return (len(sentence.split()) > 1 and sentence_end_mark in self.english_sentence_end_marks)
# 普通句子的判断逻辑
if is_chinese_sentence:
sentence_words = len(sentence)
else:
sentence_words = len(sentence.split())
return sentence_words > 4
def _send_sentence_to_queue(self, voice_task: VoiceTask, sentence: str,
answer_index: int) -> None:
"""将句子发送到队列"""
voice_task.answer_index = answer_index
voice_task.answer_sentence = sentence.strip()
voice_task.llm_end_time = time.time()
self.generated_answer_queue.put(copy.deepcopy(voice_task))
voice_task.llm_start_time = time.time()
def _reset_chunks(self, remain_content: str) -> list:
"""重置 chunks 列表"""
return [remain_content] if remain_content else []
def _process_chunk_content(self, chunk_content: str) -> tuple:
"""处理 chunk 内容,分离句子结束标记和剩余内容"""
if len(chunk_content) > 1:
return chunk_content[0], chunk_content[1:]
else:
return chunk_content, ''
def _process_voice_task(self, voice_task: VoiceTask) -> None:
"""处理单个语音任务"""
chunks = []
answer_index = 0
is_first_sentence = True
user_question = voice_task.transcribed_text
print(f'用户问题: {user_question}')
if self.websocket_message_queue:
self.websocket_message_queue.put_nowait(
QuestionDisplayMessage(
session_id=voice_task.session_id,
question=user_question,
task_id=voice_task.id,
)
)
voice_task.llm_start_time = time.time()
system_prompt = self._get_prompt_by_language(voice_task.language)
pipeline = create_langchain_pipeline(self.model_instance, system_prompt, self.get_session_history)
config = {"configurable": {"session_id": voice_task.session_id}}
try:
for chunk in pipeline.stream(input={'input': user_question}, config=config):
if not chunk.content.strip():
continue
chunk_content = f'{chunk.content}'
sentence_end_mark, remain_content = self._process_chunk_content(chunk_content)
chunks.append(sentence_end_mark)
sentence = preprocess_sentence_text(chunks)
if not sentence:
continue
# 检查是否应该结束当前句子
if self._should_end_sentence(sentence, sentence_end_mark, is_first_sentence):
self._send_sentence_to_queue(voice_task, sentence, answer_index)
chunks = self._reset_chunks(remain_content)
answer_index += 1
is_first_sentence = False
else:
if remain_content:
chunks.append(remain_content)
# 处理最后剩余的 chunks
self._handle_remaining_chunks(voice_task, chunks, answer_index)
except Exception as e:
print(f'处理语音任务时发生错误: {e}')
def _handle_remaining_chunks(self, voice_task: VoiceTask, chunks: list, answer_index: int) -> None:
"""处理剩余的 chunks"""
if not chunks:
return
sentence = preprocess_sentence_text(chunks)
if not sentence or sentence.strip() in self.sentence_end_marks:
return
self._send_sentence_to_queue(voice_task, sentence, answer_index)
def run(self):
model_path = paths.LLM_MODELS_PATH / 'qwen' / 'Qwen2.5-14B-Instruct.Q4_0.gguf'
model_params = {
'streaming': True,
'n_gpu_layers': -1,
'n_batch': 512,
'n_ctx': 2048,
'f16_kv': True,
'temperature': 0.8,
# 'n_predict': -1,
'top_k': 50,
'top_p': 1.0,
}
self.model_instance = create_langchain_chat_llamacpp_instance(
local_model_path=model_path, model_params=model_params
)
pipeline = create_langchain_pipeline(self.model_instance, CHINESE_SYSTEM_PROMPT, self.get_session_history)
warmup_langchain_pipeline(pipeline)
self.is_ready = True
"""主运行循环"""
while not self.is_exited:
try:
voice_task: VoiceTask = self.user_question_queue.get(block=False, timeout=0.1)
self._process_voice_task(voice_task)
except Empty:
continue
except Exception as e:
print(f'AnswerGeneratorWorker 运行时发生错误: {e}')