Spaces:
Running
Running
| from langchain_core.messages import HumanMessage, AIMessage | |
| from src.utils.mongo import chat_messages_history | |
| from src.utils.logger import logger | |
| from src.utils.mongo import chat_history_management_crud | |
| from src.apis.interfaces.api_interface import Chat | |
| from src.utils.helper import handle_validator_raise | |
| from src.utils.redis import get_key_redis, set_key_redis | |
| import json | |
| from langchain_core.messages.ai import AIMessageChunk | |
| from fastapi import BackgroundTasks | |
| from fastapi.responses import JSONResponse | |
| from src.langgraph.multi_agent.chat.chat_flow import ChatBot | |
| app = ChatBot() | |
| workflow = app() | |
| def post_process_history(history): | |
| processed_history = [] | |
| for entry in history: | |
| if entry["type"] == "human": | |
| processed_history.append(HumanMessage(content=entry["content"])) | |
| elif entry["type"] == "ai": | |
| processed_history.append(AIMessage(content=entry["content"])) | |
| return processed_history | |
| async def save_history(user_id, human_message, ai_message, intent): | |
| messages_add_to_history = [HumanMessage(human_message), AIMessage(ai_message)] | |
| messages_add_to_history_dict = [ | |
| {"type": "human", "content": human_message}, | |
| {"type": "ai", "content": ai_message}, | |
| ] | |
| messages_add_to_history_cache = { | |
| "message": messages_add_to_history_dict, | |
| "intent": intent, | |
| } | |
| history = chat_messages_history(user_id) | |
| await history.aadd_messages(messages_add_to_history) | |
| check_exist_history = await chat_history_management_crud.read_one( | |
| {"session_id": user_id} | |
| ) | |
| if check_exist_history is None: | |
| await chat_history_management_crud.create( | |
| {"user_id": user_id, "session_id": user_id, "intent": intent} | |
| ) | |
| logger.info("History created") | |
| else: | |
| await chat_history_management_crud.update( | |
| {"session_id": user_id}, {"intent": intent} | |
| ) | |
| logger.info("History updated") | |
| history_cache = await get_key_redis(f"chat_history_{user_id}") | |
| if history_cache is not None: | |
| history_cache = eval(history_cache) | |
| history_cache["message"] = ( | |
| history_cache["message"] + messages_add_to_history_dict | |
| ) | |
| history_cache["intent"] = intent | |
| await set_key_redis( | |
| f"chat_history_{user_id}", | |
| str(history_cache), | |
| ) | |
| return {"message": "History updated"} | |
| await set_key_redis(f"chat_history_{user_id}", str(messages_add_to_history_cache)) | |
| return {"message": "History created"} | |
| async def chat_streaming_function(user, data: Chat, background_tasks: BackgroundTasks): | |
| try: | |
| human_message = data.message | |
| history = data.history | |
| lat = data.lat | |
| long = data.long | |
| language = data.language | |
| logger.info(f"Language: {language}") | |
| try: | |
| process_history = post_process_history(history) if history is not None else None | |
| except Exception as e: | |
| logger.error(f"Error processing chat history: {str(e)}") | |
| yield json.dumps({"type": "error", "content": "Error processing chat history" + str(e)}, ensure_ascii=False) | |
| return | |
| config = { | |
| "configurable": { | |
| "user_id": user["id"], | |
| "user_email": user["email"], | |
| "contact_number": user["contact_number"], | |
| "session_id": user["id"], | |
| "lat": lat, | |
| "long": long, | |
| } | |
| } | |
| initial_input = { | |
| "messages": [("user", human_message)], | |
| "messages_history": process_history, | |
| "entry_message": None, | |
| "manual_save": False, | |
| "intent": data.intent, | |
| "language": language, | |
| "tool_name": None, | |
| "ever_leave_skill": False, | |
| } | |
| last_output_state = None | |
| temp = "" | |
| try: | |
| async for event in workflow.astream( | |
| input=initial_input, | |
| config=config, | |
| stream_mode=["messages", "values"], | |
| ): | |
| try: | |
| event_type, event_message = event | |
| if event_type == "messages": | |
| message, metadata = event_message | |
| if ( | |
| isinstance(message, AIMessageChunk) | |
| and message.tool_calls | |
| and message.tool_call_chunks[0]["name"] != "ClassifyUserIntent" | |
| ): | |
| tool_name = message.tool_call_chunks[0]["name"] | |
| message_yield = json.dumps( | |
| {"type": "tool_call", "content": tool_name}, ensure_ascii=False | |
| ) | |
| print(message_yield) | |
| yield message_yield | |
| if metadata["langgraph_node"] in [ | |
| "primary_assistant", | |
| "scheduling_agent", | |
| "book_hotel_agent", | |
| ]: | |
| if message.content: | |
| temp += message.content | |
| message_yield = json.dumps( | |
| {"type": "message", "content": temp}, ensure_ascii=False | |
| ) | |
| print(message_yield) | |
| yield message_yield | |
| if event_type == "values": | |
| last_output_state = event_message | |
| except Exception as e: | |
| logger.error(f"Error processing stream event: {str(e)}") | |
| yield json.dumps({"type": "error", "content": "Error processing response" + str(e)}, ensure_ascii=False) | |
| return | |
| if last_output_state is None: | |
| raise ValueError("No output state received from workflow") | |
| final_ai_output = last_output_state["messages"][-1].content | |
| final_intent = last_output_state["intent"] | |
| tool_name_important = last_output_state["tool_name"] | |
| final_response = json.dumps( | |
| { | |
| "type": "final", | |
| "content": final_ai_output, | |
| "intent": final_intent, | |
| "tool_name": tool_name_important, | |
| }, | |
| ensure_ascii=False, | |
| ) | |
| yield final_response | |
| except Exception as e: | |
| logger.error(f"Error in workflow stream processing: {str(e)}") | |
| yield json.dumps({"type": "error", "content": "Error processing chat stream" + str(e)}, ensure_ascii=False) | |
| return | |
| except Exception as e: | |
| logger.error(f"Unexpected error in chat streaming: {str(e)}") | |
| yield json.dumps({"type": "error", "content": "An unexpected error occurred" + str(e)}, ensure_ascii=False) | |
| return | |
| background_tasks.add_task( | |
| save_history, user["id"], human_message, final_ai_output, final_intent | |
| ) | |
| async def chat_streaming_no_login_function( | |
| data: Chat, background_tasks: BackgroundTasks | |
| ): | |
| try: | |
| human_message = data.message | |
| history = data.history | |
| lat = data.lat | |
| long = data.long | |
| language = data.language | |
| logger.info(f"Language: {language}") | |
| try: | |
| process_history = post_process_history(history) if history is not None else None | |
| except Exception as e: | |
| logger.error(f"Error processing chat history: {str(e)}") | |
| yield json.dumps({"type": "error", "content": "Error processing chat history"}, ensure_ascii=False) + "\n" | |
| return | |
| config = { | |
| "configurable": { | |
| "user_id": None, | |
| "user_email": None, | |
| "contact_number": None, | |
| "session_id": None, | |
| "lat": lat, | |
| "long": long, | |
| } | |
| } | |
| initial_input = { | |
| "messages": [("user", human_message)], | |
| "messages_history": process_history, | |
| "entry_message": None, | |
| "manual_save": False, | |
| "intent": data.intent, | |
| "language": language, | |
| "tool_name": None, | |
| "ever_leave_skill": False, | |
| } | |
| last_output_state = None | |
| temp = "" | |
| try: | |
| async for event in workflow.astream( | |
| input=initial_input, | |
| config=config, | |
| stream_mode=["messages", "values"], | |
| ): | |
| try: | |
| event_type, event_message = event | |
| if event_type == "messages": | |
| message, metadata = event_message | |
| if ( | |
| isinstance(message, AIMessageChunk) | |
| and message.tool_calls | |
| and message.tool_call_chunks[0]["name"] != "ClassifyUserIntent" | |
| ): | |
| tool_name = message.tool_call_chunks[0]["name"] | |
| message_yield = json.dumps( | |
| {"type": "tool_call", "content": tool_name}, ensure_ascii=False | |
| ) | |
| print(message_yield) | |
| yield message_yield | |
| if metadata["langgraph_node"] in [ | |
| "primary_assistant", | |
| "scheduling_agent", | |
| "book_hotel_agent", | |
| ]: | |
| if message.content: | |
| temp += message.content | |
| message_yield = json.dumps( | |
| {"type": "message", "content": temp}, ensure_ascii=False | |
| ) | |
| print(message_yield) | |
| yield message_yield | |
| if event_type == "values": | |
| last_output_state = event_message | |
| except Exception as e: | |
| logger.error(f"Error processing stream event: {str(e)}") | |
| yield json.dumps({"type": "error", "content": "Error processing response" + str(e)}, ensure_ascii=False) | |
| return | |
| if last_output_state is None: | |
| raise ValueError("No output state received from workflow") | |
| final_ai_output = last_output_state["messages"][-1].content | |
| final_intent = last_output_state["intent"] | |
| tool_name_important = last_output_state["tool_name"] | |
| final_response = json.dumps( | |
| { | |
| "type": "final", | |
| "content": final_ai_output, | |
| "intent": final_intent, | |
| "tool_name": tool_name_important, | |
| }, | |
| ensure_ascii=False, | |
| ) | |
| yield final_response | |
| except Exception as e: | |
| logger.error(f"Error in workflow stream processing: {str(e)}") | |
| yield json.dumps({"type": "error", "content": "Error processing chat stream" + str(e)}, ensure_ascii=False) | |
| return | |
| except Exception as e: | |
| logger.error(f"Unexpected error in chat streaming: {str(e)}") | |
| yield json.dumps({"type": "error", "content": "An unexpected error occurred" + str(e)}, ensure_ascii=False) | |
| return | |
| async def chat_function(user, data: Chat, background_tasks: BackgroundTasks): | |
| message = data.message | |
| history = data.history | |
| lat = data.lat | |
| long = data.long | |
| language = data.language | |
| logger.info(f"Language: {language}") | |
| process_history = post_process_history(history) if history is not None else None | |
| config = { | |
| "configurable": { | |
| "user_id": user["id"], | |
| "user_email": user["email"], | |
| "contact_number": user["contact_number"], | |
| "session_id": user["id"], | |
| "lat": lat, | |
| "long": long, | |
| } | |
| } | |
| # try: | |
| initial_input = { | |
| "messages": [("user", message)], | |
| "messages_history": process_history, | |
| "entry_message": None, | |
| "manual_save": False, | |
| "intent": data.intent, | |
| "language": language, | |
| "tool_name": None, | |
| } | |
| output = await workflow.ainvoke(initial_input, config) | |
| final_ai_output = output["messages"][-1].content | |
| final_intent = output["intent"] | |
| tool_name = output["tool_name"] | |
| if final_ai_output is None: | |
| return JSONResponse( | |
| content={"message": "Error in chat_function"}, status_code=500 | |
| ) | |
| background_tasks.add_task( | |
| save_history, user["id"], data.message, final_ai_output, final_intent | |
| ) | |
| response_ouput = { | |
| "message": final_ai_output, | |
| "intent": final_intent, | |
| "tool_name": tool_name, | |
| } | |
| return JSONResponse(content=response_ouput, status_code=200) | |
| async def get_intent_function(session_id): | |
| record = await chat_history_management_crud.read_one({"session_id": session_id}) | |
| if record is None: | |
| return None | |
| return record["intent"] | |
| async def get_history_function(user_id, background_tasks: BackgroundTasks): | |
| history = chat_messages_history(user_id, 50) | |
| try: | |
| history_messages = await get_key_redis(f"chat_history_{user_id}") | |
| history_messages = None | |
| if not history_messages: | |
| logger.info("History not found in redis") | |
| history_messages = await history.aget_messages() | |
| history_messages = [ | |
| i.model_dump(include=["type", "content"]) for i in history_messages | |
| ] | |
| intent = await get_intent_function(user_id) | |
| logger.info(f"INTENT {intent}") | |
| result = {"message": history_messages, "intent": intent} | |
| background_tasks.add_task( | |
| set_key_redis, f"chat_history_{user_id}", str(result) | |
| ) | |
| return result | |
| history_messages = eval(history_messages) | |
| return history_messages | |
| except Exception as e: | |
| logger.error(f"Error in get_history_function: {e}") | |
| return {"message": [], "intent": None} | |
| async def list_chat_history_function(user_id: str): | |
| result = await chat_history_management_crud.read({"user_id": user_id}) | |
| if result is None: | |
| return [] | |
| result = [i["session_id"] for i in result] | |
| return result | |
| async def delete_chat_history_function(session_id: str): | |
| history = chat_messages_history(session_id, 50) | |
| await history.aclear() | |
| return {"message": "Chat history has been deleted"} | |