Spaces:
Sleeping
Sleeping
File size: 15,207 Bytes
3973360 6d6ae78 3973360 3526a8e ef3c550 3973360 ef0145e 6b7a285 ef0145e 3973360 ef0145e 3973360 ef0145e 6b7a285 ef0145e 6b7a285 ef0145e 6b7a285 ef0145e 6b7a285 ef0145e 6b7a285 ef0145e 3973360 0171bb1 ef0145e 0171bb1 ef0145e 0171bb1 ef0145e 6b7a285 ef0145e 6b7a285 ef0145e 6b7a285 ef0145e 6b7a285 ef0145e 6b7a285 ef0145e 0171bb1 3973360 3526a8e 3973360 3526a8e 6d6ae78 3973360 3526a8e 62a29c1 3526a8e 62a29c1 3526a8e 3973360 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 |
from langchain_core.messages import HumanMessage, AIMessage
from src.utils.mongo import chat_messages_history
from src.utils.logger import logger
from src.utils.mongo import chat_history_management_crud
from src.apis.interfaces.api_interface import Chat
from src.utils.helper import handle_validator_raise
from src.utils.redis import get_key_redis, set_key_redis
import json
from langchain_core.messages.ai import AIMessageChunk
from fastapi import BackgroundTasks
from fastapi.responses import JSONResponse
from src.langgraph.multi_agent.chat.chat_flow import ChatBot
app = ChatBot()
workflow = app()
@handle_validator_raise
def post_process_history(history):
processed_history = []
for entry in history:
if entry["type"] == "human":
processed_history.append(HumanMessage(content=entry["content"]))
elif entry["type"] == "ai":
processed_history.append(AIMessage(content=entry["content"]))
return processed_history
async def save_history(user_id, human_message, ai_message, intent):
messages_add_to_history = [HumanMessage(human_message), AIMessage(ai_message)]
messages_add_to_history_dict = [
{"type": "human", "content": human_message},
{"type": "ai", "content": ai_message},
]
messages_add_to_history_cache = {
"message": messages_add_to_history_dict,
"intent": intent,
}
history = chat_messages_history(user_id)
await history.aadd_messages(messages_add_to_history)
check_exist_history = await chat_history_management_crud.read_one(
{"session_id": user_id}
)
if check_exist_history is None:
await chat_history_management_crud.create(
{"user_id": user_id, "session_id": user_id, "intent": intent}
)
logger.info("History created")
else:
await chat_history_management_crud.update(
{"session_id": user_id}, {"intent": intent}
)
logger.info("History updated")
history_cache = await get_key_redis(f"chat_history_{user_id}")
if history_cache is not None:
history_cache = eval(history_cache)
history_cache["message"] = (
history_cache["message"] + messages_add_to_history_dict
)
history_cache["intent"] = intent
await set_key_redis(
f"chat_history_{user_id}",
str(history_cache),
)
return {"message": "History updated"}
await set_key_redis(f"chat_history_{user_id}", str(messages_add_to_history_cache))
return {"message": "History created"}
async def chat_streaming_function(user, data: Chat, background_tasks: BackgroundTasks):
try:
human_message = data.message
history = data.history
lat = data.lat
long = data.long
language = data.language
logger.info(f"Language: {language}")
try:
process_history = post_process_history(history) if history is not None else None
except Exception as e:
logger.error(f"Error processing chat history: {str(e)}")
yield json.dumps({"type": "error", "content": "Error processing chat history" + str(e)}, ensure_ascii=False)
return
config = {
"configurable": {
"user_id": user["id"],
"user_email": user["email"],
"contact_number": user["contact_number"],
"session_id": user["id"],
"lat": lat,
"long": long,
}
}
initial_input = {
"messages": [("user", human_message)],
"messages_history": process_history,
"entry_message": None,
"manual_save": False,
"intent": data.intent,
"language": language,
"tool_name": None,
"ever_leave_skill": False,
}
last_output_state = None
temp = ""
try:
async for event in workflow.astream(
input=initial_input,
config=config,
stream_mode=["messages", "values"],
):
try:
event_type, event_message = event
if event_type == "messages":
message, metadata = event_message
if (
isinstance(message, AIMessageChunk)
and message.tool_calls
and message.tool_call_chunks[0]["name"] != "ClassifyUserIntent"
):
tool_name = message.tool_call_chunks[0]["name"]
message_yield = json.dumps(
{"type": "tool_call", "content": tool_name}, ensure_ascii=False
)
print(message_yield)
yield message_yield
if metadata["langgraph_node"] in [
"primary_assistant",
"scheduling_agent",
"book_hotel_agent",
]:
if message.content:
temp += message.content
message_yield = json.dumps(
{"type": "message", "content": temp}, ensure_ascii=False
)
print(message_yield)
yield message_yield
if event_type == "values":
last_output_state = event_message
except Exception as e:
logger.error(f"Error processing stream event: {str(e)}")
yield json.dumps({"type": "error", "content": "Error processing response" + str(e)}, ensure_ascii=False)
return
if last_output_state is None:
raise ValueError("No output state received from workflow")
final_ai_output = last_output_state["messages"][-1].content
final_intent = last_output_state["intent"]
tool_name_important = last_output_state["tool_name"]
final_response = json.dumps(
{
"type": "final",
"content": final_ai_output,
"intent": final_intent,
"tool_name": tool_name_important,
},
ensure_ascii=False,
)
yield final_response
except Exception as e:
logger.error(f"Error in workflow stream processing: {str(e)}")
yield json.dumps({"type": "error", "content": "Error processing chat stream" + str(e)}, ensure_ascii=False)
return
except Exception as e:
logger.error(f"Unexpected error in chat streaming: {str(e)}")
yield json.dumps({"type": "error", "content": "An unexpected error occurred" + str(e)}, ensure_ascii=False)
return
background_tasks.add_task(
save_history, user["id"], human_message, final_ai_output, final_intent
)
async def chat_streaming_no_login_function(
data: Chat, background_tasks: BackgroundTasks
):
try:
human_message = data.message
history = data.history
lat = data.lat
long = data.long
language = data.language
logger.info(f"Language: {language}")
try:
process_history = post_process_history(history) if history is not None else None
except Exception as e:
logger.error(f"Error processing chat history: {str(e)}")
yield json.dumps({"type": "error", "content": "Error processing chat history"}, ensure_ascii=False) + "\n"
return
config = {
"configurable": {
"user_id": None,
"user_email": None,
"contact_number": None,
"session_id": None,
"lat": lat,
"long": long,
}
}
initial_input = {
"messages": [("user", human_message)],
"messages_history": process_history,
"entry_message": None,
"manual_save": False,
"intent": data.intent,
"language": language,
"tool_name": None,
"ever_leave_skill": False,
}
last_output_state = None
temp = ""
try:
async for event in workflow.astream(
input=initial_input,
config=config,
stream_mode=["messages", "values"],
):
try:
event_type, event_message = event
if event_type == "messages":
message, metadata = event_message
if (
isinstance(message, AIMessageChunk)
and message.tool_calls
and message.tool_call_chunks[0]["name"] != "ClassifyUserIntent"
):
tool_name = message.tool_call_chunks[0]["name"]
message_yield = json.dumps(
{"type": "tool_call", "content": tool_name}, ensure_ascii=False
)
print(message_yield)
yield message_yield
if metadata["langgraph_node"] in [
"primary_assistant",
"scheduling_agent",
"book_hotel_agent",
]:
if message.content:
temp += message.content
message_yield = json.dumps(
{"type": "message", "content": temp}, ensure_ascii=False
)
print(message_yield)
yield message_yield
if event_type == "values":
last_output_state = event_message
except Exception as e:
logger.error(f"Error processing stream event: {str(e)}")
yield json.dumps({"type": "error", "content": "Error processing response" + str(e)}, ensure_ascii=False)
return
if last_output_state is None:
raise ValueError("No output state received from workflow")
final_ai_output = last_output_state["messages"][-1].content
final_intent = last_output_state["intent"]
tool_name_important = last_output_state["tool_name"]
final_response = json.dumps(
{
"type": "final",
"content": final_ai_output,
"intent": final_intent,
"tool_name": tool_name_important,
},
ensure_ascii=False,
)
yield final_response
except Exception as e:
logger.error(f"Error in workflow stream processing: {str(e)}")
yield json.dumps({"type": "error", "content": "Error processing chat stream" + str(e)}, ensure_ascii=False)
return
except Exception as e:
logger.error(f"Unexpected error in chat streaming: {str(e)}")
yield json.dumps({"type": "error", "content": "An unexpected error occurred" + str(e)}, ensure_ascii=False)
return
async def chat_function(user, data: Chat, background_tasks: BackgroundTasks):
message = data.message
history = data.history
lat = data.lat
long = data.long
language = data.language
logger.info(f"Language: {language}")
process_history = post_process_history(history) if history is not None else None
config = {
"configurable": {
"user_id": user["id"],
"user_email": user["email"],
"contact_number": user["contact_number"],
"session_id": user["id"],
"lat": lat,
"long": long,
}
}
# try:
initial_input = {
"messages": [("user", message)],
"messages_history": process_history,
"entry_message": None,
"manual_save": False,
"intent": data.intent,
"language": language,
"tool_name": None,
}
output = await workflow.ainvoke(initial_input, config)
final_ai_output = output["messages"][-1].content
final_intent = output["intent"]
tool_name = output["tool_name"]
if final_ai_output is None:
return JSONResponse(
content={"message": "Error in chat_function"}, status_code=500
)
background_tasks.add_task(
save_history, user["id"], data.message, final_ai_output, final_intent
)
response_ouput = {
"message": final_ai_output,
"intent": final_intent,
"tool_name": tool_name,
}
return JSONResponse(content=response_ouput, status_code=200)
async def get_intent_function(session_id):
record = await chat_history_management_crud.read_one({"session_id": session_id})
if record is None:
return None
return record["intent"]
async def get_history_function(user_id, background_tasks: BackgroundTasks):
history = chat_messages_history(user_id, 50)
try:
history_messages = await get_key_redis(f"chat_history_{user_id}")
history_messages = None
if not history_messages:
logger.info("History not found in redis")
history_messages = await history.aget_messages()
history_messages = [
i.model_dump(include=["type", "content"]) for i in history_messages
]
intent = await get_intent_function(user_id)
logger.info(f"INTENT {intent}")
result = {"message": history_messages, "intent": intent}
background_tasks.add_task(
set_key_redis, f"chat_history_{user_id}", str(result)
)
return result
history_messages = eval(history_messages)
return history_messages
except Exception as e:
logger.error(f"Error in get_history_function: {e}")
return {"message": [], "intent": None}
async def list_chat_history_function(user_id: str):
result = await chat_history_management_crud.read({"user_id": user_id})
if result is None:
return []
result = [i["session_id"] for i in result]
return result
async def delete_chat_history_function(session_id: str):
history = chat_messages_history(session_id, 50)
await history.aclear()
return {"message": "Chat history has been deleted"}
|