Spaces:
Running
Running
| #!/usr/bin/env python | |
| """ | |
| Add new test users to JSON database (Development Only) | |
| This script creates test users from environment variables in .env file. | |
| Passwords must be configured in .env for security. | |
| Usage: | |
| python add_users.py | |
| Environment Variables Required: | |
| - TEST_USER1_USERNAME: First test user username | |
| - TEST_USER1_PASSWORD: First test user password | |
| - TEST_USER2_USERNAME: Second test user username | |
| - TEST_USER2_PASSWORD: Second test user password | |
| Example .env: | |
| TEST_USER1_USERNAME=oviya | |
| TEST_USER1_PASSWORD=your-secure-password | |
| TEST_USER2_USERNAME=testing | |
| TEST_USER2_PASSWORD=your-secure-password | |
| """ | |
| import json | |
| import os | |
| import sys | |
| import logging | |
| import bcrypt | |
| from datetime import datetime | |
| from typing import Dict, List, Tuple, Optional | |
| # Load environment variables from .env file | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # Add project to path | |
| sys.path.insert(0, os.path.dirname(__file__)) | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Database paths | |
| USERS_FILE = os.path.join(os.path.dirname(__file__), 'data', 'users.json') | |
| def hash_password(password: str) -> str: | |
| """ | |
| Hash password using bcrypt | |
| Args: | |
| password: Plain text password to hash | |
| Returns: | |
| Hashed password string | |
| """ | |
| return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') | |
| def validate_username(username: str) -> bool: | |
| """ | |
| Validate username format | |
| Args: | |
| username: Username to validate | |
| Returns: | |
| True if valid, raises ValueError otherwise | |
| Raises: | |
| ValueError: If username doesn't meet requirements | |
| """ | |
| if not username: | |
| raise ValueError("Username cannot be empty") | |
| if len(username) < 3: | |
| raise ValueError("Username must be at least 3 characters long") | |
| if len(username) > 50: | |
| raise ValueError("Username must not exceed 50 characters") | |
| if not username.replace('_', '').isalnum(): | |
| raise ValueError("Username must contain only letters, numbers, and underscores") | |
| return True | |
| def validate_password(password: str) -> bool: | |
| """ | |
| Validate password requirements | |
| Args: | |
| password: Password to validate | |
| Returns: | |
| True if valid, raises ValueError otherwise | |
| Raises: | |
| ValueError: If password doesn't meet requirements | |
| """ | |
| if not password: | |
| raise ValueError("Password cannot be empty") | |
| if len(password) < 8: | |
| raise ValueError("Password must be at least 8 characters long") | |
| if len(password) > 128: | |
| raise ValueError("Password must not exceed 128 characters") | |
| return True | |
| def load_test_users_config() -> List[Dict[str, str]]: | |
| """ | |
| Load test users from environment variables | |
| Returns: | |
| List of user configuration dictionaries | |
| Raises: | |
| ValueError: If required environment variables are missing | |
| """ | |
| user1_username = os.getenv("TEST_USER1_USERNAME") | |
| user1_password = os.getenv("TEST_USER1_PASSWORD") | |
| user2_username = os.getenv("TEST_USER2_USERNAME") | |
| user2_password = os.getenv("TEST_USER2_PASSWORD") | |
| if not user1_password: | |
| raise ValueError("TEST_USER1_PASSWORD not set in .env file") | |
| if not user2_password: | |
| raise ValueError("TEST_USER2_PASSWORD not set in .env file") | |
| return [ | |
| { | |
| "username": user1_username or "oviya", | |
| "password": user1_password, | |
| }, | |
| { | |
| "username": user2_username or "testing", | |
| "password": user2_password, | |
| } | |
| ] | |
| def user_already_exists(users: Dict, username: str) -> bool: | |
| """ | |
| Check if user already exists in database | |
| Args: | |
| users: Dictionary of existing users | |
| username: Username to check | |
| Returns: | |
| True if user exists, False otherwise | |
| """ | |
| return username.lower() in users | |
| def create_user_record( | |
| username: str, | |
| password: str, | |
| next_id: int, | |
| role: str = 'user' | |
| ) -> Tuple[Dict, int]: | |
| """ | |
| Create a user record dictionary | |
| Args: | |
| username: Username for the account | |
| password: Plain text password to hash | |
| next_id: User ID number | |
| role: User role (default: 'user') | |
| Returns: | |
| Tuple of (user_record_dict, next_id_incremented) | |
| Raises: | |
| ValueError: If validation fails | |
| """ | |
| # Validate inputs | |
| validate_username(username) | |
| validate_password(password) | |
| # Create record | |
| user_record = { | |
| 'id': next_id, | |
| 'username': username.lower(), | |
| 'password_hash': hash_password(password), | |
| 'role': role, | |
| 'created_at': datetime.now().isoformat() | |
| } | |
| return user_record, next_id + 1 | |
| def load_users() -> Dict: | |
| """ | |
| Load users from JSON file | |
| Returns: | |
| Dictionary of users, empty dict if file doesn't exist | |
| Raises: | |
| json.JSONDecodeError: If JSON is invalid | |
| IOError: If file cannot be read | |
| """ | |
| try: | |
| with open(USERS_FILE, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| except FileNotFoundError: | |
| logger.warning(f"Users file not found: {USERS_FILE}") | |
| return {} | |
| except json.JSONDecodeError as e: | |
| raise ValueError(f"Invalid JSON in {USERS_FILE}: {e}") | |
| def save_users(users: Dict) -> bool: | |
| """ | |
| Save users to JSON file | |
| Args: | |
| users: Dictionary of users to save | |
| Returns: | |
| True on success | |
| Raises: | |
| IOError: If file cannot be written | |
| """ | |
| try: | |
| os.makedirs(os.path.dirname(USERS_FILE), exist_ok=True) | |
| with open(USERS_FILE, 'w', encoding='utf-8') as f: | |
| json.dump(users, f, indent=2, ensure_ascii=False) | |
| return True | |
| except IOError as e: | |
| raise IOError(f"Failed to save users to {USERS_FILE}: {e}") | |
| def add_users() -> int: | |
| """ | |
| Add new test users to the database | |
| Returns: | |
| 0 on success, 1 on failure | |
| """ | |
| try: | |
| logger.info("Starting user creation process...") | |
| # Load test user configuration | |
| test_users_config = load_test_users_config() | |
| logger.info(f"Loaded configuration for {len(test_users_config)} test users") | |
| # Load existing users | |
| users = load_users() | |
| logger.info(f"Loaded existing users: {list(users.keys())}") | |
| # Get next ID | |
| next_id = max([user.get('id', 0) for user in users.values()]) + 1 if users else 1 | |
| logger.info(f"Next user ID will be: {next_id}") | |
| # Track created users for rollback capability | |
| created_users = [] | |
| # Create users from configuration | |
| for idx, user_config in enumerate(test_users_config, 1): | |
| username = user_config['username'] | |
| password = user_config['password'] | |
| try: | |
| # Check if user already exists | |
| if user_already_exists(users, username): | |
| logger.warning(f"User '{username}' already exists, skipping...") | |
| continue | |
| # Create user record | |
| user_record, next_id = create_user_record(username, password, next_id) | |
| # Add to users dictionary | |
| users[username.lower()] = user_record | |
| created_users.append(username) | |
| # Log success (without showing password) | |
| logger.info(f"? Created user {idx}: {username} (ID: {user_record['id']})") | |
| except ValueError as e: | |
| logger.error(f"? Failed to create user '{username}': {e}") | |
| # Continue with next user instead of failing completely | |
| continue | |
| except Exception as e: | |
| logger.error(f"? Unexpected error creating user '{username}': {e}") | |
| continue | |
| # Check if any users were created | |
| if not created_users: | |
| logger.warning("No new users were created (all may already exist)") | |
| return 0 | |
| # Save users to file | |
| try: | |
| save_users(users) | |
| logger.info(f"? Successfully saved {len(users)} total users to {USERS_FILE}") | |
| except IOError as e: | |
| logger.error(f"? Failed to save users: {e}") | |
| # Attempt to restore from backup or indicate rollback needed | |
| raise | |
| # Print summary | |
| logger.info("\n" + "="*60) | |
| logger.info("? User creation completed successfully!") | |
| logger.info("="*60) | |
| logger.info(f"Created users: {', '.join(created_users)}") | |
| logger.info(f"Total users in database: {len(users)}") | |
| logger.info(f"All users: {', '.join(sorted(users.keys()))}") | |
| return 0 | |
| except ValueError as e: | |
| logger.error(f"Configuration error: {e}") | |
| logger.error("Please check your .env file and ensure TEST_USER passwords are set") | |
| return 1 | |
| except IOError as e: | |
| logger.error(f"File I/O error: {e}") | |
| return 1 | |
| except Exception as e: | |
| logger.exception(f"Unexpected error: {e}") | |
| return 1 | |
| if __name__ == '__main__': | |
| try: | |
| exit_code = add_users() | |
| sys.exit(exit_code) | |
| except KeyboardInterrupt: | |
| logger.info("\nOperation cancelled by user") | |
| sys.exit(1) | |
| except Exception as e: | |
| logger.exception(f"Fatal error: {e}") | |
| sys.exit(1) | |