File size: 15,207 Bytes
3973360
 
 
 
 
 
 
 
 
 
 
6d6ae78
 
 
 
3973360
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3526a8e
 
 
 
 
 
 
 
 
 
 
 
 
ef3c550
3973360
 
 
ef0145e
 
 
 
 
 
 
 
 
 
 
 
6b7a285
ef0145e
 
 
 
 
 
 
 
 
 
 
3973360
ef0145e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3973360
ef0145e
 
 
 
 
 
 
 
 
 
 
 
 
 
6b7a285
ef0145e
 
 
 
 
 
 
 
 
 
 
 
6b7a285
ef0145e
 
 
 
6b7a285
ef0145e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6b7a285
ef0145e
 
 
 
6b7a285
ef0145e
3973360
 
 
 
 
 
0171bb1
 
 
ef0145e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0171bb1
ef0145e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0171bb1
ef0145e
 
 
 
 
 
 
 
 
 
 
 
 
 
6b7a285
ef0145e
 
 
 
 
 
 
 
 
 
 
 
6b7a285
ef0145e
 
 
 
6b7a285
ef0145e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6b7a285
ef0145e
 
 
 
6b7a285
ef0145e
0171bb1
 
3973360
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3526a8e
 
3973360
3526a8e
6d6ae78
3973360
 
 
 
 
 
3526a8e
62a29c1
3526a8e
62a29c1
 
 
3526a8e
3973360
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
from langchain_core.messages import HumanMessage, AIMessage
from src.utils.mongo import chat_messages_history
from src.utils.logger import logger
from src.utils.mongo import chat_history_management_crud
from src.apis.interfaces.api_interface import Chat
from src.utils.helper import handle_validator_raise
from src.utils.redis import get_key_redis, set_key_redis
import json
from langchain_core.messages.ai import AIMessageChunk
from fastapi import BackgroundTasks
from fastapi.responses import JSONResponse
from src.langgraph.multi_agent.chat.chat_flow import ChatBot

app = ChatBot()
workflow = app()


@handle_validator_raise
def post_process_history(history):
    processed_history = []
    for entry in history:
        if entry["type"] == "human":
            processed_history.append(HumanMessage(content=entry["content"]))
        elif entry["type"] == "ai":
            processed_history.append(AIMessage(content=entry["content"]))
    return processed_history


async def save_history(user_id, human_message, ai_message, intent):
    messages_add_to_history = [HumanMessage(human_message), AIMessage(ai_message)]
    messages_add_to_history_dict = [
        {"type": "human", "content": human_message},
        {"type": "ai", "content": ai_message},
    ]
    messages_add_to_history_cache = {
        "message": messages_add_to_history_dict,
        "intent": intent,
    }
    history = chat_messages_history(user_id)
    await history.aadd_messages(messages_add_to_history)
    check_exist_history = await chat_history_management_crud.read_one(
        {"session_id": user_id}
    )
    if check_exist_history is None:
        await chat_history_management_crud.create(
            {"user_id": user_id, "session_id": user_id, "intent": intent}
        )
        logger.info("History created")
    else:
        await chat_history_management_crud.update(
            {"session_id": user_id}, {"intent": intent}
        )
        logger.info("History updated")
    history_cache = await get_key_redis(f"chat_history_{user_id}")
    if history_cache is not None:
        history_cache = eval(history_cache)
        history_cache["message"] = (
            history_cache["message"] + messages_add_to_history_dict
        )
        history_cache["intent"] = intent
        await set_key_redis(
            f"chat_history_{user_id}",
            str(history_cache),
        )
        return {"message": "History updated"}
    await set_key_redis(f"chat_history_{user_id}", str(messages_add_to_history_cache))
    return {"message": "History created"}


