| from typing import AsyncIterator, Tuple, Callable, List, Union, Dict, Any
|
| from functools import wraps
|
| from .output_types import Actions, SentenceOutput, DisplayText
|
| from ..utils.tts_preprocessor import tts_filter as filter_text
|
| from ..live2d_model import Live2dModel
|
| from ..config_manager import TTSPreprocessorConfig
|
| from ..utils.sentence_divider import SentenceDivider
|
| from ..utils.sentence_divider import SentenceWithTags, TagState
|
| from loguru import logger
|
|
|
|
|
| def sentence_divider(
|
| faster_first_response: bool = True,
|
| segment_method: str = "pysbd",
|
| valid_tags: List[str] = None,
|
| ):
|
| """
|
| Decorator that transforms token stream into sentences with tags
|
|
|
| Args:
|
| faster_first_response: bool - Whether to enable faster first response
|
| segment_method: str - Method for sentence segmentation
|
| valid_tags: List[str] - List of valid tags to process
|
| """
|
|
|
| def decorator(
|
| func: Callable[
|
| ..., AsyncIterator[Union[str, Dict[str, Any]]]
|
| ],
|
| ) -> Callable[
|
| ..., AsyncIterator[Union[SentenceWithTags, Dict[str, Any]]]
|
| ]:
|
| @wraps(func)
|
| async def wrapper(
|
| *args, **kwargs
|
| ) -> AsyncIterator[Union[SentenceWithTags, Dict[str, Any]]]:
|
| divider = SentenceDivider(
|
| faster_first_response=faster_first_response,
|
| segment_method=segment_method,
|
| valid_tags=valid_tags or [],
|
| )
|
| stream_from_func = func(*args, **kwargs)
|
|
|
|
|
| async for item in divider.process_stream(stream_from_func):
|
| if isinstance(item, SentenceWithTags):
|
| logger.debug(f"sentence_divider yielding sentence: {item}")
|
| elif isinstance(item, dict):
|
| logger.debug(f"sentence_divider yielding dict: {item}")
|
| yield item
|
|
|
|
|
| return wrapper
|
|
|
| return decorator
|
|
|
|
|
| def actions_extractor(live2d_model: Live2dModel):
|
| """
|
| Decorator that extracts actions from sentences, passing through dicts.
|
| """
|
|
|
| def decorator(
|
| func: Callable[
|
| ..., AsyncIterator[Union[SentenceWithTags, Dict[str, Any]]]
|
| ],
|
| ) -> Callable[
|
| ..., AsyncIterator[Union[Tuple[SentenceWithTags, Actions], Dict[str, Any]]]
|
| ]:
|
| @wraps(func)
|
| async def wrapper(
|
| *args, **kwargs
|
| ) -> AsyncIterator[
|
| Union[Tuple[SentenceWithTags, Actions], Dict[str, Any]]
|
| ]:
|
| stream = func(*args, **kwargs)
|
| async for item in stream:
|
| if isinstance(item, SentenceWithTags):
|
| sentence = item
|
| actions = Actions()
|
|
|
| if not any(
|
| tag.state in [TagState.START, TagState.END]
|
| for tag in sentence.tags
|
| ):
|
| expressions = live2d_model.extract_emotion(sentence.text)
|
| if expressions:
|
| actions.expressions = expressions
|
| yield sentence, actions
|
| elif isinstance(item, dict):
|
|
|
| yield item
|
| else:
|
| logger.warning(
|
| f"actions_extractor received unexpected type: {type(item)}"
|
| )
|
|
|
| return wrapper
|
|
|
| return decorator
|
|
|
|
|
| def display_processor():
|
| """
|
| Decorator that processes text for display, passing through dicts.
|
| """
|
|
|
| def decorator(
|
| func: Callable[
|
| ..., AsyncIterator[Union[Tuple[SentenceWithTags, Actions], Dict[str, Any]]]
|
| ],
|
| ) -> Callable[
|
| ...,
|
| AsyncIterator[
|
| Union[Tuple[SentenceWithTags, DisplayText, Actions], Dict[str, Any]]
|
| ],
|
| ]:
|
| @wraps(func)
|
| async def wrapper(
|
| *args, **kwargs
|
| ) -> AsyncIterator[
|
| Union[Tuple[SentenceWithTags, DisplayText, Actions], Dict[str, Any]]
|
| ]:
|
| stream = func(*args, **kwargs)
|
|
|
| async for item in stream:
|
| if (
|
| isinstance(item, tuple)
|
| and len(item) == 2
|
| and isinstance(item[0], SentenceWithTags)
|
| ):
|
| sentence, actions = item
|
| text = sentence.text
|
|
|
| for tag in sentence.tags:
|
| if tag.name == "think":
|
| if tag.state == TagState.START:
|
| text = "("
|
| elif tag.state == TagState.END:
|
| text = ")"
|
|
|
| display = DisplayText(text=text)
|
| yield sentence, display, actions
|
| elif isinstance(item, dict):
|
|
|
| yield item
|
| else:
|
| logger.warning(
|
| f"display_processor received unexpected type: {type(item)}"
|
| )
|
|
|
| return wrapper
|
|
|
| return decorator
|
|
|
|
|
| def tts_filter(
|
| tts_preprocessor_config: TTSPreprocessorConfig = None,
|
| ):
|
| """
|
| Decorator that filters text for TTS, passing through dicts.
|
| Skips TTS for think tag content.
|
| """
|
|
|
| def decorator(
|
| func: Callable[
|
| ...,
|
| AsyncIterator[
|
| Union[Tuple[SentenceWithTags, DisplayText, Actions], Dict[str, Any]]
|
| ],
|
| ],
|
| ) -> Callable[
|
| ..., AsyncIterator[Union[SentenceOutput, Dict[str, Any]]]
|
| ]:
|
| @wraps(func)
|
| async def wrapper(
|
| *args, **kwargs
|
| ) -> AsyncIterator[Union[SentenceOutput, Dict[str, Any]]]:
|
| stream = func(*args, **kwargs)
|
| config = tts_preprocessor_config or TTSPreprocessorConfig()
|
|
|
| async for item in stream:
|
| if (
|
| isinstance(item, tuple)
|
| and len(item) == 3
|
| and isinstance(item[1], DisplayText)
|
| ):
|
| sentence, display, actions = item
|
| if any(tag.name == "think" for tag in sentence.tags):
|
| tts = ""
|
| else:
|
| tts = filter_text(
|
| text=display.text,
|
| remove_special_char=config.remove_special_char,
|
| ignore_brackets=config.ignore_brackets,
|
| ignore_parentheses=config.ignore_parentheses,
|
| ignore_asterisks=config.ignore_asterisks,
|
| ignore_angle_brackets=config.ignore_angle_brackets,
|
| )
|
|
|
| logger.debug(f"[{display.name}] display: {display.text}")
|
| logger.debug(f"[{display.name}] tts: {tts}")
|
|
|
| yield SentenceOutput(
|
| display_text=display,
|
| tts_text=tts,
|
| actions=actions,
|
| )
|
| elif isinstance(item, dict):
|
|
|
| yield item
|
| else:
|
| logger.warning(f"tts_filter received unexpected type: {type(item)}")
|
|
|
| return wrapper
|
|
|
| return decorator
|
|
|