Zok213
Refactor app.py to transition from Flask to FastAPI, enhancing the API with WebSocket support for real-time notifications, improved session handling, and background processing for unanswered questions. Update Dockerfile for compatibility with FastAPI and streamline application setup. Revise README.md to reflect new features and API endpoints.
a52d88d | import os | |
| import motor.motor_asyncio | |
| from dotenv import load_dotenv | |
| import logging | |
| from bson import ObjectId | |
| import json | |
| from pymongo import MongoClient | |
| import datetime | |
| from typing import List, Dict, Any, Optional, Union | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Load environment variables | |
| load_dotenv() | |
| # MongoDB connection parameters | |
| MONGODB_URI = os.getenv("MONGODB_URI") | |
| MONGODB_DB = os.getenv("MONGODB_DB") | |
| MONGODB_COLLECTION = os.getenv("MONGODB_COLLECTION") | |
| # Create MongoDB client | |
| try: | |
| client = motor.motor_asyncio.AsyncIOMotorClient(MONGODB_URI) | |
| db = client[MONGODB_DB] | |
| collection = db[MONGODB_COLLECTION] | |
| logger.info("MongoDB connection established successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to connect to MongoDB: {e}") | |
| raise | |
| def convert_mongo_doc(doc): | |
| """Chuyển đổi document từ MongoDB thành định dạng JSON hợp lệ""" | |
| if isinstance(doc, dict): | |
| return {k: convert_mongo_doc(v) for k, v in doc.items()} | |
| elif isinstance(doc, list): | |
| return [convert_mongo_doc(item) for item in doc] | |
| elif isinstance(doc, ObjectId): | |
| return str(doc) | |
| elif isinstance(doc, datetime.datetime): | |
| return doc.isoformat() | |
| else: | |
| return doc | |
| async def save_session(session_data): | |
| """ | |
| Save a new session chat to the database | |
| Args: | |
| session_data (dict): Session data to be saved | |
| Returns: | |
| str: ID of the inserted document | |
| """ | |
| try: | |
| # Đảm bảo các trường cần thiết tồn tại | |
| required_fields = ["session_id", "user_id", "action", "factor"] | |
| for field in required_fields: | |
| if field not in session_data: | |
| raise ValueError(f"Missing required field: {field}") | |
| # Chuyển đổi user_id thành số nếu là chuỗi | |
| if isinstance(session_data.get("user_id"), str) and session_data["user_id"].isdigit(): | |
| session_data["user_id"] = int(session_data["user_id"]) | |
| # Đảm bảo các trường văn bản không phải None | |
| text_fields = ["username", "first_name", "last_name", "message", "action", "factor"] | |
| for field in text_fields: | |
| if field in session_data and session_data[field] is None: | |
| session_data[field] = "" | |
| # Log mức độ debug chi tiết hơn | |
| logger.debug(f"Saving session data: {session_data}") | |
| result = await collection.insert_one(session_data) | |
| logger.info(f"Session saved with ID: {result.inserted_id}, session_id: {session_data.get('session_id')}") | |
| return str(result.inserted_id) | |
| except Exception as e: | |
| logger.error(f"Error saving session: {e}") | |
| raise | |
| async def get_unanswered_questions(limit: int = 10) -> List[Dict[str, Any]]: | |
| """ | |
| Lấy danh sách câu hỏi chưa trả lời (RAG đáp ứng với "I don't know") | |
| """ | |
| try: | |
| logger.info(f"Đang truy vấn {limit} câu hỏi chưa được trả lời...") | |
| # Lọc session có tin nhắn của RAG là "I don't know" | |
| rag_filter = { | |
| "factor": "RAG", | |
| "message": {"$regex": "^I don't know"} | |
| } | |
| # Sử dụng to_list() để lấy danh sách từ cursor | |
| sessions = await db.rag_sessions.find(rag_filter).sort("timestamp", -1).to_list(limit) | |
| if not sessions: | |
| logger.info("Không tìm thấy session nào với tin nhắn RAG 'I don't know'") | |
| return [] | |
| logger.info(f"Tìm thấy {len(sessions)} session với RAG trả lời 'I don't know'") | |
| result = [] | |
| for session in sessions: | |
| session_id = session.get("session_id") | |
| if not session_id: | |
| logger.warning(f"Session không có session_id: {session.get('_id')}") | |
| continue | |
| # Tìm câu hỏi tương ứng của người dùng | |
| user_question = db.user_questions.find_one({"session_id": session_id}) | |
| if not user_question: | |
| logger.warning(f"Không tìm thấy câu hỏi người dùng cho session {session_id}") | |
| continue | |
| # Chuyển đổi dữ liệu trước khi đưa vào kết quả | |
| session_data = convert_mongo_doc(session) | |
| user_question_data = convert_mongo_doc(user_question) | |
| result.append({ | |
| "session": session_data, | |
| "user_question": user_question_data | |
| }) | |
| logger.info(f"Tìm thấy {len(result)} câu hỏi chưa được trả lời với đầy đủ thông tin") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Lỗi khi lấy câu hỏi chưa trả lời: {e}") | |
| import traceback | |
| logger.error(traceback.format_exc()) | |
| return [] | |
| async def get_unanswered_question_by_session_id(session_id: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Lấy câu hỏi chưa trả lời theo session_id cụ thể | |
| """ | |
| try: | |
| logger.info(f"Đang tìm câu hỏi chưa trả lời cho session {session_id}") | |
| # Tìm session RAG với "I don't know" của session_id cụ thể | |
| rag_session = await collection.find_one({ | |
| "factor": "RAG", | |
| "session_id": session_id, | |
| "message": {"$regex": "^I don't know", "$options": "i"} | |
| }) | |
| if not rag_session: | |
| logger.info(f"Không tìm thấy session RAG nào với 'I don't know' cho session_id {session_id}") | |
| return None | |
| logger.info(f"Tìm thấy session RAG: {rag_session.get('_id')}") | |
| # Tìm câu hỏi tương ứng của người dùng | |
| user_question = await collection.find_one({ | |
| "session_id": session_id, | |
| "factor": "user" | |
| }) | |
| if not user_question: | |
| logger.warning(f"Không tìm thấy câu hỏi người dùng cho session {session_id}") | |
| return None | |
| logger.info(f"Tìm thấy câu hỏi người dùng: {user_question.get('_id')}") | |
| # Chuyển đổi dữ liệu | |
| session_data = convert_mongo_doc(rag_session) | |
| user_question_data = convert_mongo_doc(user_question) | |
| result = { | |
| "session": session_data, | |
| "user_question": user_question_data | |
| } | |
| logger.info(f"Đã tìm thấy đầy đủ thông tin cho câu hỏi chưa trả lời với session_id {session_id}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Lỗi khi tìm câu hỏi chưa trả lời theo session_id: {e}") | |
| import traceback | |
| logger.error(traceback.format_exc()) | |
| return None | |
| async def find_user_question(session_id: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Tìm câu hỏi của người dùng theo session_id | |
| """ | |
| try: | |
| user_question = await collection.find_one({ | |
| "session_id": session_id, | |
| "factor": "user" | |
| }) | |
| if user_question: | |
| logger.info(f"Tìm thấy câu hỏi người dùng cho session {session_id}") | |
| return convert_mongo_doc(user_question) | |
| else: | |
| logger.warning(f"Không tìm thấy câu hỏi người dùng cho session {session_id}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Lỗi khi tìm câu hỏi người dùng: {e}") | |
| return None |