from langchain_community.utilities.sql_database import SQLDatabase from langchain_experimental.sql import SQLDatabaseChain import sys import os import pymysql from fastapi import HTTPException from fastapi.encoders import jsonable_encoder import re sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "."))) import function.prompt.prompt_main as prompt import function.prompt.prompt_custom as prompt_cus os.environ["GOOGLE_API_KEY"] = "AIzaSyDAVIagntGC7kL93qmLgNZ-is1fsb7tsN4" sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from bson import ObjectId import os from dotenv import load_dotenv import os from dotenv import load_dotenv import os from dotenv import load_dotenv from dotenv import load_dotenv, find_dotenv load_dotenv(find_dotenv(), override=True) DB_HOST = os.getenv("DB_HOST") DB_USER = os.getenv("DB_USER") DB_PASSWORD = os.getenv("DB_PASSWORD") DB_NAME = os.getenv("DB_NAME") DB_PORT = os.getenv("DB_PORT") import re def contains_delete(sql: str) -> bool: return bool(re.search(r'\bdelete\b', sql, re.IGNORECASE)) # Tạo connection string import os from urllib.parse import quote password = os.getenv("DB_PASSWORD") # VD: 'Yahana0509@' DB_PASSWORD = quote(password) connection_uri = ( f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" "?ssl_verify_cert=false&ssl_verify_identity=false" ) db = SQLDatabase.from_uri(connection_uri) # db = SQLDatabase.from_uri("mysql+pymysql://root@127.0.0.1:3306/demohmdrinks") from dotenv import load_dotenv import function.filter.filter_role as filter_role_1 import function.filter.filter_sql_injection as filter_sql_injection_1 import function.filter.result as query_result_1 import support.get_key as get_key import response.ResponseChat as res_chat from datetime import datetime import pytz from mongoengine import connect import sys import os import nltk import function.agent.pipeline_agent as pipeline_agent nltk.download('punkt') from models.Database_Entity import User, ChatHistory, DetailChat from dotenv import load_dotenv load_dotenv() MONGO_URI = os.getenv("MONGO_URI", "") connect("chatbot_hmdrinks", host=MONGO_URI) load_dotenv() #setup model from bson import ObjectId import random from langchain_google_genai import GoogleGenerativeAIEmbeddings, ChatGoogleGenerativeAI BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../")) sys.path.insert(0, BASE_DIR) from repository.MySQL import UserRepository from function.prompt.prompt_syntax_insert import is_insert_related_to_product_category_variant, filter_syntax_sql import sqlparse import sqlparse import sys import os sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..'))) from function.prompt import prompt_detail_table schema_mapping = { "user": prompt_detail_table.prompt_users, "user_voucher": prompt_detail_table.prompt_user_voucher, "category": prompt_detail_table.prompt_categort, "category_translation": prompt_detail_table.prompt_category_translation, "cart": prompt_detail_table.prompt_cart, "cart_item": prompt_detail_table.prompt_cart_item, "orders":prompt_detail_table.prompt_orders, "order_item": prompt_detail_table.prompt_order_item, "payment": prompt_detail_table.prompt_payments, "favourite": prompt_detail_table.prompt_favourite, "favourite_item": prompt_detail_table.prompt_fav_item, "post": prompt_detail_table.prompt_post, "post_translation": prompt_detail_table.prompt_post_translation, "product": prompt_detail_table.prompt_product, "product_translation": prompt_detail_table.prompt_product_translation, "shipment": prompt_detail_table.prompt_shipment, "product_variants": prompt_detail_table.prompt_product_variants, "review": prompt_detail_table.prompt_review, "user_coin": prompt_detail_table.prompt_user_coin, "absence": prompt_detail_table.prompt_absence, "cart_group": prompt_detail_table.prompt_cart_group, "cart_item_group": prompt_detail_table.prompt_cartitem_group, "group_orders": prompt_detail_table.prompt_group_orders, "payments_group": prompt_detail_table.prompt_payments_group, "group_order_members":prompt_detail_table.prompt_group_orders_member, "shipment_group":prompt_detail_table.prompt_shipment_group, "shipper_attendance":prompt_detail_table.prompt_shipper_attendance, "shipper_commission_detail":prompt_detail_table.prompt_shipper_commission_detail, "shipper_salary_summary":prompt_detail_table.prompt_shipper_salary_summary, "voucher": prompt_detail_table.prompt_voucher } def get_schemas_from_sql(sql_query: str, schema_mapping: dict): import sqlglot parsed_query = sqlglot.parse_one(sql_query, read="sqlite") # Lấy danh sách bảng duy nhất trong query table_names = list({t.name for t in parsed_query.find_all(sqlglot.exp.Table)}) schemas_used = {} for table in table_names: if table in schema_mapping: schemas_used[table] = schema_mapping[table] else: print(f"⚠️ Warning: Table '{table}' not found in schema_mapping") # Gom toàn bộ schema thành 1 chuỗi duy nhất all_schemas = "\n\n".join( [f"Schema for table '{table}':\n{schemas_used[table]}" for table in schemas_used] ) return all_schemas def build_sql_fix_prompt(schemas_result: dict, sql: str) -> str: prompt = f""" Bạn là một chuyên gia cơ sở dữ liệu. Dưới đây là mô tả schema chi tiết của các bảng có trong hệ thống: {schemas_result} --- Dưới đây là một câu SQL đang bị lỗi do không đúng tên bảng hoặc tên cột: ```sql {sql.strip()} Yêu cầu của bạn là: Dựa trên các schema ở trên, hãy kiểm tra và chỉnh sửa câu SQL sao cho: Tên bảng, tên cột phải chính xác theo schema. Logic và mục đích của truy vấn được giữ nguyên. Chỉ trả lại phần SQL đã được chỉnh sửa (không giải thích, không chú thích, không thêm nhận xét). Trả lời dưới dạng một truy vấn SQL duy nhất. """.strip() return prompt async def execute_query_user(user_input: str, user_id: int, languages: str, role: str): api_key = get_key.get_random_api_key() os.environ["GOOGLE_API_KEY"] = api_key llm1 = ChatGoogleGenerativeAI(model='gemini-2.5-flash-preview-04-17',temperature=0.6) # db = SQLDatabase.from_uri("mysql+pymysql://root@127.0.0.1:3306/demohmdrinks") db = SQLDatabase.from_uri(connection_uri) PROMPT_CUSTOM = await prompt_cus.get_prompt_custom(user_input) check_insert = is_insert_related_to_product_category_variant(user_input) db_config = { "host": os.getenv("DB_HOST"), "user": os.getenv("DB_USER"), "database": os.getenv("DB_NAME"), "password": os.getenv("DB_PASSWORD"), "port": int(os.getenv("DB_PORT", 3306)), "charset": "utf8mb4", "cursorclass": pymysql.cursors.DictCursor, } def execute_query_with_pymysql(query, multi=False): connection = pymysql.connect(**db_config) try: with connection.cursor() as cursor: results = [] if multi: statements = sqlparse.split(query) for stmt in statements: stmt = stmt.strip() if stmt: try: cursor.execute(stmt) try: results.append(cursor.fetchall()) except pymysql.ProgrammingError: results.append("✅ Executed") except Exception as e: results.append(f"❌ Error in query: {stmt}\n{str(e)}") else: try: cursor.execute(query) results = cursor.fetchall() except Exception as e: return f"❌ Error executing query: {str(e)}" connection.commit() return results except pymysql.MySQLError as e: return f"❌ MySQL Error: {str(e)}" finally: connection.close() def clean_sql(sql: str) -> str: sql = re.sub(r"```sql", "", sql, flags=re.IGNORECASE) sql = re.sub(r'%%s', r'%s', sql) sql = re.sub(r"```", "", sql) return sql.strip() def extract_sql_from_response(data): match = re.search(r"SQLQuery:\s*(.*)", data, re.DOTALL) return clean_sql(match.group(1)) if match else None def extract_sql_from_error(error_msg): match = re.search(r"```sql\n(.*?)```", error_msg, re.DOTALL) return clean_sql(match.group(1)) if match else None def process_and_execute_sql(sql): sql_clean = clean_sql(sql) result = get_schemas_from_sql(sql_clean, schema_mapping) prompt = build_sql_fix_prompt(result,sql =sql_clean) from function.advance_shopping.call_gemini import tool_call data = tool_call.generate(prompt = prompt) sql_clean = clean_sql(data) print("SQL step2: ", sql_clean) if contains_delete(sql_clean): return "Lỗi: Bạn không dược phép thực hiện truy vấn DELETE trong hệ thống này." if re.search(r"\\bIF\\b.*\\bTHEN\\b", sql_clean, re.IGNORECASE): return "❌ Lỗi: Không được dùng IF...THEN trong SQL. Vui lòng chia nhỏ truy vấn." if check_insert: check_syntax = filter_syntax_sql(sql_clean, PROMPT_CUSTOM, user_input) if check_syntax is True: try: connection = pymysql.connect(**db_config) with connection.cursor() as cursor: statements = sqlparse.split(sql_clean) results = [] for stmt in statements: stmt = stmt.strip() if stmt: try: cursor.execute(stmt) try: results.append(cursor.fetchall()) except: results.append("✅ OK") except Exception as e: return f"❌ Lỗi tại truy vấn: `{stmt}`\nChi tiết: {str(e)}" connection.commit() return results except Exception as e: return f"❌ Lỗi khi thực thi từng truy vấn: {str(e)}" finally: connection.close() else: try: regenerated_data = db_chain.run(f""" Role: {text_role} Language: {languages} Question: {user_input}. """) regenerated_sql = extract_sql_from_response(regenerated_data) if regenerated_sql: return process_and_execute_sql(regenerated_sql) else: return "❌ Lỗi: Không thể tạo lại truy vấn hợp lệ." except Exception as regen_error: return f"❌ Lỗi khi tạo lại truy vấn: {str(regen_error)}" else: return execute_query_with_pymysql(sql_clean, multi=True) if "Do not use IF...THEN" not in PROMPT_CUSTOM.template: PROMPT_CUSTOM.template += ( "\n\n⚠️ Note: Do NOT use IF...THEN...ELSE...END in SQL. " "Only use plain SELECT, INSERT, UPDATE, DELETE, SET statements." ) db_chain = SQLDatabaseChain.from_llm(llm=llm1, db=db, prompt=PROMPT_CUSTOM) text_role = f"{role} (userId = {user_id})" if role == "ADMIN" else f"{role} (userId = {user_id}), not role ADMIN" try: data = db_chain.run(f""" Role: {text_role} Language: {languages} Question: {user_input}. """) extracted_sql = extract_sql_from_response(data) if extracted_sql: return process_and_execute_sql(extracted_sql) else: return data except Exception as e: error_message = str(e) extracted_sql = extract_sql_from_error(error_message) fix_sql = extracted_sql.replace("```","") fix_sql = re.sub(r"```sql", "", fix_sql) fix_sql = re.sub(r'%%s', r'%s', fix_sql) if contains_delete(fix_sql): return "Lỗi: Bạn không dược phép thực hiện truy vấn DELETE trong hệ thống này." if extracted_sql: return process_and_execute_sql(fix_sql) else: return f"❌ Lỗi không thể thực thi truy vấn: {error_message}" async def create_new_chat_history(user_id: int) -> str: """ Tạo một đoạn chat mới cho user_id và trả về ObjectId của đoạn chat. """ check = UserRepository.getUserByUserId(user_id) if check is None: raise HTTPException(status_code=404, detail="User not found or has been deleted in MySQL") user = User.objects(user_id=user_id).first() if not user: user = User(id=ObjectId(), user_id=user_id, user_name=f"User_{user_id}") user.save() random_name_chat = str(random.randint(10**14, 10**15 - 1)) name_chat = f"Chat_{random_name_chat}" new_chat = ChatHistory(id=ObjectId(), user=user, name_chat=name_chat) new_chat.save() return res_chat.CreateNewChat(idMongo=str(new_chat.id), chat_name=name_chat) async def update_chat_name(chat_id: str, new_name: str,user_id:int) -> str: """ Cập nhật name_chat của một ChatHistory dựa trên chat_id. """ check = UserRepository.getUserByUserId(user_id) if check is None: raise HTTPException(status_code=404, detail="User not found or has been deleted in MySQL") user = User.objects(user_id=user_id, is_deleted=False).first() if not user: raise HTTPException(status_code=400, detail="User not found or has been deleted in MongoDB") chat = ChatHistory.objects(_id=ObjectId(chat_id)).first() if not chat: raise HTTPException(status_code=404, detail="Chat not found in MongoDB") chat.name_chat = new_name chat.save() return f"Updated chat name to {new_name}" async def soft_delete_chat(chat_id: str,user_id:int): """ Cập nhật `is_deleted=True` và `date_deleted` cho `ChatHistory` và các `DetailChat` liên quan. """ check = UserRepository.getUserByUserId(user_id) if check is None: raise HTTPException(status_code=400, detail="User not found or has been deleted in MySQL") check_history_id = UserRepository.getChatHistory(user_id,chat_id) if check_history_id is None: raise HTTPException(status_code=400, detail="Chat history not found or has been deleted in MySQL") user = User.objects(user_id=user_id, is_deleted=False).first() if not user: raise HTTPException(status_code=400, detail="User not found or has been deleted in MongoDB") chat = ChatHistory.objects(_id=ObjectId(chat_id)).first() if not chat: raise HTTPException(status_code=400, detail="Chat not found or has been deleted in MongoDB") chat.is_deleted = True chat.date_deleted = datetime.now(pytz.UTC) chat.save() DetailChat.objects(chat_history=chat).update( set__is_deleted=True, set__date_deleted=datetime.now(pytz.UTC) ) return {"message": "Chat and related details marked as deleted"} from typing import Optional from models.Database_Entity import StopSignal from datetime import datetime, timedelta import asyncio async def chat_with_user( user_input: str, user_id: int, languages: str, role: str, token: str, chat_history_id: str = None, stop_event: Optional[asyncio.Event] = None ) -> str: """ Xử lý tin nhắn của người dùng, lưu vào lịch sử chat và trả về phản hồi từ AI. """ if role not in ["ADMIN", "CUSTOMER", "SHIPPER"]: raise HTTPException(status_code=400, detail="ROLE not valid") user_id = int(user_id) if languages not in ["VN", "EN"]: raise HTTPException(status_code=400, detail="Language not valid") if not user_input: raise HTTPException(status_code=400, detail="User input empty") if not isinstance(user_id, int) or user_id <= 0: raise HTTPException(status_code=400, detail="Invalid user_id: must be a positive integer") languages = "Vietnamese" if languages == "VN" else "English" check = UserRepository.getUserByUserId(user_id) if check is None: raise HTTPException(status_code=400, detail="User not found or has been deleted in MySQL") check_history_id = UserRepository.getChatHistory(user_id,chat_history_id) if check_history_id is None: raise HTTPException(status_code=400, detail="Chat not found or has been deleted in MySQL") user = User.objects(user_id=user_id).first() if not user: return {"error": "User not found or has been deleted in MongoDB"} chat_history = None if chat_history_id: try: chat_history_obj_id = ObjectId(chat_history_id) # Chuyển đổi sang ObjectId chat_history = ChatHistory.objects(_id=chat_history_obj_id, user=user).first() except Exception as e: print(f"⚠️ Invalid chat_history_id: {e}") if not chat_history: raise HTTPException(status_code=400, detail="Chat history not found or has been deleted in MongoDB") await asyncio.sleep(0.1) if stop_event and stop_event.is_set(): raise asyncio.CancelledError("⛔ Task was cancelled before processing") await asyncio.sleep(0.1) if StopSignal.objects(chat_history=chat_history_id, is_stopped=True).first(): print("🛑 Dừng vì có StopSignal trong DB.") raise asyncio.CancelledError("⛔ Task was cancelled before processing") result_final = await pipeline_agent.multi_query_user( user_input, user_id, role, languages, chat_history_id, token,stop_event ) detail_chat = DetailChat( id=ObjectId(), chat_history=chat_history, you_message=user_input, ai_message=result_final, timestamp=datetime.now(pytz.UTC) ) detail_chat.save() chat_history_messages = DetailChat.objects(chat_history=chat_history).order_by('timestamp') def convert_to_vn_time(timestamp): return (timestamp + timedelta(hours=7)).strftime('%Y-%m-%dT%H:%M:%S') sorted_messages = sorted(chat_history_messages, key=lambda msg: msg.timestamp, reverse=True) formatted_messages = [ { "index": i + 1, # Đánh số từ 1 "id": str(msg.id), "you_message": msg.you_message, "ai_message": msg.ai_message, "timestamp": convert_to_vn_time(msg.timestamp) } for i, msg in enumerate(sorted_messages) ] return jsonable_encoder({ "new_message": { "id": str(detail_chat.id), "you_message": detail_chat.you_message, "ai_message": detail_chat.ai_message, "timestamp": convert_to_vn_time(detail_chat.timestamp) }, "previous_messages": formatted_messages }) from bson import ObjectId async def get_chat_details(chat_id: str,user_id:int): """ Lấy tất cả `DetailChat` thuộc `ChatHistory` có `chat_id`, chỉ lấy bản ghi `is_deleted=False`. """ check = UserRepository.getUserByUserId(user_id) if check is None: raise HTTPException(status_code=400, detail="User not found or has been deleted in MySQL") check_history_id = UserRepository.getChatHistory(user_id,chat_id) if check_history_id is None: raise HTTPException(status_code=400, detail="Chat history not found or has been deleted in MySQL") user = User.objects(user_id=user_id, is_deleted=False).first() if not user: raise HTTPException(status_code=400, detail="User not found or has been deleted in MongoDB") chat = ChatHistory.objects(_id=ObjectId(chat_id), is_deleted=False).first() if not chat: raise HTTPException(status_code=400, detail="Chat history not found or has been deleted in MongoDB") chat_details = DetailChat.objects(chat_history=chat, is_deleted=False) def convert_to_vn_time(timestamp): return (timestamp + timedelta(hours=7)).strftime('%Y-%m-%dT%H:%M:%S') list_detail_response = [ res_chat.DetailResponse( id=str(index + 1), # ✅ Đánh số lại từ 1 you_message=detail.you_message, ai_message=detail.ai_message, timestamp=convert_to_vn_time(detail.timestamp) # ✅ Chuyển sang GMT+7 ) for index, detail in enumerate(sorted(chat_details, key=lambda d: d.timestamp, reverse=True)) # ✅ Sắp xếp giảm dần ] return res_chat.ListDetailResponse( chat_id=str(chat.id), chat_name=chat.name_chat, list_detail_response=list_detail_response ) async def regenerate( user_question_new: str, user_id: int, languages: str, role: str, token: str, chat_history_id: str = None, stop_event: Optional[asyncio.Event] = None ) -> str: """ Xử lý tin nhắn của người dùng, lưu vào lịch sử chat và trả về phản hồi từ AI. """ PROMPT_CUSTOM = await prompt_cus.get_prompt_custom(user_question_new) if role not in ["ADMIN", "CUSTOMER", "SHIPPER"]: raise HTTPException(status_code=400, detail="ROLE not valid") user_id = int(user_id) if languages not in ["VN", "EN"]: raise HTTPException(status_code=400, detail="Language not valid") if not user_question_new: raise HTTPException(status_code=400, detail="User input empty") if not isinstance(user_id, int) or user_id <= 0: raise HTTPException(status_code=400, detail="Invalid user_id: must be a positive integer") languages = "Vietnamese" if languages == "VN" else "English" check = UserRepository.getUserByUserId(user_id) if check is None: raise HTTPException(status_code=400, detail="User not found or has been deleted in MySQL") check_history_id = UserRepository.getChatHistory(user_id,chat_history_id) if check_history_id is None: raise HTTPException(status_code=400, detail="Chat not found or has been deleted in MySQL") user = User.objects(user_id=user_id).first() if not user: return {"error": "User not found or has been deleted in MongoDB"} chat_history = None if chat_history_id: try: chat_history_obj_id = ObjectId(chat_history_id) # Chuyển đổi sang ObjectId chat_history = ChatHistory.objects(_id=chat_history_obj_id, user=user).first() except Exception as e: print(f"⚠️ Invalid chat_history_id: {e}") if not chat_history: raise HTTPException(status_code=400, detail="Chat history not found or has been deleted in MongoDB") # filtered_input = filter_sql_injection_1.filter_sql_injection(user_input) # filtered_role_input = filter_role_1.filter_role(filtered_input) # result = await execute_query_user(filtered_role_input, user_id, languages, role) # result_final = query_result_1.query_result(user_input, result) if chat_history: last_chat = ( DetailChat.objects(chat_history=chat_history, is_deleted=False) .order_by("-timestamp") .first() ) await asyncio.sleep(0.1) if stop_event and stop_event.is_set(): raise asyncio.CancelledError("⛔ Task was cancelled before processing") await asyncio.sleep(0.1) if StopSignal.objects(chat_history=chat_history_id, is_stopped=True).first(): print("🛑 Dừng vì có StopSignal trong DB.") raise asyncio.CancelledError("⛔ Task was cancelled before processing") result_final = await pipeline_agent.multi_query_user(user_question_new,user_id,role,languages,chat_history_id,token,stop_event) last_chat.update(set__you_message=user_question_new, set__ai_message=result_final, set__timestamp = datetime.now(pytz.UTC)) last_chat_result = ( DetailChat.objects(chat_history=chat_history, is_deleted=False) .order_by("-timestamp") .first() ) chat_history_messages = DetailChat.objects(chat_history=chat_history).order_by('timestamp') def convert_to_vn_time(timestamp): return (timestamp + timedelta(hours=7)).strftime('%Y-%m-%dT%H:%M:%S') sorted_messages = sorted(chat_history_messages, key=lambda msg: msg.timestamp, reverse=True) formatted_messages = [ { "index": i + 1, # Đánh số từ 1 "id": str(msg.id), "you_message": msg.you_message, "ai_message": msg.ai_message, "timestamp": convert_to_vn_time(msg.timestamp) } for i, msg in enumerate(sorted_messages) ] return jsonable_encoder({ "new_message": { "id": str(last_chat_result.id), "you_message": last_chat_result.you_message, "ai_message": last_chat_result.ai_message, "timestamp": convert_to_vn_time(last_chat_result.timestamp) }, "previous_messages": formatted_messages }) from bson import ObjectId async def get_user_chat_history(user_id: int): """ API lấy danh sách tất cả các đoạn chat của một user_id. """ check = UserRepository.getUserByUserId(user_id) if check is None: raise HTTPException(status_code=400, detail="User not found or has been deleted in MySQL") user = User.objects(user_id=user_id, is_deleted=False).first() if not user: raise HTTPException(status_code=400, detail="User not found or has been deleted in MongoDB") chat_histories = ChatHistory.objects(user=user, is_deleted=False) chat_list = [ res_chat.ChatResponse( chat_id=str(chat.id), chat_name=chat.name_chat, timestamp=chat.date_deleted if chat.is_deleted else chat.id.generation_time ) for chat in chat_histories ] return res_chat.UserChatHistoryResponse( user_id=user.user_id, user_name=user.user_name, chat_list=chat_list ) from bson import ObjectId async def get_chat_details_text(chat_id: str, user_id: int): """ Trích xuất tất cả các chi tiết chat của một chat_id, gom thành một đoạn văn bản. """ # Kiểm tra xem user có tồn tại không user = User.objects(user_id=user_id, is_deleted=False).first() if not user: raise HTTPException(status_code=400, detail="User not found or has been deleted in MongoDB") # Kiểm tra xem chat history có tồn tại không chat = ChatHistory.objects(_id=ObjectId(chat_id), user=user, is_deleted=False).first() if not chat: raise HTTPException(status_code=400, detail="Chat not found or has been deleted in MongoDB") chat_details = DetailChat.objects(chat_history=chat, is_deleted=False).order_by('timestamp') if not chat_details: return list() # Gom tất cả các câu hỏi và câu trả lời vào danh sách chat_text_list = [] for index, detail in enumerate(chat_details,start=1): chat_text_list.append({ "order":str(index), "timestamp": detail.timestamp.strftime("%Y-%m-%d %H:%M:%S"), "you_message": detail.you_message, "ai_message": detail.ai_message }) return chat_text_list import asyncio import os import subprocess from datetime import datetime from pathlib import Path from function.analyze import main from function.analyze.gemini import result_analyze async def generate_and_save_code(question: str, user_id: int, role, languages: str, filename: str = "analyze_result.py"): code_test, folder_name = await main.analyze( question, user_id, languages, role ) code_clean = code_test.strip() if code_clean.startswith("```python"): code_clean = code_clean[9:].strip() if code_clean.endswith("```"): code_clean = code_clean[:-3].strip() # code_clean = code_clean.replace("else:", "").strip() code_clean = code_clean.replace("os_path:", "os.path").strip() encoding_fix = 'import sys\nsys.stdout.reconfigure(encoding="utf-8")\n\n' encoding_fix1= 'import numpy as np\n\n' code_clean = encoding_fix + encoding_fix1 + code_clean timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_dir = Path(f"./Temp/{folder_name}_{timestamp}") output_dir.mkdir(parents=True, exist_ok=True) file_path = output_dir / filename with open(file_path, "w", encoding="utf-8") as f: f.write(code_clean) env = os.environ.copy() env["OUTPUT_DIR"] = str(output_dir) result = subprocess.run( ["python", filename], capture_output=True, text=True, env=env, cwd=output_dir, encoding="utf-8", errors="replace" ) output_folder = str(output_dir) absolute_path = os.path.abspath(output_folder) final_path = os.path.join(absolute_path, "test5") result_final = result_analyze.generate(image_folder=final_path,question=question) return result_final,final_path