import os import motor.motor_asyncio from dotenv import load_dotenv import logging from bson import ObjectId import json from pymongo import MongoClient import datetime from typing import List, Dict, Any, Optional, Union # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Load environment variables load_dotenv() # MongoDB connection parameters MONGODB_URI = os.getenv("MONGODB_URI") MONGODB_DB = os.getenv("MONGODB_DB") MONGODB_COLLECTION = os.getenv("MONGODB_COLLECTION") # Create MongoDB client try: client = motor.motor_asyncio.AsyncIOMotorClient(MONGODB_URI) db = client[MONGODB_DB] collection = db[MONGODB_COLLECTION] logger.info("MongoDB connection established successfully") except Exception as e: logger.error(f"Failed to connect to MongoDB: {e}") raise def convert_mongo_doc(doc): """Chuyển đổi document từ MongoDB thành định dạng JSON hợp lệ""" if isinstance(doc, dict): return {k: convert_mongo_doc(v) for k, v in doc.items()} elif isinstance(doc, list): return [convert_mongo_doc(item) for item in doc] elif isinstance(doc, ObjectId): return str(doc) elif isinstance(doc, datetime.datetime): return doc.isoformat() else: return doc async def save_session(session_data): """ Save a new session chat to the database Args: session_data (dict): Session data to be saved Returns: str: ID of the inserted document """ try: # Đảm bảo các trường cần thiết tồn tại required_fields = ["session_id", "user_id", "action", "factor"] for field in required_fields: if field not in session_data: raise ValueError(f"Missing required field: {field}") # Chuyển đổi user_id thành số nếu là chuỗi if isinstance(session_data.get("user_id"), str) and session_data["user_id"].isdigit(): session_data["user_id"] = int(session_data["user_id"]) # Đảm bảo các trường văn bản không phải None text_fields = ["username", "first_name", "last_name", "message", "action", "factor"] for field in text_fields: if field in session_data and session_data[field] is None: session_data[field] = "" # Log mức độ debug chi tiết hơn logger.debug(f"Saving session data: {session_data}") result = await collection.insert_one(session_data) logger.info(f"Session saved with ID: {result.inserted_id}, session_id: {session_data.get('session_id')}") return str(result.inserted_id) except Exception as e: logger.error(f"Error saving session: {e}") raise async def get_unanswered_questions(limit: int = 10) -> List[Dict[str, Any]]: """ Lấy danh sách câu hỏi chưa trả lời (RAG đáp ứng với "I don't know") """ try: logger.info(f"Đang truy vấn {limit} câu hỏi chưa được trả lời...") # Lọc session có tin nhắn của RAG là "I don't know" rag_filter = { "factor": "RAG", "message": {"$regex": "^I don't know"} } # Sử dụng to_list() để lấy danh sách từ cursor sessions = await db.rag_sessions.find(rag_filter).sort("timestamp", -1).to_list(limit) if not sessions: logger.info("Không tìm thấy session nào với tin nhắn RAG 'I don't know'") return [] logger.info(f"Tìm thấy {len(sessions)} session với RAG trả lời 'I don't know'") result = [] for session in sessions: session_id = session.get("session_id") if not session_id: logger.warning(f"Session không có session_id: {session.get('_id')}") continue # Tìm câu hỏi tương ứng của người dùng user_question = db.user_questions.find_one({"session_id": session_id}) if not user_question: logger.warning(f"Không tìm thấy câu hỏi người dùng cho session {session_id}") continue # Chuyển đổi dữ liệu trước khi đưa vào kết quả session_data = convert_mongo_doc(session) user_question_data = convert_mongo_doc(user_question) result.append({ "session": session_data, "user_question": user_question_data }) logger.info(f"Tìm thấy {len(result)} câu hỏi chưa được trả lời với đầy đủ thông tin") return result except Exception as e: logger.error(f"Lỗi khi lấy câu hỏi chưa trả lời: {e}") import traceback logger.error(traceback.format_exc()) return [] async def get_unanswered_question_by_session_id(session_id: str) -> Optional[Dict[str, Any]]: """ Lấy câu hỏi chưa trả lời theo session_id cụ thể """ try: logger.info(f"Đang tìm câu hỏi chưa trả lời cho session {session_id}") # Tìm session RAG với "I don't know" của session_id cụ thể rag_session = await collection.find_one({ "factor": "RAG", "session_id": session_id, "message": {"$regex": "^I don't know", "$options": "i"} }) if not rag_session: logger.info(f"Không tìm thấy session RAG nào với 'I don't know' cho session_id {session_id}") return None logger.info(f"Tìm thấy session RAG: {rag_session.get('_id')}") # Tìm câu hỏi tương ứng của người dùng user_question = await collection.find_one({ "session_id": session_id, "factor": "user" }) if not user_question: logger.warning(f"Không tìm thấy câu hỏi người dùng cho session {session_id}") return None logger.info(f"Tìm thấy câu hỏi người dùng: {user_question.get('_id')}") # Chuyển đổi dữ liệu session_data = convert_mongo_doc(rag_session) user_question_data = convert_mongo_doc(user_question) result = { "session": session_data, "user_question": user_question_data } logger.info(f"Đã tìm thấy đầy đủ thông tin cho câu hỏi chưa trả lời với session_id {session_id}") return result except Exception as e: logger.error(f"Lỗi khi tìm câu hỏi chưa trả lời theo session_id: {e}") import traceback logger.error(traceback.format_exc()) return None async def find_user_question(session_id: str) -> Optional[Dict[str, Any]]: """ Tìm câu hỏi của người dùng theo session_id """ try: user_question = await collection.find_one({ "session_id": session_id, "factor": "user" }) if user_question: logger.info(f"Tìm thấy câu hỏi người dùng cho session {session_id}") return convert_mongo_doc(user_question) else: logger.warning(f"Không tìm thấy câu hỏi người dùng cho session {session_id}") return None except Exception as e: logger.error(f"Lỗi khi tìm câu hỏi người dùng: {e}") return None