| import asyncio
|
| import re
|
| from typing import Optional, Union, Any, List, Dict
|
| import numpy as np
|
| import json
|
| from loguru import logger
|
|
|
| from ..message_handler import message_handler
|
| from .types import WebSocketSend, BroadcastContext
|
| from .tts_manager import TTSTaskManager
|
| from ..agent.output_types import SentenceOutput, AudioOutput
|
| from ..agent.input_types import BatchInput, TextData, ImageData, TextSource, ImageSource
|
| from ..asr.asr_interface import ASRInterface
|
| from ..live2d_model import Live2dModel
|
| from ..tts.tts_interface import TTSInterface
|
| from ..utils.stream_audio import prepare_audio_payload
|
|
|
|
|
|
|
| def create_batch_input(
|
| input_text: str,
|
| images: Optional[List[Dict[str, Any]]],
|
| from_name: str,
|
| metadata: Optional[Dict[str, Any]] = None,
|
| ) -> BatchInput:
|
| """Create batch input for agent processing"""
|
| return BatchInput(
|
| texts=[
|
| TextData(source=TextSource.INPUT, content=input_text, from_name=from_name)
|
| ],
|
| images=[
|
| ImageData(
|
| source=ImageSource(img["source"]),
|
| data=img["data"],
|
| mime_type=img["mime_type"],
|
| )
|
| for img in (images or [])
|
| ]
|
| if images
|
| else None,
|
| metadata=metadata,
|
| )
|
|
|
|
|
| async def process_agent_output(
|
| output: Union[AudioOutput, SentenceOutput],
|
| character_config: Any,
|
| live2d_model: Live2dModel,
|
| tts_engine: TTSInterface,
|
| websocket_send: WebSocketSend,
|
| tts_manager: TTSTaskManager,
|
| translate_engine: Optional[Any] = None,
|
| ) -> str:
|
| """Process agent output with character information and optional translation"""
|
| output.display_text.name = character_config.character_name
|
| output.display_text.avatar = character_config.avatar
|
|
|
| full_response = ""
|
| try:
|
| if isinstance(output, SentenceOutput):
|
| full_response = await handle_sentence_output(
|
| output,
|
| live2d_model,
|
| tts_engine,
|
| websocket_send,
|
| tts_manager,
|
| translate_engine,
|
| )
|
| elif isinstance(output, AudioOutput):
|
| full_response = await handle_audio_output(output, websocket_send)
|
| else:
|
| logger.warning(f"Unknown output type: {type(output)}")
|
| except Exception as e:
|
| logger.error(f"Error processing agent output: {e}")
|
| await websocket_send(
|
| json.dumps(
|
| {"type": "error", "message": f"Error processing response: {str(e)}"}
|
| )
|
| )
|
|
|
| return full_response
|
|
|
|
|
| async def handle_sentence_output(
|
| output: SentenceOutput,
|
| live2d_model: Live2dModel,
|
| tts_engine: TTSInterface,
|
| websocket_send: WebSocketSend,
|
| tts_manager: TTSTaskManager,
|
| translate_engine: Optional[Any] = None,
|
| ) -> str:
|
| """Handle sentence output type with optional translation support"""
|
| full_response = ""
|
| async for display_text, tts_text, actions in output:
|
| logger.debug(f"๐ Processing output: '''{tts_text}'''...")
|
|
|
| if translate_engine:
|
| if len(re.sub(r'[\s.,!?๏ผใ๏ผ๏ผ\'"ใใ๏ผใ\s]+', "", tts_text)):
|
| tts_text = translate_engine.translate(tts_text)
|
| logger.info(f"๐ Text after translation: '''{tts_text}'''...")
|
| else:
|
| logger.debug("๐ซ No translation engine available. Skipping translation.")
|
|
|
| full_response += display_text.text
|
| await tts_manager.speak(
|
| tts_text=tts_text,
|
| display_text=display_text,
|
| actions=actions,
|
| live2d_model=live2d_model,
|
| tts_engine=tts_engine,
|
| websocket_send=websocket_send,
|
| )
|
| return full_response
|
|
|
|
|
| async def handle_audio_output(
|
| output: AudioOutput,
|
| websocket_send: WebSocketSend,
|
| ) -> str:
|
| """Process and send AudioOutput directly to the client"""
|
| full_response = ""
|
| async for audio_path, display_text, transcript, actions in output:
|
| full_response += transcript
|
| audio_payload = prepare_audio_payload(
|
| audio_path=audio_path,
|
| display_text=display_text,
|
| actions=actions.to_dict() if actions else None,
|
| )
|
| await websocket_send(json.dumps(audio_payload))
|
| return full_response
|
|
|
|
|
| async def send_conversation_start_signals(websocket_send: WebSocketSend) -> None:
|
| """Send initial conversation signals"""
|
| await websocket_send(
|
| json.dumps(
|
| {
|
| "type": "control",
|
| "text": "conversation-chain-start",
|
| }
|
| )
|
| )
|
| await websocket_send(json.dumps({"type": "full-text", "text": "Thinking..."}))
|
|
|
|
|
| async def process_user_input(
|
| user_input: Union[str, np.ndarray],
|
| asr_engine: ASRInterface,
|
| websocket_send: WebSocketSend,
|
| ) -> str:
|
| """Process user input, converting audio to text if needed"""
|
| if isinstance(user_input, np.ndarray):
|
| logger.info("Transcribing audio input...")
|
| input_text = await asr_engine.async_transcribe_np(user_input)
|
| await websocket_send(
|
| json.dumps({"type": "user-input-transcription", "text": input_text})
|
| )
|
| return input_text
|
| return user_input
|
|
|
|
|
| async def finalize_conversation_turn(
|
| tts_manager: TTSTaskManager,
|
| websocket_send: WebSocketSend,
|
| client_uid: str,
|
| broadcast_ctx: Optional[BroadcastContext] = None,
|
| ) -> None:
|
| """Finalize a conversation turn"""
|
| if tts_manager.task_list:
|
| await asyncio.gather(*tts_manager.task_list)
|
| await websocket_send(json.dumps({"type": "backend-synth-complete"}))
|
|
|
| response = await message_handler.wait_for_response(
|
| client_uid, "frontend-playback-complete"
|
| )
|
|
|
| if not response:
|
| logger.warning(f"No playback completion response from {client_uid}")
|
| return
|
|
|
| await websocket_send(json.dumps({"type": "force-new-message"}))
|
|
|
| if broadcast_ctx and broadcast_ctx.broadcast_func:
|
| await broadcast_ctx.broadcast_func(
|
| broadcast_ctx.group_members,
|
| {"type": "force-new-message"},
|
| broadcast_ctx.current_client_uid,
|
| )
|
|
|
| await send_conversation_end_signal(websocket_send, broadcast_ctx)
|
|
|
|
|
| async def send_conversation_end_signal(
|
| websocket_send: WebSocketSend,
|
| broadcast_ctx: Optional[BroadcastContext],
|
| session_emoji: str = "๐",
|
| ) -> None:
|
| """Send conversation chain end signal"""
|
| chain_end_msg = {
|
| "type": "control",
|
| "text": "conversation-chain-end",
|
| }
|
|
|
| await websocket_send(json.dumps(chain_end_msg))
|
|
|
| if broadcast_ctx and broadcast_ctx.broadcast_func and broadcast_ctx.group_members:
|
| await broadcast_ctx.broadcast_func(
|
| broadcast_ctx.group_members,
|
| chain_end_msg,
|
| )
|
|
|
| logger.info(f"๐๐โ
Conversation Chain {session_emoji} completed!")
|
|
|
|
|
| def cleanup_conversation(tts_manager: TTSTaskManager, session_emoji: str) -> None:
|
| """Clean up conversation resources"""
|
| tts_manager.clear()
|
| logger.debug(f"๐งน Clearing up conversation {session_emoji}.")
|
|
|
|
|
| EMOJI_LIST = [
|
| "๐ถ",
|
| "๐ฑ",
|
| "๐ญ",
|
| "๐น",
|
| "๐ฐ",
|
| "๐ฆ",
|
| "๐ป",
|
| "๐ผ",
|
| "๐จ",
|
| "๐ฏ",
|
| "๐ฆ",
|
| "๐ฎ",
|
| "๐ท",
|
| "๐ธ",
|
| "๐ต",
|
| "๐",
|
| "๐ง",
|
| "๐ฆ",
|
| "๐ค",
|
| "๐ฃ",
|
| "๐ฅ",
|
| "๐ฆ",
|
| "๐ฆ
",
|
| "๐ฆ",
|
| "๐ฆ",
|
| "๐บ",
|
| "๐",
|
| "๐ด",
|
| "๐ฆ",
|
| "๐",
|
| "๐ต",
|
| "๐",
|
| "๐ฒ",
|
| "๐ณ",
|
| "๐ด",
|
| "๐ฑ",
|
| "๐ฟ",
|
| "โ๏ธ",
|
| "๐",
|
| "๐",
|
| "๐",
|
| "๐",
|
| "๐พ",
|
| "๐",
|
| "๐น",
|
| "๐ธ",
|
| "๐",
|
| "๐",
|
| "โญ๏ธ",
|
| "๐ฅ",
|
| "๐",
|
| "๐ฉ",
|
| "โ๏ธ",
|
| "๐",
|
| "๐",
|
| "๐",
|
| "๐",
|
| "๐",
|
| "๐๏ธ",
|
| "๐ญ",
|
| "๐จ",
|
| "๐งต",
|
| "๐ชก",
|
| "๐งถ",
|
| "๐ฅฝ",
|
| "๐ฅผ",
|
| "๐ฆบ",
|
| "๐",
|
| "๐",
|
| "๐",
|
| "๐",
|
| ]
|
|
|