Spaces:
Sleeping
Sleeping
File size: 11,103 Bytes
5854e13 ea2d2fc 5854e13 5714ef6 5854e13 5714ef6 5854e13 5714ef6 5854e13 bacb197 5854e13 bacb197 ecbc4c7 bacb197 5854e13 bacb197 5854e13 bacb197 5854e13 bacb197 5854e13 5714ef6 ea2d2fc 5714ef6 ea2d2fc 5714ef6 ea2d2fc 5714ef6 ea2d2fc 5714ef6 ea2d2fc 5714ef6 ea2d2fc 5714ef6 ea2d2fc 5714ef6 5854e13 7bac7c5 5854e13 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 | import bcrypt
from bson import ObjectId
from datetime import datetime
from db import get_users_collection
import logging
class User:
def __init__(self, email, name, password=None, permissions="User", position="Officer",
department_id=None, logs=None, incidents=None, _id=None, created_at=None, updated_at=None):
self.email = email
self.name = name
self.password = password
self.permissions = permissions # "Admin" or "User"
self.position = position
self.department_id = department_id
self.logs = logs or []
self.incidents = incidents or []
self._id = _id
self.created_at = created_at or datetime.now()
self.updated_at = updated_at or datetime.now()
@staticmethod
def hash_password(password):
"""Hash a password for storing."""
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
@staticmethod
def verify_password(stored_password, provided_password):
"""Verify a stored password against one provided by user"""
return bcrypt.checkpw(provided_password.encode('utf-8'), stored_password.encode('utf-8'))
def to_dict(self):
"""Convert instance to dictionary (excluding password)"""
user_dict = {
"email": self.email,
"name": self.name,
"permissions": self.permissions,
"position": self.position,
"department_id": str(self.department_id) if self.department_id else None,
"logs": [str(log_id) for log_id in self.logs],
"incidents": [str(incident_id) for incident_id in self.incidents],
"created_at": self.created_at,
"updated_at": self.updated_at
}
if self._id:
user_dict["_id"] = str(self._id)
return user_dict
@classmethod
def from_dict(cls, user_dict):
"""Create instance from dictionary"""
logger = logging.getLogger(__name__)
# Log the input dictionary for debugging
logger.info(f"from_dict called with: {user_dict.get('_id')}, email: {user_dict.get('email')}")
# Handle _id
if "_id" in user_dict and user_dict["_id"]:
user_dict["_id"] = ObjectId(user_dict["_id"]) if isinstance(user_dict["_id"], str) else user_dict["_id"]
# Handle department_id
if "department_id" in user_dict and user_dict["department_id"]:
# Log the department_id before conversion
logger.info(f"Processing department_id: {user_dict['department_id']}, type: {type(user_dict['department_id'])}")
try:
# Handle department_id whether it's a string or an ObjectId
if isinstance(user_dict["department_id"], str):
user_dict["department_id"] = ObjectId(user_dict["department_id"])
logger.info(f"Converted string department_id to ObjectId: {user_dict['department_id']}")
# No conversion needed if already an ObjectId
if not isinstance(user_dict["department_id"], ObjectId):
logger.warning(f"department_id is not an ObjectId after conversion: {user_dict['department_id']}, type: {type(user_dict['department_id'])}")
except Exception as e:
logger.error(f"Error converting department_id: {e}")
# Convert string IDs to ObjectIds for logs and incidents
if "logs" in user_dict and user_dict["logs"]:
user_dict["logs"] = [ObjectId(log_id) if isinstance(log_id, str) else log_id for log_id in user_dict["logs"]]
if "incidents" in user_dict and user_dict["incidents"]:
user_dict["incidents"] = [ObjectId(incident_id) if isinstance(incident_id, str) else incident_id for incident_id in user_dict["incidents"]]
return cls(**user_dict)
def save(self):
"""Save user to database"""
users_collection = get_users_collection()
user_dict = self.to_dict()
# Remove password from the dictionary for safety before saving
# Password should only be updated via set_password method
if 'password' in user_dict:
del user_dict['password']
if self._id:
# Update existing user
user_dict["updated_at"] = datetime.now()
# --- Remove _id before updating ---
if "_id" in user_dict:
del user_dict["_id"]
# --- End Remove _id ---
result = users_collection.update_one(
{"_id": ObjectId(self._id)},
{"$set": user_dict}
)
success = result.modified_count > 0
else:
# Insert new user - Requires password which was deleted above!
# We need the hashed password for insertion.
# Let's re-add the hashed password if it exists on the object
if self.password:
user_dict['password'] = self.password
else:
# This should ideally not happen if creating a new user
# Maybe throw an error or log a warning?
logger.warning(f"Attempting to insert new user {self.email} without a password hash.")
# Handle appropriately - perhaps prevent save or set a placeholder?
# For now, let it proceed but log it.
user_dict["created_at"] = datetime.now()
user_dict["updated_at"] = datetime.now()
result = users_collection.insert_one(user_dict)
self._id = result.inserted_id
success = result.acknowledged
# Add user to department if insert was successful
if success and self.department_id:
department = Department.find_by_id(self.department_id)
if department:
department.add_member(self._id)
return success
@classmethod
def find_by_id(cls, user_id):
"""Find user by ID"""
users_collection = get_users_collection()
user_data = users_collection.find_one({"_id": ObjectId(user_id)})
if user_data:
return cls.from_dict(user_data)
return None
@classmethod
def find_by_email(cls, email):
"""Find user by email"""
users_collection = get_users_collection()
user_data = users_collection.find_one({"email": email})
if user_data:
return cls.from_dict(user_data)
return None
@classmethod
def find_by_department(cls, department_id):
"""Find users by department"""
logger = logging.getLogger(__name__)
logger.info(f"Finding users by department: {department_id}, type: {type(department_id)}")
# Ensure department_id is an ObjectId
if isinstance(department_id, str):
try:
department_id = ObjectId(department_id)
logger.info(f"Converted department_id to ObjectId: {department_id}")
except Exception as e:
logger.error(f"Error converting department_id to ObjectId: {e}")
return []
# Check if department_id is still not an ObjectId
if not isinstance(department_id, ObjectId):
logger.warning(f"department_id is not an ObjectId: {department_id}, type: {type(department_id)}")
# Get all users to help with debugging
all_users = list(get_users_collection().find())
logger.info(f"Total users in database: {len(all_users)}")
# Check each user's department_id for debugging
for user in all_users:
user_dept_id = user.get('department_id')
logger.info(f"User {user.get('email')} has department_id: {user_dept_id}, type: {type(user_dept_id)}")
# Log string comparison for debugging
if isinstance(user_dept_id, ObjectId) and isinstance(department_id, ObjectId):
logger.info(f"Comparing ObjectIds: {user_dept_id} == {department_id} result: {user_dept_id == department_id}")
elif isinstance(user_dept_id, str) and isinstance(department_id, str):
logger.info(f"Comparing strings: {user_dept_id} == {department_id} result: {user_dept_id == department_id}")
elif user_dept_id is not None and department_id is not None:
logger.info(f"Comparing mixed types: {str(user_dept_id)} == {str(department_id)} result: {str(user_dept_id) == str(department_id)}")
# Try both approaches for finding users
try:
# Method 1: Direct query using department_id as ObjectId
logger.info(f"Querying database for users with department_id: {department_id}")
users = list(get_users_collection().find({"department_id": department_id}))
logger.info(f"Query 1 found {len(users)} users with department_id as ObjectId")
# If that doesn't work, try string comparison as a fallback
if not users and isinstance(department_id, ObjectId):
dept_id_str = str(department_id)
logger.info(f"Trying fallback query with string department_id: {dept_id_str}")
# Filter manually for string comparison
string_matches = [
User.from_dict(user) for user in all_users
if user.get('department_id') is not None and str(user.get('department_id')) == dept_id_str
]
logger.info(f"String comparison found {len(string_matches)} matching users")
if string_matches:
logger.info("Using string comparison results instead")
return string_matches
return [User.from_dict(user) for user in users]
except Exception as e:
logger.error(f"Error finding users by department: {e}")
return []
@classmethod
def get_all(cls):
"""Get all users"""
users_collection = get_users_collection()
users_data = users_collection.find()
return [cls.from_dict(user_data) for user_data in users_data]
def delete(self):
"""Delete user from database"""
if not self._id:
return False
users_collection = get_users_collection()
result = users_collection.delete_one({"_id": ObjectId(self._id)})
return result.deleted_count > 0
def add_log(self, log_id):
"""Add a log to user's logs"""
if log_id not in self.logs:
self.logs.append(ObjectId(log_id))
return self.save()
return True
def add_incident(self, incident_id):
"""Add an incident to user's incidents"""
if incident_id not in self.incidents:
self.incidents.append(ObjectId(incident_id))
return self.save()
return True |