chatbot_server / service /ChatService.py
kltn21110's picture
Update service/ChatService.py
33251a9 verified
from langchain_community.utilities.sql_database import SQLDatabase
from langchain_experimental.sql import SQLDatabaseChain
import sys
import os
import pymysql
from fastapi import HTTPException
from fastapi.encoders import jsonable_encoder
import re
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".")))
import function.prompt.prompt_main as prompt
import function.prompt.prompt_custom as prompt_cus
os.environ["GOOGLE_API_KEY"] = "AIzaSyDAVIagntGC7kL93qmLgNZ-is1fsb7tsN4"
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from bson import ObjectId
import os
from dotenv import load_dotenv
import os
from dotenv import load_dotenv
import os
from dotenv import load_dotenv
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv(), override=True)
DB_HOST = os.getenv("DB_HOST")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_NAME = os.getenv("DB_NAME")
DB_PORT = os.getenv("DB_PORT")
import re
def contains_delete(sql: str) -> bool:
return bool(re.search(r'\bdelete\b', sql, re.IGNORECASE))
# Tạo connection string
import os
from urllib.parse import quote
password = os.getenv("DB_PASSWORD") # VD: 'Yahana0509@'
DB_PASSWORD = quote(password)
connection_uri = (
f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
"?ssl_verify_cert=false&ssl_verify_identity=false"
)
db = SQLDatabase.from_uri(connection_uri)
# db = SQLDatabase.from_uri("mysql+pymysql://root@127.0.0.1:3306/demohmdrinks")
from dotenv import load_dotenv
import function.filter.filter_role as filter_role_1
import function.filter.filter_sql_injection as filter_sql_injection_1
import function.filter.result as query_result_1
import support.get_key as get_key
import response.ResponseChat as res_chat
from datetime import datetime
import pytz
from mongoengine import connect
import sys
import os
import nltk
import function.agent.pipeline_agent as pipeline_agent
nltk.download('punkt')
from models.Database_Entity import User, ChatHistory, DetailChat
from dotenv import load_dotenv
load_dotenv()
MONGO_URI = os.getenv("MONGO_URI", "")
connect("chatbot_hmdrinks", host=MONGO_URI)
load_dotenv()
#setup model
from bson import ObjectId
import random
from langchain_google_genai import GoogleGenerativeAIEmbeddings, ChatGoogleGenerativeAI
BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../"))
sys.path.insert(0, BASE_DIR)
from repository.MySQL import UserRepository
from function.prompt.prompt_syntax_insert import is_insert_related_to_product_category_variant, filter_syntax_sql
import sqlparse
import sqlparse
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..')))
from function.prompt import prompt_detail_table
schema_mapping = {
"user": prompt_detail_table.prompt_users,
"user_voucher": prompt_detail_table.prompt_user_voucher,
"category": prompt_detail_table.prompt_categort,
"category_translation": prompt_detail_table.prompt_category_translation,
"cart": prompt_detail_table.prompt_cart,
"cart_item": prompt_detail_table.prompt_cart_item,
"orders":prompt_detail_table.prompt_orders,
"order_item": prompt_detail_table.prompt_order_item,
"payment": prompt_detail_table.prompt_payments,
"favourite": prompt_detail_table.prompt_favourite,
"favourite_item": prompt_detail_table.prompt_fav_item,
"post": prompt_detail_table.prompt_post,
"post_translation": prompt_detail_table.prompt_post_translation,
"product": prompt_detail_table.prompt_product,
"product_translation": prompt_detail_table.prompt_product_translation,
"shipment": prompt_detail_table.prompt_shipment,
"product_variants": prompt_detail_table.prompt_product_variants,
"review": prompt_detail_table.prompt_review,
"user_coin": prompt_detail_table.prompt_user_coin,
"absence": prompt_detail_table.prompt_absence,
"cart_group": prompt_detail_table.prompt_cart_group,
"cart_item_group": prompt_detail_table.prompt_cartitem_group,
"group_orders": prompt_detail_table.prompt_group_orders,
"payments_group": prompt_detail_table.prompt_payments_group,
"group_order_members":prompt_detail_table.prompt_group_orders_member,
"shipment_group":prompt_detail_table.prompt_shipment_group,
"shipper_attendance":prompt_detail_table.prompt_shipper_attendance,
"shipper_commission_detail":prompt_detail_table.prompt_shipper_commission_detail,
"shipper_salary_summary":prompt_detail_table.prompt_shipper_salary_summary,
"voucher": prompt_detail_table.prompt_voucher
}
def get_schemas_from_sql(sql_query: str, schema_mapping: dict):
import sqlglot
parsed_query = sqlglot.parse_one(sql_query, read="sqlite")
# Lấy danh sách bảng duy nhất trong query
table_names = list({t.name for t in parsed_query.find_all(sqlglot.exp.Table)})
schemas_used = {}
for table in table_names:
if table in schema_mapping:
schemas_used[table] = schema_mapping[table]
else:
print(f"⚠️ Warning: Table '{table}' not found in schema_mapping")
# Gom toàn bộ schema thành 1 chuỗi duy nhất
all_schemas = "\n\n".join(
[f"Schema for table '{table}':\n{schemas_used[table]}" for table in schemas_used]
)
return all_schemas
def build_sql_fix_prompt(schemas_result: dict, sql: str) -> str:
prompt = f"""
Bạn là một chuyên gia cơ sở dữ liệu.
Dưới đây là mô tả schema chi tiết của các bảng có trong hệ thống:
{schemas_result}
---
Dưới đây là một câu SQL đang bị lỗi do không đúng tên bảng hoặc tên cột:
```sql
{sql.strip()}
Yêu cầu của bạn là:
Dựa trên các schema ở trên, hãy kiểm tra và chỉnh sửa câu SQL sao cho:
Tên bảng, tên cột phải chính xác theo schema.
Logic và mục đích của truy vấn được giữ nguyên.
Chỉ trả lại phần SQL đã được chỉnh sửa (không giải thích, không chú thích, không thêm nhận xét).
Trả lời dưới dạng một truy vấn SQL duy nhất.
""".strip()
return prompt
async def execute_query_user(user_input: str, user_id: int, languages: str, role: str):
api_key = get_key.get_random_api_key()
os.environ["GOOGLE_API_KEY"] = api_key
llm1 = ChatGoogleGenerativeAI(model='gemini-2.5-flash-preview-04-17',temperature=0.6)
# db = SQLDatabase.from_uri("mysql+pymysql://root@127.0.0.1:3306/demohmdrinks")
db = SQLDatabase.from_uri(connection_uri)
PROMPT_CUSTOM = await prompt_cus.get_prompt_custom(user_input)
check_insert = is_insert_related_to_product_category_variant(user_input)
db_config = {
"host": os.getenv("DB_HOST"),
"user": os.getenv("DB_USER"),
"database": os.getenv("DB_NAME"),
"password": os.getenv("DB_PASSWORD"),
"port": int(os.getenv("DB_PORT", 3306)),
"charset": "utf8mb4",
"cursorclass": pymysql.cursors.DictCursor,
}
def execute_query_with_pymysql(query, multi=False):
connection = pymysql.connect(**db_config)
try:
with connection.cursor() as cursor:
results = []
if multi:
statements = sqlparse.split(query)
for stmt in statements:
stmt = stmt.strip()
if stmt:
try:
cursor.execute(stmt)
try:
results.append(cursor.fetchall())
except pymysql.ProgrammingError:
results.append("✅ Executed")
except Exception as e:
results.append(f"❌ Error in query: {stmt}\n{str(e)}")
else:
try:
cursor.execute(query)
results = cursor.fetchall()
except Exception as e:
return f"❌ Error executing query: {str(e)}"
connection.commit()
return results
except pymysql.MySQLError as e:
return f"❌ MySQL Error: {str(e)}"
finally:
connection.close()
def clean_sql(sql: str) -> str:
sql = re.sub(r"```sql", "", sql, flags=re.IGNORECASE)
sql = re.sub(r'%%s', r'%s', sql)
sql = re.sub(r"```", "", sql)
return sql.strip()
def extract_sql_from_response(data):
match = re.search(r"SQLQuery:\s*(.*)", data, re.DOTALL)
return clean_sql(match.group(1)) if match else None
def extract_sql_from_error(error_msg):
match = re.search(r"```sql\n(.*?)```", error_msg, re.DOTALL)
return clean_sql(match.group(1)) if match else None
def process_and_execute_sql(sql):
sql_clean = clean_sql(sql)
result = get_schemas_from_sql(sql_clean, schema_mapping)
prompt = build_sql_fix_prompt(result,sql =sql_clean)
from function.advance_shopping.call_gemini import tool_call
data = tool_call.generate(prompt = prompt)
sql_clean = clean_sql(data)
print("SQL step2: ", sql_clean)
if contains_delete(sql_clean):
return "Lỗi: Bạn không dược phép thực hiện truy vấn DELETE trong hệ thống này."
if re.search(r"\\bIF\\b.*\\bTHEN\\b", sql_clean, re.IGNORECASE):
return "❌ Lỗi: Không được dùng IF...THEN trong SQL. Vui lòng chia nhỏ truy vấn."
if check_insert:
check_syntax = filter_syntax_sql(sql_clean, PROMPT_CUSTOM, user_input)
if check_syntax is True:
try:
connection = pymysql.connect(**db_config)
with connection.cursor() as cursor:
statements = sqlparse.split(sql_clean)
results = []
for stmt in statements:
stmt = stmt.strip()
if stmt:
try:
cursor.execute(stmt)
try:
results.append(cursor.fetchall())
except:
results.append("✅ OK")
except Exception as e:
return f"❌ Lỗi tại truy vấn: `{stmt}`\nChi tiết: {str(e)}"
connection.commit()
return results
except Exception as e:
return f"❌ Lỗi khi thực thi từng truy vấn: {str(e)}"
finally:
connection.close()
else:
try:
regenerated_data = db_chain.run(f"""
Role: {text_role}
Language: {languages}
Question: {user_input}.
""")
regenerated_sql = extract_sql_from_response(regenerated_data)
if regenerated_sql:
return process_and_execute_sql(regenerated_sql)
else:
return "❌ Lỗi: Không thể tạo lại truy vấn hợp lệ."
except Exception as regen_error:
return f"❌ Lỗi khi tạo lại truy vấn: {str(regen_error)}"
else:
return execute_query_with_pymysql(sql_clean, multi=True)
if "Do not use IF...THEN" not in PROMPT_CUSTOM.template:
PROMPT_CUSTOM.template += (
"\n\n⚠️ Note: Do NOT use IF...THEN...ELSE...END in SQL. "
"Only use plain SELECT, INSERT, UPDATE, DELETE, SET statements."
)
db_chain = SQLDatabaseChain.from_llm(llm=llm1, db=db, prompt=PROMPT_CUSTOM)
text_role = f"{role} (userId = {user_id})" if role == "ADMIN" else f"{role} (userId = {user_id}), not role ADMIN"
try:
data = db_chain.run(f"""
Role: {text_role}
Language: {languages}
Question: {user_input}.
""")
extracted_sql = extract_sql_from_response(data)
if extracted_sql:
return process_and_execute_sql(extracted_sql)
else:
return data
except Exception as e:
error_message = str(e)
extracted_sql = extract_sql_from_error(error_message)
fix_sql = extracted_sql.replace("```","")
fix_sql = re.sub(r"```sql", "", fix_sql)
fix_sql = re.sub(r'%%s', r'%s', fix_sql)
if contains_delete(fix_sql):
return "Lỗi: Bạn không dược phép thực hiện truy vấn DELETE trong hệ thống này."
if extracted_sql:
return process_and_execute_sql(fix_sql)
else:
return f"❌ Lỗi không thể thực thi truy vấn: {error_message}"
async def create_new_chat_history(user_id: int) -> str:
"""
Tạo một đoạn chat mới cho user_id và trả về ObjectId của đoạn chat.
"""
check = UserRepository.getUserByUserId(user_id)
if check is None:
raise HTTPException(status_code=404, detail="User not found or has been deleted in MySQL")
user = User.objects(user_id=user_id).first()
if not user:
user = User(id=ObjectId(), user_id=user_id, user_name=f"User_{user_id}")
user.save()
random_name_chat = str(random.randint(10**14, 10**15 - 1))
name_chat = f"Chat_{random_name_chat}"
new_chat = ChatHistory(id=ObjectId(), user=user, name_chat=name_chat)
new_chat.save()
return res_chat.CreateNewChat(idMongo=str(new_chat.id), chat_name=name_chat)
async def update_chat_name(chat_id: str, new_name: str,user_id:int) -> str:
"""
Cập nhật name_chat của một ChatHistory dựa trên chat_id.
"""
check = UserRepository.getUserByUserId(user_id)
if check is None:
raise HTTPException(status_code=404, detail="User not found or has been deleted in MySQL")
user = User.objects(user_id=user_id, is_deleted=False).first()
if not user:
raise HTTPException(status_code=400, detail="User not found or has been deleted in MongoDB")
chat = ChatHistory.objects(_id=ObjectId(chat_id)).first()
if not chat:
raise HTTPException(status_code=404, detail="Chat not found in MongoDB")
chat.name_chat = new_name
chat.save()
return f"Updated chat name to {new_name}"
async def soft_delete_chat(chat_id: str,user_id:int):
"""
Cập nhật `is_deleted=True` và `date_deleted` cho `ChatHistory` và các `DetailChat` liên quan.
"""
check = UserRepository.getUserByUserId(user_id)
if check is None:
raise HTTPException(status_code=400, detail="User not found or has been deleted in MySQL")
check_history_id = UserRepository.getChatHistory(user_id,chat_id)
if check_history_id is None:
raise HTTPException(status_code=400, detail="Chat history not found or has been deleted in MySQL")
user = User.objects(user_id=user_id, is_deleted=False).first()
if not user:
raise HTTPException(status_code=400, detail="User not found or has been deleted in MongoDB")
chat = ChatHistory.objects(_id=ObjectId(chat_id)).first()
if not chat:
raise HTTPException(status_code=400, detail="Chat not found or has been deleted in MongoDB")
chat.is_deleted = True
chat.date_deleted = datetime.now(pytz.UTC)
chat.save()
DetailChat.objects(chat_history=chat).update(
set__is_deleted=True,
set__date_deleted=datetime.now(pytz.UTC)
)
return {"message": "Chat and related details marked as deleted"}
from typing import Optional
from models.Database_Entity import StopSignal
from datetime import datetime, timedelta
import asyncio
async def chat_with_user(
user_input: str,
user_id: int,
languages: str,
role: str,
token: str,
chat_history_id: str = None,
stop_event: Optional[asyncio.Event] = None
) -> str:
"""
Xử lý tin nhắn của người dùng, lưu vào lịch sử chat và trả về phản hồi từ AI.
"""
if role not in ["ADMIN", "CUSTOMER", "SHIPPER"]:
raise HTTPException(status_code=400, detail="ROLE not valid")
user_id = int(user_id)
if languages not in ["VN", "EN"]:
raise HTTPException(status_code=400, detail="Language not valid")
if not user_input:
raise HTTPException(status_code=400, detail="User input empty")
if not isinstance(user_id, int) or user_id <= 0:
raise HTTPException(status_code=400, detail="Invalid user_id: must be a positive integer")
languages = "Vietnamese" if languages == "VN" else "English"
check = UserRepository.getUserByUserId(user_id)
if check is None:
raise HTTPException(status_code=400, detail="User not found or has been deleted in MySQL")
check_history_id = UserRepository.getChatHistory(user_id,chat_history_id)
if check_history_id is None:
raise HTTPException(status_code=400, detail="Chat not found or has been deleted in MySQL")
user = User.objects(user_id=user_id).first()
if not user:
return {"error": "User not found or has been deleted in MongoDB"}
chat_history = None
if chat_history_id:
try:
chat_history_obj_id = ObjectId(chat_history_id) # Chuyển đổi sang ObjectId
chat_history = ChatHistory.objects(_id=chat_history_obj_id, user=user).first()
except Exception as e:
print(f"⚠️ Invalid chat_history_id: {e}")
if not chat_history:
raise HTTPException(status_code=400, detail="Chat history not found or has been deleted in MongoDB")
await asyncio.sleep(0.1)
if stop_event and stop_event.is_set():
raise asyncio.CancelledError("⛔ Task was cancelled before processing")
await asyncio.sleep(0.1)
if StopSignal.objects(chat_history=chat_history_id, is_stopped=True).first():
print("🛑 Dừng vì có StopSignal trong DB.")
raise asyncio.CancelledError("⛔ Task was cancelled before processing")
result_final = await pipeline_agent.multi_query_user(
user_input, user_id, role, languages, chat_history_id, token,stop_event
)
detail_chat = DetailChat(
id=ObjectId(),
chat_history=chat_history,
you_message=user_input,
ai_message=result_final,
timestamp=datetime.now(pytz.UTC)
)
detail_chat.save()
chat_history_messages = DetailChat.objects(chat_history=chat_history).order_by('timestamp')
def convert_to_vn_time(timestamp):
return (timestamp + timedelta(hours=7)).strftime('%Y-%m-%dT%H:%M:%S')
sorted_messages = sorted(chat_history_messages, key=lambda msg: msg.timestamp, reverse=True)
formatted_messages = [
{
"index": i + 1, # Đánh số từ 1
"id": str(msg.id),
"you_message": msg.you_message,
"ai_message": msg.ai_message,
"timestamp": convert_to_vn_time(msg.timestamp)
}
for i, msg in enumerate(sorted_messages)
]
return jsonable_encoder({
"new_message": {
"id": str(detail_chat.id),
"you_message": detail_chat.you_message,
"ai_message": detail_chat.ai_message,
"timestamp": convert_to_vn_time(detail_chat.timestamp)
},
"previous_messages": formatted_messages
})
from bson import ObjectId
async def get_chat_details(chat_id: str,user_id:int):
"""
Lấy tất cả `DetailChat` thuộc `ChatHistory` có `chat_id`, chỉ lấy bản ghi `is_deleted=False`.
"""
check = UserRepository.getUserByUserId(user_id)
if check is None:
raise HTTPException(status_code=400, detail="User not found or has been deleted in MySQL")
check_history_id = UserRepository.getChatHistory(user_id,chat_id)
if check_history_id is None:
raise HTTPException(status_code=400, detail="Chat history not found or has been deleted in MySQL")
user = User.objects(user_id=user_id, is_deleted=False).first()
if not user:
raise HTTPException(status_code=400, detail="User not found or has been deleted in MongoDB")
chat = ChatHistory.objects(_id=ObjectId(chat_id), is_deleted=False).first()
if not chat:
raise HTTPException(status_code=400, detail="Chat history not found or has been deleted in MongoDB")
chat_details = DetailChat.objects(chat_history=chat, is_deleted=False)
def convert_to_vn_time(timestamp):
return (timestamp + timedelta(hours=7)).strftime('%Y-%m-%dT%H:%M:%S')
list_detail_response = [
res_chat.DetailResponse(
id=str(index + 1), # ✅ Đánh số lại từ 1
you_message=detail.you_message,
ai_message=detail.ai_message,
timestamp=convert_to_vn_time(detail.timestamp) # ✅ Chuyển sang GMT+7
)
for index, detail in enumerate(sorted(chat_details, key=lambda d: d.timestamp, reverse=True)) # ✅ Sắp xếp giảm dần
]
return res_chat.ListDetailResponse(
chat_id=str(chat.id),
chat_name=chat.name_chat,
list_detail_response=list_detail_response
)
async def regenerate(
user_question_new: str,
user_id: int,
languages: str,
role: str,
token: str,
chat_history_id: str = None,
stop_event: Optional[asyncio.Event] = None
) -> str:
"""
Xử lý tin nhắn của người dùng, lưu vào lịch sử chat và trả về phản hồi từ AI.
"""
PROMPT_CUSTOM = await prompt_cus.get_prompt_custom(user_question_new)
if role not in ["ADMIN", "CUSTOMER", "SHIPPER"]:
raise HTTPException(status_code=400, detail="ROLE not valid")
user_id = int(user_id)
if languages not in ["VN", "EN"]:
raise HTTPException(status_code=400, detail="Language not valid")
if not user_question_new:
raise HTTPException(status_code=400, detail="User input empty")
if not isinstance(user_id, int) or user_id <= 0:
raise HTTPException(status_code=400, detail="Invalid user_id: must be a positive integer")
languages = "Vietnamese" if languages == "VN" else "English"
check = UserRepository.getUserByUserId(user_id)
if check is None:
raise HTTPException(status_code=400, detail="User not found or has been deleted in MySQL")
check_history_id = UserRepository.getChatHistory(user_id,chat_history_id)
if check_history_id is None:
raise HTTPException(status_code=400, detail="Chat not found or has been deleted in MySQL")
user = User.objects(user_id=user_id).first()
if not user:
return {"error": "User not found or has been deleted in MongoDB"}
chat_history = None
if chat_history_id:
try:
chat_history_obj_id = ObjectId(chat_history_id) # Chuyển đổi sang ObjectId
chat_history = ChatHistory.objects(_id=chat_history_obj_id, user=user).first()
except Exception as e:
print(f"⚠️ Invalid chat_history_id: {e}")
if not chat_history:
raise HTTPException(status_code=400, detail="Chat history not found or has been deleted in MongoDB")
# filtered_input = filter_sql_injection_1.filter_sql_injection(user_input)
# filtered_role_input = filter_role_1.filter_role(filtered_input)
# result = await execute_query_user(filtered_role_input, user_id, languages, role)
# result_final = query_result_1.query_result(user_input, result)
if chat_history:
last_chat = (
DetailChat.objects(chat_history=chat_history, is_deleted=False)
.order_by("-timestamp")
.first()
)
await asyncio.sleep(0.1)
if stop_event and stop_event.is_set():
raise asyncio.CancelledError("⛔ Task was cancelled before processing")
await asyncio.sleep(0.1)
if StopSignal.objects(chat_history=chat_history_id, is_stopped=True).first():
print("🛑 Dừng vì có StopSignal trong DB.")
raise asyncio.CancelledError("⛔ Task was cancelled before processing")
result_final = await pipeline_agent.multi_query_user(user_question_new,user_id,role,languages,chat_history_id,token,stop_event)
last_chat.update(set__you_message=user_question_new, set__ai_message=result_final, set__timestamp = datetime.now(pytz.UTC))
last_chat_result = (
DetailChat.objects(chat_history=chat_history, is_deleted=False)
.order_by("-timestamp")
.first()
)
chat_history_messages = DetailChat.objects(chat_history=chat_history).order_by('timestamp')
def convert_to_vn_time(timestamp):
return (timestamp + timedelta(hours=7)).strftime('%Y-%m-%dT%H:%M:%S')
sorted_messages = sorted(chat_history_messages, key=lambda msg: msg.timestamp, reverse=True)
formatted_messages = [
{
"index": i + 1, # Đánh số từ 1
"id": str(msg.id),
"you_message": msg.you_message,
"ai_message": msg.ai_message,
"timestamp": convert_to_vn_time(msg.timestamp)
}
for i, msg in enumerate(sorted_messages)
]
return jsonable_encoder({
"new_message": {
"id": str(last_chat_result.id),
"you_message": last_chat_result.you_message,
"ai_message": last_chat_result.ai_message,
"timestamp": convert_to_vn_time(last_chat_result.timestamp)
},
"previous_messages": formatted_messages
})
from bson import ObjectId
async def get_user_chat_history(user_id: int):
"""
API lấy danh sách tất cả các đoạn chat của một user_id.
"""
check = UserRepository.getUserByUserId(user_id)
if check is None:
raise HTTPException(status_code=400, detail="User not found or has been deleted in MySQL")
user = User.objects(user_id=user_id, is_deleted=False).first()
if not user:
raise HTTPException(status_code=400, detail="User not found or has been deleted in MongoDB")
chat_histories = ChatHistory.objects(user=user, is_deleted=False)
chat_list = [
res_chat.ChatResponse(
chat_id=str(chat.id),
chat_name=chat.name_chat,
timestamp=chat.date_deleted if chat.is_deleted else chat.id.generation_time
)
for chat in chat_histories
]
return res_chat.UserChatHistoryResponse(
user_id=user.user_id,
user_name=user.user_name,
chat_list=chat_list
)
from bson import ObjectId
async def get_chat_details_text(chat_id: str, user_id: int):
"""
Trích xuất tất cả các chi tiết chat của một chat_id, gom thành một đoạn văn bản.
"""
# Kiểm tra xem user có tồn tại không
user = User.objects(user_id=user_id, is_deleted=False).first()
if not user:
raise HTTPException(status_code=400, detail="User not found or has been deleted in MongoDB")
# Kiểm tra xem chat history có tồn tại không
chat = ChatHistory.objects(_id=ObjectId(chat_id), user=user, is_deleted=False).first()
if not chat:
raise HTTPException(status_code=400, detail="Chat not found or has been deleted in MongoDB")
chat_details = DetailChat.objects(chat_history=chat, is_deleted=False).order_by('timestamp')
if not chat_details:
return list()
# Gom tất cả các câu hỏi và câu trả lời vào danh sách
chat_text_list = []
for index, detail in enumerate(chat_details,start=1):
chat_text_list.append({
"order":str(index),
"timestamp": detail.timestamp.strftime("%Y-%m-%d %H:%M:%S"),
"you_message": detail.you_message,
"ai_message": detail.ai_message
})
return chat_text_list
import asyncio
import os
import subprocess
from datetime import datetime
from pathlib import Path
from function.analyze import main
from function.analyze.gemini import result_analyze
async def generate_and_save_code(question: str, user_id: int, role, languages: str, filename: str = "analyze_result.py"):
code_test, folder_name = await main.analyze(
question,
user_id,
languages,
role
)
code_clean = code_test.strip()
if code_clean.startswith("```python"):
code_clean = code_clean[9:].strip()
if code_clean.endswith("```"):
code_clean = code_clean[:-3].strip()
# code_clean = code_clean.replace("else:", "").strip()
code_clean = code_clean.replace("os_path:", "os.path").strip()
encoding_fix = 'import sys\nsys.stdout.reconfigure(encoding="utf-8")\n\n'
encoding_fix1= 'import numpy as np\n\n'
code_clean = encoding_fix + encoding_fix1 + code_clean
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = Path(f"./Temp/{folder_name}_{timestamp}")
output_dir.mkdir(parents=True, exist_ok=True)
file_path = output_dir / filename
with open(file_path, "w", encoding="utf-8") as f:
f.write(code_clean)
env = os.environ.copy()
env["OUTPUT_DIR"] = str(output_dir)
result = subprocess.run(
["python", filename],
capture_output=True,
text=True,
env=env,
cwd=output_dir,
encoding="utf-8",
errors="replace"
)
output_folder = str(output_dir)
absolute_path = os.path.abspath(output_folder)
final_path = os.path.join(absolute_path, "test5")
result_final = result_analyze.generate(image_folder=final_path,question=question)
return result_final,final_path