Spaces:
Running
Running
| # | |
| # Copyright (c) 2024–2025, Daily | |
| # | |
| # SPDX-License-Identifier: BSD 2-Clause License | |
| # | |
| """Blingfire-based text aggregator for sentence detection.""" | |
| import blingfire as bf | |
| from loguru import logger | |
| from pipecat.utils.text.base_text_aggregator import BaseTextAggregator | |
| class BlingfireTextAggregator(BaseTextAggregator): | |
| """This is a text aggregator that uses blingfire for sentence detection. | |
| It aggregates text until a complete sentence is detected using blingfire's | |
| sentence segmentation capabilities. | |
| """ | |
| def __init__(self): | |
| """Initialize the BlingfireTextAggregator with an empty text buffer.""" | |
| self._text = "" | |
| logger.debug("BlingfireTextAggregator: Initialized new instance") | |
| def text(self) -> str: | |
| """Return the currently aggregated text. | |
| Returns: | |
| str: The text currently being aggregated. | |
| """ | |
| return self._text | |
| async def aggregate(self, text: str) -> str | None: | |
| """Aggregate text and return a complete sentence when detected. | |
| Args: | |
| text (str): The text to be aggregated. | |
| Returns: | |
| str | None: A complete sentence if one is detected, otherwise None. | |
| """ | |
| result: str | None = None | |
| self._text += text | |
| logger.debug(f"BlingfireTextAggregator: Aggregating text: '{self._text}'") | |
| # Use blingfire to split text into sentences | |
| sentences_text = bf.text_to_sentences(self._text) | |
| sentences = [s.strip() for s in sentences_text.split("\n") if s.strip()] | |
| # If we have multiple sentences, return the first complete one | |
| if len(sentences) > 1: | |
| result = sentences[0] | |
| if self._text.lstrip().startswith(sentences[0].strip()): | |
| self._text = self._text[len(sentences[0]) :] | |
| return result | |
| async def handle_interruption(self): | |
| """Handle interruption by clearing the aggregated text buffer.""" | |
| logger.debug("BlingfireTextAggregator: Handling interruption") | |
| self._text = "" | |
| async def reset(self): | |
| """Reset the aggregated text buffer to empty.""" | |
| logger.debug("BlingfireTextAggregator: Resetting text buffer") | |
| self._text = "" | |