File size: 5,682 Bytes
d4f6849
 
2efe331
d4f6849
2efe331
 
 
1e0a76d
3f2f6d9
2efe331
 
 
 
 
3f2f6d9
2efe331
 
 
 
 
 
 
d4f6849
00662bb
 
 
 
 
 
d4f6849
e234fba
 
 
 
 
d4f6849
cc5642a
e234fba
cc5642a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d4f6849
cc5642a
3f2f6d9
 
 
 
 
cc5642a
 
 
 
 
 
 
 
 
 
 
 
 
d4f6849
cc5642a
 
 
 
 
 
 
 
3f2f6d9
cc5642a
 
 
9342992
 
 
d4f6849
cc5642a
 
d4f6849
cc5642a
 
 
 
d4f6849
cc5642a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d4f6849
cc5642a
d4f6849
33fe7cc
cc5642a
 
9342992
33fe7cc
a3aa6c1
ad024aa
a3aa6c1
00662bb
 
 
3f2f6d9
cc5642a
 
 
33fe7cc
 
 
 
 
cc5642a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import os
import asyncio
import json
import websockets

from src.utils import OpenAIClient
from src.config import logger

from ._bot_actions_service import BotActionService


class WebSocketService:
    def __init__(self):
        self.openai_client = OpenAIClient
        self.bot_action_service = BotActionService()

    async def __aenter__(self):
        return self

    async def __aexit__(self, *args):
        pass

    async def handle_text_conversation_with_openai_websockets(
        self,
        client_websocket,
        conversation_id,
        user_id,
        conversation_history,
        redis_client,
    ):
        openai_uri = (
            os.getenv("OPENAI_WS_BASE_URL")
            + "/v1/realtime?model="
            + os.getenv("OPENAI_REALTIME_MODEL")
        )
        openai_websocket = None
        async with websockets.connect(
            uri=openai_uri,
            additional_headers={
                "Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}",
                "OpenAI-Beta": "realtime=v1",
            },
        ) as openai_websocket:

            async def openai_receiver():
                async for data in openai_websocket:
                    message = json.loads(data)

                    openai_server_events = {
                        "session_created": message["type"] == "session.created",
                        "function_call": (
                            message["type"] == "response.function_call_arguments.done"
                        ),
                        "ai_response": (
                            message["type"] == "response.done"
                            and message["response"]["status"] == "completed"
                            and message["response"]["output"][0]["type"] == "message"
                            and message["response"]["output"][0]["role"] == "assistant"
                        ),
                    }

                    if openai_server_events["function_call"]:
                        response = (
                            await self.bot_action_service.handle_ai_function_call(
                                data=message, user_id=user_id
                            )
                        )
                        event_response = {
                            "type": "conversation.item.create",
                            "previous_item_id": None,
                            "item": {
                                "type": "message",
                                "role": "user",
                                "content": [
                                    {
                                        "type": "input_text",
                                        "text": response,
                                    }
                                ],
                            },
                        }
                        await openai_websocket.send(json.dumps(event_response))
                        event_response_2 = {"type": "response.create"}
                        await openai_websocket.send(json.dumps(event_response_2))

                    if openai_server_events["ai_response"]:
                        ai_response = message["response"]["output"][0]["content"][0][
                            "text"
                        ]
                        await self.bot_action_service.handle_ai_message(
                            message_content=ai_response,
                            conversation_id=conversation_id,
                        )
                        await client_websocket.send_text(
                            json.dumps({"ai_message": ai_response})
                        )

                    elif message["type"] == "error":
                        logger.error(f"Error: {message['error']}")

            async def openai_sender(message):
                async with self.openai_client() as client:
                    create_session_event = await client.create_text_session_event()
                    await openai_websocket.send(json.dumps(create_session_event))

                create_conversation_event = {
                    "type": "conversation.item.create",
                    "previous_item_id": None,
                    "item": {
                        "type": "message",
                        "role": "user",
                        "content": [
                            {
                                "type": "input_text",
                                "text": message,
                            }
                        ],
                    },
                }
                await openai_websocket.send(json.dumps(create_conversation_event))
                await openai_websocket.send(json.dumps({"type": "response.create"}))

            receiver_task = asyncio.create_task(openai_receiver())

            conversation_history_sent = False
            while True:
                user_message = await client_websocket.receive_text()
                user_message = json.loads(user_message)["user_message"]

                user_status = await redis_client.get(
                    f"session:{conversation_id}"
                )
                if not user_status:
                    raise Exception("A max 25 minute session has ended")

                await self.bot_action_service.handle_user_message(
                    message_content=user_message,
                    conversation_id=conversation_id,
                )

                if not conversation_history_sent:
                    user_message = f"{conversation_history}\n{user_message}"
                    conversation_history_sent = True

                await openai_sender(user_message)