Spaces:
Runtime error
Runtime error
| """ | |
| Test script for JSON database authentication system | |
| Tests all CRUD operations and authentication flows | |
| """ | |
| import json | |
| import os | |
| import sys | |
| from pathlib import Path | |
| # Add project to path | |
| sys.path.insert(0, os.path.dirname(__file__)) | |
| from auth.json_database import ( | |
| JSONUsers, | |
| JSONBlacklistedTokens, | |
| JSONRefreshTokens, | |
| ensure_database_initialized, | |
| test_database_connection, | |
| get_database_info | |
| ) | |
| def print_test(name, passed, message=""): | |
| """Print formatted test result""" | |
| status = "? PASS" if passed else "? FAIL" | |
| print(f"{status}: {name}") | |
| if message: | |
| print(f" {message}") | |
| def test_database_connection_test(): | |
| """Test database connection""" | |
| print("\n=== Testing Database Connection ===") | |
| success, message = test_database_connection() | |
| print_test("Connection Test", success, message) | |
| return success | |
| def test_database_initialization(): | |
| """Test database initialization""" | |
| print("\n=== Testing Database Initialization ===") | |
| try: | |
| ensure_database_initialized() | |
| print_test("Database Init", True) | |
| return True | |
| except Exception as e: | |
| print_test("Database Init", False, str(e)) | |
| return False | |
| def test_user_creation(): | |
| """Test user creation""" | |
| print("\n=== Testing User Operations ===") | |
| # Create user | |
| result = JSONUsers.create_user("testuser", "hashed_password_123", "user") | |
| print_test("Create User", result) | |
| # Create duplicate user (should fail) | |
| result2 = JSONUsers.create_user("testuser", "another_hash", "user") | |
| print_test("Prevent Duplicate User", not result2) | |
| # Find user | |
| user = JSONUsers.find_by_username("testuser") | |
| found = user is not None and user['username'] == 'testuser' | |
| print_test("Find User", found) | |
| return result and not result2 and found | |
| def test_user_list(): | |
| """Test listing all users""" | |
| print("\n=== Testing User Listing ===") | |
| # Create another user | |
| JSONUsers.create_user("admin", "admin_hash", "admin") | |
| # Get all users | |
| users = JSONUsers.get_all_users() | |
| found_testuser = any(u['username'] == 'testuser' for u in users) | |
| found_admin = any(u['username'] == 'admin' for u in users) | |
| print_test("List All Users", len(users) >= 2, f"Found {len(users)} users") | |
| print_test("Found Test User", found_testuser) | |
| print_test("Found Admin User", found_admin) | |
| return len(users) >= 2 | |
| def test_promote_to_admin(): | |
| """Test promoting user to admin""" | |
| print("\n=== Testing Admin Promotion ===") | |
| # Promote testuser to admin | |
| result = JSONUsers.promote_to_admin("testuser") | |
| print_test("Promote User", result) | |
| # Verify role changed | |
| user = JSONUsers.find_by_username("testuser") | |
| is_admin = user is not None and user['role'] == 'admin' | |
| print_test("Verify Admin Role", is_admin) | |
| return result and is_admin | |
| def test_token_operations(): | |
| """Test token blacklist operations""" | |
| print("\n=== Testing Token Operations ===") | |
| # Create refresh token | |
| token1 = "jwt_refresh_token_12345" | |
| result = JSONRefreshTokens.create_token("testuser", token1) | |
| print_test("Create Refresh Token", result) | |
| # Find token | |
| username = JSONRefreshTokens.find_by_token(token1) | |
| found = username == "testuser" | |
| print_test("Find Refresh Token", found) | |
| # Blacklist token | |
| access_token = "jwt_access_token_abcde" | |
| result = JSONBlacklistedTokens.add_to_blacklist(access_token) | |
| print_test("Blacklist Token", result) | |
| # Check if blacklisted | |
| is_blacklisted = JSONBlacklistedTokens.is_blacklisted(access_token) | |
| print_test("Verify Token Blacklisted", is_blacklisted) | |
| # Check non-existent token | |
| not_blacklisted = not JSONBlacklistedTokens.is_blacklisted("fake_token") | |
| print_test("Non-blacklisted Token Not Found", not_blacklisted) | |
| return result and found and is_blacklisted and not_blacklisted | |
| def test_token_deletion(): | |
| """Test deleting user tokens""" | |
| print("\n=== Testing Token Deletion ===") | |
| # Create multiple tokens for user | |
| JSONRefreshTokens.create_token("testuser", "token_1") | |
| JSONRefreshTokens.create_token("testuser", "token_2") | |
| JSONRefreshTokens.create_token("testuser", "token_3") | |
| # Delete all user tokens | |
| result = JSONRefreshTokens.delete_user_tokens("testuser") | |
| print_test("Delete User Tokens", result) | |
| # Verify tokens deleted | |
| remaining = JSONRefreshTokens.find_by_token("token_1") | |
| is_deleted = remaining is None | |
| print_test("Verify Tokens Deleted", is_deleted) | |
| return result and is_deleted | |
| def test_database_info(): | |
| """Test database diagnostics""" | |
| print("\n=== Testing Database Diagnostics ===") | |
| info = get_database_info() | |
| has_type = 'database_type' in info and info['database_type'] == 'JSON' | |
| print_test("Database Type Info", has_type) | |
| has_location = 'storage_location' in info | |
| print_test("Storage Location Info", has_location) | |
| has_status = 'connection_status' in info and info['connection_status'] == 'ok' | |
| print_test("Connection Status Info", has_status) | |
| has_files = 'files' in info | |
| print_test("Files Info", has_files) | |
| return has_type and has_location and has_status and has_files | |
| def test_user_count(): | |
| """Test user count""" | |
| print("\n=== Testing User Count ===") | |
| count = JSONUsers.user_count() | |
| print_test("User Count", count > 0, f"Found {count} users") | |
| return count > 0 | |
| def test_json_file_format(): | |
| """Test that JSON files are properly formatted""" | |
| print("\n=== Testing JSON File Format ===") | |
| from auth.json_database import USERS_FILE, BLACKLISTED_TOKENS_FILE, REFRESH_TOKENS_FILE | |
| all_valid = True | |
| # Test users.json | |
| try: | |
| with open(USERS_FILE, 'r') as f: | |
| users_data = json.load(f) | |
| print_test("users.json Valid JSON", True) | |
| except Exception as e: | |
| print_test("users.json Valid JSON", False, str(e)) | |
| all_valid = False | |
| # Test blacklisted_tokens.json | |
| try: | |
| with open(BLACKLISTED_TOKENS_FILE, 'r') as f: | |
| tokens_data = json.load(f) | |
| print_test("blacklisted_tokens.json Valid JSON", True) | |
| except Exception as e: | |
| print_test("blacklisted_tokens.json Valid JSON", False, str(e)) | |
| all_valid = False | |
| # Test refresh_tokens.json | |
| try: | |
| with open(REFRESH_TOKENS_FILE, 'r') as f: | |
| refresh_data = json.load(f) | |
| print_test("refresh_tokens.json Valid JSON", True) | |
| except Exception as e: | |
| print_test("refresh_tokens.json Valid JSON", False, str(e)) | |
| all_valid = False | |
| return all_valid | |
| def cleanup_test_data(): | |
| """Clean up test data""" | |
| print("\n=== Cleanup ===") | |
| from auth.json_database import USERS_FILE, BLACKLISTED_TOKENS_FILE, REFRESH_TOKENS_FILE | |
| # Clear users.json but keep other files | |
| with open(USERS_FILE, 'w') as f: | |
| json.dump({}, f) | |
| print("Cleaned up test data") | |
| def run_all_tests(): | |
| """Run all tests""" | |
| print("\n" + "="*50) | |
| print("JSON Database Authentication System - Test Suite") | |
| print("="*50) | |
| tests = [ | |
| ("Database Connection", test_database_connection_test), | |
| ("Database Initialization", test_database_initialization), | |
| ("User Creation", test_user_creation), | |
| ("User Listing", test_user_list), | |
| ("Promote to Admin", test_promote_to_admin), | |
| ("Token Operations", test_token_operations), | |
| ("Token Deletion", test_token_deletion), | |
| ("Database Info", test_database_info), | |
| ("User Count", test_user_count), | |
| ("JSON File Format", test_json_file_format), | |
| ] | |
| results = [] | |
| for name, test_func in tests: | |
| try: | |
| result = test_func() | |
| results.append((name, result)) | |
| except Exception as e: | |
| print(f"\n? EXCEPTION in {name}: {str(e)}") | |
| results.append((name, False)) | |
| # Print summary | |
| print("\n" + "="*50) | |
| print("Test Summary") | |
| print("="*50) | |
| passed = sum(1 for _, result in results if result) | |
| total = len(results) | |
| for name, result in results: | |
| status = "?" if result else "?" | |
| print(f"{status} {name}") | |
| print(f"\nTotal: {passed}/{total} tests passed") | |
| if passed == total: | |
| print("\n? All tests passed!") | |
| return 0 | |
| else: | |
| print(f"\n? {total - passed} tests failed") | |
| return 1 | |
| if __name__ == '__main__': | |
| try: | |
| exit_code = run_all_tests() | |
| sys.exit(exit_code) | |
| except Exception as e: | |
| print(f"\n? Fatal error: {str(e)}") | |
| sys.exit(1) | |