File size: 6,984 Bytes
8cb7ff4
7f5fade
 
 
 
 
cebae6b
2a02af0
7f5fade
 
 
 
 
 
cebae6b
 
 
7f5fade
 
 
 
 
 
 
8cb7ff4
 
7f5fade
 
 
 
 
 
 
 
 
 
 
 
8cb7ff4
7f5fade
 
 
 
 
 
 
 
 
 
 
 
 
8cb7ff4
7f5fade
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8cb7ff4
7f5fade
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8cb7ff4
7f5fade
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8cb7ff4
7f5fade
 
 
 
 
 
 
 
 
8cb7ff4
 
 
 
 
7f5fade
 
 
 
 
 
 
8cb7ff4
 
 
6761fa6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
import gradio as gr
import os
import string
from pymongo import MongoClient
from openai import AsyncOpenAI, OpenAI
import copy

from constants import *
import asyncio
import string as st
from opik.integrations.openai import track_openai
from opik import track
from bson.objectid import ObjectId
import opik
from textwrap import dedent


oClient = opik.Opik()
mdb = MongoClient(
    os.getenv("MONGO_URI")
)  # , "mongodb://localhost:27017/"))  # Default to localhost if not set
aclient = track_openai(AsyncOpenAI())
client = track_openai(OpenAI())
db = mdb["Mindware"]


def purge(d):
    """
    Recursively collect all leaf nodes.
    """
    result = {}
    for k, v in d.items():
        if k == "chat_history":
            pass
        if isinstance(v, dict):
            result.update(purge(v))
        elif isinstance(v, list):
            for idx, d in enumerate(v):

                if isinstance(d, dict):
                    try:
                        for k1 in d.keys():
                            result[k1] = []
                        for k2, v2 in d.items():
                            result[k2].append(v2)
                    except Exception as e:
                        print("Error! Error!", e)
                        if k not in result.keys():
                            result[k] = []
                        result[k].append(d)
                else:
                    result[k] = v

            else:
                result[k] = v
        else:
            result[k] = v
    return result


def deploy(d):
    """
    Recursively deploy all leaf nodes.
    """
    result = {}
    result.update(purge(d))

    return result


async def chat(prompt, model="gpt-4"):
    text = await aclient.chat.completions.create(
        model=model, messages=[{"role": "user", "content": prompt}]
    )
    return text.choices[0].message.content


async def chat_generator(prompt: str, user: dict = None):
    response = await aclient.chat.completions.create(
        model="gpt-4",
        messages=[
            {
                "role": "user",
                "content": prompt.format(**deploy(user)) if user else prompt,
            }
        ],
        stream=True,
    )
    # reply_chunks = []
    async for chunk in response:
        if chunk and chunk.choices[0].delta.content:

            # reply_chunks.append(chunk.choices[0].delta.content)
            yield chunk.choices[0].delta.content


def get_or_init_user(reply, userId):

    users = list(db["users"].find({"userId": userId}))
    if not users:
        user = dict(**copy.deepcopy(USER_TEMPLATE))
        user.update({"userId": userId})
        user.update({"user_query": reply})
        print("user created:", user)

        # user.update({"chat_history": history})

    else:
        user = users[0]
        user.pop("_id")
        user = dict(users[0])
        user.update({"user_query": reply})

    return user


async def search(query, n=5):
    embed = await aclient.embeddings.create(input=query, model="text-embedding-3-small")

    query_embedding = embed.data[0].embedding
    pipeline = [
        {
            "$vectorSearch": {
                "queryVector": query_embedding,
                "path": "embedding",
                "index": "arrestor",
                "score": {"$meta": "vectorSearchScore"},
                "filter": {"class": "THERAPIST"},
                "numCandidates": 850,
                "limit": n,
            }
        }
    ]

    projection = [{"$project": {"embedding": 0}}]

    pipeline += projection

    docs = db["runway"].aggregate(pipeline)
    return list(docs)


async def agentic_search(query, n=5):
    results = await search(query, n * 5)
    tasks = [chat(ARAG_PROMPT.format(query=query, doc=doc)) for doc in results]
    is_context = asyncio.gather(*tasks)

    docs = []
    for doc, reply in zip(results, is_context):
        if reply == "True":
            docs.append(doc)
            if len(docs) >= n:
                break

    return list(docs)


async def update_user(response, user):
    user["last_question"] = response
    user["chat_history"].append({"role": "user", "content": user["user_query"]})
    user["chat_history"].append({"role": "assistant", "content": response})
    db["users"].delete_many({"userId": user["userId"]})
    db["users"].insert_one(user)

    print("Updated", user["userId"], user["user_query"], "reply:", response)


async def add_background_tasks(task):
    """a dummy wrapper to be replaced with FastAPI background tasks"""
    await task


punc_removal = str.maketrans("", "", string.punctuation.replace("_", ""))


async def escalate(user):
    print(f"user {user.get('name')} is not working, escalating to clinician")


async def update_docs(user):
    print(f"updating docs for {user['name']}: {user['cache']}")


@track
async def handle_chat(reply, userId):
    """
    Handle the chat response and update the user
    """
    user = get_or_init_user(reply, userId)
    prompt = BASE_PROMPT
    tasks = [
        chat(prompt=p.format(**deploy(user)))
        for p in [INTENT_PROMPT, RISK_PROMPT, CACHE_PROMPT, INTENSITY_PROMPT]
    ]
    responses = await asyncio.gather(*tasks)

    if responses[2]:
        user["cache"] = responses[2]
        await add_background_tasks(update_docs(user))

    intent = responses[0].upper().translate(punc_removal).replace(" ", "_")

    if intent == "ACTIVE_SPEAKING":
        prompt += SPEAKING_PROMPT
    elif intent == "VALIDATION_SEEK":
        prompt += VALIDATION_PROMPT
    elif intent == "OVERWHELMED":
        prompt += OVERWHELMED_PROMPT
        await asyncio.sleep(5)
    elif intent == "REMOTE_REFERRAL":
        results = await search(user["cache"], n=5)
        prompt += REMOTE_PROMPT.format(results=results, **deploy(user))
    elif intent == "NEUTRAL_STOP":
        prompt += STOP_PROMPT
    elif intent == "END_OF_NARRATIVE":
        prompt += END_PROMPT
    else:
        print("Unknown response of intent detection:", responses[0])

    if responses[1].upper().translate(punc_removal).replace(" ", "_") == "HIGH_RISK":
        prompt += HIGH_RISK_PROMPT
        await add_background_tasks(escalate(user))

    response = ""
    async for word in chat_generator(prompt, user):
        if word:
            response += word
            yield word

    await add_background_tasks(update_user(response, user))

    return


async def respond(message, history, id):
    """
    Respond to the chat message and return the response.
    """
    reply = ""
    async for r in handle_chat(message, id):
        if r:
            reply += r
            yield reply


"""
For information on how to customize the ChatInterface, peruse the gradio docs: https://www.gradio.app/docs/chatinterface
"""
with gr.Blocks() as demo:
    id = gr.Textbox(str(ObjectId()), label="userID")
    gr.ChatInterface(
        fn=respond,
        type="messages",
        additional_inputs=[id],
    )


if __name__ == "__main__":
    demo.launch(ssr_mode=False)