liumaolin
Standardize service lifecycle management by replacing `stop` with `exit` and introducing `is_exited` check
f5226c0 | import copy | |
| import time | |
| from queue import Queue, Empty | |
| from langchain.memory import ConversationBufferWindowMemory | |
| from langchain_core.chat_history import InMemoryChatMessageHistory | |
| from voice_dialogue.config import paths | |
| from voice_dialogue.core.base import BaseThread | |
| from voice_dialogue.core.constants import chat_history_cache | |
| from voice_dialogue.models.voice_task import VoiceTask, QuestionDisplayMessage | |
| from voice_dialogue.services.text.processor import preprocess_sentence_text, \ | |
| create_langchain_chat_llamacpp_instance, create_langchain_pipeline, warmup_langchain_pipeline | |
| CHINESE_SYSTEM_PROMPT = ("你是善于模拟真实的思考过程的AI助手。" | |
| "回答时,必须首先生成一个不超过5个字的简短句子," | |
| "比如:\"让我想一下\"、\"在我看来\"、\"稍等我理一理\"、\"不错的问题\"、\"稍等片刻\"等,然后再进行正式回答," | |
| "保持中文口语化表达,禁用emoji和系统相关描述,确保衔接词与内容存在合理逻辑关联。") | |
| ENGLISH_SYSTEM_PROMPT = ("You are an AI assistant skilled at simulating authentic thinking processes. " | |
| "When responding, you must first generate a brief phrase of no more than 5 words, " | |
| "such as: 'Let me think', 'I see', 'Let me process this', 'Good question', 'One moment', etc., then proceed with your formal response. " | |
| "Maintain natural conversational English expression, avoid emojis and system-related descriptions, and ensure logical coherence between transitional phrases and content.") | |
| class LLMResponseGenerator(BaseThread): | |
| """LLM 回答生成器 - 负责使用语言模型生成回答文本""" | |
| def __init__( | |
| self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None, | |
| user_question_queue: Queue, | |
| generated_answer_queue: Queue, | |
| websocket_message_queue: Queue = None | |
| ): | |
| super().__init__(group, target, name, args, kwargs, daemon=daemon) | |
| self.user_question_queue = user_question_queue | |
| self.generated_answer_queue = generated_answer_queue | |
| self.websocket_message_queue = websocket_message_queue | |
| self.english_sentence_end_marks = {'!', '?', '.', ',', ':', ';'} | |
| self.chinese_sentence_end_marks = {',', '。', '!', '?', ':', ';', '、'} | |
| self.sentence_end_marks = self.english_sentence_end_marks | self.chinese_sentence_end_marks | |
| def _get_prompt_by_language(self, language: str) -> str: | |
| """根据语言获取对应的 prompt""" | |
| if language == "zh": | |
| return CHINESE_SYSTEM_PROMPT | |
| else: | |
| return ENGLISH_SYSTEM_PROMPT | |
| def get_session_history(self, session_id: str) -> InMemoryChatMessageHistory: | |
| message_history = InMemoryChatMessageHistory() | |
| if session_id not in chat_history_cache: | |
| return message_history | |
| for k, message in chat_history_cache.get(session_id).items(): | |
| identity = k.rsplit(':')[-1] | |
| if identity == 'human': | |
| message_history.add_user_message(message) | |
| elif identity == 'ai': | |
| message_history.add_ai_message(' '.join(message)) | |
| memory = ConversationBufferWindowMemory( | |
| chat_memory=message_history, | |
| k=3, | |
| return_messages=True, | |
| ) | |
| assert len(memory.memory_variables) == 1 | |
| key = memory.memory_variables[0] | |
| messages = memory.load_memory_variables({})[key] | |
| return InMemoryChatMessageHistory(messages=messages) | |
| def _should_end_sentence(self, sentence: str, sentence_end_mark: str, is_first_sentence: bool) -> bool: | |
| """判断是否应该结束当前句子""" | |
| if not sentence or sentence_end_mark not in self.sentence_end_marks: | |
| return False | |
| is_chinese_sentence = False | |
| if sentence_end_mark in self.chinese_sentence_end_marks: | |
| is_chinese_sentence = True | |
| # 第一个句子的特殊处理逻辑 | |
| if is_first_sentence: | |
| if is_chinese_sentence: | |
| return (len(sentence) > 2 and sentence_end_mark in self.chinese_sentence_end_marks) | |
| else: | |
| return (len(sentence.split()) > 1 and sentence_end_mark in self.english_sentence_end_marks) | |
| # 普通句子的判断逻辑 | |
| if is_chinese_sentence: | |
| sentence_words = len(sentence) | |
| else: | |
| sentence_words = len(sentence.split()) | |
| return sentence_words > 4 | |
| def _send_sentence_to_queue(self, voice_task: VoiceTask, sentence: str, | |
| answer_index: int) -> None: | |
| """将句子发送到队列""" | |
| voice_task.answer_index = answer_index | |
| voice_task.answer_sentence = sentence.strip() | |
| voice_task.llm_end_time = time.time() | |
| self.generated_answer_queue.put(copy.deepcopy(voice_task)) | |
| voice_task.llm_start_time = time.time() | |
| def _reset_chunks(self, remain_content: str) -> list: | |
| """重置 chunks 列表""" | |
| return [remain_content] if remain_content else [] | |
| def _process_chunk_content(self, chunk_content: str) -> tuple: | |
| """处理 chunk 内容,分离句子结束标记和剩余内容""" | |
| if len(chunk_content) > 1: | |
| return chunk_content[0], chunk_content[1:] | |
| else: | |
| return chunk_content, '' | |
| def _process_voice_task(self, voice_task: VoiceTask) -> None: | |
| """处理单个语音任务""" | |
| chunks = [] | |
| answer_index = 0 | |
| is_first_sentence = True | |
| user_question = voice_task.transcribed_text | |
| print(f'用户问题: {user_question}') | |
| if self.websocket_message_queue: | |
| self.websocket_message_queue.put_nowait( | |
| QuestionDisplayMessage( | |
| session_id=voice_task.session_id, | |
| question=user_question, | |
| task_id=voice_task.id, | |
| ) | |
| ) | |
| voice_task.llm_start_time = time.time() | |
| system_prompt = self._get_prompt_by_language(voice_task.language) | |
| pipeline = create_langchain_pipeline(self.model_instance, system_prompt, self.get_session_history) | |
| config = {"configurable": {"session_id": voice_task.session_id}} | |
| try: | |
| for chunk in pipeline.stream(input={'input': user_question}, config=config): | |
| if not chunk.content.strip(): | |
| continue | |
| chunk_content = f'{chunk.content}' | |
| sentence_end_mark, remain_content = self._process_chunk_content(chunk_content) | |
| chunks.append(sentence_end_mark) | |
| sentence = preprocess_sentence_text(chunks) | |
| if not sentence: | |
| continue | |
| # 检查是否应该结束当前句子 | |
| if self._should_end_sentence(sentence, sentence_end_mark, is_first_sentence): | |
| self._send_sentence_to_queue(voice_task, sentence, answer_index) | |
| chunks = self._reset_chunks(remain_content) | |
| answer_index += 1 | |
| is_first_sentence = False | |
| else: | |
| if remain_content: | |
| chunks.append(remain_content) | |
| # 处理最后剩余的 chunks | |
| self._handle_remaining_chunks(voice_task, chunks, answer_index) | |
| except Exception as e: | |
| print(f'处理语音任务时发生错误: {e}') | |
| def _handle_remaining_chunks(self, voice_task: VoiceTask, chunks: list, answer_index: int) -> None: | |
| """处理剩余的 chunks""" | |
| if not chunks: | |
| return | |
| sentence = preprocess_sentence_text(chunks) | |
| if not sentence or sentence.strip() in self.sentence_end_marks: | |
| return | |
| self._send_sentence_to_queue(voice_task, sentence, answer_index) | |
| def run(self): | |
| model_path = paths.LLM_MODELS_PATH / 'qwen' / 'Qwen2.5-14B-Instruct.Q4_0.gguf' | |
| model_params = { | |
| 'streaming': True, | |
| 'n_gpu_layers': -1, | |
| 'n_batch': 512, | |
| 'n_ctx': 2048, | |
| 'f16_kv': True, | |
| 'temperature': 0.8, | |
| # 'n_predict': -1, | |
| 'top_k': 50, | |
| 'top_p': 1.0, | |
| } | |
| self.model_instance = create_langchain_chat_llamacpp_instance( | |
| local_model_path=model_path, model_params=model_params | |
| ) | |
| pipeline = create_langchain_pipeline(self.model_instance, CHINESE_SYSTEM_PROMPT, self.get_session_history) | |
| warmup_langchain_pipeline(pipeline) | |
| self.is_ready = True | |
| """主运行循环""" | |
| while not self.is_exited: | |
| try: | |
| voice_task: VoiceTask = self.user_question_queue.get(block=False, timeout=0.1) | |
| self._process_voice_task(voice_task) | |
| except Empty: | |
| continue | |
| except Exception as e: | |
| print(f'AnswerGeneratorWorker 运行时发生错误: {e}') | |