Spaces:
Sleeping
Sleeping
| import bcrypt | |
| from bson import ObjectId | |
| from datetime import datetime | |
| from db import get_users_collection | |
| import logging | |
| class User: | |
| def __init__(self, email, name, password=None, permissions="User", position="Officer", | |
| department_id=None, logs=None, incidents=None, _id=None, created_at=None, updated_at=None): | |
| self.email = email | |
| self.name = name | |
| self.password = password | |
| self.permissions = permissions # "Admin" or "User" | |
| self.position = position | |
| self.department_id = department_id | |
| self.logs = logs or [] | |
| self.incidents = incidents or [] | |
| self._id = _id | |
| self.created_at = created_at or datetime.now() | |
| self.updated_at = updated_at or datetime.now() | |
| def hash_password(password): | |
| """Hash a password for storing.""" | |
| return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') | |
| def verify_password(stored_password, provided_password): | |
| """Verify a stored password against one provided by user""" | |
| return bcrypt.checkpw(provided_password.encode('utf-8'), stored_password.encode('utf-8')) | |
| def to_dict(self): | |
| """Convert instance to dictionary (excluding password)""" | |
| user_dict = { | |
| "email": self.email, | |
| "name": self.name, | |
| "permissions": self.permissions, | |
| "position": self.position, | |
| "department_id": str(self.department_id) if self.department_id else None, | |
| "logs": [str(log_id) for log_id in self.logs], | |
| "incidents": [str(incident_id) for incident_id in self.incidents], | |
| "created_at": self.created_at, | |
| "updated_at": self.updated_at | |
| } | |
| if self._id: | |
| user_dict["_id"] = str(self._id) | |
| return user_dict | |
| def from_dict(cls, user_dict): | |
| """Create instance from dictionary""" | |
| logger = logging.getLogger(__name__) | |
| # Log the input dictionary for debugging | |
| logger.info(f"from_dict called with: {user_dict.get('_id')}, email: {user_dict.get('email')}") | |
| # Handle _id | |
| if "_id" in user_dict and user_dict["_id"]: | |
| user_dict["_id"] = ObjectId(user_dict["_id"]) if isinstance(user_dict["_id"], str) else user_dict["_id"] | |
| # Handle department_id | |
| if "department_id" in user_dict and user_dict["department_id"]: | |
| # Log the department_id before conversion | |
| logger.info(f"Processing department_id: {user_dict['department_id']}, type: {type(user_dict['department_id'])}") | |
| try: | |
| # Handle department_id whether it's a string or an ObjectId | |
| if isinstance(user_dict["department_id"], str): | |
| user_dict["department_id"] = ObjectId(user_dict["department_id"]) | |
| logger.info(f"Converted string department_id to ObjectId: {user_dict['department_id']}") | |
| # No conversion needed if already an ObjectId | |
| if not isinstance(user_dict["department_id"], ObjectId): | |
| logger.warning(f"department_id is not an ObjectId after conversion: {user_dict['department_id']}, type: {type(user_dict['department_id'])}") | |
| except Exception as e: | |
| logger.error(f"Error converting department_id: {e}") | |
| # Convert string IDs to ObjectIds for logs and incidents | |
| if "logs" in user_dict and user_dict["logs"]: | |
| user_dict["logs"] = [ObjectId(log_id) if isinstance(log_id, str) else log_id for log_id in user_dict["logs"]] | |
| if "incidents" in user_dict and user_dict["incidents"]: | |
| user_dict["incidents"] = [ObjectId(incident_id) if isinstance(incident_id, str) else incident_id for incident_id in user_dict["incidents"]] | |
| return cls(**user_dict) | |
| def save(self): | |
| """Save user to database""" | |
| users_collection = get_users_collection() | |
| user_dict = self.to_dict() | |
| # Remove password from the dictionary for safety before saving | |
| # Password should only be updated via set_password method | |
| if 'password' in user_dict: | |
| del user_dict['password'] | |
| if self._id: | |
| # Update existing user | |
| user_dict["updated_at"] = datetime.now() | |
| # --- Remove _id before updating --- | |
| if "_id" in user_dict: | |
| del user_dict["_id"] | |
| # --- End Remove _id --- | |
| result = users_collection.update_one( | |
| {"_id": ObjectId(self._id)}, | |
| {"$set": user_dict} | |
| ) | |
| success = result.modified_count > 0 | |
| else: | |
| # Insert new user - Requires password which was deleted above! | |
| # We need the hashed password for insertion. | |
| # Let's re-add the hashed password if it exists on the object | |
| if self.password: | |
| user_dict['password'] = self.password | |
| else: | |
| # This should ideally not happen if creating a new user | |
| # Maybe throw an error or log a warning? | |
| logger.warning(f"Attempting to insert new user {self.email} without a password hash.") | |
| # Handle appropriately - perhaps prevent save or set a placeholder? | |
| # For now, let it proceed but log it. | |
| user_dict["created_at"] = datetime.now() | |
| user_dict["updated_at"] = datetime.now() | |
| result = users_collection.insert_one(user_dict) | |
| self._id = result.inserted_id | |
| success = result.acknowledged | |
| # Add user to department if insert was successful | |
| if success and self.department_id: | |
| department = Department.find_by_id(self.department_id) | |
| if department: | |
| department.add_member(self._id) | |
| return success | |
| def find_by_id(cls, user_id): | |
| """Find user by ID""" | |
| users_collection = get_users_collection() | |
| user_data = users_collection.find_one({"_id": ObjectId(user_id)}) | |
| if user_data: | |
| return cls.from_dict(user_data) | |
| return None | |
| def find_by_email(cls, email): | |
| """Find user by email""" | |
| users_collection = get_users_collection() | |
| user_data = users_collection.find_one({"email": email}) | |
| if user_data: | |
| return cls.from_dict(user_data) | |
| return None | |
| def find_by_department(cls, department_id): | |
| """Find users by department""" | |
| logger = logging.getLogger(__name__) | |
| logger.info(f"Finding users by department: {department_id}, type: {type(department_id)}") | |
| # Ensure department_id is an ObjectId | |
| if isinstance(department_id, str): | |
| try: | |
| department_id = ObjectId(department_id) | |
| logger.info(f"Converted department_id to ObjectId: {department_id}") | |
| except Exception as e: | |
| logger.error(f"Error converting department_id to ObjectId: {e}") | |
| return [] | |
| # Check if department_id is still not an ObjectId | |
| if not isinstance(department_id, ObjectId): | |
| logger.warning(f"department_id is not an ObjectId: {department_id}, type: {type(department_id)}") | |
| # Get all users to help with debugging | |
| all_users = list(get_users_collection().find()) | |
| logger.info(f"Total users in database: {len(all_users)}") | |
| # Check each user's department_id for debugging | |
| for user in all_users: | |
| user_dept_id = user.get('department_id') | |
| logger.info(f"User {user.get('email')} has department_id: {user_dept_id}, type: {type(user_dept_id)}") | |
| # Log string comparison for debugging | |
| if isinstance(user_dept_id, ObjectId) and isinstance(department_id, ObjectId): | |
| logger.info(f"Comparing ObjectIds: {user_dept_id} == {department_id} result: {user_dept_id == department_id}") | |
| elif isinstance(user_dept_id, str) and isinstance(department_id, str): | |
| logger.info(f"Comparing strings: {user_dept_id} == {department_id} result: {user_dept_id == department_id}") | |
| elif user_dept_id is not None and department_id is not None: | |
| logger.info(f"Comparing mixed types: {str(user_dept_id)} == {str(department_id)} result: {str(user_dept_id) == str(department_id)}") | |
| # Try both approaches for finding users | |
| try: | |
| # Method 1: Direct query using department_id as ObjectId | |
| logger.info(f"Querying database for users with department_id: {department_id}") | |
| users = list(get_users_collection().find({"department_id": department_id})) | |
| logger.info(f"Query 1 found {len(users)} users with department_id as ObjectId") | |
| # If that doesn't work, try string comparison as a fallback | |
| if not users and isinstance(department_id, ObjectId): | |
| dept_id_str = str(department_id) | |
| logger.info(f"Trying fallback query with string department_id: {dept_id_str}") | |
| # Filter manually for string comparison | |
| string_matches = [ | |
| User.from_dict(user) for user in all_users | |
| if user.get('department_id') is not None and str(user.get('department_id')) == dept_id_str | |
| ] | |
| logger.info(f"String comparison found {len(string_matches)} matching users") | |
| if string_matches: | |
| logger.info("Using string comparison results instead") | |
| return string_matches | |
| return [User.from_dict(user) for user in users] | |
| except Exception as e: | |
| logger.error(f"Error finding users by department: {e}") | |
| return [] | |
| def get_all(cls): | |
| """Get all users""" | |
| users_collection = get_users_collection() | |
| users_data = users_collection.find() | |
| return [cls.from_dict(user_data) for user_data in users_data] | |
| def delete(self): | |
| """Delete user from database""" | |
| if not self._id: | |
| return False | |
| users_collection = get_users_collection() | |
| result = users_collection.delete_one({"_id": ObjectId(self._id)}) | |
| return result.deleted_count > 0 | |
| def add_log(self, log_id): | |
| """Add a log to user's logs""" | |
| if log_id not in self.logs: | |
| self.logs.append(ObjectId(log_id)) | |
| return self.save() | |
| return True | |
| def add_incident(self, incident_id): | |
| """Add an incident to user's incidents""" | |
| if incident_id not in self.incidents: | |
| self.incidents.append(ObjectId(incident_id)) | |
| return self.save() | |
| return True |