MongoDB / db_connector.py
Zok213
Refactor app.py to transition from Flask to FastAPI, enhancing the API with WebSocket support for real-time notifications, improved session handling, and background processing for unanswered questions. Update Dockerfile for compatibility with FastAPI and streamline application setup. Revise README.md to reflect new features and API endpoints.
a52d88d
import os
import motor.motor_asyncio
from dotenv import load_dotenv
import logging
from bson import ObjectId
import json
from pymongo import MongoClient
import datetime
from typing import List, Dict, Any, Optional, Union
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# MongoDB connection parameters
MONGODB_URI = os.getenv("MONGODB_URI")
MONGODB_DB = os.getenv("MONGODB_DB")
MONGODB_COLLECTION = os.getenv("MONGODB_COLLECTION")
# Create MongoDB client
try:
client = motor.motor_asyncio.AsyncIOMotorClient(MONGODB_URI)
db = client[MONGODB_DB]
collection = db[MONGODB_COLLECTION]
logger.info("MongoDB connection established successfully")
except Exception as e:
logger.error(f"Failed to connect to MongoDB: {e}")
raise
def convert_mongo_doc(doc):
"""Chuyển đổi document từ MongoDB thành định dạng JSON hợp lệ"""
if isinstance(doc, dict):
return {k: convert_mongo_doc(v) for k, v in doc.items()}
elif isinstance(doc, list):
return [convert_mongo_doc(item) for item in doc]
elif isinstance(doc, ObjectId):
return str(doc)
elif isinstance(doc, datetime.datetime):
return doc.isoformat()
else:
return doc
async def save_session(session_data):
"""
Save a new session chat to the database
Args:
session_data (dict): Session data to be saved
Returns:
str: ID of the inserted document
"""
try:
# Đảm bảo các trường cần thiết tồn tại
required_fields = ["session_id", "user_id", "action", "factor"]
for field in required_fields:
if field not in session_data:
raise ValueError(f"Missing required field: {field}")
# Chuyển đổi user_id thành số nếu là chuỗi
if isinstance(session_data.get("user_id"), str) and session_data["user_id"].isdigit():
session_data["user_id"] = int(session_data["user_id"])
# Đảm bảo các trường văn bản không phải None
text_fields = ["username", "first_name", "last_name", "message", "action", "factor"]
for field in text_fields:
if field in session_data and session_data[field] is None:
session_data[field] = ""
# Log mức độ debug chi tiết hơn
logger.debug(f"Saving session data: {session_data}")
result = await collection.insert_one(session_data)
logger.info(f"Session saved with ID: {result.inserted_id}, session_id: {session_data.get('session_id')}")
return str(result.inserted_id)
except Exception as e:
logger.error(f"Error saving session: {e}")
raise
async def get_unanswered_questions(limit: int = 10) -> List[Dict[str, Any]]:
"""
Lấy danh sách câu hỏi chưa trả lời (RAG đáp ứng với "I don't know")
"""
try:
logger.info(f"Đang truy vấn {limit} câu hỏi chưa được trả lời...")
# Lọc session có tin nhắn của RAG là "I don't know"
rag_filter = {
"factor": "RAG",
"message": {"$regex": "^I don't know"}
}
# Sử dụng to_list() để lấy danh sách từ cursor
sessions = await db.rag_sessions.find(rag_filter).sort("timestamp", -1).to_list(limit)
if not sessions:
logger.info("Không tìm thấy session nào với tin nhắn RAG 'I don't know'")
return []
logger.info(f"Tìm thấy {len(sessions)} session với RAG trả lời 'I don't know'")
result = []
for session in sessions:
session_id = session.get("session_id")
if not session_id:
logger.warning(f"Session không có session_id: {session.get('_id')}")
continue
# Tìm câu hỏi tương ứng của người dùng
user_question = db.user_questions.find_one({"session_id": session_id})
if not user_question:
logger.warning(f"Không tìm thấy câu hỏi người dùng cho session {session_id}")
continue
# Chuyển đổi dữ liệu trước khi đưa vào kết quả
session_data = convert_mongo_doc(session)
user_question_data = convert_mongo_doc(user_question)
result.append({
"session": session_data,
"user_question": user_question_data
})
logger.info(f"Tìm thấy {len(result)} câu hỏi chưa được trả lời với đầy đủ thông tin")
return result
except Exception as e:
logger.error(f"Lỗi khi lấy câu hỏi chưa trả lời: {e}")
import traceback
logger.error(traceback.format_exc())
return []
async def get_unanswered_question_by_session_id(session_id: str) -> Optional[Dict[str, Any]]:
"""
Lấy câu hỏi chưa trả lời theo session_id cụ thể
"""
try:
logger.info(f"Đang tìm câu hỏi chưa trả lời cho session {session_id}")
# Tìm session RAG với "I don't know" của session_id cụ thể
rag_session = await collection.find_one({
"factor": "RAG",
"session_id": session_id,
"message": {"$regex": "^I don't know", "$options": "i"}
})
if not rag_session:
logger.info(f"Không tìm thấy session RAG nào với 'I don't know' cho session_id {session_id}")
return None
logger.info(f"Tìm thấy session RAG: {rag_session.get('_id')}")
# Tìm câu hỏi tương ứng của người dùng
user_question = await collection.find_one({
"session_id": session_id,
"factor": "user"
})
if not user_question:
logger.warning(f"Không tìm thấy câu hỏi người dùng cho session {session_id}")
return None
logger.info(f"Tìm thấy câu hỏi người dùng: {user_question.get('_id')}")
# Chuyển đổi dữ liệu
session_data = convert_mongo_doc(rag_session)
user_question_data = convert_mongo_doc(user_question)
result = {
"session": session_data,
"user_question": user_question_data
}
logger.info(f"Đã tìm thấy đầy đủ thông tin cho câu hỏi chưa trả lời với session_id {session_id}")
return result
except Exception as e:
logger.error(f"Lỗi khi tìm câu hỏi chưa trả lời theo session_id: {e}")
import traceback
logger.error(traceback.format_exc())
return None
async def find_user_question(session_id: str) -> Optional[Dict[str, Any]]:
"""
Tìm câu hỏi của người dùng theo session_id
"""
try:
user_question = await collection.find_one({
"session_id": session_id,
"factor": "user"
})
if user_question:
logger.info(f"Tìm thấy câu hỏi người dùng cho session {session_id}")
return convert_mongo_doc(user_question)
else:
logger.warning(f"Không tìm thấy câu hỏi người dùng cho session {session_id}")
return None
except Exception as e:
logger.error(f"Lỗi khi tìm câu hỏi người dùng: {e}")
return None