test1 / bot.py
MFF212's picture
Update bot.py
9cd569a verified
import os
import asyncio
from fastapi import FastAPI, WebSocket
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import (
LocalSmartTurnAnalyzerV3,
)
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import parse_telephony_websocket
from pipecat.serializers.exotel import ExotelFrameSerializer
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport
from pipecat.transports.websocket.fastapi import (
FastAPIWebsocketParams,
FastAPIWebsocketTransport,
)
from pipecat.turns.user_stop import TurnAnalyzerUserTurnStopStrategy
from pipecat.turns.user_turn_strategies import UserTurnStrategies
# -------------------------------------------------------------------
# ENV
# -------------------------------------------------------------------
load_dotenv(override=True)
# -------------------------------------------------------------------
# FASTAPI APP
# -------------------------------------------------------------------
app = FastAPI()
@app.get("/health")
async def health():
return {"status": "ok"}
# -------------------------------------------------------------------
# CORE BOT LOGIC (unchanged, just wrapped)
# -------------------------------------------------------------------
async def run_bot(transport: BaseTransport, handle_sigint: bool):
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="07bc462a-c644-49f1-baf7-82d5599131be",
)
messages = [
{
"role": "system",
"content": """<YOUR FULL SYSTEM PROMPT EXACTLY AS YOU WROTE IT>""",
}
]
context = LLMContext(messages)
user_agg, assistant_agg = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
user_turn_strategies=UserTurnStrategies(
stop=[
TurnAnalyzerUserTurnStopStrategy(
turn_analyzer=LocalSmartTurnAnalyzerV3()
)
]
),
vad_analyzer=SileroVADAnalyzer(
params=VADParams(stop_secs=0.2)
),
),
)
pipeline = Pipeline(
[
transport.input(),
stt,
user_agg,
llm,
tts,
transport.output(),
assistant_agg,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
audio_in_sample_rate=8000,
audio_out_sample_rate=8000,
enable_metrics=True,
enable_usage_metrics=True,
),
)
@transport.event_handler("on_client_connected")
async def on_connect(transport, client):
messages.append(
{
"role": "system",
"content": "హలో, నేను స్పందనా స్ఫూర్తి నుంచి ఐషా మాట్లాడుతున్నాను. నేను ప్రజ్వల్ గారితో మాట్లాడుతున్నానా?",
}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_disconnect(transport, client):
await task.cancel()
runner = PipelineRunner(handle_sigint=handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
transport_type, call_data = await parse_telephony_websocket(
runner_args.websocket
)
logger.info(f"Transport detected: {transport_type}")
serializer = ExotelFrameSerializer(
stream_sid=call_data["stream_id"],
call_sid=call_data["call_id"],
)
transport = FastAPIWebsocketTransport(
websocket=runner_args.websocket,
params=FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
add_wav_header=False,
serializer=serializer,
),
)
await run_bot(transport, handle_sigint=False)
# -------------------------------------------------------------------
# WEBSOCKET ENDPOINT (/media)
# -------------------------------------------------------------------
@app.websocket("/media")
async def media_ws(websocket: WebSocket):
await websocket.accept()
runner_args = RunnerArguments(
websocket=websocket,
handle_sigint=False,
)
await bot(runner_args)