| | import copy |
| | import time |
| | import unicodedata |
| | from queue import Queue, Empty |
| |
|
| | from langchain.memory import ConversationBufferWindowMemory |
| | from langchain_core.chat_history import InMemoryChatMessageHistory |
| |
|
| | from voice_dialogue.config.llm_config import get_llm_model_params, get_apple_silicon_summary, BUILTIN_LLM_MODEL_PATH |
| | from voice_dialogue.config.user_config import get_prompt |
| | from voice_dialogue.core.base import BaseThread |
| | from voice_dialogue.core.constants import chat_history_cache |
| | from voice_dialogue.llm.processor import ( |
| | preprocess_sentence_text, create_langchain_chat_llamacpp_instance, |
| | create_langchain_pipeline, warmup_langchain_pipeline |
| | ) |
| | from voice_dialogue.models.voice_task import VoiceTask, QuestionDisplayMessage |
| | from voice_dialogue.services.mixins import TaskStatusMixin |
| | from voice_dialogue.utils.logger import logger |
| |
|
| |
|
| | class LLMService(BaseThread, TaskStatusMixin): |
| | """LLM 回答生成器 - 负责使用语言模型生成回答文本""" |
| |
|
| | def __init__( |
| | self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None, |
| | user_question_queue: Queue, |
| | generated_answer_queue: Queue, |
| | websocket_message_queue: Queue = None |
| | ): |
| | super().__init__(group, target, name, args, kwargs, daemon=daemon) |
| |
|
| | self.user_question_queue = user_question_queue |
| | self.generated_answer_queue = generated_answer_queue |
| | self.websocket_message_queue = websocket_message_queue |
| |
|
| | self.english_sentence_end_marks = {'!', '?', '.', ',', ':', ';'} |
| | self.chinese_sentence_end_marks = {',', '。', '!', '?', ':', ';', '、'} |
| | self.sentence_end_marks = self.english_sentence_end_marks | self.chinese_sentence_end_marks |
| |
|
| | def _get_prompt_by_language(self, language: str) -> str: |
| | """根据语言获取对应的 prompt""" |
| | return get_prompt(language) |
| |
|
| | def get_session_history(self, session_id: str) -> InMemoryChatMessageHistory: |
| | message_history = InMemoryChatMessageHistory() |
| | if session_id not in chat_history_cache: |
| | return message_history |
| |
|
| | for k, message in chat_history_cache.get(session_id).items(): |
| | identity = k.rsplit(':')[-1] |
| | if identity == 'human': |
| | message_history.add_user_message(message) |
| | elif identity == 'ai': |
| | message_history.add_ai_message(' '.join(message)) |
| |
|
| | memory = ConversationBufferWindowMemory( |
| | chat_memory=message_history, |
| | k=3, |
| | return_messages=True, |
| | ) |
| | assert len(memory.memory_variables) == 1 |
| | key = memory.memory_variables[0] |
| | messages = memory.load_memory_variables({})[key] |
| | return InMemoryChatMessageHistory(messages=messages) |
| |
|
| | def _should_end_sentence(self, sentence: str, sentence_end_mark: str, is_first_sentence: bool) -> bool: |
| | """判断是否应该结束当前句子""" |
| | if not sentence or sentence_end_mark not in self.sentence_end_marks: |
| | return False |
| |
|
| | is_chinese_sentence = False |
| | if sentence_end_mark in self.chinese_sentence_end_marks: |
| | is_chinese_sentence = True |
| |
|
| | |
| | if is_first_sentence: |
| | if is_chinese_sentence: |
| | return (len(sentence) > 2 and sentence_end_mark in self.chinese_sentence_end_marks) |
| | else: |
| | return (len(sentence.split()) > 1 and sentence_end_mark in self.english_sentence_end_marks) |
| |
|
| | |
| | if is_chinese_sentence: |
| | sentence_words = len(sentence) |
| | return sentence_words > 4 |
| | else: |
| | sentence_words = len(sentence.split()) |
| | return sentence_words > 4 or (sentence_words > 2 and sentence_end_mark in {'.', '?', '!', }) |
| |
|
| | def _send_sentence_to_queue(self, voice_task: VoiceTask, sentence: str, answer_index: int) -> None: |
| | """将句子发送到队列""" |
| | voice_task.answer_index = answer_index |
| | voice_task.answer_sentence = sentence.strip() |
| | voice_task.llm_end_time = time.time() |
| | self.generated_answer_queue.put(copy.deepcopy(voice_task)) |
| | voice_task.llm_start_time = time.time() |
| |
|
| | def _reset_chunks(self, remain_content: str) -> list: |
| | """重置 chunks 列表""" |
| | return [remain_content] if remain_content else [] |
| |
|
| | def _is_punctuation(self, char: str) -> bool: |
| | """判断一个字符是否是标点符号""" |
| | if not char or len(char) != 1: |
| | return False |
| | |
| | return unicodedata.category(char).startswith('P') |
| |
|
| | def _process_chunk_content(self, chunk_content: str) -> tuple: |
| | """处理 chunk 内容,从右到左找标点符号并分割成三部分""" |
| | if not chunk_content: |
| | return '', '', '' |
| |
|
| | |
| | for i in range(len(chunk_content) - 1, -1, -1): |
| | char = chunk_content[i] |
| | if self._is_punctuation(char): |
| | |
| | before_punct = chunk_content[:i] |
| | punct = char |
| | after_punct = chunk_content[i + 1:] |
| | return before_punct, punct, after_punct |
| |
|
| | |
| | return chunk_content, '', '' |
| |
|
| | def _process_voice_task(self, voice_task: VoiceTask) -> None: |
| | """处理单个语音任务""" |
| |
|
| | chunks = [] |
| | answer_index = 0 |
| | is_first_sentence = True |
| |
|
| | user_question = voice_task.transcribed_text |
| | logger.info(f'用户问题: {user_question}') |
| | if self.websocket_message_queue: |
| | self.websocket_message_queue.put_nowait( |
| | QuestionDisplayMessage( |
| | session_id=voice_task.session_id, |
| | question=user_question, |
| | task_id=voice_task.id, |
| | ) |
| | ) |
| |
|
| | voice_task.llm_start_time = time.time() |
| |
|
| | system_prompt = self._get_prompt_by_language(voice_task.language) |
| | pipeline = create_langchain_pipeline(self.model_instance, system_prompt, self.get_session_history) |
| |
|
| | config = {"configurable": {"session_id": voice_task.session_id}} |
| |
|
| | try: |
| | for chunk in pipeline.stream(input={'input': user_question}, config=config): |
| |
|
| | if not self.is_task_valid(voice_task): |
| | return |
| |
|
| | if not chunk.content: |
| | continue |
| | elif chunk.content in {'<think>', '\n\n', '</think>'}: |
| | continue |
| |
|
| | chunk_content = f'{chunk.content}' |
| |
|
| | before_punct, sentence_end_mark, remain_content = self._process_chunk_content(chunk_content) |
| | if before_punct: |
| | chunks.append(before_punct) |
| | if sentence_end_mark: |
| | chunks.append(sentence_end_mark) |
| |
|
| | sentence = preprocess_sentence_text(chunks) |
| | if not sentence: |
| | chunks.append(remain_content) |
| | continue |
| |
|
| | |
| | if self._should_end_sentence(sentence, sentence_end_mark, is_first_sentence): |
| | self._send_sentence_to_queue(voice_task, sentence, answer_index) |
| | chunks = self._reset_chunks(remain_content) |
| | answer_index += 1 |
| | is_first_sentence = False |
| | else: |
| | if remain_content: |
| | chunks.append(remain_content) |
| |
|
| | |
| | self._handle_remaining_chunks(voice_task, chunks, answer_index) |
| |
|
| | except Exception as e: |
| | logger.error(f'处理语音任务时发生错误: {e}') |
| |
|
| | def _handle_remaining_chunks(self, voice_task: VoiceTask, chunks: list, answer_index: int) -> None: |
| | """处理剩余的 chunks""" |
| | if not chunks: |
| | return |
| |
|
| | sentence = preprocess_sentence_text(chunks) |
| | if not sentence or sentence.strip() in self.sentence_end_marks: |
| | return |
| |
|
| | self._send_sentence_to_queue(voice_task, sentence, answer_index) |
| |
|
| | def run(self): |
| |
|
| | model_params = get_llm_model_params() |
| |
|
| | |
| | chip_summary = get_apple_silicon_summary() |
| | logger.info(f"检测到芯片: {chip_summary['chip_name']}") |
| | logger.info(f"性能核心数: {chip_summary['performance_cores']}") |
| | logger.info(f"使用线程数: {chip_summary['optimal_n_threads']} (仅使用性能核心)") |
| | logger.info(f"上下文窗口: {chip_summary['optimal_n_ctx']}") |
| | logger.info(f"配置说明: {chip_summary['config_note']}") |
| |
|
| | self.model_instance = create_langchain_chat_llamacpp_instance( |
| | local_model_path=BUILTIN_LLM_MODEL_PATH, model_params=model_params |
| | ) |
| | |
| | prompt = get_prompt("zh") |
| | pipeline = create_langchain_pipeline(self.model_instance, prompt, self.get_session_history) |
| | warmup_langchain_pipeline(pipeline) |
| |
|
| | self.is_ready = True |
| |
|
| | """主运行循环""" |
| | while not self.is_exited: |
| | try: |
| | voice_task: VoiceTask = self.user_question_queue.get(block=True, timeout=1) |
| | self._process_voice_task(voice_task) |
| | except Empty: |
| | continue |
| | except Exception as e: |
| | logger.error(f'AnswerGeneratorWorker 运行时发生错误: {e}') |
| |
|