""" Test script for JSON database authentication system Tests all CRUD operations and authentication flows """ import json import os import sys from pathlib import Path # Add project to path sys.path.insert(0, os.path.dirname(__file__)) from auth.json_database import ( JSONUsers, JSONBlacklistedTokens, JSONRefreshTokens, ensure_database_initialized, test_database_connection, get_database_info ) def print_test(name, passed, message=""): """Print formatted test result""" status = "? PASS" if passed else "? FAIL" print(f"{status}: {name}") if message: print(f" {message}") def test_database_connection_test(): """Test database connection""" print("\n=== Testing Database Connection ===") success, message = test_database_connection() print_test("Connection Test", success, message) return success def test_database_initialization(): """Test database initialization""" print("\n=== Testing Database Initialization ===") try: ensure_database_initialized() print_test("Database Init", True) return True except Exception as e: print_test("Database Init", False, str(e)) return False def test_user_creation(): """Test user creation""" print("\n=== Testing User Operations ===") # Create user result = JSONUsers.create_user("testuser", "hashed_password_123", "user") print_test("Create User", result) # Create duplicate user (should fail) result2 = JSONUsers.create_user("testuser", "another_hash", "user") print_test("Prevent Duplicate User", not result2) # Find user user = JSONUsers.find_by_username("testuser") found = user is not None and user['username'] == 'testuser' print_test("Find User", found) return result and not result2 and found def test_user_list(): """Test listing all users""" print("\n=== Testing User Listing ===") # Create another user JSONUsers.create_user("admin", "admin_hash", "admin") # Get all users users = JSONUsers.get_all_users() found_testuser = any(u['username'] == 'testuser' for u in users) found_admin = any(u['username'] == 'admin' for u in users) print_test("List All Users", len(users) >= 2, f"Found {len(users)} users") print_test("Found Test User", found_testuser) print_test("Found Admin User", found_admin) return len(users) >= 2 def test_promote_to_admin(): """Test promoting user to admin""" print("\n=== Testing Admin Promotion ===") # Promote testuser to admin result = JSONUsers.promote_to_admin("testuser") print_test("Promote User", result) # Verify role changed user = JSONUsers.find_by_username("testuser") is_admin = user is not None and user['role'] == 'admin' print_test("Verify Admin Role", is_admin) return result and is_admin def test_token_operations(): """Test token blacklist operations""" print("\n=== Testing Token Operations ===") # Create refresh token token1 = "jwt_refresh_token_12345" result = JSONRefreshTokens.create_token("testuser", token1) print_test("Create Refresh Token", result) # Find token username = JSONRefreshTokens.find_by_token(token1) found = username == "testuser" print_test("Find Refresh Token", found) # Blacklist token access_token = "jwt_access_token_abcde" result = JSONBlacklistedTokens.add_to_blacklist(access_token) print_test("Blacklist Token", result) # Check if blacklisted is_blacklisted = JSONBlacklistedTokens.is_blacklisted(access_token) print_test("Verify Token Blacklisted", is_blacklisted) # Check non-existent token not_blacklisted = not JSONBlacklistedTokens.is_blacklisted("fake_token") print_test("Non-blacklisted Token Not Found", not_blacklisted) return result and found and is_blacklisted and not_blacklisted def test_token_deletion(): """Test deleting user tokens""" print("\n=== Testing Token Deletion ===") # Create multiple tokens for user JSONRefreshTokens.create_token("testuser", "token_1") JSONRefreshTokens.create_token("testuser", "token_2") JSONRefreshTokens.create_token("testuser", "token_3") # Delete all user tokens result = JSONRefreshTokens.delete_user_tokens("testuser") print_test("Delete User Tokens", result) # Verify tokens deleted remaining = JSONRefreshTokens.find_by_token("token_1") is_deleted = remaining is None print_test("Verify Tokens Deleted", is_deleted) return result and is_deleted def test_database_info(): """Test database diagnostics""" print("\n=== Testing Database Diagnostics ===") info = get_database_info() has_type = 'database_type' in info and info['database_type'] == 'JSON' print_test("Database Type Info", has_type) has_location = 'storage_location' in info print_test("Storage Location Info", has_location) has_status = 'connection_status' in info and info['connection_status'] == 'ok' print_test("Connection Status Info", has_status) has_files = 'files' in info print_test("Files Info", has_files) return has_type and has_location and has_status and has_files def test_user_count(): """Test user count""" print("\n=== Testing User Count ===") count = JSONUsers.user_count() print_test("User Count", count > 0, f"Found {count} users") return count > 0 def test_json_file_format(): """Test that JSON files are properly formatted""" print("\n=== Testing JSON File Format ===") from auth.json_database import USERS_FILE, BLACKLISTED_TOKENS_FILE, REFRESH_TOKENS_FILE all_valid = True # Test users.json try: with open(USERS_FILE, 'r') as f: users_data = json.load(f) print_test("users.json Valid JSON", True) except Exception as e: print_test("users.json Valid JSON", False, str(e)) all_valid = False # Test blacklisted_tokens.json try: with open(BLACKLISTED_TOKENS_FILE, 'r') as f: tokens_data = json.load(f) print_test("blacklisted_tokens.json Valid JSON", True) except Exception as e: print_test("blacklisted_tokens.json Valid JSON", False, str(e)) all_valid = False # Test refresh_tokens.json try: with open(REFRESH_TOKENS_FILE, 'r') as f: refresh_data = json.load(f) print_test("refresh_tokens.json Valid JSON", True) except Exception as e: print_test("refresh_tokens.json Valid JSON", False, str(e)) all_valid = False return all_valid def cleanup_test_data(): """Clean up test data""" print("\n=== Cleanup ===") from auth.json_database import USERS_FILE, BLACKLISTED_TOKENS_FILE, REFRESH_TOKENS_FILE # Clear users.json but keep other files with open(USERS_FILE, 'w') as f: json.dump({}, f) print("Cleaned up test data") def run_all_tests(): """Run all tests""" print("\n" + "="*50) print("JSON Database Authentication System - Test Suite") print("="*50) tests = [ ("Database Connection", test_database_connection_test), ("Database Initialization", test_database_initialization), ("User Creation", test_user_creation), ("User Listing", test_user_list), ("Promote to Admin", test_promote_to_admin), ("Token Operations", test_token_operations), ("Token Deletion", test_token_deletion), ("Database Info", test_database_info), ("User Count", test_user_count), ("JSON File Format", test_json_file_format), ] results = [] for name, test_func in tests: try: result = test_func() results.append((name, result)) except Exception as e: print(f"\n? EXCEPTION in {name}: {str(e)}") results.append((name, False)) # Print summary print("\n" + "="*50) print("Test Summary") print("="*50) passed = sum(1 for _, result in results if result) total = len(results) for name, result in results: status = "?" if result else "?" print(f"{status} {name}") print(f"\nTotal: {passed}/{total} tests passed") if passed == total: print("\n? All tests passed!") return 0 else: print(f"\n? {total - passed} tests failed") return 1 if __name__ == '__main__': try: exit_code = run_all_tests() sys.exit(exit_code) except Exception as e: print(f"\n? Fatal error: {str(e)}") sys.exit(1)