Spaces:
Sleeping
Sleeping
| from flask import jsonify, request | |
| from models.user import User | |
| from models.department import Department | |
| from utils.auth import generate_token | |
| import logging | |
| # Configure logging | |
| logger = logging.getLogger(__name__) | |
| def create_user(): | |
| """Create a new user and update their department's members array""" | |
| data = request.get_json() | |
| # Check if required fields are present | |
| required_fields = ['email', 'name', 'password', 'position', 'permissions', 'department_id'] | |
| for field in required_fields: | |
| if field not in data: | |
| return jsonify({'message': f'Missing required field: {field}'}), 400 | |
| # Check if user with this email already exists | |
| existing_user = User.find_by_email(data['email']) | |
| if existing_user: | |
| return jsonify({'message': 'User with this email already exists'}), 400 | |
| # Check if permissions are valid | |
| if data['permissions'] not in ['Admin', 'User']: | |
| data['permissions'] = 'User' # Default to User if invalid | |
| # Check if department exists | |
| department = Department.find_by_id(data['department_id']) | |
| if not department: | |
| return jsonify({'message': 'Department not found'}), 404 | |
| try: | |
| # Hash password | |
| hashed_password = User.hash_password(data['password']) | |
| # Create new user | |
| user = User( | |
| email=data['email'], | |
| name=data['name'], | |
| password=hashed_password, | |
| permissions=data['permissions'], | |
| position=data['position'], | |
| department_id=department._id | |
| ) | |
| if user.save(): | |
| # Add user to department members | |
| department.add_member(user._id) | |
| return jsonify({ | |
| 'message': 'User created successfully', | |
| 'user': user.to_dict() | |
| }), 201 | |
| else: | |
| return jsonify({'message': 'Failed to save user'}), 500 | |
| except Exception as e: | |
| logger.error(f"Error creating user: {str(e)}") | |
| return jsonify({'message': f'Error creating user: {str(e)}'}), 500 | |
| def create_users_bulk(): | |
| """Create multiple users in bulk and update their departments""" | |
| data = request.get_json() | |
| if not isinstance(data, list): | |
| return jsonify({'message': 'Request body must be an array of users'}), 400 | |
| created_users = [] | |
| errors = [] | |
| for i, user_data in enumerate(data): | |
| # Check if required fields are present | |
| required_fields = ['email', 'name', 'password', 'position', 'permissions', 'department_id'] | |
| missing_fields = [field for field in required_fields if field not in user_data] | |
| if missing_fields: | |
| errors.append(f"User {i+1}: Missing required fields: {', '.join(missing_fields)}") | |
| continue | |
| # Check if user already exists | |
| if User.find_by_email(user_data['email']): | |
| errors.append(f"User {i+1}: User with email {user_data['email']} already exists") | |
| continue | |
| # Check if department exists | |
| department = Department.find_by_id(user_data['department_id']) | |
| if not department: | |
| errors.append(f"User {i+1}: Department not found") | |
| continue | |
| try: | |
| # Hash password | |
| hashed_password = User.hash_password(user_data['password']) | |
| # Create new user | |
| user = User( | |
| email=user_data['email'], | |
| name=user_data['name'], | |
| password=hashed_password, | |
| permissions=user_data['permissions'], | |
| position=user_data['position'], | |
| department_id=department._id | |
| ) | |
| if user.save(): | |
| # Add user to department members | |
| department.add_member(user._id) | |
| created_users.append(user.to_dict()) | |
| else: | |
| errors.append(f"User {i+1}: Failed to save user {user_data['email']}") | |
| except Exception as e: | |
| logger.error(f"Error creating user {user_data['email']}: {str(e)}") | |
| errors.append(f"User {i+1}: Error creating user: {str(e)}") | |
| return jsonify({ | |
| 'message': f'Created {len(created_users)} users with {len(errors)} errors', | |
| 'users': created_users, | |
| 'errors': errors | |
| }), 201 if created_users else 400 | |
| def get_all_users(current_user=None): | |
| """Get all users""" | |
| logger.info(f"get_all_users called") | |
| if current_user: | |
| logger.info(f"Called by user: {current_user._id}, {current_user.email}, Permissions: {current_user.permissions}") | |
| users = User.get_all() | |
| logger.info(f"Found {len(users)} users in the database") | |
| return jsonify({'users': [user.to_dict() for user in users]}), 200 | |
| def get_user(user_id): | |
| """Get a specific user by ID""" | |
| user = User.find_by_id(user_id) | |
| if not user: | |
| return jsonify({'message': 'User not found'}), 404 | |
| return jsonify({'user': user.to_dict()}), 200 | |
| def update_user(user_id): | |
| """Update a user's information""" | |
| user = User.find_by_id(user_id) | |
| if not user: | |
| return jsonify({'message': 'User not found'}), 404 | |
| data = request.get_json() | |
| # Update fields if provided | |
| if 'name' in data: | |
| user.name = data['name'] | |
| if 'position' in data: | |
| user.position = data['position'] | |
| if 'permissions' in data and data['permissions'] in ['Admin', 'User']: | |
| user.permissions = data['permissions'] | |
| # For department_id changes, we need to update both the user and the departments | |
| if 'department_id' in data and str(data['department_id']) != str(user.department_id): | |
| # Get old and new departments | |
| old_department = Department.find_by_id(user.department_id) | |
| new_department = Department.find_by_id(data['department_id']) | |
| if not new_department: | |
| return jsonify({'message': 'New department not found'}), 404 | |
| # Remove user from old department | |
| if old_department: | |
| old_department.remove_member(user._id) | |
| # Update user's department | |
| user.department_id = new_department._id | |
| # Add user to new department | |
| new_department.add_member(user._id) | |
| if user.save(): | |
| return jsonify({ | |
| 'message': 'User updated successfully', | |
| 'user': user.to_dict() | |
| }), 200 | |
| else: | |
| return jsonify({'message': 'Failed to update user'}), 500 | |
| def delete_user(user_id): | |
| """Delete a user and remove them from their department's members array""" | |
| user = User.find_by_id(user_id) | |
| if not user: | |
| return jsonify({'message': 'User not found'}), 404 | |
| # Remove user from department | |
| department = Department.find_by_id(user.department_id) | |
| if department: | |
| department.remove_member(user._id) | |
| # Delete user | |
| if user.delete(): | |
| return jsonify({'message': 'User deleted successfully'}), 200 | |
| else: | |
| return jsonify({'message': 'Failed to delete user'}), 500 |