Spaces:
Sleeping
Sleeping
| from flask import jsonify, request | |
| from models.department import Department | |
| from models.user import User | |
| import csv | |
| import io | |
| import bcrypt | |
| import random | |
| import string | |
| import logging | |
| # Configure logging | |
| logger = logging.getLogger(__name__) | |
| def generate_random_password(length=12): | |
| """Generate a random password""" | |
| characters = string.ascii_letters + string.digits | |
| return ''.join(random.choice(characters) for _ in range(length)) | |
| def create_department(): | |
| """Create a new department with its first admin user""" | |
| data = request.get_json() | |
| # Check if required fields are present | |
| required_fields = ['name', 'address', 'website', 'admin_email', 'admin_name', 'admin_password'] | |
| for field in required_fields: | |
| if field not in data: | |
| return jsonify({'message': f'Missing required field: {field}'}), 400 | |
| # Check if department with this name already exists | |
| existing_department = Department.find_by_name(data['name']) | |
| if existing_department: | |
| return jsonify({'message': 'Department with this name already exists'}), 400 | |
| # Check if user with this email already exists | |
| existing_user = User.find_by_email(data['admin_email']) | |
| if existing_user: | |
| return jsonify({'message': 'User with this email already exists'}), 400 | |
| try: | |
| # Log the request data for debugging | |
| logger.info(f"Creating new department with data: {data}") | |
| # Create department | |
| department = Department( | |
| name=data['name'], | |
| address=data['address'], | |
| website=data['website'] | |
| ) | |
| # Save department and log the result | |
| save_result = department.save() | |
| logger.info(f"Department saved with _id: {department._id}, save result: {save_result}") | |
| # Create admin user | |
| hashed_password = User.hash_password(data['admin_password']) | |
| admin_user = User( | |
| email=data['admin_email'], | |
| name=data['admin_name'], | |
| password=hashed_password, | |
| permissions='Admin', | |
| position=data.get('admin_position', 'Administrator'), | |
| department_id=department._id | |
| ) | |
| # Save user and log the result | |
| user_save_result = admin_user.save() | |
| logger.info(f"Admin user saved with _id: {admin_user._id}, save result: {user_save_result}") | |
| # Add admin to department members and log the result | |
| member_add_result = department.add_member(admin_user._id) | |
| logger.info(f"Adding admin user to department members, result: {member_add_result}") | |
| # Check if the member was added correctly | |
| updated_department = Department.find_by_id(department._id) | |
| logger.info(f"Updated department members: {updated_department.members if updated_department else 'Department not found'}") | |
| # Return success response with detailed information | |
| return jsonify({ | |
| 'message': 'Department and admin user created successfully', | |
| 'department': { | |
| **department.to_dict(), | |
| 'members_debug': [str(member) for member in department.members] | |
| }, | |
| 'admin_user': admin_user.to_dict(), | |
| 'success': True | |
| }), 201 | |
| except Exception as e: | |
| import traceback | |
| logger.error(f"Error creating department: {str(e)}") | |
| logger.error(f"Traceback: {traceback.format_exc()}") | |
| return jsonify({'message': f'Error creating department: {str(e)}'}), 500 | |
| def get_department(department_id): | |
| """Get department by ID""" | |
| department = Department.find_by_id(department_id) | |
| if not department: | |
| return jsonify({'message': 'Department not found'}), 404 | |
| return jsonify({'department': department.to_dict()}), 200 | |
| def update_department(department_id): | |
| """Update department details""" | |
| department = Department.find_by_id(department_id) | |
| if not department: | |
| return jsonify({'message': 'Department not found'}), 404 | |
| data = request.get_json() | |
| # Update fields if provided | |
| if 'name' in data: | |
| department.name = data['name'] | |
| if 'address' in data: | |
| department.address = data['address'] | |
| if 'website' in data: | |
| department.website = data['website'] | |
| # Save changes | |
| if department.save(): | |
| return jsonify({'message': 'Department updated successfully', 'department': department.to_dict()}), 200 | |
| else: | |
| return jsonify({'message': 'Failed to update department'}), 500 | |
| def delete_department(department_id): | |
| """Delete a department""" | |
| department = Department.find_by_id(department_id) | |
| if not department: | |
| return jsonify({'message': 'Department not found'}), 404 | |
| # Delete all users in the department | |
| users = User.find_by_department(department_id) | |
| for user in users: | |
| user.delete() | |
| # Delete the department | |
| if department.delete(): | |
| return jsonify({'message': 'Department and all its users deleted successfully'}), 200 | |
| else: | |
| return jsonify({'message': 'Failed to delete department'}), 500 | |
| def get_all_departments(): | |
| """Get all departments""" | |
| departments = Department.get_all() | |
| return jsonify({'departments': [department.to_dict() for department in departments]}), 200 | |
| def add_members_csv(department_id): | |
| """Add multiple members to a department using CSV upload""" | |
| department = Department.find_by_id(department_id) | |
| if not department: | |
| return jsonify({'message': 'Department not found'}), 404 | |
| if 'file' not in request.files: | |
| return jsonify({'message': 'No file part'}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({'message': 'No selected file'}), 400 | |
| if file and file.filename.endswith('.csv'): | |
| try: | |
| # Read CSV data | |
| csv_data = file.read().decode('utf-8') | |
| csv_reader = csv.DictReader(io.StringIO(csv_data)) | |
| # Process each row | |
| new_users = [] | |
| errors = [] | |
| required_fields = ['email', 'name', 'position', 'permissions'] | |
| for row_index, row in enumerate(csv_reader, start=1): | |
| # Check if all required fields are present | |
| missing_fields = [field for field in required_fields if field not in row or not row[field]] | |
| if missing_fields: | |
| errors.append(f"Row {row_index}: Missing required fields: {', '.join(missing_fields)}") | |
| continue | |
| # Check if user already exists | |
| if User.find_by_email(row['email']): | |
| errors.append(f"Row {row_index}: User with email {row['email']} already exists") | |
| continue | |
| # Check if permissions are valid | |
| if row['permissions'] not in ['Admin', 'User']: | |
| row['permissions'] = 'User' # Default to User if invalid | |
| # Generate random password | |
| password = generate_random_password() | |
| hashed_password = User.hash_password(password) | |
| # Create new user | |
| user = User( | |
| email=row['email'], | |
| name=row['name'], | |
| password=hashed_password, | |
| permissions=row['permissions'], | |
| position=row['position'], | |
| department_id=department._id | |
| ) | |
| if user.save(): | |
| # Add user to department | |
| department.add_member(user._id) | |
| # Store for response (including the raw password) | |
| user_dict = user.to_dict() | |
| user_dict['raw_password'] = password | |
| new_users.append(user_dict) | |
| else: | |
| errors.append(f"Row {row_index}: Failed to save user {row['email']}") | |
| # Return results | |
| return jsonify({ | |
| 'message': f"Processed {len(new_users)} new users with {len(errors)} errors", | |
| 'new_users': new_users, | |
| 'errors': errors | |
| }), 200 | |
| except Exception as e: | |
| logger.error(f"Error processing CSV: {str(e)}") | |
| return jsonify({'message': f'Error processing CSV: {str(e)}'}), 500 | |
| else: | |
| return jsonify({'message': 'File must be a CSV'}), 400 | |
| def add_member(department_id, current_user=None): | |
| """Add a single member to a department""" | |
| department = Department.find_by_id(department_id) | |
| if not department: | |
| return jsonify({'message': 'Department not found'}), 404 | |
| data = request.get_json() | |
| # Check if required fields are present | |
| required_fields = ['email', 'name', 'position', 'permissions'] | |
| for field in required_fields: | |
| if field not in data: | |
| return jsonify({'message': f'Missing required field: {field}'}), 400 | |
| # Check if user already exists | |
| if User.find_by_email(data['email']): | |
| return jsonify({'message': 'User with this email already exists'}), 400 | |
| # Check if permissions are valid | |
| if data['permissions'] not in ['Admin', 'User']: | |
| data['permissions'] = 'User' # Default to User if invalid | |
| try: | |
| # Generate random password if not provided | |
| password = data.get('password', generate_random_password()) | |
| hashed_password = User.hash_password(password) | |
| # Create new user | |
| user = User( | |
| email=data['email'], | |
| name=data['name'], | |
| password=hashed_password, | |
| permissions=data['permissions'], | |
| position=data['position'], | |
| department_id=department._id | |
| ) | |
| if user.save(): | |
| # Add user to department | |
| department.add_member(user._id) | |
| # Return the raw password in the response | |
| user_dict = user.to_dict() | |
| user_dict['raw_password'] = password | |
| return jsonify({ | |
| 'message': 'User added successfully', | |
| 'user': user_dict | |
| }), 201 | |
| else: | |
| return jsonify({'message': 'Failed to save user'}), 500 | |
| except Exception as e: | |
| logger.error(f"Error adding member: {str(e)}") | |
| return jsonify({'message': f'Error adding member: {str(e)}'}), 500 | |
| def get_department_members(department_id, current_user=None): | |
| """Get all members of a department""" | |
| # Log input parameters | |
| logger.info(f"get_department_members called with department_id: {department_id}") | |
| if current_user: | |
| logger.info(f"Current user: {current_user._id}, {current_user.email}, Department: {current_user.department_id}") | |
| # Find department by ID | |
| department = Department.find_by_id(department_id) | |
| if not department: | |
| logger.error(f"Department with ID {department_id} not found") | |
| return jsonify({'message': 'Department not found'}), 404 | |
| # Log department info | |
| logger.info(f"Found department: {department.name}, ID: {department._id}") | |
| logger.info(f"Department members list (raw): {department.members}") | |
| logger.info(f"Department has {len(department.members)} members in its members array") | |
| # Find users by department | |
| logger.info(f"Calling User.find_by_department with department_id: {department_id}") | |
| users = User.find_by_department(department_id) | |
| # Log result | |
| logger.info(f"User.find_by_department returned {len(users)} users") | |
| for user in users: | |
| logger.info(f"Found user: {user._id}, {user.email}, Department: {user.department_id}") | |
| # Return response | |
| return jsonify({'members': [user.to_dict() for user in users]}), 200 | |
| def remove_member(department_id, user_id, current_user=None): | |
| """Remove a member from a department""" | |
| department = Department.find_by_id(department_id) | |
| if not department: | |
| return jsonify({'message': 'Department not found'}), 404 | |
| user = User.find_by_id(user_id) | |
| if not user: | |
| return jsonify({'message': 'User not found'}), 404 | |
| # Check if user belongs to the department | |
| if str(user.department_id) != str(department._id): | |
| return jsonify({'message': 'User does not belong to this department'}), 400 | |
| # Remove user from department | |
| if department.remove_member(user_id) and user.delete(): | |
| return jsonify({'message': 'User removed successfully'}), 200 | |
| else: | |
| return jsonify({'message': 'Failed to remove user'}), 500 | |
| def update_member_permissions(department_id, user_id, current_user=None): | |
| """Update a member's permissions""" | |
| department = Department.find_by_id(department_id) | |
| if not department: | |
| return jsonify({'message': 'Department not found'}), 404 | |
| user = User.find_by_id(user_id) | |
| if not user: | |
| return jsonify({'message': 'User not found'}), 404 | |
| # Check if user belongs to the department | |
| if str(user.department_id) != str(department._id): | |
| return jsonify({'message': 'User does not belong to this department'}), 400 | |
| data = request.get_json() | |
| # Check if permissions are provided | |
| if 'permissions' not in data: | |
| return jsonify({'message': 'Permissions not provided'}), 400 | |
| # Check if permissions are valid | |
| if data['permissions'] not in ['Admin', 'User']: | |
| return jsonify({'message': 'Invalid permissions. Must be "Admin" or "User"'}), 400 | |
| # Update permissions | |
| user.permissions = data['permissions'] | |
| if user.save(): | |
| return jsonify({'message': 'User permissions updated successfully', 'user': user.to_dict()}), 200 | |
| else: | |
| return jsonify({'message': 'Failed to update user permissions'}), 500 | |
| def debug_department(department_id, current_user=None): | |
| """Debug endpoint to get detailed information about a department and its members""" | |
| logger.info(f"debug_department called with department_id: {department_id}") | |
| if current_user: | |
| logger.info(f"Called by user: {current_user._id}, {current_user.email}, Permissions: {current_user.permissions}") | |
| # Find department directly from MongoDB | |
| from db import get_departments_collection, get_users_collection | |
| from bson import ObjectId | |
| dept_id_obj = ObjectId(department_id) if isinstance(department_id, str) else department_id | |
| logger.info(f"Looking up department with ObjectId: {dept_id_obj}") | |
| # Get raw department data | |
| departments_collection = get_departments_collection() | |
| department_data = departments_collection.find_one({"_id": dept_id_obj}) | |
| if not department_data: | |
| logger.error(f"Department with ID {department_id} not found in database") | |
| return jsonify({'message': 'Department not found'}), 404 | |
| # Get department object | |
| department = Department.find_by_id(department_id) | |
| # Get users collection | |
| users_collection = get_users_collection() | |
| # Find all users in database | |
| all_users = list(users_collection.find()) | |
| logger.info(f"Total users in database: {len(all_users)}") | |
| # Collect user info for all department members | |
| member_info = [] | |
| for member_id in department_data.get('members', []): | |
| # Try to find user by ID | |
| user_data = users_collection.find_one({"_id": member_id}) | |
| if user_data: | |
| member_info.append({ | |
| "user_id": str(member_id), | |
| "found_in_db": True, | |
| "email": user_data.get('email'), | |
| "name": user_data.get('name'), | |
| "department_id": str(user_data.get('department_id')) if user_data.get('department_id') else None | |
| }) | |
| else: | |
| member_info.append({ | |
| "user_id": str(member_id), | |
| "found_in_db": False | |
| }) | |
| # Find all users with this department_id | |
| users_with_dept = list(users_collection.find({"department_id": dept_id_obj})) | |
| logger.info(f"Found {len(users_with_dept)} users with department_id matching {dept_id_obj}") | |
| users_info = [] | |
| for user_data in users_with_dept: | |
| users_info.append({ | |
| "user_id": str(user_data.get('_id')), | |
| "email": user_data.get('email'), | |
| "name": user_data.get('name'), | |
| "in_members_array": any(str(user_data.get('_id')) == str(m) for m in department_data.get('members', [])) | |
| }) | |
| # Build response with detailed info | |
| response = { | |
| "department": { | |
| "_id": str(department_data.get('_id')), | |
| "name": department_data.get('name'), | |
| "raw_members": [str(m) for m in department_data.get('members', [])], | |
| "member_count": len(department_data.get('members', [])), | |
| "members_detail": member_info | |
| }, | |
| "users_with_department_id": { | |
| "count": len(users_with_dept), | |
| "users": users_info | |
| } | |
| } | |
| return jsonify(response), 200 |