#!/usr/bin/env python """ Add new test users to JSON database (Development Only) This script creates test users from environment variables in .env file. Passwords must be configured in .env for security. Usage: python add_users.py Environment Variables Required: - TEST_USER1_USERNAME: First test user username - TEST_USER1_PASSWORD: First test user password - TEST_USER2_USERNAME: Second test user username - TEST_USER2_PASSWORD: Second test user password Example .env: TEST_USER1_USERNAME=oviya TEST_USER1_PASSWORD=your-secure-password TEST_USER2_USERNAME=testing TEST_USER2_PASSWORD=your-secure-password """ import json import os import sys import logging import bcrypt from datetime import datetime from typing import Dict, List, Tuple, Optional # Load environment variables from .env file from dotenv import load_dotenv load_dotenv() # Add project to path sys.path.insert(0, os.path.dirname(__file__)) # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Database paths USERS_FILE = os.path.join(os.path.dirname(__file__), 'data', 'users.json') def hash_password(password: str) -> str: """ Hash password using bcrypt Args: password: Plain text password to hash Returns: Hashed password string """ return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') def validate_username(username: str) -> bool: """ Validate username format Args: username: Username to validate Returns: True if valid, raises ValueError otherwise Raises: ValueError: If username doesn't meet requirements """ if not username: raise ValueError("Username cannot be empty") if len(username) < 3: raise ValueError("Username must be at least 3 characters long") if len(username) > 50: raise ValueError("Username must not exceed 50 characters") if not username.replace('_', '').isalnum(): raise ValueError("Username must contain only letters, numbers, and underscores") return True def validate_password(password: str) -> bool: """ Validate password requirements Args: password: Password to validate Returns: True if valid, raises ValueError otherwise Raises: ValueError: If password doesn't meet requirements """ if not password: raise ValueError("Password cannot be empty") if len(password) < 8: raise ValueError("Password must be at least 8 characters long") if len(password) > 128: raise ValueError("Password must not exceed 128 characters") return True def load_test_users_config() -> List[Dict[str, str]]: """ Load test users from environment variables Returns: List of user configuration dictionaries Raises: ValueError: If required environment variables are missing """ user1_username = os.getenv("TEST_USER1_USERNAME") user1_password = os.getenv("TEST_USER1_PASSWORD") user2_username = os.getenv("TEST_USER2_USERNAME") user2_password = os.getenv("TEST_USER2_PASSWORD") if not user1_password: raise ValueError("TEST_USER1_PASSWORD not set in .env file") if not user2_password: raise ValueError("TEST_USER2_PASSWORD not set in .env file") return [ { "username": user1_username or "oviya", "password": user1_password, }, { "username": user2_username or "testing", "password": user2_password, } ] def user_already_exists(users: Dict, username: str) -> bool: """ Check if user already exists in database Args: users: Dictionary of existing users username: Username to check Returns: True if user exists, False otherwise """ return username.lower() in users def create_user_record( username: str, password: str, next_id: int, role: str = 'user' ) -> Tuple[Dict, int]: """ Create a user record dictionary Args: username: Username for the account password: Plain text password to hash next_id: User ID number role: User role (default: 'user') Returns: Tuple of (user_record_dict, next_id_incremented) Raises: ValueError: If validation fails """ # Validate inputs validate_username(username) validate_password(password) # Create record user_record = { 'id': next_id, 'username': username.lower(), 'password_hash': hash_password(password), 'role': role, 'created_at': datetime.now().isoformat() } return user_record, next_id + 1 def load_users() -> Dict: """ Load users from JSON file Returns: Dictionary of users, empty dict if file doesn't exist Raises: json.JSONDecodeError: If JSON is invalid IOError: If file cannot be read """ try: with open(USERS_FILE, 'r', encoding='utf-8') as f: return json.load(f) except FileNotFoundError: logger.warning(f"Users file not found: {USERS_FILE}") return {} except json.JSONDecodeError as e: raise ValueError(f"Invalid JSON in {USERS_FILE}: {e}") def save_users(users: Dict) -> bool: """ Save users to JSON file Args: users: Dictionary of users to save Returns: True on success Raises: IOError: If file cannot be written """ try: os.makedirs(os.path.dirname(USERS_FILE), exist_ok=True) with open(USERS_FILE, 'w', encoding='utf-8') as f: json.dump(users, f, indent=2, ensure_ascii=False) return True except IOError as e: raise IOError(f"Failed to save users to {USERS_FILE}: {e}") def add_users() -> int: """ Add new test users to the database Returns: 0 on success, 1 on failure """ try: logger.info("Starting user creation process...") # Load test user configuration test_users_config = load_test_users_config() logger.info(f"Loaded configuration for {len(test_users_config)} test users") # Load existing users users = load_users() logger.info(f"Loaded existing users: {list(users.keys())}") # Get next ID next_id = max([user.get('id', 0) for user in users.values()]) + 1 if users else 1 logger.info(f"Next user ID will be: {next_id}") # Track created users for rollback capability created_users = [] # Create users from configuration for idx, user_config in enumerate(test_users_config, 1): username = user_config['username'] password = user_config['password'] try: # Check if user already exists if user_already_exists(users, username): logger.warning(f"User '{username}' already exists, skipping...") continue # Create user record user_record, next_id = create_user_record(username, password, next_id) # Add to users dictionary users[username.lower()] = user_record created_users.append(username) # Log success (without showing password) logger.info(f"? Created user {idx}: {username} (ID: {user_record['id']})") except ValueError as e: logger.error(f"? Failed to create user '{username}': {e}") # Continue with next user instead of failing completely continue except Exception as e: logger.error(f"? Unexpected error creating user '{username}': {e}") continue # Check if any users were created if not created_users: logger.warning("No new users were created (all may already exist)") return 0 # Save users to file try: save_users(users) logger.info(f"? Successfully saved {len(users)} total users to {USERS_FILE}") except IOError as e: logger.error(f"? Failed to save users: {e}") # Attempt to restore from backup or indicate rollback needed raise # Print summary logger.info("\n" + "="*60) logger.info("? User creation completed successfully!") logger.info("="*60) logger.info(f"Created users: {', '.join(created_users)}") logger.info(f"Total users in database: {len(users)}") logger.info(f"All users: {', '.join(sorted(users.keys()))}") return 0 except ValueError as e: logger.error(f"Configuration error: {e}") logger.error("Please check your .env file and ensure TEST_USER passwords are set") return 1 except IOError as e: logger.error(f"File I/O error: {e}") return 1 except Exception as e: logger.exception(f"Unexpected error: {e}") return 1 if __name__ == '__main__': try: exit_code = add_users() sys.exit(exit_code) except KeyboardInterrupt: logger.info("\nOperation cancelled by user") sys.exit(1) except Exception as e: logger.exception(f"Fatal error: {e}") sys.exit(1)