keepme-backend / src /services /_websocket_service.py
ramanjitsingh1368's picture
refactor WebRTC and WebSocket services to remove user_id dependency from session management
ad024aa
import os
import asyncio
import json
import websockets
from src.utils import OpenAIClient
from src.config import logger
from ._bot_actions_service import BotActionService
class WebSocketService:
def __init__(self):
self.openai_client = OpenAIClient
self.bot_action_service = BotActionService()
async def __aenter__(self):
return self
async def __aexit__(self, *args):
pass
async def handle_text_conversation_with_openai_websockets(
self,
client_websocket,
conversation_id,
user_id,
conversation_history,
redis_client,
):
openai_uri = (
os.getenv("OPENAI_WS_BASE_URL")
+ "/v1/realtime?model="
+ os.getenv("OPENAI_REALTIME_MODEL")
)
openai_websocket = None
async with websockets.connect(
uri=openai_uri,
additional_headers={
"Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}",
"OpenAI-Beta": "realtime=v1",
},
) as openai_websocket:
async def openai_receiver():
async for data in openai_websocket:
message = json.loads(data)
openai_server_events = {
"session_created": message["type"] == "session.created",
"function_call": (
message["type"] == "response.function_call_arguments.done"
),
"ai_response": (
message["type"] == "response.done"
and message["response"]["status"] == "completed"
and message["response"]["output"][0]["type"] == "message"
and message["response"]["output"][0]["role"] == "assistant"
),
}
if openai_server_events["function_call"]:
response = (
await self.bot_action_service.handle_ai_function_call(
data=message, user_id=user_id
)
)
event_response = {
"type": "conversation.item.create",
"previous_item_id": None,
"item": {
"type": "message",
"role": "user",
"content": [
{
"type": "input_text",
"text": response,
}
],
},
}
await openai_websocket.send(json.dumps(event_response))
event_response_2 = {"type": "response.create"}
await openai_websocket.send(json.dumps(event_response_2))
if openai_server_events["ai_response"]:
ai_response = message["response"]["output"][0]["content"][0][
"text"
]
await self.bot_action_service.handle_ai_message(
message_content=ai_response,
conversation_id=conversation_id,
)
await client_websocket.send_text(
json.dumps({"ai_message": ai_response})
)
elif message["type"] == "error":
logger.error(f"Error: {message['error']}")
async def openai_sender(message):
async with self.openai_client() as client:
create_session_event = await client.create_text_session_event()
await openai_websocket.send(json.dumps(create_session_event))
create_conversation_event = {
"type": "conversation.item.create",
"previous_item_id": None,
"item": {
"type": "message",
"role": "user",
"content": [
{
"type": "input_text",
"text": message,
}
],
},
}
await openai_websocket.send(json.dumps(create_conversation_event))
await openai_websocket.send(json.dumps({"type": "response.create"}))
receiver_task = asyncio.create_task(openai_receiver())
conversation_history_sent = False
while True:
user_message = await client_websocket.receive_text()
user_message = json.loads(user_message)["user_message"]
user_status = await redis_client.get(
f"session:{conversation_id}"
)
if not user_status:
raise Exception("A max 25 minute session has ended")
await self.bot_action_service.handle_user_message(
message_content=user_message,
conversation_id=conversation_id,
)
if not conversation_history_sent:
user_message = f"{conversation_history}\n{user_message}"
conversation_history_sent = True
await openai_sender(user_message)