| | |
| | |
| | |
| | |
| | |
| | import argparse |
| | import asyncio |
| | import os |
| | import sys |
| | from loguru import logger |
| |
|
| | from call_connection_manager import CallConfigManager, SessionManager |
| | from pipecat.audio.vad.silero import SileroVADAnalyzer |
| | from pipecat.frames.frames import ( |
| | AudioRawFrame, |
| | EndTaskFrame, |
| | Frame, |
| | LLMMessagesFrame, |
| | TranscriptionFrame, |
| | ) |
| | from pipecat.pipeline.pipeline import Pipeline |
| | from pipecat.pipeline.runner import PipelineRunner |
| | from pipecat.pipeline.task import PipelineParams, PipelineTask |
| | from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext |
| | from pipecat.processors.frame_processor import FrameDirection, FrameProcessor |
| | from pipecat.services.cartesia.tts import CartesiaTTSService |
| | from pipecat.services.deepgram.stt import DeepgramSTTService |
| | from pipecat.services.openai.llm import OpenAILLMService |
| | from pipecat.transports.services.daily import DailyParams, DailyTransport |
| |
|
| | logger.remove(0) |
| | logger.add(sys.stderr, level="DEBUG") |
| |
|
| | class VoicemailDetectionProcessor(FrameProcessor): |
| | def __init__(self, session_manager, call_config_manager, task): |
| | super().__init__() |
| | self.session_manager = session_manager |
| | self.call_config_manager = call_config_manager |
| | self.task = task |
| |
|
| | async def process_frame(self, frame: Frame, direction: FrameDirection): |
| | await super().process_frame(frame, direction) |
| | if direction == FrameDirection.DOWNSTREAM: |
| | if isinstance(frame, TranscriptionFrame): |
| | logger.debug(f"Transcription: {frame.text}") |
| | if not self.session_manager.call_flow_state.voicemail_detected: |
| | if "voicemail" in frame.text.lower() or "leave a message" in frame.text.lower(): |
| | logger.info("Voicemail detected") |
| | self.session_manager.call_flow_state.set_voicemail_detected() |
| | content = "Voicemail detected, leaving a message." |
| | message = self.call_config_manager.create_system_message(content) |
| | await self.task.queue_frames([LLMMessagesFrame([message])]) |
| | else: |
| | logger.info("Human detected") |
| | self.session_manager.call_flow_state.set_human_detected() |
| | await self.push_frame(frame, direction) |
| |
|
| | async def main(room_url: str, token: str, body: dict): |
| | call_config_manager = CallConfigManager.from_json_string(body) if body else CallConfigManager() |
| | dialout_settings = call_config_manager.get_dialout_settings() |
| | test_mode = call_config_manager.is_test_mode() |
| | session_manager = SessionManager() |
| |
|
| | |
| | transport_params = DailyParams( |
| | api_url=os.environ.get("DAILY_API_URL", "https://api.daily.co/v1"), |
| | api_key=os.environ.get("HF_DAILY_API_KEY", ""), |
| | audio_in_enabled=True, |
| | audio_out_enabled=True, |
| | video_out_enabled=False, |
| | vad_analyzer=SileroVADAnalyzer(), |
| | transcription_enabled=False, |
| | ) |
| |
|
| | transport = DailyTransport(room_url, token, "Voicemail Detection Bot", transport_params) |
| | tts = CartesiaTTSService( |
| | api_key=os.environ.get("HF_CARTESIA_API_KEY", ""), |
| | voice_id="b7d50908-b17c-442d-ad8d-810c63997ed9", |
| | ) |
| | stt = DeepgramSTTService( |
| | api_key=os.environ.get("HF_DEEPGRAM_API_KEY", ""), |
| | model="nova-2", |
| | ) |
| | llm = OpenAILLMService(api_key=os.environ.get("HF_OPENAI_API_KEY")) |
| |
|
| | |
| | system_instruction = """You are a friendly, helpful robot. If a human answers, greet them and ask how you can assist. If a voicemail is detected, leave a brief message: 'Hello, this is a test call from Pipecat. Please call us back at your convenience.'""" |
| | messages = [call_config_manager.create_system_message(system_instruction)] |
| | context = OpenAILLMContext(messages) |
| | context_aggregator = llm.create_context_aggregator(context) |
| |
|
| | |
| | voicemail_detector = VoicemailDetectionProcessor(session_manager, call_config_manager, task) |
| |
|
| | pipeline = Pipeline([ |
| | transport.input(), |
| | stt, |
| | voicemail_detector, |
| | context_aggregator.user(), |
| | llm, |
| | tts, |
| | transport.output(), |
| | context_aggregator.assistant(), |
| | ]) |
| |
|
| | task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True)) |
| |
|
| | @transport.event_handler("on_first_participant_joined") |
| | async def on_first_participant_joined(transport, participant): |
| | await task.queue_frames([context_aggregator.user().get_context_frame()]) |
| |
|
| | @transport.event_handler("on_participant_left") |
| | async def on_participant_left(transport, participant, reason): |
| | logger.debug(f"Participant left: {participant}, reason: {reason}") |
| | await task.cancel() |
| |
|
| | |
| | if not test_mode: |
| | await call_config_manager.start_dialout(transport, dialout_settings) |
| |
|
| | |
| | runner = PipelineRunner() |
| | await runner.run(task) |
| |
|
| | if __name__ == "__main__": |
| | parser = argparse.ArgumentParser(description="Pipecat Voicemail Detection Bot") |
| | parser.add_argument("-u", "--url", type=str, help="Room URL") |
| | parser.add_argument("-t", "--token", type=str, help="Room Token") |
| | parser.add_argument("-b", "--body", type=str, help="JSON configuration string") |
| | args = parser.parse_args() |
| | logger.info(f"Room URL: {args.url}") |
| | logger.info(f"Token: {args.token}") |
| | logger.info(f"Body provided: {bool(args.body)}") |
| | asyncio.run(main(args.url, args.token, args.body)) |