Spaces:
Runtime error
Runtime error
File size: 5,682 Bytes
d4f6849 2efe331 d4f6849 2efe331 1e0a76d 3f2f6d9 2efe331 3f2f6d9 2efe331 d4f6849 00662bb d4f6849 e234fba d4f6849 cc5642a e234fba cc5642a d4f6849 cc5642a 3f2f6d9 cc5642a d4f6849 cc5642a 3f2f6d9 cc5642a 9342992 d4f6849 cc5642a d4f6849 cc5642a d4f6849 cc5642a d4f6849 cc5642a d4f6849 33fe7cc cc5642a 9342992 33fe7cc a3aa6c1 ad024aa a3aa6c1 00662bb 3f2f6d9 cc5642a 33fe7cc cc5642a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
import os
import asyncio
import json
import websockets
from src.utils import OpenAIClient
from src.config import logger
from ._bot_actions_service import BotActionService
class WebSocketService:
def __init__(self):
self.openai_client = OpenAIClient
self.bot_action_service = BotActionService()
async def __aenter__(self):
return self
async def __aexit__(self, *args):
pass
async def handle_text_conversation_with_openai_websockets(
self,
client_websocket,
conversation_id,
user_id,
conversation_history,
redis_client,
):
openai_uri = (
os.getenv("OPENAI_WS_BASE_URL")
+ "/v1/realtime?model="
+ os.getenv("OPENAI_REALTIME_MODEL")
)
openai_websocket = None
async with websockets.connect(
uri=openai_uri,
additional_headers={
"Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}",
"OpenAI-Beta": "realtime=v1",
},
) as openai_websocket:
async def openai_receiver():
async for data in openai_websocket:
message = json.loads(data)
openai_server_events = {
"session_created": message["type"] == "session.created",
"function_call": (
message["type"] == "response.function_call_arguments.done"
),
"ai_response": (
message["type"] == "response.done"
and message["response"]["status"] == "completed"
and message["response"]["output"][0]["type"] == "message"
and message["response"]["output"][0]["role"] == "assistant"
),
}
if openai_server_events["function_call"]:
response = (
await self.bot_action_service.handle_ai_function_call(
data=message, user_id=user_id
)
)
event_response = {
"type": "conversation.item.create",
"previous_item_id": None,
"item": {
"type": "message",
"role": "user",
"content": [
{
"type": "input_text",
"text": response,
}
],
},
}
await openai_websocket.send(json.dumps(event_response))
event_response_2 = {"type": "response.create"}
await openai_websocket.send(json.dumps(event_response_2))
if openai_server_events["ai_response"]:
ai_response = message["response"]["output"][0]["content"][0][
"text"
]
await self.bot_action_service.handle_ai_message(
message_content=ai_response,
conversation_id=conversation_id,
)
await client_websocket.send_text(
json.dumps({"ai_message": ai_response})
)
elif message["type"] == "error":
logger.error(f"Error: {message['error']}")
async def openai_sender(message):
async with self.openai_client() as client:
create_session_event = await client.create_text_session_event()
await openai_websocket.send(json.dumps(create_session_event))
create_conversation_event = {
"type": "conversation.item.create",
"previous_item_id": None,
"item": {
"type": "message",
"role": "user",
"content": [
{
"type": "input_text",
"text": message,
}
],
},
}
await openai_websocket.send(json.dumps(create_conversation_event))
await openai_websocket.send(json.dumps({"type": "response.create"}))
receiver_task = asyncio.create_task(openai_receiver())
conversation_history_sent = False
while True:
user_message = await client_websocket.receive_text()
user_message = json.loads(user_message)["user_message"]
user_status = await redis_client.get(
f"session:{conversation_id}"
)
if not user_status:
raise Exception("A max 25 minute session has ended")
await self.bot_action_service.handle_user_message(
message_content=user_message,
conversation_id=conversation_id,
)
if not conversation_history_sent:
user_message = f"{conversation_history}\n{user_message}"
conversation_history_sent = True
await openai_sender(user_message)
|