test_ui / src /open_llm_vtuber /conversations /conversation_utils.py
britto224's picture
Upload 130 files
5669b22 verified
import asyncio
import re
from typing import Optional, Union, Any, List, Dict
import numpy as np
import json
from loguru import logger
from ..message_handler import message_handler
from .types import WebSocketSend, BroadcastContext
from .tts_manager import TTSTaskManager
from ..agent.output_types import SentenceOutput, AudioOutput
from ..agent.input_types import BatchInput, TextData, ImageData, TextSource, ImageSource
from ..asr.asr_interface import ASRInterface
from ..live2d_model import Live2dModel
from ..tts.tts_interface import TTSInterface
from ..utils.stream_audio import prepare_audio_payload
# Convert class methods to standalone functions
def create_batch_input(
input_text: str,
images: Optional[List[Dict[str, Any]]],
from_name: str,
metadata: Optional[Dict[str, Any]] = None,
) -> BatchInput:
"""Create batch input for agent processing"""
return BatchInput(
texts=[
TextData(source=TextSource.INPUT, content=input_text, from_name=from_name)
],
images=[
ImageData(
source=ImageSource(img["source"]),
data=img["data"],
mime_type=img["mime_type"],
)
for img in (images or [])
]
if images
else None,
metadata=metadata,
)
async def process_agent_output(
output: Union[AudioOutput, SentenceOutput],
character_config: Any,
live2d_model: Live2dModel,
tts_engine: TTSInterface,
websocket_send: WebSocketSend,
tts_manager: TTSTaskManager,
translate_engine: Optional[Any] = None,
) -> str:
"""Process agent output with character information and optional translation"""
output.display_text.name = character_config.character_name
output.display_text.avatar = character_config.avatar
full_response = ""
try:
if isinstance(output, SentenceOutput):
full_response = await handle_sentence_output(
output,
live2d_model,
tts_engine,
websocket_send,
tts_manager,
translate_engine,
)
elif isinstance(output, AudioOutput):
full_response = await handle_audio_output(output, websocket_send)
else:
logger.warning(f"Unknown output type: {type(output)}")
except Exception as e:
logger.error(f"Error processing agent output: {e}")
await websocket_send(
json.dumps(
{"type": "error", "message": f"Error processing response: {str(e)}"}
)
)
return full_response
async def handle_sentence_output(
output: SentenceOutput,
live2d_model: Live2dModel,
tts_engine: TTSInterface,
websocket_send: WebSocketSend,
tts_manager: TTSTaskManager,
translate_engine: Optional[Any] = None,
) -> str:
"""Handle sentence output type with optional translation support"""
full_response = ""
async for display_text, tts_text, actions in output:
logger.debug(f"๐Ÿƒ Processing output: '''{tts_text}'''...")
if translate_engine:
if len(re.sub(r'[\s.,!?๏ผŒใ€‚๏ผ๏ผŸ\'"ใ€ใ€๏ผ‰ใ€‘\s]+', "", tts_text)):
tts_text = translate_engine.translate(tts_text)
logger.info(f"๐Ÿƒ Text after translation: '''{tts_text}'''...")
else:
logger.debug("๐Ÿšซ No translation engine available. Skipping translation.")
full_response += display_text.text
await tts_manager.speak(
tts_text=tts_text,
display_text=display_text,
actions=actions,
live2d_model=live2d_model,
tts_engine=tts_engine,
websocket_send=websocket_send,
)
return full_response
async def handle_audio_output(
output: AudioOutput,
websocket_send: WebSocketSend,
) -> str:
"""Process and send AudioOutput directly to the client"""
full_response = ""
async for audio_path, display_text, transcript, actions in output:
full_response += transcript
audio_payload = prepare_audio_payload(
audio_path=audio_path,
display_text=display_text,
actions=actions.to_dict() if actions else None,
)
await websocket_send(json.dumps(audio_payload))
return full_response
async def send_conversation_start_signals(websocket_send: WebSocketSend) -> None:
"""Send initial conversation signals"""
await websocket_send(
json.dumps(
{
"type": "control",
"text": "conversation-chain-start",
}
)
)
await websocket_send(json.dumps({"type": "full-text", "text": "Thinking..."}))
async def process_user_input(
user_input: Union[str, np.ndarray],
asr_engine: ASRInterface,
websocket_send: WebSocketSend,
) -> str:
"""Process user input, converting audio to text if needed"""
if isinstance(user_input, np.ndarray):
logger.info("Transcribing audio input...")
input_text = await asr_engine.async_transcribe_np(user_input)
await websocket_send(
json.dumps({"type": "user-input-transcription", "text": input_text})
)
return input_text
return user_input
async def finalize_conversation_turn(
tts_manager: TTSTaskManager,
websocket_send: WebSocketSend,
client_uid: str,
broadcast_ctx: Optional[BroadcastContext] = None,
) -> None:
"""Finalize a conversation turn"""
if tts_manager.task_list:
await asyncio.gather(*tts_manager.task_list)
await websocket_send(json.dumps({"type": "backend-synth-complete"}))
response = await message_handler.wait_for_response(
client_uid, "frontend-playback-complete"
)
if not response:
logger.warning(f"No playback completion response from {client_uid}")
return
await websocket_send(json.dumps({"type": "force-new-message"}))
if broadcast_ctx and broadcast_ctx.broadcast_func:
await broadcast_ctx.broadcast_func(
broadcast_ctx.group_members,
{"type": "force-new-message"},
broadcast_ctx.current_client_uid,
)
await send_conversation_end_signal(websocket_send, broadcast_ctx)
async def send_conversation_end_signal(
websocket_send: WebSocketSend,
broadcast_ctx: Optional[BroadcastContext],
session_emoji: str = "๐Ÿ˜Š",
) -> None:
"""Send conversation chain end signal"""
chain_end_msg = {
"type": "control",
"text": "conversation-chain-end",
}
await websocket_send(json.dumps(chain_end_msg))
if broadcast_ctx and broadcast_ctx.broadcast_func and broadcast_ctx.group_members:
await broadcast_ctx.broadcast_func(
broadcast_ctx.group_members,
chain_end_msg,
)
logger.info(f"๐Ÿ˜Ž๐Ÿ‘โœ… Conversation Chain {session_emoji} completed!")
def cleanup_conversation(tts_manager: TTSTaskManager, session_emoji: str) -> None:
"""Clean up conversation resources"""
tts_manager.clear()
logger.debug(f"๐Ÿงน Clearing up conversation {session_emoji}.")
EMOJI_LIST = [
"๐Ÿถ",
"๐Ÿฑ",
"๐Ÿญ",
"๐Ÿน",
"๐Ÿฐ",
"๐ŸฆŠ",
"๐Ÿป",
"๐Ÿผ",
"๐Ÿจ",
"๐Ÿฏ",
"๐Ÿฆ",
"๐Ÿฎ",
"๐Ÿท",
"๐Ÿธ",
"๐Ÿต",
"๐Ÿ”",
"๐Ÿง",
"๐Ÿฆ",
"๐Ÿค",
"๐Ÿฃ",
"๐Ÿฅ",
"๐Ÿฆ†",
"๐Ÿฆ…",
"๐Ÿฆ‰",
"๐Ÿฆ‡",
"๐Ÿบ",
"๐Ÿ—",
"๐Ÿด",
"๐Ÿฆ„",
"๐Ÿ",
"๐ŸŒต",
"๐ŸŽ„",
"๐ŸŒฒ",
"๐ŸŒณ",
"๐ŸŒด",
"๐ŸŒฑ",
"๐ŸŒฟ",
"โ˜˜๏ธ",
"๐Ÿ€",
"๐Ÿ‚",
"๐Ÿ",
"๐Ÿ„",
"๐ŸŒพ",
"๐Ÿ’",
"๐ŸŒน",
"๐ŸŒธ",
"๐ŸŒ›",
"๐ŸŒ",
"โญ๏ธ",
"๐Ÿ”ฅ",
"๐ŸŒˆ",
"๐ŸŒฉ",
"โ›„๏ธ",
"๐ŸŽƒ",
"๐ŸŽ„",
"๐ŸŽ‰",
"๐ŸŽ",
"๐ŸŽ—",
"๐Ÿ€„๏ธ",
"๐ŸŽญ",
"๐ŸŽจ",
"๐Ÿงต",
"๐Ÿชก",
"๐Ÿงถ",
"๐Ÿฅฝ",
"๐Ÿฅผ",
"๐Ÿฆบ",
"๐Ÿ‘”",
"๐Ÿ‘•",
"๐Ÿ‘œ",
"๐Ÿ‘‘",
]