enflow-api / models /user.py
dhruv575
greedy
bacb197
import bcrypt
from bson import ObjectId
from datetime import datetime
from db import get_users_collection
import logging
class User:
def __init__(self, email, name, password=None, permissions="User", position="Officer",
department_id=None, logs=None, incidents=None, _id=None, created_at=None, updated_at=None):
self.email = email
self.name = name
self.password = password
self.permissions = permissions # "Admin" or "User"
self.position = position
self.department_id = department_id
self.logs = logs or []
self.incidents = incidents or []
self._id = _id
self.created_at = created_at or datetime.now()
self.updated_at = updated_at or datetime.now()
@staticmethod
def hash_password(password):
"""Hash a password for storing."""
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
@staticmethod
def verify_password(stored_password, provided_password):
"""Verify a stored password against one provided by user"""
return bcrypt.checkpw(provided_password.encode('utf-8'), stored_password.encode('utf-8'))
def to_dict(self):
"""Convert instance to dictionary (excluding password)"""
user_dict = {
"email": self.email,
"name": self.name,
"permissions": self.permissions,
"position": self.position,
"department_id": str(self.department_id) if self.department_id else None,
"logs": [str(log_id) for log_id in self.logs],
"incidents": [str(incident_id) for incident_id in self.incidents],
"created_at": self.created_at,
"updated_at": self.updated_at
}
if self._id:
user_dict["_id"] = str(self._id)
return user_dict
@classmethod
def from_dict(cls, user_dict):
"""Create instance from dictionary"""
logger = logging.getLogger(__name__)
# Log the input dictionary for debugging
logger.info(f"from_dict called with: {user_dict.get('_id')}, email: {user_dict.get('email')}")
# Handle _id
if "_id" in user_dict and user_dict["_id"]:
user_dict["_id"] = ObjectId(user_dict["_id"]) if isinstance(user_dict["_id"], str) else user_dict["_id"]
# Handle department_id
if "department_id" in user_dict and user_dict["department_id"]:
# Log the department_id before conversion
logger.info(f"Processing department_id: {user_dict['department_id']}, type: {type(user_dict['department_id'])}")
try:
# Handle department_id whether it's a string or an ObjectId
if isinstance(user_dict["department_id"], str):
user_dict["department_id"] = ObjectId(user_dict["department_id"])
logger.info(f"Converted string department_id to ObjectId: {user_dict['department_id']}")
# No conversion needed if already an ObjectId
if not isinstance(user_dict["department_id"], ObjectId):
logger.warning(f"department_id is not an ObjectId after conversion: {user_dict['department_id']}, type: {type(user_dict['department_id'])}")
except Exception as e:
logger.error(f"Error converting department_id: {e}")
# Convert string IDs to ObjectIds for logs and incidents
if "logs" in user_dict and user_dict["logs"]:
user_dict["logs"] = [ObjectId(log_id) if isinstance(log_id, str) else log_id for log_id in user_dict["logs"]]
if "incidents" in user_dict and user_dict["incidents"]:
user_dict["incidents"] = [ObjectId(incident_id) if isinstance(incident_id, str) else incident_id for incident_id in user_dict["incidents"]]
return cls(**user_dict)
def save(self):
"""Save user to database"""
users_collection = get_users_collection()
user_dict = self.to_dict()
# Remove password from the dictionary for safety before saving
# Password should only be updated via set_password method
if 'password' in user_dict:
del user_dict['password']
if self._id:
# Update existing user
user_dict["updated_at"] = datetime.now()
# --- Remove _id before updating ---
if "_id" in user_dict:
del user_dict["_id"]
# --- End Remove _id ---
result = users_collection.update_one(
{"_id": ObjectId(self._id)},
{"$set": user_dict}
)
success = result.modified_count > 0
else:
# Insert new user - Requires password which was deleted above!
# We need the hashed password for insertion.
# Let's re-add the hashed password if it exists on the object
if self.password:
user_dict['password'] = self.password
else:
# This should ideally not happen if creating a new user
# Maybe throw an error or log a warning?
logger.warning(f"Attempting to insert new user {self.email} without a password hash.")
# Handle appropriately - perhaps prevent save or set a placeholder?
# For now, let it proceed but log it.
user_dict["created_at"] = datetime.now()
user_dict["updated_at"] = datetime.now()
result = users_collection.insert_one(user_dict)
self._id = result.inserted_id
success = result.acknowledged
# Add user to department if insert was successful
if success and self.department_id:
department = Department.find_by_id(self.department_id)
if department:
department.add_member(self._id)
return success
@classmethod
def find_by_id(cls, user_id):
"""Find user by ID"""
users_collection = get_users_collection()
user_data = users_collection.find_one({"_id": ObjectId(user_id)})
if user_data:
return cls.from_dict(user_data)
return None
@classmethod
def find_by_email(cls, email):
"""Find user by email"""
users_collection = get_users_collection()
user_data = users_collection.find_one({"email": email})
if user_data:
return cls.from_dict(user_data)
return None
@classmethod
def find_by_department(cls, department_id):
"""Find users by department"""
logger = logging.getLogger(__name__)
logger.info(f"Finding users by department: {department_id}, type: {type(department_id)}")
# Ensure department_id is an ObjectId
if isinstance(department_id, str):
try:
department_id = ObjectId(department_id)
logger.info(f"Converted department_id to ObjectId: {department_id}")
except Exception as e:
logger.error(f"Error converting department_id to ObjectId: {e}")
return []
# Check if department_id is still not an ObjectId
if not isinstance(department_id, ObjectId):
logger.warning(f"department_id is not an ObjectId: {department_id}, type: {type(department_id)}")
# Get all users to help with debugging
all_users = list(get_users_collection().find())
logger.info(f"Total users in database: {len(all_users)}")
# Check each user's department_id for debugging
for user in all_users:
user_dept_id = user.get('department_id')
logger.info(f"User {user.get('email')} has department_id: {user_dept_id}, type: {type(user_dept_id)}")
# Log string comparison for debugging
if isinstance(user_dept_id, ObjectId) and isinstance(department_id, ObjectId):
logger.info(f"Comparing ObjectIds: {user_dept_id} == {department_id} result: {user_dept_id == department_id}")
elif isinstance(user_dept_id, str) and isinstance(department_id, str):
logger.info(f"Comparing strings: {user_dept_id} == {department_id} result: {user_dept_id == department_id}")
elif user_dept_id is not None and department_id is not None:
logger.info(f"Comparing mixed types: {str(user_dept_id)} == {str(department_id)} result: {str(user_dept_id) == str(department_id)}")
# Try both approaches for finding users
try:
# Method 1: Direct query using department_id as ObjectId
logger.info(f"Querying database for users with department_id: {department_id}")
users = list(get_users_collection().find({"department_id": department_id}))
logger.info(f"Query 1 found {len(users)} users with department_id as ObjectId")
# If that doesn't work, try string comparison as a fallback
if not users and isinstance(department_id, ObjectId):
dept_id_str = str(department_id)
logger.info(f"Trying fallback query with string department_id: {dept_id_str}")
# Filter manually for string comparison
string_matches = [
User.from_dict(user) for user in all_users
if user.get('department_id') is not None and str(user.get('department_id')) == dept_id_str
]
logger.info(f"String comparison found {len(string_matches)} matching users")
if string_matches:
logger.info("Using string comparison results instead")
return string_matches
return [User.from_dict(user) for user in users]
except Exception as e:
logger.error(f"Error finding users by department: {e}")
return []
@classmethod
def get_all(cls):
"""Get all users"""
users_collection = get_users_collection()
users_data = users_collection.find()
return [cls.from_dict(user_data) for user_data in users_data]
def delete(self):
"""Delete user from database"""
if not self._id:
return False
users_collection = get_users_collection()
result = users_collection.delete_one({"_id": ObjectId(self._id)})
return result.deleted_count > 0
def add_log(self, log_id):
"""Add a log to user's logs"""
if log_id not in self.logs:
self.logs.append(ObjectId(log_id))
return self.save()
return True
def add_incident(self, incident_id):
"""Add an incident to user's incidents"""
if incident_id not in self.incidents:
self.incidents.append(ObjectId(incident_id))
return self.save()
return True