async def chat_streaming_function(user, data: Chat, background_tasks: BackgroundTasks):
    try:
        human_message = data.message
        history = data.history
        lat = data.lat
        long = data.long
        language = data.language
        logger.info(f"Language: {language}")

        try:
            process_history = post_process_history(history) if history is not None else None
        except Exception as e:
            logger.error(f"Error processing chat history: {str(e)}")
            yield json.dumps({"type": "error", "content": "Error processing chat history" + str(e)}, ensure_ascii=False)
            return

        config = {
            "configurable": {
                "user_id": user["id"],
                "user_email": user["email"],
                "contact_number": user["contact_number"],
                "session_id": user["id"],
                "lat": lat,
                "long": long,
            }
        }

        initial_input = {
            "messages": [("user", human_message)],
            "messages_history": process_history,
            "entry_message": None,
            "manual_save": False,
            "intent": data.intent,
            "language": language,
            "tool_name": None,
            "ever_leave_skill": False,
        }

        last_output_state = None
        temp = ""

        try:
            async for event in workflow.astream(
                input=initial_input,
                config=config,
                stream_mode=["messages", "values"],
            ):
                try:
                    event_type, event_message = event
                    if event_type == "messages":
                        message, metadata = event_message
                        if (
                            isinstance(message, AIMessageChunk)
                            and message.tool_calls
                            and message.tool_call_chunks[0]["name"] != "ClassifyUserIntent"
                        ):
                            tool_name = message.tool_call_chunks[0]["name"]
                            message_yield = json.dumps(
                                {"type": "tool_call", "content": tool_name}, ensure_ascii=False
                            )
                            print(message_yield)
                            yield message_yield

                        if metadata["langgraph_node"] in [
                            "primary_assistant",
                            "scheduling_agent",
                            "book_hotel_agent",
                        ]:
                            if message.content:
                                temp += message.content
                                message_yield = json.dumps(
                                    {"type": "message", "content": temp}, ensure_ascii=False
                                )
                                print(message_yield)
                                yield message_yield
                    if event_type == "values":
                        last_output_state = event_message
                except Exception as e:
                    logger.error(f"Error processing stream event: {str(e)}")
                    yield json.dumps({"type": "error", "content": "Error processing response" + str(e)}, ensure_ascii=False)
                    return

            if last_output_state is None:
                raise ValueError("No output state received from workflow")

            final_ai_output = last_output_state["messages"][-1].content
            final_intent = last_output_state["intent"]
            tool_name_important = last_output_state["tool_name"]

            final_response = json.dumps(
                {
                    "type": "final",
                    "content": final_ai_output,
                    "intent": final_intent,
                    "tool_name": tool_name_important,
                },
                ensure_ascii=False,
            )
            yield final_response

        except Exception as e:
            logger.error(f"Error in workflow stream processing: {str(e)}")
            yield json.dumps({"type": "error", "content": "Error processing chat stream" + str(e)}, ensure_ascii=False)
            return

    except Exception as e:
        logger.error(f"Unexpected error in chat streaming: {str(e)}")
        yield json.dumps({"type": "error", "content": "An unexpected error occurred" + str(e)}, ensure_ascii=False)
        return

    background_tasks.add_task(
        save_history, user["id"], human_message, final_ai_output, final_intent
    )


