Spaces:
Runtime error
Runtime error
| import os | |
| import asyncio | |
| import json | |
| import websockets | |
| from src.utils import OpenAIClient | |
| from src.config import logger | |
| from ._bot_actions_service import BotActionService | |
| class WebSocketService: | |
| def __init__(self): | |
| self.openai_client = OpenAIClient | |
| self.bot_action_service = BotActionService() | |
| async def __aenter__(self): | |
| return self | |
| async def __aexit__(self, *args): | |
| pass | |
| async def handle_text_conversation_with_openai_websockets( | |
| self, | |
| client_websocket, | |
| conversation_id, | |
| user_id, | |
| conversation_history, | |
| redis_client, | |
| ): | |
| openai_uri = ( | |
| os.getenv("OPENAI_WS_BASE_URL") | |
| + "/v1/realtime?model=" | |
| + os.getenv("OPENAI_REALTIME_MODEL") | |
| ) | |
| openai_websocket = None | |
| async with websockets.connect( | |
| uri=openai_uri, | |
| additional_headers={ | |
| "Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}", | |
| "OpenAI-Beta": "realtime=v1", | |
| }, | |
| ) as openai_websocket: | |
| async def openai_receiver(): | |
| async for data in openai_websocket: | |
| message = json.loads(data) | |
| openai_server_events = { | |
| "session_created": message["type"] == "session.created", | |
| "function_call": ( | |
| message["type"] == "response.function_call_arguments.done" | |
| ), | |
| "ai_response": ( | |
| message["type"] == "response.done" | |
| and message["response"]["status"] == "completed" | |
| and message["response"]["output"][0]["type"] == "message" | |
| and message["response"]["output"][0]["role"] == "assistant" | |
| ), | |
| } | |
| if openai_server_events["function_call"]: | |
| response = ( | |
| await self.bot_action_service.handle_ai_function_call( | |
| data=message, user_id=user_id | |
| ) | |
| ) | |
| event_response = { | |
| "type": "conversation.item.create", | |
| "previous_item_id": None, | |
| "item": { | |
| "type": "message", | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "input_text", | |
| "text": response, | |
| } | |
| ], | |
| }, | |
| } | |
| await openai_websocket.send(json.dumps(event_response)) | |
| event_response_2 = {"type": "response.create"} | |
| await openai_websocket.send(json.dumps(event_response_2)) | |
| if openai_server_events["ai_response"]: | |
| ai_response = message["response"]["output"][0]["content"][0][ | |
| "text" | |
| ] | |
| await self.bot_action_service.handle_ai_message( | |
| message_content=ai_response, | |
| conversation_id=conversation_id, | |
| ) | |
| await client_websocket.send_text( | |
| json.dumps({"ai_message": ai_response}) | |
| ) | |
| elif message["type"] == "error": | |
| logger.error(f"Error: {message['error']}") | |
| async def openai_sender(message): | |
| async with self.openai_client() as client: | |
| create_session_event = await client.create_text_session_event() | |
| await openai_websocket.send(json.dumps(create_session_event)) | |
| create_conversation_event = { | |
| "type": "conversation.item.create", | |
| "previous_item_id": None, | |
| "item": { | |
| "type": "message", | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "input_text", | |
| "text": message, | |
| } | |
| ], | |
| }, | |
| } | |
| await openai_websocket.send(json.dumps(create_conversation_event)) | |
| await openai_websocket.send(json.dumps({"type": "response.create"})) | |
| receiver_task = asyncio.create_task(openai_receiver()) | |
| conversation_history_sent = False | |
| while True: | |
| user_message = await client_websocket.receive_text() | |
| user_message = json.loads(user_message)["user_message"] | |
| user_status = await redis_client.get( | |
| f"session:{conversation_id}" | |
| ) | |
| if not user_status: | |
| raise Exception("A max 25 minute session has ended") | |
| await self.bot_action_service.handle_user_message( | |
| message_content=user_message, | |
| conversation_id=conversation_id, | |
| ) | |
| if not conversation_history_sent: | |
| user_message = f"{conversation_history}\n{user_message}" | |
| conversation_history_sent = True | |
| await openai_sender(user_message) | |