Spaces:
Sleeping
Sleeping
| import os | |
| import datetime | |
| from pymongo import MongoClient | |
| from bson.objectid import ObjectId | |
| import logging | |
| from dotenv import load_dotenv | |
| # Cấu hình logging | |
| logger = logging.getLogger(__name__) | |
| # Tải biến môi trường | |
| load_dotenv() | |
| # Kết nối MongoDB | |
| MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/") | |
| DATABASE_NAME = os.getenv("MONGO_DB_NAME", "nutribot_db") | |
| # Singleton pattern cho kết nối MongoDB | |
| _mongo_client = None | |
| _db = None | |
| def get_db(): | |
| """Trả về instance của MongoDB database (singleton pattern)""" | |
| global _mongo_client, _db | |
| if _mongo_client is None: | |
| try: | |
| _mongo_client = MongoClient(MONGO_URI) | |
| _db = _mongo_client[DATABASE_NAME] | |
| logger.info(f"Đã kết nối đến database: {DATABASE_NAME}") | |
| except Exception as e: | |
| logger.error(f"Lỗi kết nối MongoDB: {e}") | |
| raise | |
| return _db | |
| def ensure_indexes(): | |
| """Tạo các index cần thiết cho feedback collection""" | |
| try: | |
| db = get_db() | |
| feedback_collection = db.feedback | |
| # Index cho user_id | |
| feedback_collection.create_index("user_id") | |
| # Index cho status | |
| feedback_collection.create_index("status") | |
| # Index cho category | |
| feedback_collection.create_index("category") | |
| # Index cho created_at (để sắp xếp) | |
| feedback_collection.create_index([("created_at", -1)]) | |
| logger.info("Đã tạo indexes cho feedback collection") | |
| except Exception as e: | |
| logger.error(f"Lỗi tạo indexes: {e}") | |
| class Feedback: | |
| def __init__(self, user_id=None, rating=5, category='', title='', content='', | |
| status='pending', admin_response='', created_at=None, updated_at=None, | |
| feedback_id=None): | |
| self.feedback_id = feedback_id | |
| self.user_id = user_id | |
| self.rating = rating | |
| self.category = category | |
| self.title = title | |
| self.content = content | |
| self.status = status # pending, reviewed, resolved | |
| self.admin_response = admin_response | |
| self.created_at = created_at or datetime.datetime.now() | |
| self.updated_at = updated_at or datetime.datetime.now() | |
| def to_dict(self): | |
| """Chuyển đổi thông tin feedback thành dictionary""" | |
| feedback_dict = { | |
| "user_id": self.user_id, | |
| "rating": self.rating, | |
| "category": self.category, | |
| "title": self.title, | |
| "content": self.content, | |
| "status": self.status, | |
| "admin_response": self.admin_response, | |
| "created_at": self.created_at, | |
| "updated_at": self.updated_at | |
| } | |
| if self.feedback_id: | |
| feedback_dict["_id"] = self.feedback_id | |
| return feedback_dict | |
| def from_dict(cls, feedback_dict): | |
| """Tạo đối tượng Feedback từ dictionary""" | |
| if not feedback_dict: | |
| return None | |
| return cls( | |
| feedback_id=feedback_dict.get("_id"), | |
| user_id=feedback_dict.get("user_id"), | |
| rating=feedback_dict.get("rating", 5), | |
| category=feedback_dict.get("category", ""), | |
| title=feedback_dict.get("title", ""), | |
| content=feedback_dict.get("content", ""), | |
| status=feedback_dict.get("status", "pending"), | |
| admin_response=feedback_dict.get("admin_response", ""), | |
| created_at=feedback_dict.get("created_at"), | |
| updated_at=feedback_dict.get("updated_at") | |
| ) | |
| def save(self): | |
| """Lưu thông tin feedback vào database""" | |
| try: | |
| db = get_db() | |
| feedback_collection = db.feedback | |
| self.updated_at = datetime.datetime.now() | |
| if not self.feedback_id: | |
| insert_result = feedback_collection.insert_one(self.to_dict()) | |
| self.feedback_id = insert_result.inserted_id | |
| logger.info(f"Đã tạo feedback mới với ID: {self.feedback_id}") | |
| return self.feedback_id | |
| else: | |
| feedback_collection.update_one( | |
| {"_id": self.feedback_id}, | |
| {"$set": self.to_dict()} | |
| ) | |
| logger.info(f"Đã cập nhật feedback: {self.feedback_id}") | |
| return self.feedback_id | |
| except Exception as e: | |
| logger.error(f"Lỗi khi lưu feedback: {e}") | |
| raise | |
| def find_by_id(cls, feedback_id): | |
| """Tìm feedback theo ID""" | |
| try: | |
| db = get_db() | |
| feedback_collection = db.feedback | |
| if isinstance(feedback_id, str): | |
| feedback_id = ObjectId(feedback_id) | |
| feedback_dict = feedback_collection.find_one({"_id": feedback_id}) | |
| if feedback_dict: | |
| return cls.from_dict(feedback_dict) | |
| return None | |
| except Exception as e: | |
| logger.error(f"Lỗi khi tìm feedback: {e}") | |
| return None | |
| def find_by_user(cls, user_id, limit=10, skip=0): | |
| """Tìm feedback theo user_id""" | |
| try: | |
| db = get_db() | |
| feedback_collection = db.feedback | |
| if isinstance(user_id, str): | |
| user_id = ObjectId(user_id) | |
| feedbacks_cursor = feedback_collection.find({"user_id": user_id})\ | |
| .sort("created_at", -1)\ | |
| .skip(skip)\ | |
| .limit(limit) | |
| result = [] | |
| for feedback_dict in feedbacks_cursor: | |
| feedback_obj = cls.from_dict(feedback_dict) | |
| if feedback_obj: | |
| result.append(feedback_obj) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Lỗi tìm feedback theo user: {e}") | |
| return [] | |
| def get_all_for_admin(cls, limit=20, skip=0, status_filter=None): | |
| """Lấy tất cả feedback cho admin với join user info""" | |
| try: | |
| db = get_db() | |
| feedback_collection = db.feedback | |
| # Tạo query filter | |
| query_filter = {} | |
| if status_filter: | |
| query_filter["status"] = status_filter | |
| # Aggregate để join với user collection | |
| pipeline = [ | |
| {"$match": query_filter}, | |
| { | |
| "$lookup": { | |
| "from": "users", | |
| "localField": "user_id", | |
| "foreignField": "_id", | |
| "as": "user_info" | |
| } | |
| }, | |
| {"$sort": {"created_at": -1}}, | |
| {"$skip": skip}, | |
| {"$limit": limit} | |
| ] | |
| result = [] | |
| for feedback_doc in feedback_collection.aggregate(pipeline): | |
| feedback = cls.from_dict(feedback_doc) | |
| # Thêm thông tin user nếu có | |
| if feedback and feedback_doc.get("user_info"): | |
| user_info = feedback_doc["user_info"][0] | |
| feedback.user_name = user_info.get("name", "Ẩn danh") | |
| feedback.user_email = user_info.get("email", "") | |
| else: | |
| feedback.user_name = "Ẩn danh" | |
| feedback.user_email = "" | |
| result.append(feedback) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Lỗi lấy feedback cho admin: {e}") | |
| return [] | |
| def update_admin_response(self, response, status): | |
| """Cập nhật phản hồi từ admin""" | |
| try: | |
| self.admin_response = response | |
| self.status = status | |
| self.save() | |
| return True | |
| except Exception as e: | |
| logger.error(f"Lỗi cập nhật admin response: {e}") | |
| return False | |
| def get_stats(): | |
| """Lấy thống kê về feedback""" | |
| try: | |
| db = get_db() | |
| feedback_collection = db.feedback | |
| # Tổng số feedback | |
| total_feedback = feedback_collection.count_documents({}) | |
| # Feedback pending | |
| pending_feedback = feedback_collection.count_documents({"status": "pending"}) | |
| # Feedback trong tháng này | |
| now = datetime.datetime.now() | |
| month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) | |
| this_month_feedback = feedback_collection.count_documents({ | |
| "created_at": {"$gte": month_start} | |
| }) | |
| # Đánh giá trung bình | |
| pipeline = [ | |
| {"$group": {"_id": None, "avg_rating": {"$avg": "$rating"}}} | |
| ] | |
| rating_result = list(feedback_collection.aggregate(pipeline)) | |
| average_rating = rating_result[0]["avg_rating"] if rating_result else 0 | |
| return { | |
| "total_feedback": total_feedback, | |
| "pending_feedback": pending_feedback, | |
| "this_month_feedback": this_month_feedback, | |
| "average_rating": average_rating | |
| } | |
| except Exception as e: | |
| logger.error(f"Lỗi lấy thống kê feedback: {e}") | |
| return { | |
| "total_feedback": 0, | |
| "pending_feedback": 0, | |
| "this_month_feedback": 0, | |
| "average_rating": 0 | |
| } | |
| def create_feedback(user_id, rating, category, title, content): | |
| """Tạo feedback mới""" | |
| try: | |
| feedback = Feedback( | |
| user_id=ObjectId(user_id) if isinstance(user_id, str) else user_id, | |
| rating=rating, | |
| category=category, | |
| title=title, | |
| content=content | |
| ) | |
| feedback_id = feedback.save() | |
| return True, {"feedback_id": str(feedback_id)} | |
| except Exception as e: | |
| logger.error(f"Lỗi tạo feedback: {e}") | |
| return False, str(e) |