async def chat_streaming_no_login_function(

    data: Chat, background_tasks: BackgroundTasks

):
    try:
        human_message = data.message
        history = data.history
        lat = data.lat
        long = data.long
        language = data.language
        logger.info(f"Language: {language}")

        try:
            process_history = post_process_history(history) if history is not None else None
        except Exception as e:
            logger.error(f"Error processing chat history: {str(e)}")
            yield json.dumps({"type": "error", "content": "Error processing chat history"}, ensure_ascii=False) + "\n"
            return

        config = {
            "configurable": {
                "user_id": None,
                "user_email": None,
                "contact_number": None,
                "session_id": None,
                "lat": lat,
                "long": long,
            }
        }

        initial_input = {
            "messages": [("user", human_message)],
            "messages_history": process_history,
            "entry_message": None,
            "manual_save": False,
            "intent": data.intent,
            "language": language,
            "tool_name": None,
            "ever_leave_skill": False,
        }

        last_output_state = None
        temp = ""

        try:
            async for event in workflow.astream(
                input=initial_input,
                config=config,
                stream_mode=["messages", "values"],
            ):
                try:
                    event_type, event_message = event
                    if event_type == "messages":
                        message, metadata = event_message
                        if (
                            isinstance(message, AIMessageChunk)
                            and message.tool_calls
                            and message.tool_call_chunks[0]["name"] != "ClassifyUserIntent"
                        ):
                            tool_name = message.tool_call_chunks[0]["name"]
                            message_yield = json.dumps(
                                {"type": "tool_call", "content": tool_name}, ensure_ascii=False
                            )
                            print(message_yield)
                            yield message_yield

                        if metadata["langgraph_node"] in [
                            "primary_assistant",
                            "scheduling_agent",
                            "book_hotel_agent",
                        ]:
                            if message.content:
                                temp += message.content
                                message_yield = json.dumps(
                                    {"type": "message", "content": temp}, ensure_ascii=False
                                )
                                print(message_yield)
                                yield message_yield
                    if event_type == "values":
                        last_output_state = event_message
                except Exception as e:
                    logger.error(f"Error processing stream event: {str(e)}")
                    yield json.dumps({"type": "error", "content": "Error processing response" + str(e)}, ensure_ascii=False)
                    return

            if last_output_state is None:
                raise ValueError("No output state received from workflow")

            final_ai_output = last_output_state["messages"][-1].content
            final_intent = last_output_state["intent"]
            tool_name_important = last_output_state["tool_name"]

            final_response = json.dumps(
                {
                    "type": "final",
                    "content": final_ai_output,
                    "intent": final_intent,
                    "tool_name": tool_name_important,
                },
                ensure_ascii=False,
            )
            yield final_response

        except Exception as e:
            logger.error(f"Error in workflow stream processing: {str(e)}")
            yield json.dumps({"type": "error", "content": "Error processing chat stream" + str(e)}, ensure_ascii=False)
            return

    except Exception as e:
        logger.error(f"Unexpected error in chat streaming: {str(e)}")
        yield json.dumps({"type": "error", "content": "An unexpected error occurred" + str(e)}, ensure_ascii=False)
        return


async def chat_function(user, data: Chat, background_tasks: BackgroundTasks):
    message = data.message
    history = data.history
    lat = data.lat
    long = data.long
    language = data.language
    logger.info(f"Language: {language}")
    process_history = post_process_history(history) if history is not None else None
    config = {
        "configurable": {
            "user_id": user["id"],
            "user_email": user["email"],
            "contact_number": user["contact_number"],
            "session_id": user["id"],
            "lat": lat,
            "long": long,
        }
    }
    # try:
    initial_input = {
        "messages": [("user", message)],
        "messages_history": process_history,
        "entry_message": None,
        "manual_save": False,
        "intent": data.intent,
        "language": language,
        "tool_name": None,
    }
    output = await workflow.ainvoke(initial_input, config)

    final_ai_output = output["messages"][-1].content
    final_intent = output["intent"]
    tool_name = output["tool_name"]

    if final_ai_output is None:
        return JSONResponse(
            content={"message": "Error in chat_function"}, status_code=500
        )
    background_tasks.add_task(
        save_history, user["id"], data.message, final_ai_output, final_intent
    )

    response_ouput = {
        "message": final_ai_output,
        "intent": final_intent,
        "tool_name": tool_name,
    }
    return JSONResponse(content=response_ouput, status_code=200)


async def get_intent_function(session_id):
    record = await chat_history_management_crud.read_one({"session_id": session_id})
    if record is None:
        return None

    return record["intent"]


async def get_history_function(user_id, background_tasks: BackgroundTasks):
    history = chat_messages_history(user_id, 50)
    try:
        history_messages = await get_key_redis(f"chat_history_{user_id}")
        history_messages = None
        if not history_messages:
            logger.info("History not found in redis")
            history_messages = await history.aget_messages()
            history_messages = [
                i.model_dump(include=["type", "content"]) for i in history_messages
            ]
            intent = await get_intent_function(user_id)
            logger.info(f"INTENT {intent}")
            result = {"message": history_messages, "intent": intent}
            background_tasks.add_task(
                set_key_redis, f"chat_history_{user_id}", str(result)
            )
            return result
        history_messages = eval(history_messages)
        return history_messages
    except Exception as e:
        logger.error(f"Error in get_history_function: {e}")
        return {"message": [], "intent": None}


async def list_chat_history_function(user_id: str):
    result = await chat_history_management_crud.read({"user_id": user_id})
    if result is None:
        return []
    result = [i["session_id"] for i in result]
    return result


async def delete_chat_history_function(session_id: str):
    history = chat_messages_history(session_id, 50)
    await history.aclear()
    return {"message": "Chat history has been deleted"}