import os import asyncio import json import websockets from src.utils import OpenAIClient from src.config import logger from ._bot_actions_service import BotActionService class WebSocketService: def __init__(self): self.openai_client = OpenAIClient self.bot_action_service = BotActionService() async def __aenter__(self): return self async def __aexit__(self, *args): pass async def handle_text_conversation_with_openai_websockets( self, client_websocket, conversation_id, user_id, conversation_history, redis_client, ): openai_uri = ( os.getenv("OPENAI_WS_BASE_URL") + "/v1/realtime?model=" + os.getenv("OPENAI_REALTIME_MODEL") ) openai_websocket = None async with websockets.connect( uri=openai_uri, additional_headers={ "Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}", "OpenAI-Beta": "realtime=v1", }, ) as openai_websocket: async def openai_receiver(): async for data in openai_websocket: message = json.loads(data) openai_server_events = { "session_created": message["type"] == "session.created", "function_call": ( message["type"] == "response.function_call_arguments.done" ), "ai_response": ( message["type"] == "response.done" and message["response"]["status"] == "completed" and message["response"]["output"][0]["type"] == "message" and message["response"]["output"][0]["role"] == "assistant" ), } if openai_server_events["function_call"]: response = ( await self.bot_action_service.handle_ai_function_call( data=message, user_id=user_id ) ) event_response = { "type": "conversation.item.create", "previous_item_id": None, "item": { "type": "message", "role": "user", "content": [ { "type": "input_text", "text": response, } ], }, } await openai_websocket.send(json.dumps(event_response)) event_response_2 = {"type": "response.create"} await openai_websocket.send(json.dumps(event_response_2)) if openai_server_events["ai_response"]: ai_response = message["response"]["output"][0]["content"][0][ "text" ] await self.bot_action_service.handle_ai_message( message_content=ai_response, conversation_id=conversation_id, ) await client_websocket.send_text( json.dumps({"ai_message": ai_response}) ) elif message["type"] == "error": logger.error(f"Error: {message['error']}") async def openai_sender(message): async with self.openai_client() as client: create_session_event = await client.create_text_session_event() await openai_websocket.send(json.dumps(create_session_event)) create_conversation_event = { "type": "conversation.item.create", "previous_item_id": None, "item": { "type": "message", "role": "user", "content": [ { "type": "input_text", "text": message, } ], }, } await openai_websocket.send(json.dumps(create_conversation_event)) await openai_websocket.send(json.dumps({"type": "response.create"})) receiver_task = asyncio.create_task(openai_receiver()) conversation_history_sent = False while True: user_message = await client_websocket.receive_text() user_message = json.loads(user_message)["user_message"] user_status = await redis_client.get( f"session:{conversation_id}" ) if not user_status: raise Exception("A max 25 minute session has ended") await self.bot_action_service.handle_user_message( message_content=user_message, conversation_id=conversation_id, ) if not conversation_history_sent: user_message = f"{conversation_history}\n{user_message}" conversation_history_sent = True await openai_sender(user_message)