import os import asyncio from fastapi import FastAPI, WebSocket from dotenv import load_dotenv from loguru import logger from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import ( LocalSmartTurnAnalyzerV3, ) from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams from pipecat.frames.frames import LLMRunFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.llm_response_universal import ( LLMContextAggregatorPair, LLMUserAggregatorParams, ) from pipecat.runner.types import RunnerArguments from pipecat.runner.utils import parse_telephony_websocket from pipecat.serializers.exotel import ExotelFrameSerializer from pipecat.services.cartesia.tts import CartesiaTTSService from pipecat.services.deepgram.stt import DeepgramSTTService from pipecat.services.openai.llm import OpenAILLMService from pipecat.transports.base_transport import BaseTransport from pipecat.transports.websocket.fastapi import ( FastAPIWebsocketParams, FastAPIWebsocketTransport, ) from pipecat.turns.user_stop import TurnAnalyzerUserTurnStopStrategy from pipecat.turns.user_turn_strategies import UserTurnStrategies # ------------------------------------------------------------------- # ENV # ------------------------------------------------------------------- load_dotenv(override=True) # ------------------------------------------------------------------- # FASTAPI APP # ------------------------------------------------------------------- app = FastAPI() @app.get("/health") async def health(): return {"status": "ok"} # ------------------------------------------------------------------- # CORE BOT LOGIC (unchanged, just wrapped) # ------------------------------------------------------------------- async def run_bot(transport: BaseTransport, handle_sigint: bool): llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) tts = CartesiaTTSService( api_key=os.getenv("CARTESIA_API_KEY"), voice_id="07bc462a-c644-49f1-baf7-82d5599131be", ) messages = [ { "role": "system", "content": """""", } ] context = LLMContext(messages) user_agg, assistant_agg = LLMContextAggregatorPair( context, user_params=LLMUserAggregatorParams( user_turn_strategies=UserTurnStrategies( stop=[ TurnAnalyzerUserTurnStopStrategy( turn_analyzer=LocalSmartTurnAnalyzerV3() ) ] ), vad_analyzer=SileroVADAnalyzer( params=VADParams(stop_secs=0.2) ), ), ) pipeline = Pipeline( [ transport.input(), stt, user_agg, llm, tts, transport.output(), assistant_agg, ] ) task = PipelineTask( pipeline, params=PipelineParams( audio_in_sample_rate=8000, audio_out_sample_rate=8000, enable_metrics=True, enable_usage_metrics=True, ), ) @transport.event_handler("on_client_connected") async def on_connect(transport, client): messages.append( { "role": "system", "content": "హలో, నేను స్పందనా స్ఫూర్తి నుంచి ఐషా మాట్లాడుతున్నాను. నేను ప్రజ్వల్ గారితో మాట్లాడుతున్నానా?", } ) await task.queue_frames([LLMRunFrame()]) @transport.event_handler("on_client_disconnected") async def on_disconnect(transport, client): await task.cancel() runner = PipelineRunner(handle_sigint=handle_sigint) await runner.run(task) async def bot(runner_args: RunnerArguments): transport_type, call_data = await parse_telephony_websocket( runner_args.websocket ) logger.info(f"Transport detected: {transport_type}") serializer = ExotelFrameSerializer( stream_sid=call_data["stream_id"], call_sid=call_data["call_id"], ) transport = FastAPIWebsocketTransport( websocket=runner_args.websocket, params=FastAPIWebsocketParams( audio_in_enabled=True, audio_out_enabled=True, add_wav_header=False, serializer=serializer, ), ) await run_bot(transport, handle_sigint=False) # ------------------------------------------------------------------- # WEBSOCKET ENDPOINT (/media) # ------------------------------------------------------------------- @app.websocket("/media") async def media_ws(websocket: WebSocket): await websocket.accept() runner_args = RunnerArguments( websocket=websocket, handle_sigint=False, ) await bot(runner_args)