Spaces:
Sleeping
Sleeping
| import os | |
| import datetime | |
| from pymongo import MongoClient | |
| from bson.objectid import ObjectId | |
| import bcrypt | |
| import logging | |
| from dotenv import load_dotenv | |
| # Cấu hình logging | |
| logger = logging.getLogger(__name__) | |
| # Tải biến môi trường | |
| load_dotenv() | |
| # Kết nối MongoDB | |
| MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/") | |
| DATABASE_NAME = os.getenv("MONGO_DB_NAME", "nutribot_db") | |
| # Singleton pattern cho kết nối MongoDB | |
| _mongo_client = None | |
| _db = None | |
| def get_db(): | |
| """Trả về instance của MongoDB database (singleton pattern)""" | |
| global _mongo_client, _db | |
| if _mongo_client is None: | |
| try: | |
| _mongo_client = MongoClient(MONGO_URI) | |
| _db = _mongo_client[DATABASE_NAME] | |
| logger.info(f"Đã kết nối đến database: {DATABASE_NAME}") | |
| except Exception as e: | |
| logger.error(f"Lỗi kết nối MongoDB: {e}") | |
| raise | |
| return _db | |
| class User: | |
| def __init__(self, name, email, password, gender=None, role="user", permissions=None, | |
| user_id=None, created_at=None, updated_at=None, last_login=None): | |
| self.user_id = user_id | |
| self.name = name | |
| self.email = email | |
| self.password = password # Mật khẩu đã mã hóa | |
| self.gender = gender | |
| self.role = role # "user" hoặc "admin" | |
| self.permissions = permissions or self._get_default_permissions(role) | |
| self.created_at = created_at or datetime.datetime.now() | |
| self.updated_at = updated_at or datetime.datetime.now() | |
| self.last_login = last_login | |
| def _get_default_permissions(self, role): | |
| """Lấy quyền mặc định theo role""" | |
| if role == "admin": | |
| return { | |
| "users": {"read": True, "write": True, "delete": True}, | |
| "documents": {"read": True, "write": True, "delete": True}, | |
| "conversations": {"read": True, "write": True, "delete": True}, | |
| "system": {"read": True, "write": True, "delete": False}, | |
| "analytics": {"read": True, "write": False, "delete": False} | |
| } | |
| else: # role == "user" | |
| return { | |
| "conversations": {"read": True, "write": True, "delete": True}, | |
| "profile": {"read": True, "write": True, "delete": False} | |
| } | |
| def hash_password(password): | |
| """Mã hóa mật khẩu sử dụng bcrypt""" | |
| salt = bcrypt.gensalt() | |
| hashed_pw = bcrypt.hashpw(password.encode('utf-8'), salt) | |
| return hashed_pw | |
| def check_password(hashed_password, password): | |
| """Kiểm tra mật khẩu""" | |
| return bcrypt.checkpw(password.encode('utf-8'), hashed_password) | |
| def is_admin(self): | |
| """Kiểm tra xem user có phải admin không""" | |
| return self.role == "admin" | |
| def has_permission(self, resource, action): | |
| """Kiểm tra quyền truy cập""" | |
| if not self.permissions: | |
| return False | |
| resource_perms = self.permissions.get(resource, {}) | |
| return resource_perms.get(action, False) | |
| def to_dict(self): | |
| """Chuyển đổi thông tin user thành dictionary""" | |
| user_dict = { | |
| "name": self.name, | |
| "email": self.email, | |
| "password": self.password, | |
| "gender": self.gender, | |
| "role": self.role, | |
| "permissions": self.permissions, | |
| "created_at": self.created_at, | |
| "updated_at": self.updated_at, | |
| "last_login": self.last_login | |
| } | |
| if self.user_id: | |
| user_dict["_id"] = self.user_id | |
| return user_dict | |
| def from_dict(cls, user_dict): | |
| """Tạo đối tượng User từ dictionary""" | |
| if not user_dict: | |
| return None | |
| return cls( | |
| user_id=user_dict.get("_id"), | |
| name=user_dict.get("name"), | |
| email=user_dict.get("email"), | |
| password=user_dict.get("password"), | |
| gender=user_dict.get("gender"), | |
| role=user_dict.get("role", "user"), # Mặc định là user | |
| permissions=user_dict.get("permissions"), | |
| created_at=user_dict.get("created_at"), | |
| updated_at=user_dict.get("updated_at"), | |
| last_login=user_dict.get("last_login") | |
| ) | |
| def find_by_email(cls, email): | |
| """Tìm người dùng theo email""" | |
| try: | |
| db = get_db() | |
| users_collection = db.users | |
| user_data = users_collection.find_one({"email": email}) | |
| return cls.from_dict(user_data) | |
| except Exception as e: | |
| logger.error(f"Lỗi tìm người dùng theo email: {e}") | |
| return None | |
| def find_by_id(cls, user_id): | |
| """Tìm người dùng theo ID""" | |
| try: | |
| db = get_db() | |
| users_collection = db.users | |
| user_data = users_collection.find_one({"_id": ObjectId(user_id)}) | |
| return cls.from_dict(user_data) | |
| except Exception as e: | |
| logger.error(f"Lỗi tìm người dùng theo ID: {e}") | |
| return None | |
| def get_all_admins(cls, page=1, per_page=10): | |
| """Lấy danh sách tất cả admin với phân trang""" | |
| try: | |
| db = get_db() | |
| users_collection = db.users | |
| skip = (page - 1) * per_page | |
| admins_data = users_collection.find( | |
| {"role": "admin"} | |
| ).sort("created_at", -1).skip(skip).limit(per_page) | |
| total = users_collection.count_documents({"role": "admin"}) | |
| return [cls.from_dict(admin) for admin in admins_data], total | |
| except Exception as e: | |
| logger.error(f"Lỗi lấy danh sách admin: {e}") | |
| return [], 0 | |
| def save(self): | |
| """Lưu thông tin người dùng vào database""" | |
| try: | |
| db = get_db() | |
| users_collection = db.users | |
| if not self.user_id: | |
| # Đây là người dùng mới | |
| insert_result = users_collection.insert_one(self.to_dict()) | |
| self.user_id = insert_result.inserted_id | |
| logger.info(f"Đã tạo người dùng mới với ID: {self.user_id}") | |
| return self.user_id | |
| else: | |
| # Cập nhật người dùng đã tồn tại | |
| self.updated_at = datetime.datetime.now() | |
| users_collection.update_one( | |
| {"_id": self.user_id}, | |
| {"$set": self.to_dict()} | |
| ) | |
| logger.info(f"Đã cập nhật thông tin người dùng: {self.user_id}") | |
| return self.user_id | |
| except Exception as e: | |
| logger.error(f"Lỗi khi lưu thông tin người dùng: {e}") | |
| raise | |
| def update_last_login(self): | |
| """Cập nhật thời gian đăng nhập cuối""" | |
| try: | |
| self.last_login = datetime.datetime.now() | |
| self.save() | |
| except Exception as e: | |
| logger.error(f"Lỗi cập nhật last_login: {e}") | |
| def delete(self): | |
| """Xóa người dùng từ database""" | |
| try: | |
| if self.user_id: | |
| db = get_db() | |
| users_collection = db.users | |
| users_collection.delete_one({"_id": self.user_id}) | |
| logger.info(f"Đã xóa người dùng: {self.user_id}") | |
| return True | |
| return False | |
| except Exception as e: | |
| logger.error(f"Lỗi khi xóa người dùng: {e}") | |
| return False | |
| def register(name, email, password, gender=None, role="user"): | |
| """Đăng ký người dùng mới""" | |
| try: | |
| # Kiểm tra email đã tồn tại chưa | |
| existing_user = User.find_by_email(email) | |
| if existing_user: | |
| return False, "Email đã được sử dụng" | |
| # Mã hóa mật khẩu | |
| hashed_password = User.hash_password(password) | |
| # Tạo người dùng mới | |
| new_user = User( | |
| name=name, | |
| email=email, | |
| password=hashed_password, | |
| gender=gender, | |
| role=role | |
| ) | |
| # Lưu vào database | |
| user_id = new_user.save() | |
| return True, {"user_id": str(user_id)} | |
| except Exception as e: | |
| logger.error(f"Lỗi đăng ký người dùng: {e}") | |
| return False, f"Lỗi đăng ký: {str(e)}" | |
| def login(email, password): | |
| """Đăng nhập người dùng""" | |
| try: | |
| if not email or not password: | |
| return False, "Vui lòng nhập đầy đủ thông tin đăng nhập" | |
| user = User.find_by_email(email) | |
| if not user: | |
| return False, "Email không tồn tại" | |
| if not User.check_password(user.password, password): | |
| return False, "Mật khẩu không chính xác" | |
| # Cập nhật thời gian đăng nhập | |
| user.update_last_login() | |
| return True, user | |
| except Exception as e: | |
| logger.error(f"Lỗi đăng nhập: {e}") | |
| return False, f"Lỗi đăng nhập: {str(e)}" | |
| def create_admin(name, email, password, gender=None): | |
| """Tạo admin mới""" | |
| return User.register(name, email, password, gender, role="admin") | |
| def create_default_admin(): | |
| """Tạo admin mặc định nếu chưa có""" | |
| try: | |
| db = get_db() | |
| users_collection = db.users | |
| # Kiểm tra xem đã có admin chưa | |
| existing_admin = users_collection.find_one({"role": "admin"}) | |
| if not existing_admin: | |
| # Tạo admin mặc định | |
| default_email = "admin@nutribot.com" | |
| default_password = "NutribotAdmin2024!" | |
| success, result = User.create_admin( | |
| name="Administrator", | |
| email=default_email, | |
| password=default_password | |
| ) | |
| if success: | |
| logger.info(f"Đã tạo admin mặc định: {default_email}") | |
| logger.info(f"Mật khẩu mặc định: {default_password}") | |
| return True, { | |
| "email": default_email, | |
| "password": default_password, | |
| "user_id": result["user_id"] | |
| } | |
| else: | |
| logger.error(f"Lỗi tạo admin mặc định: {result}") | |
| return False, result | |
| else: | |
| logger.info("Admin đã tồn tại") | |
| return True, {"message": "Admin đã tồn tại"} | |
| except Exception as e: | |
| logger.error(f"Lỗi tạo admin mặc định: {e}") | |
| return False, str(e) | |
| def get_stats(): | |
| """Lấy thống kê về users""" | |
| try: | |
| db = get_db() | |
| users_collection = db.users | |
| # Tổng số users | |
| total_users = users_collection.count_documents({}) | |
| # Số admin | |
| total_admins = users_collection.count_documents({"role": "admin"}) | |
| # Users đăng ký hôm nay | |
| today_start = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) | |
| new_users_today = users_collection.count_documents({ | |
| "created_at": {"$gte": today_start} | |
| }) | |
| # Users có hoạt động (có last_login) | |
| active_users = users_collection.count_documents({ | |
| "last_login": {"$exists": True} | |
| }) | |
| return { | |
| "total_users": total_users, | |
| "total_admins": total_admins, | |
| "active_users": active_users, | |
| "new_users_today": new_users_today | |
| } | |
| except Exception as e: | |
| logger.error(f"Lỗi lấy thống kê users: {e}") | |
| return { | |
| "total_users": 0, | |
| "total_admins": 0, | |
| "active_users": 0, | |
| "new_users_today": 0 | |
| } |