from langchain_community.utilities.sql_database import SQLDatabase from langchain_experimental.sql import SQLDatabaseChain import sys import os import pymysql from fastapi import HTTPException from fastapi.encoders import jsonable_encoder import re sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "."))) import prompt.prompt_main as prompt import prompt.prompt_custom as prompt_cus os.environ["GOOGLE_API_KEY"] = "AIzaSyBoPdVLTJNxfDg9wxWDpY4QJezHiyjKbTE" sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from bson import ObjectId import os from dotenv import load_dotenv import os from dotenv import load_dotenv from dotenv import load_dotenv, find_dotenv load_dotenv(find_dotenv(), override=True) DB_HOST = os.getenv("DB_HOST") DB_USER = os.getenv("DB_USER") DB_PASSWORD = os.getenv("DB_PASSWORD") DB_NAME = os.getenv("DB_NAME") DB_PORT = os.getenv("DB_PORT") # Tạo connection string import os from urllib.parse import quote password = os.getenv("DB_PASSWORD") # VD: 'Yahana0509@' DB_PASSWORD = quote(password) connection_uri = ( f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" "?ssl_verify_cert=false&ssl_verify_identity=false" ) db = SQLDatabase.from_uri(connection_uri) from dotenv import load_dotenv import filter.filter_role as filter_role_1 import filter.filter_sql_injection as filter_sql_injection_1 import filter.result as query_result_1 import support.get_key as get_key import response.ResponseChat as res_chat from datetime import datetime import pytz from mongoengine import connect import sys import os import nltk import function.agent.pipeline_agent as pipeline_agent nltk.download('punkt') from models.Database_Entity import User, ChatHistory, DetailChat from dotenv import load_dotenv load_dotenv() MONGO_URI = os.getenv("MONGO_URI", "") connect("chatbot_hmdrinks", host=MONGO_URI) load_dotenv() #setup model from bson import ObjectId import random from langchain_google_genai import GoogleGenerativeAIEmbeddings, ChatGoogleGenerativeAI BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../")) sys.path.insert(0, BASE_DIR) from repository.MySQL import UserRepository from prompt.prompt_syntax_insert import is_insert_related_to_product_category_variant, filter_syntax_sql import sqlparse import re import sys import os sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..'))) from prompt import prompt_detail_table schema_mapping = { "user": prompt_detail_table.prompt_users, "user_voucher": prompt_detail_table.prompt_user_voucher, "category": prompt_detail_table.prompt_categort, "category_translation": prompt_detail_table.prompt_category_translation, "cart": prompt_detail_table.prompt_cart, "cart_item": prompt_detail_table.prompt_cart_item, "orders":prompt_detail_table.prompt_orders, "order_item": prompt_detail_table.prompt_order_item, "payment": prompt_detail_table.prompt_payments, "payments": prompt_detail_table.prompt_payments, "favourite": prompt_detail_table.prompt_favourite, "favourite_item": prompt_detail_table.prompt_fav_item, "post": prompt_detail_table.prompt_post, "post_translation": prompt_detail_table.prompt_post_translation, "product": prompt_detail_table.prompt_product, "product_translation": prompt_detail_table.prompt_product_translation, "shipment": prompt_detail_table.prompt_shipment, "product_variants": prompt_detail_table.prompt_product_variants, "review": prompt_detail_table.prompt_review, "user_coin": prompt_detail_table.prompt_user_coin, "absence": prompt_detail_table.prompt_absence, "cart_group": prompt_detail_table.prompt_cart_group, "cart_item_group": prompt_detail_table.prompt_cartitem_group, "group_orders": prompt_detail_table.prompt_group_orders, "payments_group": prompt_detail_table.prompt_payments_group, "group_order_members":prompt_detail_table.prompt_group_orders_member, "shipment_group":prompt_detail_table.prompt_shipment_group, "shipper_attendance":prompt_detail_table.prompt_shipper_attendance, "shipper_commission_detail":prompt_detail_table.prompt_shipper_commission_detail, "shipper_salary_summary":prompt_detail_table.prompt_shipper_salary_summary, "voucher": prompt_detail_table.prompt_voucher } def get_schemas_from_sql(sql_query: str, schema_mapping: dict): import sqlglot parsed_query = sqlglot.parse_one(sql_query, read="mysql") # Lấy danh sách bảng duy nhất trong query table_names = list({t.name for t in parsed_query.find_all(sqlglot.exp.Table)}) schemas_used = {} for table in table_names: if table in schema_mapping: schemas_used[table] = schema_mapping[table] else: print(f"⚠️ Warning: Table '{table}' not found in schema_mapping") # Gom toàn bộ schema thành 1 chuỗi duy nhất all_schemas = "\n\n".join( [f"Schema for table '{table}':\n{schemas_used[table]}" for table in schemas_used] ) return all_schemas def build_sql_fix_prompt(schemas_result: dict, sql: str) -> str: prompt = f""" Bạn là một chuyên gia cơ sở dữ liệu. Dưới đây là mô tả schema chi tiết của các bảng có trong hệ thống: Đọc rõ và ghi nhớ tùng thuộc tính của mỗi bảng mà bạn truy vấn: {schemas_result} --- Dưới đây là một câu SQL đang bị lỗi do không đúng tên bảng hoặc tên cột: ```sql {sql.strip()} Yêu cầu của bạn là: Dựa trên các schema ở trên, hãy kiểm tra và chỉnh sửa câu SQL sao cho: Tên bảng, tên cột(thuộc tính) phải chính xác theo trình bày bên trong schema. Nếu tên cột trong bảng đó có mô tả trong schema mà trình bày khác thì phải thay đổi sao cho cú pháp sql chính xác. Logic và mục đích của truy vấn được giữ nguyên. Chỉ trả lại phần SQL đã được chỉnh sửa (không giải thích, không chú thích, không thêm nhận xét). Trả lời dưới dạng một truy vấn SQL duy nhất. Không dùng pt.`language_ code` = 'EN Với các câu hỏi liên quan đến product luôn trả về kèm theo pro_id cho mình. - "- Tránh các lỗi như :\n" " (1054, \"Unknown column 'oi.pro_id' in 'field list'\")\n . Luôn đảm bảo bạn không bao giờ bị lỗi này" " (1054, \"Unknown column 'oi.note' in 'field list'\") . Luôn đảm bảo bạn không bao giờ bị lỗi này\n" " (1054, \"Unknown column 'oi.size' in 'field list'\") . Luôn đảm bảo bạn không bao giờ bị lỗi này \n" " (1054, \"Unknown column 'c.is_deleted' in 'on clause'\"). Luôn đảm bảo bạn không bao giờ bị lỗi này\n" """.strip() return prompt def contains_delete(sql: str) -> bool: return bool(re.search(r'\bdelete\b', sql, re.IGNORECASE)) async def execute_query_user(user_input: str, user_id: int, languages: str, role: str): api_key = get_key.get_random_api_key() os.environ["GOOGLE_API_KEY"] = api_key llm1 = ChatGoogleGenerativeAI(model='gemini-2.0-flash-thinking-exp-01-21',temperature=0.2) db = SQLDatabase.from_uri(connection_uri) PROMPT_CUSTOM = await prompt_cus.get_prompt_custom(user_input) check_insert = is_insert_related_to_product_category_variant(user_input) db_config = { "host": os.getenv("DB_HOST"), "user": os.getenv("DB_USER"), "database": os.getenv("DB_NAME"), "password": os.getenv("DB_PASSWORD"), "port": int(os.getenv("DB_PORT", 3306)), "charset": "utf8mb4", "cursorclass": pymysql.cursors.DictCursor, } def regenerate_sql_until_safe(): max_retry = 5 retry_count = 0 while retry_count < max_retry: try: regenerated_data = db_chain.run(f""" Role: {text_role} Language: {languages} Question: {user_input}. """) regenerated_sql = extract_sql_from_response(regenerated_data) if regenerated_sql: regenerated_sql = clean_sql(regenerated_sql) if not re.search(r"%{1,2}s", regenerated_sql): # đã sạch return regenerated_sql retry_count += 1 except Exception as e: return f"❌ Lỗi khi tạo lại truy vấn lần {retry_count + 1}: {str(e)}" return "❌ Lỗi: Không thể tạo được truy vấn an toàn sau nhiều lần thử." def execute_query_with_pymysql(query, multi=False): connection = pymysql.connect(**db_config) try: with connection.cursor() as cursor: results = [] if multi: statements = sqlparse.split(query) for stmt in statements: stmt = stmt.strip() if stmt: try: cursor.execute(stmt) try: results.append(cursor.fetchall()) except pymysql.ProgrammingError: results.append("✅ Executed") except Exception as e: results.append(f"❌ Error in query: {stmt}\n{str(e)}") else: try: cursor.execute(query) results = cursor.fetchall() except Exception as e: return f"❌ Error executing query: {str(e)}" connection.commit() return results except pymysql.MySQLError as e: return f"❌ MySQL Error: {str(e)}" finally: connection.close() def clean_sql(sql) -> str: if isinstance(sql, dict) and sql: first_value = next(iter(sql.values())) sql = first_value sql = re.sub(r"```sql", "", sql, flags=re.IGNORECASE) sql = sql.replace("```sql", "") sql = re.sub(r"```", "", sql) return sql.strip() def extract_sql_from_response(data): # Định dạng markdown block [SQL: ```sql ... ```] match = re.search(r"\[SQL:\s*```sql\s*(.*?)\s*```]", data, re.DOTALL) if match: return clean_sql(match.group(1)) # Định dạng đơn giản SQLQuery: ... match = re.search(r"SQLQuery:\s*(.*)", data, re.DOTALL) if match: return clean_sql(match.group(1)) return None def extract_sql_from_error(error_msg): # Case 1: [SQL: ```sql ... ```] match = re.search(r"\[SQL:\s*```sql\s*(.*?)\s*```]", error_msg, re.DOTALL) if match: return clean_sql(match.group(1)) match = re.search(r"```(?:sql)?\s*\r?\n(.*?)```", error_msg, re.DOTALL) if match: return clean_sql(match.group(1)) return None def process_and_execute_sql(sql): data = sql if isinstance(data, dict) and data: first_key, first_value = next(iter(data.items())) sql = first_value elif isinstance(data, list) and data: sql = "\n\n".join(data) sql_clean = clean_sql(sql) if re.search(r"%{1,2}s", sql_clean): regenerated_sql = regenerate_sql_until_safe() sql_clean = clean_sql(regenerated_sql) result = get_schemas_from_sql(sql_clean, schema_mapping) print("result",result) prompt = build_sql_fix_prompt(result,sql =sql_clean) from advance_shopping.call_gemini import tool_call data = tool_call.generate(prompt = prompt) sql_clean = clean_sql(data) print("SQL step2: ", sql_clean) if sql_clean == None: return "❌ Lỗi không thể thực thi truy vấn: Không tìm thấy SQL trong phản hồi." if contains_delete(sql_clean): return "Lỗi: Bạn không dược phép thực hiện truy vấn DELETE trong hệ thống này." if re.search(r"\\bIF\\b.*\\bTHEN\\b", sql_clean, re.IGNORECASE): return "❌ Lỗi: Không được dùng IF...THEN trong SQL. Vui lòng chia nhỏ truy vấn." if check_insert: check_syntax = filter_syntax_sql(sql_clean, PROMPT_CUSTOM, user_input) if check_syntax is True: try: connection = pymysql.connect(**db_config) with connection.cursor() as cursor: statements = sqlparse.split(sql_clean) results = [] for stmt in statements: stmt = stmt.strip() if stmt: try: cursor.execute(stmt) try: results.append(cursor.fetchall()) except: results.append("✅ OK") except Exception as e: return f"❌ Lỗi tại truy vấn: `{stmt}`\nChi tiết: {str(e)}" connection.commit() return results except Exception as e: return f"❌ Lỗi khi thực thi từng truy vấn: {str(e)}" finally: connection.close() else: try: regenerated_data = db_chain.run(f""" Role: {text_role} Language: {languages} Question: {user_input}. """) regenerated_sql = extract_sql_from_response(regenerated_data) if regenerated_sql: return process_and_execute_sql(regenerated_sql) else: return "❌ Lỗi: Không thể tạo lại truy vấn hợp lệ." except Exception as regen_error: return f"❌ Lỗi khi tạo lại truy vấn: {str(regen_error)}" else: return execute_query_with_pymysql(sql_clean, multi=True) if "Do not use IF...THEN" not in PROMPT_CUSTOM.template: PROMPT_CUSTOM.template += ( "\n\n⚠️ Note: Do NOT use IF...THEN...ELSE...END in SQL. " "Only use plain SELECT, INSERT, UPDATE, DELETE, SET statements." ) db_chain = SQLDatabaseChain.from_llm(llm=llm1, db=db, prompt=PROMPT_CUSTOM) text_role = f"{role} (userId = {user_id})" if role == "ADMIN" else f"{role} (userId = {user_id}), not role ADMIN" try: data = db_chain.run(f""" Role: {text_role} Language: {languages} Question: {user_input}. """) extracted_sql = extract_sql_from_response(data) if extracted_sql: print("SQL:",extracted_sql) print(extracted_sql) return process_and_execute_sql(extracted_sql) else: return data except Exception as e: error_message = str(e) print("Lỗi: ",error_message) extracted_sql = extract_sql_from_error(error_message) print("SQL lỗi: ",extracted_sql) if extracted_sql == None: return "❌ Lỗi không thể thực thi truy vấn: Không tìm thấy SQL trong phản hồi." fix_sql = re.sub(r"```sql", "", extracted_sql, flags=re.IGNORECASE) fix_sql = re.sub(r"```", "", fix_sql) fix_sql = re.sub(r'%%s', r'%s', fix_sql) print(fix_sql) print("SQL fix:",fix_sql) if contains_delete(fix_sql): return "Lỗi: Bạn không dược phép thực hiện truy vấn DELETE trong hệ thống này." if extracted_sql: return process_and_execute_sql(fix_sql) else: return f"❌ Lỗi không thể thực thi truy vấn: {error_message}" async def create_new_chat_history(user_id: int) -> str: """ Tạo một đoạn chat mới cho user_id và trả về ObjectId của đoạn chat. """ check = UserRepository.getUserByUserId(user_id) if check is None: raise HTTPException(status_code=404, detail="User not found or has been deleted in MySQL") user = User.objects(user_id=user_id).first() if not user: user = User(id=ObjectId(), user_id=user_id, user_name=f"User_{user_id}") user.save() random_name_chat = str(random.randint(10**14, 10**15 - 1)) name_chat = f"Chat_{random_name_chat}" new_chat = ChatHistory(id=ObjectId(), user=user, name_chat=name_chat) new_chat.save() return res_chat.CreateNewChat(idMongo=str(new_chat.id), chat_name=name_chat) async def update_chat_name(chat_id: str, new_name: str,user_id:int) -> str: """ Cập nhật name_chat của một ChatHistory dựa trên chat_id. """ check = UserRepository.getUserByUserId(user_id) if check is None: raise HTTPException(status_code=404, detail="User not found or has been deleted in MySQL") user = User.objects(user_id=user_id, is_deleted=False).first() if not user: raise HTTPException(status_code=400, detail="User not found or has been deleted in MongoDB") chat = ChatHistory.objects(_id=ObjectId(chat_id)).first() if not chat: raise HTTPException(status_code=404, detail="Chat not found in MongoDB") chat.name_chat = new_name chat.save() return f"Updated chat name to {new_name}" async def soft_delete_chat(chat_id: str,user_id:int): """ Cập nhật `is_deleted=True` và `date_deleted` cho `ChatHistory` và các `DetailChat` liên quan. """ check = UserRepository.getUserByUserId(user_id) if check is None: raise HTTPException(status_code=400, detail="User not found or has been deleted in MySQL") check_history_id = UserRepository.getChatHistory(user_id,chat_id) if check_history_id is None: raise HTTPException(status_code=400, detail="Chat history not found or has been deleted in MySQL") user = User.objects(user_id=user_id, is_deleted=False).first() if not user: raise HTTPException(status_code=400, detail="User not found or has been deleted in MongoDB") chat = ChatHistory.objects(_id=ObjectId(chat_id)).first() if not chat: raise HTTPException(status_code=400, detail="Chat not found or has been deleted in MongoDB") chat.is_deleted = True chat.date_deleted = datetime.now(pytz.UTC) chat.save() DetailChat.objects(chat_history=chat).update( set__is_deleted=True, set__date_deleted=datetime.now(pytz.UTC) ) return {"message": "Chat and related details marked as deleted"} from datetime import datetime, timedelta async def chat_with_user( user_input: str, user_id: int, languages: str, role: str, token: str, chat_history_id: str = None, ) -> str: """ Xử lý tin nhắn của người dùng, lưu vào lịch sử chat và trả về phản hồi từ AI. """ if role not in ["ADMIN", "CUSTOMER", "SHIPPER"]: raise HTTPException(status_code=400, detail="ROLE not valid") user_id = int(user_id) if languages not in ["VN", "EN"]: raise HTTPException(status_code=400, detail="Language not valid") if not user_input: raise HTTPException(status_code=400, detail="User input empty") if not isinstance(user_id, int) or user_id <= 0: raise HTTPException(status_code=400, detail="Invalid user_id: must be a positive integer") languages = "Vietnamese" if languages == "VN" else "English" check = UserRepository.getUserByUserId(user_id) if check is None: raise HTTPException(status_code=400, detail="User not found or has been deleted in MySQL") check_history_id = UserRepository.getChatHistory(user_id,chat_history_id) if check_history_id is None: raise HTTPException(status_code=400, detail="Chat not found or has been deleted in MySQL") user = User.objects(user_id=user_id).first() if not user: return {"error": "User not found or has been deleted in MongoDB"} chat_history = None if chat_history_id: try: chat_history_obj_id = ObjectId(chat_history_id) chat_history = ChatHistory.objects(_id=chat_history_obj_id, user=user).first() except Exception as e: print(f"⚠️ Invalid chat_history_id: {e}") if not chat_history: raise HTTPException(status_code=400, detail="Chat history not found or has been deleted in MongoDB") # filtered_input = filter_sql_injection_1.filter_sql_injection(user_input) # filtered_role_input = filter_role_1.filter_role(filtered_input) # result = await execute_query_user(filtered_role_input, user_id, languages, role) # result_final = query_result_1.query_result(user_input, result) result_final = await pipeline_agent.multi_query_user(user_input,user_id,role,languages,chat_history_id, token) detail_chat = DetailChat( id=ObjectId(), chat_history=chat_history, you_message=user_input, ai_message=result_final, timestamp=datetime.now(pytz.UTC) ) detail_chat.save() chat_history_messages = DetailChat.objects(chat_history=chat_history).order_by('timestamp') def convert_to_vn_time(timestamp): return (timestamp + timedelta(hours=7)).strftime('%Y-%m-%dT%H:%M:%S') sorted_messages = sorted(chat_history_messages, key=lambda msg: msg.timestamp, reverse=True) formatted_messages = [ { "index": i + 1, # Đánh số từ 1 "id": str(msg.id), "you_message": msg.you_message, "ai_message": msg.ai_message, "timestamp": convert_to_vn_time(msg.timestamp) } for i, msg in enumerate(sorted_messages) ] return jsonable_encoder({ "new_message": { "id": str(detail_chat.id), "you_message": detail_chat.you_message, "ai_message": detail_chat.ai_message, "timestamp": convert_to_vn_time(detail_chat.timestamp) }, "previous_messages": formatted_messages }) from bson import ObjectId async def get_chat_details(chat_id: str,user_id:int): """ Lấy tất cả `DetailChat` thuộc `ChatHistory` có `chat_id`, chỉ lấy bản ghi `is_deleted=False`. """ check = UserRepository.getUserByUserId(user_id) if check is None: raise HTTPException(status_code=400, detail="User not found or has been deleted in MySQL") check_history_id = UserRepository.getChatHistory(user_id,chat_id) if check_history_id is None: raise HTTPException(status_code=400, detail="Chat history not found or has been deleted in MySQL") user = User.objects(user_id=user_id, is_deleted=False).first() if not user: raise HTTPException(status_code=400, detail="User not found or has been deleted in MongoDB") chat = ChatHistory.objects(_id=ObjectId(chat_id), is_deleted=False).first() if not chat: raise HTTPException(status_code=400, detail="Chat history not found or has been deleted in MongoDB") chat_details = DetailChat.objects(chat_history=chat, is_deleted=False) def convert_to_vn_time(timestamp): return (timestamp + timedelta(hours=7)).strftime('%Y-%m-%dT%H:%M:%S') list_detail_response = [ res_chat.DetailResponse( id=str(index + 1), # ✅ Đánh số lại từ 1 you_message=detail.you_message, ai_message=detail.ai_message, timestamp=convert_to_vn_time(detail.timestamp) # ✅ Chuyển sang GMT+7 ) for index, detail in enumerate(sorted(chat_details, key=lambda d: d.timestamp, reverse=True)) # ✅ Sắp xếp giảm dần ] return res_chat.ListDetailResponse( chat_id=str(chat.id), chat_name=chat.name_chat, list_detail_response=list_detail_response ) async def regenerate( user_question_new: str, user_id: int, languages: str, role: str, chat_history_id: str = None ) -> str: """ Xử lý tin nhắn của người dùng, lưu vào lịch sử chat và trả về phản hồi từ AI. """ PROMPT_CUSTOM = await prompt_cus.get_prompt_custom(user_question_new) if role not in ["ADMIN", "CUSTOMER", "SHIPPER"]: raise HTTPException(status_code=400, detail="ROLE not valid") user_id = int(user_id) if languages not in ["VN", "EN"]: raise HTTPException(status_code=400, detail="Language not valid") if not user_question_new: raise HTTPException(status_code=400, detail="User input empty") if not isinstance(user_id, int) or user_id <= 0: raise HTTPException(status_code=400, detail="Invalid user_id: must be a positive integer") languages = "Vietnamese" if languages == "VN" else "English" check = UserRepository.getUserByUserId(user_id) if check is None: raise HTTPException(status_code=400, detail="User not found or has been deleted in MySQL") check_history_id = UserRepository.getChatHistory(user_id,chat_history_id) if check_history_id is None: raise HTTPException(status_code=400, detail="Chat not found or has been deleted in MySQL") user = User.objects(user_id=user_id).first() if not user: return {"error": "User not found or has been deleted in MongoDB"} chat_history = None if chat_history_id: try: chat_history_obj_id = ObjectId(chat_history_id) # Chuyển đổi sang ObjectId chat_history = ChatHistory.objects(_id=chat_history_obj_id, user=user).first() except Exception as e: print(f"⚠️ Invalid chat_history_id: {e}") if not chat_history: raise HTTPException(status_code=400, detail="Chat history not found or has been deleted in MongoDB") # filtered_input = filter_sql_injection_1.filter_sql_injection(user_input) # filtered_role_input = filter_role_1.filter_role(filtered_input) # result = await execute_query_user(filtered_role_input, user_id, languages, role) # result_final = query_result_1.query_result(user_input, result) if chat_history: last_chat = ( DetailChat.objects(chat_history=chat_history, is_deleted=False) .order_by("-timestamp") .first() ) # if last_chat: # print(f"Last chat - You: {last_chat.you_message}, AI: {last_chat.ai_message}") # else: # print("⚠️ No chat details found for this history.") result_final = await pipeline_agent.multi_query_user(user_question_new,user_id,role,languages,chat_history_id) last_chat.update(set__you_message=user_question_new, set__ai_message=result_final, set__timestamp = datetime.now(pytz.UTC)) last_chat_result = ( DetailChat.objects(chat_history=chat_history, is_deleted=False) .order_by("-timestamp") .first() ) # detail_chat = DetailChat( # id=ObjectId(), # chat_history=chat_history, # you_message=user_input, # ai_message=result_final, # timestamp=datetime.now(pytz.UTC) # ) # detail_chat.save() chat_history_messages = DetailChat.objects(chat_history=chat_history).order_by('timestamp') def convert_to_vn_time(timestamp): return (timestamp + timedelta(hours=7)).strftime('%Y-%m-%dT%H:%M:%S') sorted_messages = sorted(chat_history_messages, key=lambda msg: msg.timestamp, reverse=True) formatted_messages = [ { "index": i + 1, # Đánh số từ 1 "id": str(msg.id), "you_message": msg.you_message, "ai_message": msg.ai_message, "timestamp": convert_to_vn_time(msg.timestamp) } for i, msg in enumerate(sorted_messages) ] return jsonable_encoder({ "new_message": { "id": str(last_chat_result.id), "you_message": last_chat_result.you_message, "ai_message": last_chat_result.ai_message, "timestamp": convert_to_vn_time(last_chat_result.timestamp) }, "previous_messages": formatted_messages }) from bson import ObjectId async def get_user_chat_history(user_id: int): """ API lấy danh sách tất cả các đoạn chat của một user_id. """ check = UserRepository.getUserByUserId(user_id) if check is None: raise HTTPException(status_code=400, detail="User not found or has been deleted in MySQL") user = User.objects(user_id=user_id, is_deleted=False).first() if not user: raise HTTPException(status_code=400, detail="User not found or has been deleted in MongoDB") chat_histories = ChatHistory.objects(user=user, is_deleted=False) chat_list = [ res_chat.ChatResponse( chat_id=str(chat.id), chat_name=chat.name_chat, timestamp=chat.date_deleted if chat.is_deleted else chat.id.generation_time ) for chat in chat_histories ] return res_chat.UserChatHistoryResponse( user_id=user.user_id, user_name=user.user_name, chat_list=chat_list ) from bson import ObjectId async def get_chat_details_text(chat_id: str, user_id: int): """ Trích xuất tất cả các chi tiết chat của một chat_id, gom thành một đoạn văn bản. """ # Kiểm tra xem user có tồn tại không user = User.objects(user_id=user_id, is_deleted=False).first() if not user: raise HTTPException(status_code=400, detail="User not found or has been deleted in MongoDB") # Kiểm tra xem chat history có tồn tại không chat = ChatHistory.objects(_id=ObjectId(chat_id), user=user, is_deleted=False).first() if not chat: raise HTTPException(status_code=400, detail="Chat not found or has been deleted in MongoDB") # Lấy tất cả các chi tiết chat liên quan chat_details = DetailChat.objects(chat_history=chat, is_deleted=False).order_by('timestamp') if not chat_details: return list() # Gom tất cả các câu hỏi và câu trả lời vào danh sách chat_text_list = [] for index, detail in enumerate(chat_details,start=1): chat_text_list.append({ "order":str(index), "timestamp": detail.timestamp.strftime("%Y-%m-%d %H:%M:%S"), "you_message": detail.you_message, "ai_message": detail.ai_message }) return chat_text_list import asyncio import os import subprocess from datetime import datetime from pathlib import Path from function.analyze import main import asyncio from models.Database_Entity import StopSignal from function.analyze.gemini import result_analyze async def check_should_stop(chat_id: str, stop_event: object = None): # Trường hợp dừng qua RAM (in-memory) await asyncio.sleep(0.1) if stop_event and stop_event.is_set(): print("🛑 Dừng qua stop_event.") return {"status": "cancelled"} # Trường hợp dừng qua MongoDB await asyncio.sleep(0.1) if StopSignal.objects(chat_history=chat_id, is_stopped=True).first(): print("🛑 Dừng vì có StopSignal trong DB.") return {"status": "cancelled"} return None from typing import Optional async def generate_and_save_code(question: str, user_id: int, role, languages: str,stop_event: Optional[asyncio.Event], filename: str = "analyze_result.py",chat_id: str = ""): result_check = await check_should_stop(chat_id, stop_event) if result_check: return result_check code_test, folder_name = await main.analyze( question, user_id, languages, role, chat_id, stop_event ) result_check = await check_should_stop(chat_id, stop_event) if result_check: return result_check code_clean = code_test.strip() if code_clean.startswith("```python"): code_clean = code_clean[9:].strip() if code_clean.endswith("```"): code_clean = code_clean[:-3].strip() result_check = await check_should_stop(chat_id, stop_event) if result_check: return result_check # code_clean = code_clean.replace("else:", "").strip() code_clean = code_clean.replace("os_path:", "os.path").strip() encoding_fix = 'import sys\nsys.stdout.reconfigure(encoding="utf-8")\n\n' encoding_fix1= 'import numpy as np\n\n' code_clean = encoding_fix + encoding_fix1 + code_clean timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_dir = Path(f"./Temp/{folder_name}_{timestamp}") output_dir.mkdir(parents=True, exist_ok=True) file_path = output_dir / filename with open(file_path, "w", encoding="utf-8") as f: f.write(code_clean) env = os.environ.copy() result_check = await check_should_stop(chat_id, stop_event) if result_check: return result_check env["OUTPUT_DIR"] = str(output_dir) result = subprocess.run( ["python", filename], capture_output=True, text=True, env=env, cwd=output_dir, encoding="utf-8", errors="replace" ) result_check = await check_should_stop(chat_id, stop_event) if result_check: return result_check output_folder = str(output_dir) absolute_path = os.path.abspath(output_folder) final_path = os.path.join(absolute_path, "test5") result_check = await check_should_stop(chat_id, stop_event) if result_check: return result_check result_final = result_analyze.generate(image_folder=final_path,question=question) result_check = await check_should_stop(chat_id, stop_event) if result_check: return result_check return result_final, final_path