Spaces:
Sleeping
Sleeping
| from langchain_community.utilities.sql_database import SQLDatabase | |
| from langchain_experimental.sql import SQLDatabaseChain | |
| import sys | |
| import os | |
| import pymysql | |
| from fastapi import HTTPException | |
| from fastapi.encoders import jsonable_encoder | |
| import re | |
| sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "."))) | |
| import function.prompt.prompt_main as prompt | |
| import function.prompt.prompt_custom as prompt_cus | |
| os.environ["GOOGLE_API_KEY"] = "AIzaSyDAVIagntGC7kL93qmLgNZ-is1fsb7tsN4" | |
| sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) | |
| from bson import ObjectId | |
| import os | |
| from dotenv import load_dotenv | |
| import os | |
| from dotenv import load_dotenv | |
| import os | |
| from dotenv import load_dotenv | |
| from dotenv import load_dotenv, find_dotenv | |
| load_dotenv(find_dotenv(), override=True) | |
| DB_HOST = os.getenv("DB_HOST") | |
| DB_USER = os.getenv("DB_USER") | |
| DB_PASSWORD = os.getenv("DB_PASSWORD") | |
| DB_NAME = os.getenv("DB_NAME") | |
| DB_PORT = os.getenv("DB_PORT") | |
| import re | |
| def contains_delete(sql: str) -> bool: | |
| return bool(re.search(r'\bdelete\b', sql, re.IGNORECASE)) | |
| # Tạo connection string | |
| import os | |
| from urllib.parse import quote | |
| password = os.getenv("DB_PASSWORD") # VD: 'Yahana0509@' | |
| DB_PASSWORD = quote(password) | |
| connection_uri = ( | |
| f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" | |
| "?ssl_verify_cert=false&ssl_verify_identity=false" | |
| ) | |
| db = SQLDatabase.from_uri(connection_uri) | |
| # db = SQLDatabase.from_uri("mysql+pymysql://root@127.0.0.1:3306/demohmdrinks") | |
| from dotenv import load_dotenv | |
| import function.filter.filter_role as filter_role_1 | |
| import function.filter.filter_sql_injection as filter_sql_injection_1 | |
| import function.filter.result as query_result_1 | |
| import support.get_key as get_key | |
| import response.ResponseChat as res_chat | |
| from datetime import datetime | |
| import pytz | |
| from mongoengine import connect | |
| import sys | |
| import os | |
| import nltk | |
| import function.agent.pipeline_agent as pipeline_agent | |
| nltk.download('punkt') | |
| from models.Database_Entity import User, ChatHistory, DetailChat | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| MONGO_URI = os.getenv("MONGO_URI", "") | |
| connect("chatbot_hmdrinks", host=MONGO_URI) | |
| load_dotenv() | |
| #setup model | |
| from bson import ObjectId | |
| import random | |
| from langchain_google_genai import GoogleGenerativeAIEmbeddings, ChatGoogleGenerativeAI | |
| BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../")) | |
| sys.path.insert(0, BASE_DIR) | |
| from repository.MySQL import UserRepository | |
| from function.prompt.prompt_syntax_insert import is_insert_related_to_product_category_variant, filter_syntax_sql | |
| import sqlparse | |
| import sqlparse | |
| import sys | |
| import os | |
| sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..'))) | |
| from function.prompt import prompt_detail_table | |
| schema_mapping = { | |
| "user": prompt_detail_table.prompt_users, | |
| "user_voucher": prompt_detail_table.prompt_user_voucher, | |
| "category": prompt_detail_table.prompt_categort, | |
| "category_translation": prompt_detail_table.prompt_category_translation, | |
| "cart": prompt_detail_table.prompt_cart, | |
| "cart_item": prompt_detail_table.prompt_cart_item, | |
| "orders":prompt_detail_table.prompt_orders, | |
| "order_item": prompt_detail_table.prompt_order_item, | |
| "payment": prompt_detail_table.prompt_payments, | |
| "favourite": prompt_detail_table.prompt_favourite, | |
| "favourite_item": prompt_detail_table.prompt_fav_item, | |
| "post": prompt_detail_table.prompt_post, | |
| "post_translation": prompt_detail_table.prompt_post_translation, | |
| "product": prompt_detail_table.prompt_product, | |
| "product_translation": prompt_detail_table.prompt_product_translation, | |
| "shipment": prompt_detail_table.prompt_shipment, | |
| "product_variants": prompt_detail_table.prompt_product_variants, | |
| "review": prompt_detail_table.prompt_review, | |
| "user_coin": prompt_detail_table.prompt_user_coin, | |
| "absence": prompt_detail_table.prompt_absence, | |
| "cart_group": prompt_detail_table.prompt_cart_group, | |
| "cart_item_group": prompt_detail_table.prompt_cartitem_group, | |
| "group_orders": prompt_detail_table.prompt_group_orders, | |
| "payments_group": prompt_detail_table.prompt_payments_group, | |
| "group_order_members":prompt_detail_table.prompt_group_orders_member, | |
| "shipment_group":prompt_detail_table.prompt_shipment_group, | |
| "shipper_attendance":prompt_detail_table.prompt_shipper_attendance, | |
| "shipper_commission_detail":prompt_detail_table.prompt_shipper_commission_detail, | |
| "shipper_salary_summary":prompt_detail_table.prompt_shipper_salary_summary, | |
| "voucher": prompt_detail_table.prompt_voucher | |
| } | |
| def get_schemas_from_sql(sql_query: str, schema_mapping: dict): | |
| import sqlglot | |
| parsed_query = sqlglot.parse_one(sql_query, read="sqlite") | |
| # Lấy danh sách bảng duy nhất trong query | |
| table_names = list({t.name for t in parsed_query.find_all(sqlglot.exp.Table)}) | |
| schemas_used = {} | |
| for table in table_names: | |
| if table in schema_mapping: | |
| schemas_used[table] = schema_mapping[table] | |
| else: | |
| print(f"⚠️ Warning: Table '{table}' not found in schema_mapping") | |
| # Gom toàn bộ schema thành 1 chuỗi duy nhất | |
| all_schemas = "\n\n".join( | |
| [f"Schema for table '{table}':\n{schemas_used[table]}" for table in schemas_used] | |
| ) | |
| return all_schemas | |
| def build_sql_fix_prompt(schemas_result: dict, sql: str) -> str: | |
| prompt = f""" | |
| Bạn là một chuyên gia cơ sở dữ liệu. | |
| Dưới đây là mô tả schema chi tiết của các bảng có trong hệ thống: | |
| {schemas_result} | |
| --- | |
| Dưới đây là một câu SQL đang bị lỗi do không đúng tên bảng hoặc tên cột: | |
| ```sql | |
| {sql.strip()} | |
| Yêu cầu của bạn là: | |
| Dựa trên các schema ở trên, hãy kiểm tra và chỉnh sửa câu SQL sao cho: | |
| Tên bảng, tên cột phải chính xác theo schema. | |
| Logic và mục đích của truy vấn được giữ nguyên. | |
| Chỉ trả lại phần SQL đã được chỉnh sửa (không giải thích, không chú thích, không thêm nhận xét). | |
| Trả lời dưới dạng một truy vấn SQL duy nhất. | |
| """.strip() | |
| return prompt | |
| async def execute_query_user(user_input: str, user_id: int, languages: str, role: str): | |
| api_key = get_key.get_random_api_key() | |
| os.environ["GOOGLE_API_KEY"] = api_key | |
| llm1 = ChatGoogleGenerativeAI(model='gemini-2.5-flash-preview-04-17',temperature=0.6) | |
| # db = SQLDatabase.from_uri("mysql+pymysql://root@127.0.0.1:3306/demohmdrinks") | |
| db = SQLDatabase.from_uri(connection_uri) | |
| PROMPT_CUSTOM = await prompt_cus.get_prompt_custom(user_input) | |
| check_insert = is_insert_related_to_product_category_variant(user_input) | |
| db_config = { | |
| "host": os.getenv("DB_HOST"), | |
| "user": os.getenv("DB_USER"), | |
| "database": os.getenv("DB_NAME"), | |
| "password": os.getenv("DB_PASSWORD"), | |
| "port": int(os.getenv("DB_PORT", 3306)), | |
| "charset": "utf8mb4", | |
| "cursorclass": pymysql.cursors.DictCursor, | |
| } | |
| def execute_query_with_pymysql(query, multi=False): | |
| connection = pymysql.connect(**db_config) | |
| try: | |
| with connection.cursor() as cursor: | |
| results = [] | |
| if multi: | |
| statements = sqlparse.split(query) | |
| for stmt in statements: | |
| stmt = stmt.strip() | |
| if stmt: | |
| try: | |
| cursor.execute(stmt) | |
| try: | |
| results.append(cursor.fetchall()) | |
| except pymysql.ProgrammingError: | |
| results.append("✅ Executed") | |
| except Exception as e: | |
| results.append(f"❌ Error in query: {stmt}\n{str(e)}") | |
| else: | |
| try: | |
| cursor.execute(query) | |
| results = cursor.fetchall() | |
| except Exception as e: | |
| return f"❌ Error executing query: {str(e)}" | |
| connection.commit() | |
| return results | |
| except pymysql.MySQLError as e: | |
| return f"❌ MySQL Error: {str(e)}" | |
| finally: | |
| connection.close() | |
| def clean_sql(sql: str) -> str: | |
| sql = re.sub(r"```sql", "", sql, flags=re.IGNORECASE) | |
| sql = re.sub(r'%%s', r'%s', sql) | |
| sql = re.sub(r"```", "", sql) | |
| return sql.strip() | |
| def extract_sql_from_response(data): | |
| match = re.search(r"SQLQuery:\s*(.*)", data, re.DOTALL) | |
| return clean_sql(match.group(1)) if match else None | |
| def extract_sql_from_error(error_msg): | |
| match = re.search(r"```sql\n(.*?)```", error_msg, re.DOTALL) | |
| return clean_sql(match.group(1)) if match else None | |
| def process_and_execute_sql(sql): | |
| sql_clean = clean_sql(sql) | |
| result = get_schemas_from_sql(sql_clean, schema_mapping) | |
| prompt = build_sql_fix_prompt(result,sql =sql_clean) | |
| from function.advance_shopping.call_gemini import tool_call | |
| data = tool_call.generate(prompt = prompt) | |
| sql_clean = clean_sql(data) | |
| print("SQL step2: ", sql_clean) | |
| if contains_delete(sql_clean): | |
| return "Lỗi: Bạn không dược phép thực hiện truy vấn DELETE trong hệ thống này." | |
| if re.search(r"\\bIF\\b.*\\bTHEN\\b", sql_clean, re.IGNORECASE): | |
| return "❌ Lỗi: Không được dùng IF...THEN trong SQL. Vui lòng chia nhỏ truy vấn." | |
| if check_insert: | |
| check_syntax = filter_syntax_sql(sql_clean, PROMPT_CUSTOM, user_input) | |
| if check_syntax is True: | |
| try: | |
| connection = pymysql.connect(**db_config) | |
| with connection.cursor() as cursor: | |
| statements = sqlparse.split(sql_clean) | |
| results = [] | |
| for stmt in statements: | |
| stmt = stmt.strip() | |
| if stmt: | |
| try: | |
| cursor.execute(stmt) | |
| try: | |
| results.append(cursor.fetchall()) | |
| except: | |
| results.append("✅ OK") | |
| except Exception as e: | |
| return f"❌ Lỗi tại truy vấn: `{stmt}`\nChi tiết: {str(e)}" | |
| connection.commit() | |
| return results | |
| except Exception as e: | |
| return f"❌ Lỗi khi thực thi từng truy vấn: {str(e)}" | |
| finally: | |
| connection.close() | |
| else: | |
| try: | |
| regenerated_data = db_chain.run(f""" | |
| Role: {text_role} | |
| Language: {languages} | |
| Question: {user_input}. | |
| """) | |
| regenerated_sql = extract_sql_from_response(regenerated_data) | |
| if regenerated_sql: | |
| return process_and_execute_sql(regenerated_sql) | |
| else: | |
| return "❌ Lỗi: Không thể tạo lại truy vấn hợp lệ." | |
| except Exception as regen_error: | |
| return f"❌ Lỗi khi tạo lại truy vấn: {str(regen_error)}" | |
| else: | |
| return execute_query_with_pymysql(sql_clean, multi=True) | |
| if "Do not use IF...THEN" not in PROMPT_CUSTOM.template: | |
| PROMPT_CUSTOM.template += ( | |
| "\n\n⚠️ Note: Do NOT use IF...THEN...ELSE...END in SQL. " | |
| "Only use plain SELECT, INSERT, UPDATE, DELETE, SET statements." | |
| ) | |
| db_chain = SQLDatabaseChain.from_llm(llm=llm1, db=db, prompt=PROMPT_CUSTOM) | |
| text_role = f"{role} (userId = {user_id})" if role == "ADMIN" else f"{role} (userId = {user_id}), not role ADMIN" | |
| try: | |
| data = db_chain.run(f""" | |
| Role: {text_role} | |
| Language: {languages} | |
| Question: {user_input}. | |
| """) | |
| extracted_sql = extract_sql_from_response(data) | |
| if extracted_sql: | |
| return process_and_execute_sql(extracted_sql) | |
| else: | |
| return data | |
| except Exception as e: | |
| error_message = str(e) | |
| extracted_sql = extract_sql_from_error(error_message) | |
| fix_sql = extracted_sql.replace("```","") | |
| fix_sql = re.sub(r"```sql", "", fix_sql) | |
| fix_sql = re.sub(r'%%s', r'%s', fix_sql) | |
| if contains_delete(fix_sql): | |
| return "Lỗi: Bạn không dược phép thực hiện truy vấn DELETE trong hệ thống này." | |
| if extracted_sql: | |
| return process_and_execute_sql(fix_sql) | |
| else: | |
| return f"❌ Lỗi không thể thực thi truy vấn: {error_message}" | |
| async def create_new_chat_history(user_id: int) -> str: | |
| """ | |
| Tạo một đoạn chat mới cho user_id và trả về ObjectId của đoạn chat. | |
| """ | |
| check = UserRepository.getUserByUserId(user_id) | |
| if check is None: | |
| raise HTTPException(status_code=404, detail="User not found or has been deleted in MySQL") | |
| user = User.objects(user_id=user_id).first() | |
| if not user: | |
| user = User(id=ObjectId(), user_id=user_id, user_name=f"User_{user_id}") | |
| user.save() | |
| random_name_chat = str(random.randint(10**14, 10**15 - 1)) | |
| name_chat = f"Chat_{random_name_chat}" | |
| new_chat = ChatHistory(id=ObjectId(), user=user, name_chat=name_chat) | |
| new_chat.save() | |
| return res_chat.CreateNewChat(idMongo=str(new_chat.id), chat_name=name_chat) | |
| async def update_chat_name(chat_id: str, new_name: str,user_id:int) -> str: | |
| """ | |
| Cập nhật name_chat của một ChatHistory dựa trên chat_id. | |
| """ | |
| check = UserRepository.getUserByUserId(user_id) | |
| if check is None: | |
| raise HTTPException(status_code=404, detail="User not found or has been deleted in MySQL") | |
| user = User.objects(user_id=user_id, is_deleted=False).first() | |
| if not user: | |
| raise HTTPException(status_code=400, detail="User not found or has been deleted in MongoDB") | |
| chat = ChatHistory.objects(_id=ObjectId(chat_id)).first() | |
| if not chat: | |
| raise HTTPException(status_code=404, detail="Chat not found in MongoDB") | |
| chat.name_chat = new_name | |
| chat.save() | |
| return f"Updated chat name to {new_name}" | |
| async def soft_delete_chat(chat_id: str,user_id:int): | |
| """ | |
| Cập nhật `is_deleted=True` và `date_deleted` cho `ChatHistory` và các `DetailChat` liên quan. | |
| """ | |
| check = UserRepository.getUserByUserId(user_id) | |
| if check is None: | |
| raise HTTPException(status_code=400, detail="User not found or has been deleted in MySQL") | |
| check_history_id = UserRepository.getChatHistory(user_id,chat_id) | |
| if check_history_id is None: | |
| raise HTTPException(status_code=400, detail="Chat history not found or has been deleted in MySQL") | |
| user = User.objects(user_id=user_id, is_deleted=False).first() | |
| if not user: | |
| raise HTTPException(status_code=400, detail="User not found or has been deleted in MongoDB") | |
| chat = ChatHistory.objects(_id=ObjectId(chat_id)).first() | |
| if not chat: | |
| raise HTTPException(status_code=400, detail="Chat not found or has been deleted in MongoDB") | |
| chat.is_deleted = True | |
| chat.date_deleted = datetime.now(pytz.UTC) | |
| chat.save() | |
| DetailChat.objects(chat_history=chat).update( | |
| set__is_deleted=True, | |
| set__date_deleted=datetime.now(pytz.UTC) | |
| ) | |
| return {"message": "Chat and related details marked as deleted"} | |
| from typing import Optional | |
| from models.Database_Entity import StopSignal | |
| from datetime import datetime, timedelta | |
| import asyncio | |
| async def chat_with_user( | |
| user_input: str, | |
| user_id: int, | |
| languages: str, | |
| role: str, | |
| token: str, | |
| chat_history_id: str = None, | |
| stop_event: Optional[asyncio.Event] = None | |
| ) -> str: | |
| """ | |
| Xử lý tin nhắn của người dùng, lưu vào lịch sử chat và trả về phản hồi từ AI. | |
| """ | |
| if role not in ["ADMIN", "CUSTOMER", "SHIPPER"]: | |
| raise HTTPException(status_code=400, detail="ROLE not valid") | |
| user_id = int(user_id) | |
| if languages not in ["VN", "EN"]: | |
| raise HTTPException(status_code=400, detail="Language not valid") | |
| if not user_input: | |
| raise HTTPException(status_code=400, detail="User input empty") | |
| if not isinstance(user_id, int) or user_id <= 0: | |
| raise HTTPException(status_code=400, detail="Invalid user_id: must be a positive integer") | |
| languages = "Vietnamese" if languages == "VN" else "English" | |
| check = UserRepository.getUserByUserId(user_id) | |
| if check is None: | |
| raise HTTPException(status_code=400, detail="User not found or has been deleted in MySQL") | |
| check_history_id = UserRepository.getChatHistory(user_id,chat_history_id) | |
| if check_history_id is None: | |
| raise HTTPException(status_code=400, detail="Chat not found or has been deleted in MySQL") | |
| user = User.objects(user_id=user_id).first() | |
| if not user: | |
| return {"error": "User not found or has been deleted in MongoDB"} | |
| chat_history = None | |
| if chat_history_id: | |
| try: | |
| chat_history_obj_id = ObjectId(chat_history_id) # Chuyển đổi sang ObjectId | |
| chat_history = ChatHistory.objects(_id=chat_history_obj_id, user=user).first() | |
| except Exception as e: | |
| print(f"⚠️ Invalid chat_history_id: {e}") | |
| if not chat_history: | |
| raise HTTPException(status_code=400, detail="Chat history not found or has been deleted in MongoDB") | |
| await asyncio.sleep(0.1) | |
| if stop_event and stop_event.is_set(): | |
| raise asyncio.CancelledError("⛔ Task was cancelled before processing") | |
| await asyncio.sleep(0.1) | |
| if StopSignal.objects(chat_history=chat_history_id, is_stopped=True).first(): | |
| print("🛑 Dừng vì có StopSignal trong DB.") | |
| raise asyncio.CancelledError("⛔ Task was cancelled before processing") | |
| result_final = await pipeline_agent.multi_query_user( | |
| user_input, user_id, role, languages, chat_history_id, token,stop_event | |
| ) | |
| detail_chat = DetailChat( | |
| id=ObjectId(), | |
| chat_history=chat_history, | |
| you_message=user_input, | |
| ai_message=result_final, | |
| timestamp=datetime.now(pytz.UTC) | |
| ) | |
| detail_chat.save() | |
| chat_history_messages = DetailChat.objects(chat_history=chat_history).order_by('timestamp') | |
| def convert_to_vn_time(timestamp): | |
| return (timestamp + timedelta(hours=7)).strftime('%Y-%m-%dT%H:%M:%S') | |
| sorted_messages = sorted(chat_history_messages, key=lambda msg: msg.timestamp, reverse=True) | |
| formatted_messages = [ | |
| { | |
| "index": i + 1, # Đánh số từ 1 | |
| "id": str(msg.id), | |
| "you_message": msg.you_message, | |
| "ai_message": msg.ai_message, | |
| "timestamp": convert_to_vn_time(msg.timestamp) | |
| } | |
| for i, msg in enumerate(sorted_messages) | |
| ] | |
| return jsonable_encoder({ | |
| "new_message": { | |
| "id": str(detail_chat.id), | |
| "you_message": detail_chat.you_message, | |
| "ai_message": detail_chat.ai_message, | |
| "timestamp": convert_to_vn_time(detail_chat.timestamp) | |
| }, | |
| "previous_messages": formatted_messages | |
| }) | |
| from bson import ObjectId | |
| async def get_chat_details(chat_id: str,user_id:int): | |
| """ | |
| Lấy tất cả `DetailChat` thuộc `ChatHistory` có `chat_id`, chỉ lấy bản ghi `is_deleted=False`. | |
| """ | |
| check = UserRepository.getUserByUserId(user_id) | |
| if check is None: | |
| raise HTTPException(status_code=400, detail="User not found or has been deleted in MySQL") | |
| check_history_id = UserRepository.getChatHistory(user_id,chat_id) | |
| if check_history_id is None: | |
| raise HTTPException(status_code=400, detail="Chat history not found or has been deleted in MySQL") | |
| user = User.objects(user_id=user_id, is_deleted=False).first() | |
| if not user: | |
| raise HTTPException(status_code=400, detail="User not found or has been deleted in MongoDB") | |
| chat = ChatHistory.objects(_id=ObjectId(chat_id), is_deleted=False).first() | |
| if not chat: | |
| raise HTTPException(status_code=400, detail="Chat history not found or has been deleted in MongoDB") | |
| chat_details = DetailChat.objects(chat_history=chat, is_deleted=False) | |
| def convert_to_vn_time(timestamp): | |
| return (timestamp + timedelta(hours=7)).strftime('%Y-%m-%dT%H:%M:%S') | |
| list_detail_response = [ | |
| res_chat.DetailResponse( | |
| id=str(index + 1), # ✅ Đánh số lại từ 1 | |
| you_message=detail.you_message, | |
| ai_message=detail.ai_message, | |
| timestamp=convert_to_vn_time(detail.timestamp) # ✅ Chuyển sang GMT+7 | |
| ) | |
| for index, detail in enumerate(sorted(chat_details, key=lambda d: d.timestamp, reverse=True)) # ✅ Sắp xếp giảm dần | |
| ] | |
| return res_chat.ListDetailResponse( | |
| chat_id=str(chat.id), | |
| chat_name=chat.name_chat, | |
| list_detail_response=list_detail_response | |
| ) | |
| async def regenerate( | |
| user_question_new: str, | |
| user_id: int, | |
| languages: str, | |
| role: str, | |
| token: str, | |
| chat_history_id: str = None, | |
| stop_event: Optional[asyncio.Event] = None | |
| ) -> str: | |
| """ | |
| Xử lý tin nhắn của người dùng, lưu vào lịch sử chat và trả về phản hồi từ AI. | |
| """ | |
| PROMPT_CUSTOM = await prompt_cus.get_prompt_custom(user_question_new) | |
| if role not in ["ADMIN", "CUSTOMER", "SHIPPER"]: | |
| raise HTTPException(status_code=400, detail="ROLE not valid") | |
| user_id = int(user_id) | |
| if languages not in ["VN", "EN"]: | |
| raise HTTPException(status_code=400, detail="Language not valid") | |
| if not user_question_new: | |
| raise HTTPException(status_code=400, detail="User input empty") | |
| if not isinstance(user_id, int) or user_id <= 0: | |
| raise HTTPException(status_code=400, detail="Invalid user_id: must be a positive integer") | |
| languages = "Vietnamese" if languages == "VN" else "English" | |
| check = UserRepository.getUserByUserId(user_id) | |
| if check is None: | |
| raise HTTPException(status_code=400, detail="User not found or has been deleted in MySQL") | |
| check_history_id = UserRepository.getChatHistory(user_id,chat_history_id) | |
| if check_history_id is None: | |
| raise HTTPException(status_code=400, detail="Chat not found or has been deleted in MySQL") | |
| user = User.objects(user_id=user_id).first() | |
| if not user: | |
| return {"error": "User not found or has been deleted in MongoDB"} | |
| chat_history = None | |
| if chat_history_id: | |
| try: | |
| chat_history_obj_id = ObjectId(chat_history_id) # Chuyển đổi sang ObjectId | |
| chat_history = ChatHistory.objects(_id=chat_history_obj_id, user=user).first() | |
| except Exception as e: | |
| print(f"⚠️ Invalid chat_history_id: {e}") | |
| if not chat_history: | |
| raise HTTPException(status_code=400, detail="Chat history not found or has been deleted in MongoDB") | |
| # filtered_input = filter_sql_injection_1.filter_sql_injection(user_input) | |
| # filtered_role_input = filter_role_1.filter_role(filtered_input) | |
| # result = await execute_query_user(filtered_role_input, user_id, languages, role) | |
| # result_final = query_result_1.query_result(user_input, result) | |
| if chat_history: | |
| last_chat = ( | |
| DetailChat.objects(chat_history=chat_history, is_deleted=False) | |
| .order_by("-timestamp") | |
| .first() | |
| ) | |
| await asyncio.sleep(0.1) | |
| if stop_event and stop_event.is_set(): | |
| raise asyncio.CancelledError("⛔ Task was cancelled before processing") | |
| await asyncio.sleep(0.1) | |
| if StopSignal.objects(chat_history=chat_history_id, is_stopped=True).first(): | |
| print("🛑 Dừng vì có StopSignal trong DB.") | |
| raise asyncio.CancelledError("⛔ Task was cancelled before processing") | |
| result_final = await pipeline_agent.multi_query_user(user_question_new,user_id,role,languages,chat_history_id,token,stop_event) | |
| last_chat.update(set__you_message=user_question_new, set__ai_message=result_final, set__timestamp = datetime.now(pytz.UTC)) | |
| last_chat_result = ( | |
| DetailChat.objects(chat_history=chat_history, is_deleted=False) | |
| .order_by("-timestamp") | |
| .first() | |
| ) | |
| chat_history_messages = DetailChat.objects(chat_history=chat_history).order_by('timestamp') | |
| def convert_to_vn_time(timestamp): | |
| return (timestamp + timedelta(hours=7)).strftime('%Y-%m-%dT%H:%M:%S') | |
| sorted_messages = sorted(chat_history_messages, key=lambda msg: msg.timestamp, reverse=True) | |
| formatted_messages = [ | |
| { | |
| "index": i + 1, # Đánh số từ 1 | |
| "id": str(msg.id), | |
| "you_message": msg.you_message, | |
| "ai_message": msg.ai_message, | |
| "timestamp": convert_to_vn_time(msg.timestamp) | |
| } | |
| for i, msg in enumerate(sorted_messages) | |
| ] | |
| return jsonable_encoder({ | |
| "new_message": { | |
| "id": str(last_chat_result.id), | |
| "you_message": last_chat_result.you_message, | |
| "ai_message": last_chat_result.ai_message, | |
| "timestamp": convert_to_vn_time(last_chat_result.timestamp) | |
| }, | |
| "previous_messages": formatted_messages | |
| }) | |
| from bson import ObjectId | |
| async def get_user_chat_history(user_id: int): | |
| """ | |
| API lấy danh sách tất cả các đoạn chat của một user_id. | |
| """ | |
| check = UserRepository.getUserByUserId(user_id) | |
| if check is None: | |
| raise HTTPException(status_code=400, detail="User not found or has been deleted in MySQL") | |
| user = User.objects(user_id=user_id, is_deleted=False).first() | |
| if not user: | |
| raise HTTPException(status_code=400, detail="User not found or has been deleted in MongoDB") | |
| chat_histories = ChatHistory.objects(user=user, is_deleted=False) | |
| chat_list = [ | |
| res_chat.ChatResponse( | |
| chat_id=str(chat.id), | |
| chat_name=chat.name_chat, | |
| timestamp=chat.date_deleted if chat.is_deleted else chat.id.generation_time | |
| ) | |
| for chat in chat_histories | |
| ] | |
| return res_chat.UserChatHistoryResponse( | |
| user_id=user.user_id, | |
| user_name=user.user_name, | |
| chat_list=chat_list | |
| ) | |
| from bson import ObjectId | |
| async def get_chat_details_text(chat_id: str, user_id: int): | |
| """ | |
| Trích xuất tất cả các chi tiết chat của một chat_id, gom thành một đoạn văn bản. | |
| """ | |
| # Kiểm tra xem user có tồn tại không | |
| user = User.objects(user_id=user_id, is_deleted=False).first() | |
| if not user: | |
| raise HTTPException(status_code=400, detail="User not found or has been deleted in MongoDB") | |
| # Kiểm tra xem chat history có tồn tại không | |
| chat = ChatHistory.objects(_id=ObjectId(chat_id), user=user, is_deleted=False).first() | |
| if not chat: | |
| raise HTTPException(status_code=400, detail="Chat not found or has been deleted in MongoDB") | |
| chat_details = DetailChat.objects(chat_history=chat, is_deleted=False).order_by('timestamp') | |
| if not chat_details: | |
| return list() | |
| # Gom tất cả các câu hỏi và câu trả lời vào danh sách | |
| chat_text_list = [] | |
| for index, detail in enumerate(chat_details,start=1): | |
| chat_text_list.append({ | |
| "order":str(index), | |
| "timestamp": detail.timestamp.strftime("%Y-%m-%d %H:%M:%S"), | |
| "you_message": detail.you_message, | |
| "ai_message": detail.ai_message | |
| }) | |
| return chat_text_list | |
| import asyncio | |
| import os | |
| import subprocess | |
| from datetime import datetime | |
| from pathlib import Path | |
| from function.analyze import main | |
| from function.analyze.gemini import result_analyze | |
| async def generate_and_save_code(question: str, user_id: int, role, languages: str, filename: str = "analyze_result.py"): | |
| code_test, folder_name = await main.analyze( | |
| question, | |
| user_id, | |
| languages, | |
| role | |
| ) | |
| code_clean = code_test.strip() | |
| if code_clean.startswith("```python"): | |
| code_clean = code_clean[9:].strip() | |
| if code_clean.endswith("```"): | |
| code_clean = code_clean[:-3].strip() | |
| # code_clean = code_clean.replace("else:", "").strip() | |
| code_clean = code_clean.replace("os_path:", "os.path").strip() | |
| encoding_fix = 'import sys\nsys.stdout.reconfigure(encoding="utf-8")\n\n' | |
| encoding_fix1= 'import numpy as np\n\n' | |
| code_clean = encoding_fix + encoding_fix1 + code_clean | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| output_dir = Path(f"./Temp/{folder_name}_{timestamp}") | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| file_path = output_dir / filename | |
| with open(file_path, "w", encoding="utf-8") as f: | |
| f.write(code_clean) | |
| env = os.environ.copy() | |
| env["OUTPUT_DIR"] = str(output_dir) | |
| result = subprocess.run( | |
| ["python", filename], | |
| capture_output=True, | |
| text=True, | |
| env=env, | |
| cwd=output_dir, | |
| encoding="utf-8", | |
| errors="replace" | |
| ) | |
| output_folder = str(output_dir) | |
| absolute_path = os.path.abspath(output_folder) | |
| final_path = os.path.join(absolute_path, "test5") | |
| result_final = result_analyze.generate(image_folder=final_path,question=question) | |
| return result_final,final_path |