Spaces:
Sleeping
Sleeping
| import asyncio | |
| import json | |
| import os,sys | |
| BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../")) | |
| sys.path.insert(0, BASE_DIR) | |
| from function.file import extract_file as extract_file | |
| import function.filter.filter_role as filter_role_1 | |
| import function.filter.filter_sql_injection as filter_sql_injection_1 | |
| import function.filter.result as query_result_1 | |
| import function.chat as chat_sql | |
| import function.gemini_response.response_general as res_general | |
| import function.gemini_response.response_hello as res_hello | |
| import cloudinary | |
| import cloudinary.uploader | |
| from typing import List,Any | |
| import cloudinary | |
| from cloudinary.utils import cloudinary_url | |
| from cloudinary import uploader | |
| from typing import Optional | |
| import asyncio | |
| from models.Database_Entity import StopSignal | |
| async def check_should_stop(chat_id: str, stop_event:Optional[asyncio.Event]): | |
| # Trường hợp dừng qua RAM (in-memory) | |
| await asyncio.sleep(0.1) | |
| if stop_event and stop_event.is_set(): | |
| print("🛑 Dừng qua stop_event.") | |
| return {"status": "cancelled"} | |
| # Trường hợp dừng qua MongoDB | |
| await asyncio.sleep(0.1) | |
| if StopSignal.objects(chat_history= chat_id, is_stopped=True).first(): | |
| print("🛑 Dừng vì có StopSignal trong DB.") | |
| return {"status": "cancelled"} | |
| return None | |
| # Dán nguyên chuỗi URL này vào | |
| cloudinary.config( | |
| cloud_name = "dgj8n2ggn", | |
| api_key = "287331723817617", | |
| api_secret = "MAI7dpoikIy-ap7IjPCiwfRgMQ0", | |
| secure=True | |
| ) | |
| def upload_images_to_cloudinary_and_markdown(folder_path): | |
| markdown_images = "" | |
| image_files = [ | |
| f for f in os.listdir(folder_path) | |
| if f.lower().endswith(('.png', '.jpg', '.jpeg')) | |
| ] | |
| for img in image_files: | |
| img_path = os.path.join(folder_path, img) | |
| try: | |
| upload_result = cloudinary.uploader.upload(img_path) | |
| print(img_path) | |
| image_url = upload_result.get("secure_url") | |
| print(image_url) | |
| if image_url: | |
| markdown_images += f"\n\n" | |
| except Exception as e: | |
| print(f"❌ Upload lỗi với ảnh {img}: {e}") | |
| return markdown_images | |
| async def agent_information(task,user_id,languages, role, chat_id,token,stop_event: Optional[asyncio.Event]): | |
| result_check = await check_should_stop(chat_id,stop_event) | |
| if result_check: | |
| return result_check | |
| text_all= extract_file.extract_data2() | |
| result_check = await check_should_stop(chat_id,stop_event) | |
| if result_check: | |
| return result_check | |
| task = filter_sql_injection_1.normalize_query(task) | |
| print("text: ",text_all) | |
| result_check = await check_should_stop(chat_id,stop_event) | |
| if result_check: | |
| return result_check | |
| return extract_file.handle_query_upgrade_keyword_old(f"{task}",text_all,"demohmdrinks") | |
| async def agent_hello(task,user_id,languages, role,chat_id,token,stop_event: Optional[asyncio.Event]): | |
| result_check = await check_should_stop(chat_id,stop_event) | |
| if result_check: | |
| return result_check | |
| result = await res_hello.response_hello(task) | |
| result_check = await check_should_stop(chat_id,stop_event) | |
| if result_check: | |
| return result_check | |
| return result | |
| from mongoengine import connect | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| MONGO_URI = os.getenv("MONGO_URI", "") | |
| connect("chatbot_hmdrinks", host=MONGO_URI) | |
| from models.Database_Entity import User, ChatHistory, DetailChat | |
| def get_chat_history(chat_history_id): | |
| messages = DetailChat.objects(chat_history=chat_history_id).order_by('timestamp') | |
| history_text = "" | |
| for msg in messages: | |
| history_text += f"Người dùng hỏi: {msg.you_message}\n" | |
| history_text += f"Trợ lý: {msg.ai_message}\n\n" | |
| return history_text.strip() | |
| from mongoengine import connect | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| MONGO_URI = os.getenv("MONGO_URI", "") | |
| connect("chatbot_hmdrinks", host=MONGO_URI) | |
| import sys | |
| import os | |
| project_root = os.path.abspath(os.path.join(os.getcwd(), '..', '..', '..')) | |
| if project_root not in sys.path: | |
| sys.path.append(project_root) | |
| from models.Database_Entity import User, ChatHistory, DetailChat,ChatCart, CartProduct | |
| import base64 | |
| from advance_shopping.prompt import context_prompt, multitool_prompt, user_intent | |
| from advance_shopping.call_gemini import tool_call | |
| from advance_shopping.server_java import server_java | |
| from advance_shopping.function import test as test_func | |
| def get_chat_history(chat_history_id): | |
| messages = DetailChat.objects(chat_history=chat_history_id).order_by('timestamp') | |
| print(DetailChat._meta) # in ra cấu hình model | |
| history_text = "" | |
| for msg in messages: | |
| history_text += f"Người dùng: {msg.you_message}\n" | |
| history_text += f"Trợ lý: {msg.ai_message}\n\n" | |
| return history_text.strip() | |
| from bson import ObjectId | |
| def chat_history_has_unconfirmed_cart(chat_history_id: str) -> bool: | |
| """ | |
| Trả về True nếu trong đoạn chat có giỏ hàng chưa được xác nhận đặt hàng. | |
| Trả về False nếu không có giỏ hàng, hoặc tất cả giỏ hàng đã được xác nhận. | |
| """ | |
| return ChatCart.objects( | |
| chat_history=ObjectId(chat_history_id), | |
| status="pending" | |
| ).count() > 0 | |
| from bson import ObjectId | |
| def process_data(structured_data, data): | |
| list_data1 = structured_data[:] | |
| print("data",data) | |
| if data == "None": | |
| return structured_data | |
| if isinstance(data, list) and not data: | |
| print("✅ Đây là một list rỗng.") | |
| return structured_data | |
| if data: | |
| for item in data: | |
| action = item.get("action", "").lower() | |
| name = item.get("name", "").strip().lower() | |
| size = item.get("size", "").strip().lower() | |
| size_old = item.get("size_old", "").strip().lower() if "size_old" in item else None | |
| matched = False | |
| if action == "update" and size_old: | |
| for item_old in list_data1: | |
| name_old = item_old.get("name", "").strip().lower() | |
| size_old_in_list = item_old.get("size", "").strip().lower() | |
| if name == name_old and size_old == size_old_in_list: | |
| matched = True | |
| # Cập nhật các trường | |
| for key in ["quantity", "size"]: | |
| if key in item: | |
| item_old[key] = item[key] | |
| break | |
| else: | |
| for item_old in list_data1: | |
| name_old = item_old.get("name", "").strip().lower() | |
| size_old_in_list = item_old.get("size", "").strip().lower() | |
| if name == name_old and size == size_old_in_list: | |
| matched = True | |
| if action == "delete": | |
| list_data1.remove(item_old) | |
| break | |
| elif action == "update": | |
| for key in ["quantity", "size"]: | |
| if key in item: | |
| item_old[key] = item[key] | |
| break | |
| elif action == "add": | |
| if "quantity" in item: | |
| item_old["quantity"] = int(item_old.get("quantity", 0)) + int(item["quantity"]) | |
| break | |
| if not matched and action == "add": | |
| item_to_add = {k: v for k, v in item.items() if k != "action"} | |
| list_data1.append(item_to_add) | |
| filtered_list = [] | |
| for item in list_data1: | |
| if (item.get("name") not in [None, "", "None"]): | |
| filtered_list.append(item) | |
| return filtered_list | |
| def get_last_ai_message(chat_history_id: str) -> str: | |
| last_detail = DetailChat.objects( | |
| chat_history=ObjectId(chat_history_id), | |
| ai_message__ne=None | |
| ).order_by('-timestamp').first() | |
| return last_detail.ai_message if last_detail else "None" | |
| # async def agent_sql(task,user_id,languages, role, chat_id,token,stop_event: Optional[asyncio.Event] = None): | |
| # check_analyze = filter_sql_injection_1.filter_task_analyze(task) | |
| # check_restore_delete = filter_sql_injection_1.filter_question_remove_restore(task) | |
| # prompt_check_shopping = "" | |
| # print(check_restore_delete) | |
| # if check_restore_delete == True: | |
| # return "Bạn không được phép khôi phục hoặc xóa dữ liệu. Vui lòng kiểm tra lại câu hỏi của bạn." | |
| # print(check_analyze) | |
| # if check_analyze == False: | |
| # result_final = "" | |
| # cart_status = "" | |
| # if chat_history_has_unconfirmed_cart(chat_id) is True: | |
| # cart_status = "Yes (Đang Pending)" | |
| # else: | |
| # cart_status = "No (Không có)" | |
| # print(cart_status) | |
| # prompt_check_shopping = user_intent.build_detailed_user_intent_prompt(user_question=task, | |
| # chat_history=get_chat_history(chat_id), | |
| # cart_status=cart_status) | |
| # check_shopping = tool_call.generate(prompt_check_shopping) | |
| # print(check_shopping) | |
| # type = check_shopping["type"] | |
| # if str(type) == "shopping_question": | |
| # tools = ["confirm_order", "insert_cart", "update_cart", "delete_item_cart", "deleted all cart"] | |
| # last_ai_message = get_last_ai_message(chat_id) | |
| # context_prompt = multitool_prompt.build_multi_tool_prompt(question=last_ai_message,tools=tools,user_answer=task) | |
| # data_context = {} | |
| # try: | |
| # data_context = tool_call.generate(context_prompt) | |
| # if not data_context["question"]: | |
| # data_context = tool_call.generate(context_prompt) | |
| # except: | |
| # data_context = tool_call.generate(context_prompt) | |
| # if not data_context["question"]: | |
| # data_context = tool_call.generate(context_prompt) | |
| # print("data_context: ",data_context) | |
| # question_new = data_context["question"] | |
| # latest_unconfirmed_cart = ChatCart.objects( | |
| # chat_history=chat_id, | |
| # status="pending" | |
| # ).order_by('-created_at').first() | |
| # previous_data = [] | |
| # if latest_unconfirmed_cart: | |
| # for product in latest_unconfirmed_cart.cart_products: | |
| # product_dict = product.to_mongo().to_dict() | |
| # if "_id" in product_dict: | |
| # product_dict["_id"] = str(product_dict["_id"]) | |
| # previous_data.append(product_dict) | |
| # data = test_func.classify_data(input = f"""{question_new}""", structured_data=previous_data) | |
| # list_data = list() | |
| # print(data) | |
| # if data == "None" or data == '': | |
| # data = list() | |
| # else: | |
| # try: | |
| # list_data = json.loads(data) | |
| # except: | |
| # list_data = list() | |
| # print("List data:", list_data) | |
| # print("Preious_data", previous_data) | |
| # previous_data = list(previous_data) | |
| # list_data1 = previous_data | |
| # clean_list = list() | |
| # full_test = "" | |
| # result = [] | |
| # print(list_data) | |
| # try: | |
| # print("Nhay vo test2") | |
| # result = process_data(list_data1, list_data) | |
| # except: | |
| # if data is None or data == "None": | |
| # print("Nhay vo test1") | |
| # result = process_data(list_data1, []) | |
| # clean_cart_products = [] | |
| # chat_cart = ChatCart() | |
| # if list_data: | |
| # annotated_list, clean_list, full_test = test_func.validate_product_list(data_list=result,server_java=server_java) | |
| # print(result) | |
| # print(clean_list) | |
| # results = [] | |
| # if data_context["question_normal"]: | |
| # for key, q in data_context["question_normal"].items(): | |
| # task_1 = filter_sql_injection_1.normalize_query(q) | |
| # filtered_input = filter_sql_injection_1.filter_sql_injection(task_1) | |
| # filtered_role_input = filter_role_1.filter_role(filtered_input) | |
| # result = await chat_sql.execute_query_user(filtered_role_input, user_id, languages, role) | |
| # result_final = query_result_1.query_result(task, result) | |
| # results.append({"question": q, "result": result_final}) | |
| # result_final += str(results) | |
| # for item in clean_list: | |
| # # Đảm bảo quantity là số nguyên | |
| # product = CartProduct( | |
| # name=item.get('name'), | |
| # size=item.get('size'), | |
| # quantity=int(item.get('quantity', 1)) | |
| # ) | |
| # clean_cart_products.append(product) | |
| # print("Clean_cart", clean_cart_products) | |
| # print("Full test", full_test) | |
| # if latest_unconfirmed_cart: | |
| # print("🛒 Đã có giỏ hàng 'pending' trước đó. Đang cập nhật lại sản phẩm...") | |
| # latest_unconfirmed_cart.cart_products = clean_cart_products | |
| # latest_unconfirmed_cart.save() | |
| # chat_cart = latest_unconfirmed_cart | |
| # else: | |
| # print("Chưa có giỏ hàng 'pending'. Đang tạo mới...") | |
| # chat_cart = ChatCart( | |
| # chat_history=chat_id, | |
| # cart_products=clean_cart_products, | |
| # status="pending" | |
| # ) | |
| # chat_cart.save() | |
| # if full_test: | |
| # return full_test + f"Hãy tạo ra câu hỏi để người dùng thay đổi ý định mua sản phẩm. Ngoài ra luôn hiển thị lại tình trạng giỏ của người dùng: giỏ hàng{clean_list}" + str(results) | |
| # if check_shopping.get("is_delete_cart") == "Yes": | |
| # print("🗑️ Đang xóa toàn bộ sản phẩm trong giỏ hàng...") | |
| # chat_cart.cart_products = [] | |
| # chat_cart.status = "failed" | |
| # chat_cart.save() | |
| # result_final += "Đã xóa toàn bộ sản phẩm trong giỏ hàng..." | |
| # if check_shopping.get("is_view_cart") == "Yes": | |
| # print("👀 Đang xem giỏ hàng...") | |
| # chat_cart = ChatCart.objects(chat_history=chat_id, status="pending").first() | |
| # if not chat_cart or not chat_cart.cart_products: | |
| # print("🛒 Giỏ hàng hiện đang trống.") | |
| # result_final += "🛒 Giỏ hàng hiện đang trống.\n" | |
| # else: | |
| # message_lines = ["🛒 Danh sách sản phẩm trong giỏ:"] | |
| # for product in chat_cart.cart_products: | |
| # print(f"- Tên sản phẩm: {product.name} | Số Lượng: {product.quantity} | Size(Kích cỡ): {product.size}") | |
| # message_lines.append( | |
| # f"- Tên sản phẩm: {product.name} | Số Lượng: {product.quantity} | Size(Kích cỡ): {product.size}" | |
| # ) | |
| # data = "\n".join(message_lines) | |
| # result_final += data + "\n" | |
| # if check_shopping["is_confirm_order"] == "Yes": | |
| # try: | |
| # print("🔹 [STEP 1] Tạo giỏ hàng mới...") | |
| # data = server_java.create_cart(int(user_id), token) | |
| # cart_id = data.get("cartId") | |
| # print(f"✅ Giỏ hàng tạo thành công: cart_id = {cart_id}") | |
| # except Exception as e: | |
| # print(f"❌ Lỗi khi tạo giỏ hàng: {e}") | |
| # cart_id = None | |
| # if cart_id: | |
| # for idx, item in enumerate(clean_cart_products, 1): | |
| # try: | |
| # print(f"\n🔹 [STEP 2.{idx}] Tìm kiếm sản phẩm: {item['name']} (size: {item['size']})...") | |
| # pro_id, pro_name, size_check, stock_check, price_check = server_java.search_product( | |
| # keyword=item["name"], size=item["size"] | |
| # ) | |
| # print(f"✅ Tìm thấy sản phẩm '{pro_name}' (ID: {pro_id}, Size: {size_check})") | |
| # print(f"🔹 [STEP 3.{idx}] Thêm sản phẩm vào giỏ hàng...") | |
| # data1 = server_java.insert_cartItem(user_id, cart_id, pro_id, size_check, item["quantity"], token) | |
| # print(f"✅ Đã thêm sản phẩm '{pro_name}' vào giỏ hàng.") | |
| # except Exception as e: | |
| # print(f"❌ Lỗi khi xử lý sản phẩm thứ {idx}: {e}") | |
| # continue # vẫn tiếp tục vòng lặp cho sản phẩm tiếp theo | |
| # try: | |
| # print("\n🔹 [STEP 4] Tạo đơn hàng từ giỏ hàng...") | |
| # create_orders = server_java.create_orders(user_id=user_id, cartId=cart_id, token=token) | |
| # order_id = create_orders["body"]["orderId"] | |
| # print(f"✅ Đơn hàng được tạo: order_id = {order_id}") | |
| # except Exception as e: | |
| # print(f"❌ Lỗi khi tạo đơn hàng: {e}") | |
| # order_id = None | |
| # if order_id: | |
| # try: | |
| # print("🔹 [STEP 5] Xác nhận đơn hàng với AI...") | |
| # confirm_order = server_java.confirm_orders_AI(user_id=user_id, token=token, orderId=order_id) | |
| # print("✅ Phản hồi xác nhận đơn hàng từ AI:") | |
| # print(confirm_order["body"]) | |
| # if confirm_order["body"] == "Confirm success": | |
| # chat_cart.status = "confirmed" | |
| # chat_cart.confirmed_order = True | |
| # chat_cart.save() | |
| # from urllib.parse import urlencode | |
| # base_url = "https://vnexpress.net/tin-tuc-24h" | |
| # params = { | |
| # "order_id": order_id, | |
| # "token": token, | |
| # "user_id": user_id | |
| # } | |
| # link_payment = f"https://vnexpress.net/tin-tuc-24h?order_id={order_id}&token={token}&user_id={user_id}" | |
| # return "Yêu cầu: " + str(task) + f". Yêu cầu của bạn đã thành công. Tình trạng giỏ hiện tại (Luôn luôn hiển thị lại trạng thái này để người dùng biết giỏ của họ) {clean_list}. Bạn đã xác nhận thanh toán thành công sẽ tiến hành chuyển sang link thanh toán{link_payment}. Vui lòng thực hiện thanh toán trên trang này trong vòng 30 phút tối đa" + str(results) | |
| # else: | |
| # chat_cart.status = "failed" | |
| # chat_cart.confirmed_order = False | |
| # chat_cart.save() | |
| # except Exception as e: | |
| # print(f"❌ Lỗi khi xác nhận đơn hàng với AI: {e}") | |
| # chat_cart.status = "failed" | |
| # chat_cart.confirmed_order = False | |
| # chat_cart.save() | |
| # if check_shopping["is_confirm_order"] == "No" and len(clean_cart_products) != 0 : | |
| # result_final += "Yêu cầu: " + str(task) + f". Yêu cầu của bạn đã thành công. Tình trạng giỏ hiện tại(Luôn luôn hiển thị lại trạng thái này để người dùng biết giỏ của họ): {clean_list}" + "kết quả thực thi:" + str(results) | |
| # return result_final | |
| # else: | |
| # task = filter_sql_injection_1.normalize_query(task) | |
| # filtered_input = filter_sql_injection_1.filter_sql_injection(task) | |
| # filtered_role_input = filter_role_1.filter_role(filtered_input) | |
| # result = await chat_sql.execute_query_user(filtered_role_input, user_id, languages, role) | |
| # result_final = query_result_1.query_result(task, result) | |
| # return result_final | |
| # else: | |
| # result = "" | |
| # task = filter_sql_injection_1.normalize_query(task) | |
| # filtered_input = filter_sql_injection_1.filter_sql_injection(task) | |
| # filtered_role_input = filter_role_1.filter_role(filtered_input) | |
| # output = await chat_sql.generate_and_save_code(filtered_role_input, user_id, role, languages) | |
| # if isinstance(output, tuple): | |
| # result, path_image = output | |
| # else: | |
| # result = output | |
| # path_image = None | |
| # if path_image and os.path.exists(path_image): | |
| # images_md = upload_images_to_cloudinary_and_markdown(path_image) | |
| # if images_md: | |
| # result = f"### Tất cả Hình ảnh sinh ra:\n\n{images_md}\n\n{result}" | |
| # return result | |
| # | |
| # ----- PHẦN 2: HÀM AGENT_SQL ĐÃ TÁI CẤU TRÚC ----- | |
| # | |
| # --- Các hàm trợ giúp (Helper Functions) --- | |
| from typing import Optional | |
| async def _handle_shopping_intent(task: str, user_id: int, chat_id: Any, token: str, languages,role, stop_event: Optional[asyncio.Event]) -> str: | |
| """ | |
| Xử lý các tác vụ liên quan đến mua sắm: xem giỏ, thêm/sửa/xóa, xác nhận đơn hàng. | |
| """ | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check | |
| try: | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check | |
| cart_status = "Yes (Đang Pending)" if chat_history_has_unconfirmed_cart(chat_id) else "No (Không có)" | |
| prompt_context = multitool_prompt.build_multi_tool_prompt( | |
| question=get_last_ai_message(chat_id), | |
| tools=["confirm_order", "insert_cart", "update_cart", "delete_item_cart", "deleted all cart"], | |
| user_answer=task | |
| ) | |
| data_context = tool_call.generate(prompt_context) | |
| print(f"DEBUG: data_context: {data_context}") | |
| # 2. Lấy giỏ hàng đang chờ xử lý hoặc tạo mới | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check | |
| latest_unconfirmed_cart = ChatCart.objects(chat_history=chat_id, status="pending").order_by('-created_at').first() | |
| if latest_unconfirmed_cart: | |
| chat_cart = latest_unconfirmed_cart | |
| previous_cart_items = [p.to_mongo().to_dict() for p in chat_cart.cart_products] | |
| print("INFO: Đã tìm thấy giỏ hàng 'pending', sẽ cập nhật.") | |
| else: | |
| chat_cart = ChatCart(chat_history=chat_id, status="pending") | |
| previous_cart_items = [] | |
| print("INFO: Không tìm thấy giỏ hàng 'pending', sẽ tạo mới.") | |
| #Backup data | |
| chat_cart.backup_cart_products = chat_cart.cart_products.copy() | |
| chat_cart.backup_status = chat_cart.status | |
| chat_cart.backup_confirmed_order = chat_cart.confirmed_order | |
| chat_cart.save() | |
| # 3. Trích xuất và xử lý thông tin sản phẩm từ câu hỏi của người dùng | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check | |
| new_items_str = test_func.classify_data(input=f"{data_context.get('question', '')}", structured_data=previous_cart_items) | |
| new_items_list = json.loads(new_items_str) if new_items_str and new_items_str != "None" else [] | |
| updated_cart_list = process_data(previous_cart_items, new_items_list) | |
| # 4. Xác thực sản phẩm và cập nhật giỏ hàng trong DB | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check | |
| annotated_list, clean_list, validation_message = test_func.validate_product_list(data_list=updated_cart_list, server_java=server_java) | |
| # clean_cart_products = [CartProduct(**item) for item in clean_list] | |
| clean_cart_products = [] | |
| for item in clean_list: | |
| # Đảm bảo quantity là số nguyên | |
| product = CartProduct( | |
| name=item.get('name'), | |
| size=item.get('size'), | |
| quantity=int(item.get('quantity', 1)) | |
| ) | |
| clean_cart_products.append(product) | |
| print("Clean_cart", clean_cart_products) | |
| chat_cart.cart_products = clean_cart_products | |
| chat_cart.save() | |
| print(f"INFO: Giỏ hàng đã được cập nhật với {len(clean_cart_products)} sản phẩm.") | |
| # 5. Xử lý các câu hỏi phụ (nếu có) | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check | |
| other_results = [] | |
| if data_context.get("question_normal"): | |
| for q_key, q_text in data_context["question_normal"].items(): | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check | |
| result = await _handle_simple_query(q_text, user_id, languages, role, stop_event, chat_id) | |
| other_results.append({"question": q_text, "result": result}) | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check | |
| response_parts = [] | |
| if validation_message: | |
| response_parts.append(validation_message) | |
| final_cart_state_msg = f"Tình trạng giỏ hàng hiện tại của bạn: {clean_list}" | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check | |
| shopping_intent = tool_call.generate(user_intent.build_detailed_user_intent_prompt( | |
| user_question=task, chat_history=get_chat_history(chat_id), cart_status=cart_status | |
| )) | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check | |
| if shopping_intent.get("is_delete_cart") == "Yes": | |
| chat_cart.cart_products = [] | |
| chat_cart.status = "failed" | |
| chat_cart.save() | |
| response_parts.append("Đã xóa toàn bộ sản phẩm trong giỏ hàng.") | |
| final_cart_state_msg = "Tình trạng giỏ hàng hiện tại của bạn: Giỏ hàng trống." | |
| elif shopping_intent.get("is_view_cart") == "Yes": | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check | |
| response_parts.append("Đây là giỏ hàng của bạn.") | |
| elif shopping_intent.get("is_confirm_order") == "Yes": | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check | |
| if not chat_cart.cart_products: | |
| return "Giỏ hàng của bạn đang trống. Vui lòng thêm sản phẩm trước khi xác nhận đơn hàng." | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check | |
| confirmation_result = await _execute_order_confirmation(user_id, token, chat_cart, stop_event,chat_id) | |
| confirmation_result_str = "" | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| chat_cart.status = chat_cart.backup_status | |
| chat_cart.confirmed_order = chat_cart.backup_confirmed_order | |
| chat_cart.save() | |
| if confirmation_result[1]: | |
| server_java.confirm_cancel_AI(int(user_id),token,confirmation_result[1]) | |
| server_java.remove_all_cartItem(int(user_id), confirmation_result[2], token) | |
| confirmation_result_str = confirmation_result[0] | |
| else: | |
| confirmation_result_str = confirmation_result | |
| return result_check | |
| response_parts.append(str(confirmation_result)) | |
| else: | |
| response_parts.append(f"Yêu cầu '{task}' của bạn đã được xử lý.") | |
| response_parts.append(final_cart_state_msg) | |
| if other_results: | |
| response_parts.append(f"Kết quả cho các câu hỏi khác: {other_results}") | |
| return " ".join(response_parts) | |
| except asyncio.CancelledError as e: | |
| print(f"STOP: {e}") | |
| if 'chat_cart' in locals(): | |
| if chat_cart.backup_cart_products: | |
| chat_cart.cart_products = chat_cart.backup_cart_products | |
| chat_cart.status = chat_cart.backup_status | |
| chat_cart.confirmed_order = chat_cart.backup_confirmed_order | |
| chat_cart.save() | |
| print("INFO: Đã khôi phục giỏ hàng từ bản backup do bị hủy.") | |
| return "Quá trình đã bị dừng theo yêu cầu. Giỏ hàng đã được khôi phục như ban đầu." | |
| # except Exception as e: | |
| # print(f"ERROR in _handle_shopping_intent: {e}") | |
| # return "Đã có lỗi xảy ra trong quá trình xử lý yêu cầu mua sắm của bạn. Vui lòng thử lại." | |
| # async def rollback_cart_and_order(chat_cart: ChatCart, user_id: int, token: str): | |
| # try: | |
| # if chat_cart.temp_order_id: | |
| # server_java.delete_order(chat_cart.temp_order_id, token) | |
| # print(f"INFO: Đã rollback đơn hàng {chat_cart.temp_order_id}") | |
| # if chat_cart.temp_cart_id: | |
| # server_java.delete_cart(chat_cart.temp_cart_id, token) | |
| # print(f"INFO: Đã rollback giỏ hàng {chat_cart.temp_cart_id}") | |
| # chat_cart.status = "cancelled" | |
| # chat_cart.save() | |
| # except Exception as e: | |
| # print(f"WARNING: Lỗi khi rollback: {e}") | |
| async def rollback_cart_and_order(chat_cart: ChatCart, user_id: int, token: str,order_id): | |
| try: | |
| if chat_cart.temp_order_id: | |
| server_java.delete_order(chat_cart.temp_order_id, token) | |
| print(f"INFO: Đã rollback đơn hàng {chat_cart.temp_order_id}") | |
| if chat_cart.temp_cart_id: | |
| server_java.delete_cart(chat_cart.temp_cart_id, token) | |
| print(f"INFO: Đã rollback giỏ hàng {chat_cart.temp_cart_id}") | |
| chat_cart.status = "cancelled" | |
| chat_cart.save() | |
| except Exception as e: | |
| print(f"WARNING: Lỗi khi rollback: {e}") | |
| async def _execute_order_confirmation(user_id: int, token: str, chat_cart: ChatCart, stop_event: Optional[asyncio.Event],chat_id:str= ""): | |
| """ | |
| Thực hiện quy trình 5 bước để xác nhận đơn hàng với server_java. | |
| """ | |
| try: | |
| #Backup data | |
| chat_cart.backup_status = chat_cart.status | |
| chat_cart.backup_confirmed_order = chat_cart.confirmed_order | |
| chat_cart.save() | |
| print("INFO: Bắt đầu quy trình xác nhận đơn hàng.") | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| chat_cart.status = chat_cart.backup_status | |
| chat_cart.confirmed_order = chat_cart.backup_confirmed_order | |
| chat_cart.save() | |
| return result_check | |
| cart_data = server_java.create_cart(int(user_id), token) | |
| cart_id = cart_data.get("cartId") | |
| if not cart_id: raise Exception("Không thể tạo giỏ hàng trên hệ thống.") | |
| print(f"SUCCESS: Tạo giỏ hàng tạm thành công: cart_id = {cart_id}") | |
| # Step 2 & 3: Add items to cart | |
| for item in chat_cart.cart_products: | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| chat_cart.status = chat_cart.backup_status | |
| chat_cart.confirmed_order = chat_cart.backup_confirmed_order | |
| chat_cart.save() | |
| rollback = server_java.remove_all_cartItem(int(user_id), cart_id, token) | |
| return result_check | |
| pro_id, _, size_check, _, _ = server_java.search_product(keyword=item.name, size=item.size) | |
| server_java.insert_cartItem(user_id, cart_id, pro_id, size_check, item.quantity, token) | |
| print(f"SUCCESS: Đã thêm '{item.name}' vào giỏ hàng tạm.") | |
| # Step 4: Create Order | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| chat_cart.status = "pending" | |
| chat_cart.confirmed_order = False | |
| chat_cart.save() | |
| server_java.remove_all_cartItem(int(user_id), cart_id, token) | |
| return result_check | |
| order_data = server_java.create_orders(user_id=user_id, cartId=cart_id, token=token) | |
| order_id = order_data["body"]["orderId"] | |
| if not order_id: raise Exception("Không thể tạo đơn hàng từ giỏ hàng.") | |
| print(f"SUCCESS: Tạo đơn hàng thành công: order_id = {order_id}") | |
| # Step 5: Confirm Order | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| chat_cart.status = chat_cart.backup_status | |
| chat_cart.confirmed_order = chat_cart.backup_confirmed_order | |
| chat_cart.save() | |
| server_java.confirm_cancel_AI(int(user_id),token,order_id) | |
| server_java.remove_all_cartItem(int(user_id), cart_id, token) | |
| return result_check | |
| confirm_data = server_java.confirm_orders_AI(user_id=user_id, token=token, orderId=order_id) | |
| if confirm_data.get("body") == "Confirm success": | |
| chat_cart.status = "confirmed" | |
| chat_cart.confirmed_order = True | |
| chat_cart.order_id = order_id | |
| chat_cart.cart_id = cart_id | |
| chat_cart.save() | |
| link_payment = f"https://hmdrinks-t8e3-giahans-projects.vercel.app/payment?order_id={order_id}&token={token}&user_id={user_id}" | |
| data = f"Đơn hàng của bạn đã được xác nhận thành công. Vui lòng truy cập link sau để thanh toán trong vòng 30 phút: {link_payment}(Vui lòng không bỏ sót link này)" | |
| return data, order_id, cart_id | |
| else: | |
| print(f"Hệ thống không thể xác nhận đơn hàng. Phản hồi: {confirm_data.get('body')}") | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| chat_cart.status = chat_cart.backup_status | |
| chat_cart.confirmed_order = chat_cart.backup_confirmed_order | |
| chat_cart.save() | |
| server_java.confirm_cancel_AI(int(user_id),token,order_id) | |
| server_java.remove_all_cartItem(int(user_id), cart_id, token) | |
| except Exception as e: | |
| print(f"ERROR in _execute_order_confirmation: {e}") | |
| chat_cart.status = "failed" | |
| chat_cart.save() | |
| return f"Đã xảy ra lỗi khi xác nhận đơn hàng: {e}. Giỏ hàng của bạn vẫn được lưu." | |
| async def _handle_simple_query(task: str, user_id: int, languages: List[str], role: str,stop_event: Optional[asyncio.Event],chat_id: str = "") -> str: | |
| """Xử lý một câu hỏi SQL đơn giản.""" | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check | |
| normalized_task = filter_sql_injection_1.normalize_query(task) | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check | |
| filtered_input = filter_sql_injection_1.filter_sql_injection(normalized_task) | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check | |
| filtered_role_input = filter_role_1.filter_role(filtered_input) | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check | |
| query_result = await chat_sql.execute_query_user(filtered_role_input, user_id, languages, role) | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check | |
| return query_result_1.query_result(task, query_result) | |
| async def _handle_analysis_intent(task: str, user_id: int, languages: List[str], role: str, stop_event: Optional[asyncio.Event],chat_id: str = "") -> str: | |
| """Xử lý các tác vụ yêu cầu phân tích, sinh code và vẽ biểu đồ.""" | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check | |
| normalized_task = filter_sql_injection_1.normalize_query(task) | |
| filtered_input = filter_sql_injection_1.filter_sql_injection(normalized_task) | |
| filtered_role_input = filter_role_1.filter_role(filtered_input) | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check | |
| output = await chat_sql.generate_and_save_code( | |
| filtered_role_input, | |
| user_id, | |
| role, | |
| languages, | |
| stop_event, | |
| chat_id=chat_id | |
| ) | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check | |
| result, path_image = output if isinstance(output, tuple) else (output, None) | |
| if path_image and os.path.exists(path_image): | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check # Trả về kết quả đã có | |
| images_md = upload_images_to_cloudinary_and_markdown(path_image) | |
| if images_md: | |
| result = f"### Biểu đồ được tạo:\n\n{images_md}\n\n{result}" | |
| return result | |
| # --- Hàm chính (Main Function) --- | |
| async def agent_sql(task: str, user_id: int, languages: str, role: str, chat_id: Any, token: str, stop_event: Optional[asyncio.Event] = None): | |
| """ | |
| Hàm chính điều phối các tác vụ, có khả năng dừng giữa chừng. | |
| """ | |
| # KIỂM TRA DỪNG NGAY LẬP TỨC | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check | |
| # 1. KIỂM TRA AN TOÀN BAN ĐẦU | |
| if filter_sql_injection_1.filter_question_remove_restore(task): | |
| return "Bạn không được phép khôi phục hoặc xóa dữ liệu. Vui lòng kiểm tra lại câu hỏi của bạn." | |
| # 2. PHÂN LOẠI Ý ĐỊNH CHÍNH: PHÂN TÍCH (cần sinh code) vs HỎI ĐÁP/MUA SẮM | |
| is_analysis_task = filter_sql_injection_1.filter_task_analyze(task) | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check | |
| if is_analysis_task: | |
| print("INFO: Nhận diện ý định PHÂN TÍCH.") | |
| if result_check: | |
| return result_check | |
| data = await _handle_analysis_intent(task, user_id, languages, role, stop_event,chat_id) | |
| if result_check: | |
| return result_check | |
| return data, False | |
| # 3. PHÂN LOẠI Ý ĐỊNH PHỤ: MUA SẮM vs HỎI ĐÁP THÔNG THƯỜNG | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check | |
| cart_status = "Yes (Đang Pending)" if chat_history_has_unconfirmed_cart(chat_id) else "No (Không có)" | |
| prompt_check_shopping = user_intent.build_detailed_user_intent_prompt( | |
| user_question=task, | |
| chat_history=get_chat_history(chat_id), | |
| cart_status=cart_status | |
| ) | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check | |
| intent_result = tool_call.generate(prompt_check_shopping) | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check | |
| if str(intent_result.get("type")) == "shopping_question": | |
| print("INFO: Nhận diện ý định MUA SẮM.") | |
| result = await _handle_shopping_intent(task, user_id, chat_id, token, languages, role, stop_event) | |
| result = query_result_1.query_result(task, result) | |
| return result, True | |
| else: | |
| print("INFO: Nhận diện ý định HỎI ĐÁP THÔNG THƯỜNG.") | |
| result = await _handle_simple_query(task, user_id, languages, role, stop_event,chat_id) | |
| result = query_result_1.query_result(task, result) | |
| return result, False | |
| async def agent_general(task, user_id, languages, role, chat_id, token, stop_event): | |
| task = filter_sql_injection_1.normalize_query(task) | |
| # Kiểm tra stop event trước khi gọi xử lý nặng | |
| result_check = await check_should_stop(chat_id, stop_event) | |
| if result_check: | |
| return result_check | |
| # Gọi xử lý chính | |
| result_task = asyncio.create_task(res_general.response_general(task)) | |
| # Chờ xử lý, nhưng cũng kiểm tra stop event | |
| while not result_task.done(): | |
| if stop_event.is_set(): | |
| print("Tín hiệu stop trong quá trình xử lý. Hủy task.") | |
| result_task.cancel() | |
| try: | |
| await result_task | |
| except asyncio.CancelledError: | |
| print("Task đã bị huỷ.") | |
| return None | |
| await asyncio.sleep(0.1) # Cho các task khác thời gian chạy | |
| function_mapping = { | |
| "question_hello": agent_hello, | |
| "question_information": agent_information, | |
| "question_sql": agent_sql, | |
| "question_general": agent_general | |
| } | |
| from typing import Optional | |
| async def process_toolcalls_with_order(toolcalls,token, stop_event: Optional[asyncio.Event] = None, chat_id:str = ""): | |
| task_results = {} | |
| ordered_results = {} | |
| filtered_toolcalls = [tc for tc in toolcalls if tc["name"] in function_mapping] | |
| for tc in filtered_toolcalls: | |
| orig_name = tc["name"] | |
| task_func = function_mapping.get(orig_name) | |
| if task_func is None: | |
| continue | |
| task_param = tc["args"].get("query", "") | |
| user_id = tc.get("user_id", "unknown_user") | |
| role = tc.get("role", "unknown_role") | |
| language = tc.get("language", "unknown_language") | |
| chat_id = tc.get("chat_id","unknow") | |
| if asyncio.iscoroutinefunction(task_func): | |
| result = await task_func(task_param, user_id, language,role,chat_id,token,stop_event) | |
| else: | |
| result = task_func(task_param, user_id, language,role,chat_id,token,stop_event) | |
| if isinstance(result, tuple) and len(result) == 2: | |
| result, is_shopping = result | |
| else: | |
| result = result | |
| is_shopping = False | |
| # Lưu kết quả theo id của tool_call | |
| task_results[tc["id"]] = { | |
| "result": result, | |
| "user_id": user_id, | |
| "role": role, | |
| "language": language, | |
| "is_shopping": is_shopping | |
| } | |
| ordered_results = {tc["id"]: task_results.get(tc["id"], {"result": "Không có kết quả"}) for tc in filtered_toolcalls} | |
| return ordered_results |