# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: BSD 2-Clause License """Transcript synchronization processor.""" from loguru import logger from pipecat.frames.frames import ( BotStartedSpeakingFrame, BotStoppedSpeakingFrame, Frame, InterimTranscriptionFrame, StartInterruptionFrame, TranscriptionFrame, TTSStartedFrame, TTSTextFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from nvidia_pipecat.frames.transcripts import ( BotUpdatedSpeakingTranscriptFrame, UserStoppedSpeakingTranscriptFrame, UserUpdatedSpeakingTranscriptFrame, ) class UserTranscriptSynchronization(FrameProcessor): """Synchronizes user speech transcription events across the pipeline. This class synchronizes and exposes user speech transcripts by generating UserUpdatedSpeakingFrame and UserStoppedSpeakingTranscriptFrame events. It filters high frequency ASR frames to only forward transcripts that contain actual changes. The final transcript is sent with a UserStoppedSpeakingTranscriptFrame. Input Frames: InterimTranscriptionFrame: ASR partial transcript TranscriptionFrame: ASR final transcript UserStartedSpeakingFrame: User starts speaking UserStoppedSpeakingFrame: User stops speaking StartInterruptionFrame: Resets processor state Attributes: _partial_transcript (str | None): Current partial ASR transcript. _final_transcript (str): Final ASR transcript. _stopped_speaking (bool | None): Whether user has stopped speaking. """ def __init__(self, user_started_speaking_message: str | None = None): """Initializes the processor with default state values. Args: user_started_speaking_message (str | None): Optional message to send when user starts speaking. """ super().__init__() self.user_started_speaking_message: str | None = user_started_speaking_message self._partial_transcript: str | None = None self._final_transcript: str = "" self._stopped_speaking: bool | None = None async def process_frame(self, frame: Frame, direction: FrameDirection) -> None: """Processes frames and updates transcript state. Args: frame (Frame): Incoming frame to process. direction (FrameDirection): Frame flow direction. """ await super().process_frame(frame, direction) await self.push_frame(frame, direction) if isinstance(frame, InterimTranscriptionFrame): # Check if partial transcript changed # TODO: We need filter out more of the duplicated or very similar transcripts with Riva TTS if not self._partial_transcript or self._partial_transcript != frame.text: self._partial_transcript = frame.text updated_transcript = ( (self._final_transcript.rstrip() + " " + frame.text) if self._final_transcript else frame.text ) await self.push_frame(UserUpdatedSpeakingTranscriptFrame(transcript=updated_transcript), direction) elif isinstance(frame, TranscriptionFrame): self._final_transcript += f"{frame.text} " elif isinstance(frame, UserStartedSpeakingFrame): self._stopped_speaking = False if self.user_started_speaking_message: await self.push_frame( UserUpdatedSpeakingTranscriptFrame(transcript=self.user_started_speaking_message), direction ) elif isinstance(frame, UserStoppedSpeakingFrame): self._stopped_speaking = True elif isinstance(frame, StartInterruptionFrame): self._partial_transcript = None self._final_transcript = "" self._stopped_speaking = None # We wait until we received both, TranscriptionFrame and UserStoppedSpeakingFrame if self._final_transcript and self._stopped_speaking: await self.push_frame(UserStoppedSpeakingTranscriptFrame(self._final_transcript.strip()), direction) self._partial_transcript = None self._final_transcript = "" self._stopped_speaking = None class BotTranscriptSynchronization(FrameProcessor): """Synchronizes bot speech transcripts with audio playback. Synchronizes TTSTextFrames with BotStartedSpeakingFrame events that indicate the start of speech audio playback. Creates BotUpdatedSpeakingTranscriptFrame frames containing partial and final transcripts. The bot transcription synchronization is based on the assumption that there is a pair of BotStartedSpeakingFrame and BotStoppedSpeakingFrame for each TTSStartedFrame. This processor will not work correctly if these assumptions are not met. Each BotStartedSpeakingFrame will trigger a BotUpdatedSpeakingTranscriptFrame if a transcript is available. It is ok that multiple [TTSStartedFrame, TTSTextFrame, ...] sequences come before the corresponding BotStoppedSpeakingFrame. Input Frames: TTSStartedFrame: Speech audio playback starts TTSTextFrame: Text to be spoken BotStartedSpeakingFrame: Bot starts speaking BotStoppedSpeakingFrame: Bot stops speaking StartInterruptionFrame: Resets processor state Attributes: _bot_started_speaking (bool): Indicates if the bot has started speaking. _bot_transcripts_buffer (list[str]): Buffer for bot transcripts to synchronize with audio playback. """ def __init__(self): """Initialize the BotTranscriptSynchronization processor.""" super().__init__() self._bot_started_speaking = False self._bot_transcripts_buffer: list[str] = [] async def process_frame(self, frame: Frame, direction: FrameDirection) -> None: """Processes frames and manages transcript synchronization. Args: frame (Frame): Incoming frame to process. direction (FrameDirection): Frame flow direction. """ await super().process_frame(frame, direction) if isinstance(frame, StartInterruptionFrame): # Reset transcript buffer self._bot_transcripts_buffer = [] self._bot_started_speaking = False await self.push_frame(frame, direction) elif isinstance(frame, TTSStartedFrame): # TODO Hacky solution, need proper solution if self._bot_transcripts_buffer: self._bot_transcripts_buffer.pop(0) # Start buffering the next transcript self._bot_transcripts_buffer.append("") await self.push_frame(frame, direction) elif isinstance(frame, TTSTextFrame): # Aggregate partial transcripts if not self._bot_transcripts_buffer: logger.warning("TTSTextFrame received before TTSStartedFrame!") # It looks like some TTS processors keep on sending TTSTextFrame even after a StartInterruptionFrame. else: if frame.text: if self._bot_transcripts_buffer[-1]: self._bot_transcripts_buffer[-1] += f" {frame.text}" else: self._bot_transcripts_buffer[-1] = frame.text # TODO: We need to figure out how to align the partial transcripts # with the audio. Currently, they are shown as soon as they come in. # This is needed to get the full transcript from Elevenlabs TTS # since it creates TTSStartedFrame right with the first incoming word # and does not stop until all the text is generated! if self._bot_started_speaking and len(self._bot_transcripts_buffer) == 1: # Only create BotUpdatedSpeakingTranscriptFrame if # BotStartedSpeakingFrame is related to the first active transcript await self.push_frame( BotUpdatedSpeakingTranscriptFrame(transcript=self._bot_transcripts_buffer[-1].strip()), direction, ) await self.push_frame(frame, direction) elif isinstance(frame, BotStartedSpeakingFrame): # Start synchronizing transcript from buffer with BotStartedSpeakingFrame if self._bot_started_speaking: logger.warning("BotStartedSpeakingFrame received before BotStoppedSpeakingFrame!") self._bot_started_speaking = True await self.push_frame(frame, direction) if self._bot_transcripts_buffer and self._bot_transcripts_buffer[0]: # If already available push the transcript await self.push_frame( BotUpdatedSpeakingTranscriptFrame(self._bot_transcripts_buffer[0]), FrameDirection.DOWNSTREAM ) elif isinstance(frame, BotStoppedSpeakingFrame): # Remove the shown transcript from the buffer if not self._bot_started_speaking: logger.warning("BotStoppedSpeakingFrame received before BotStartedSpeakingFrame!") self._bot_started_speaking = False if self._bot_transcripts_buffer: self._bot_transcripts_buffer.pop(0) await self.push_frame(frame, direction) else: await self.push_frame(frame, direction)