Spaces:
Sleeping
Sleeping
| """ | |
| Authentication Audit Logs Integration Tests | |
| Tests that all auth endpoints properly create audit logs: | |
| - Registration | |
| - Login (success and failure) | |
| - Logout | |
| - Password change | |
| - Password reset request | |
| - Password reset completion | |
| - Profile updates | |
| """ | |
| import pytest | |
| import asyncio | |
| from datetime import datetime, timezone | |
| from sqlalchemy.orm import Session | |
| from fastapi.testclient import TestClient | |
| from app.main import app | |
| from app.api.deps import get_db | |
| from app.models.user import User | |
| from app.models.audit_log import AuditLog | |
| from app.services.password_reset_service import PasswordResetService | |
| # Test client | |
| client = TestClient(app) | |
| class TestAuthAuditLogs: | |
| """Test suite for authentication audit logging""" | |
| def db_session(self): | |
| """Get database session""" | |
| db = next(get_db()) | |
| yield db | |
| db.close() | |
| def test_user_data(self): | |
| """Test user data""" | |
| return { | |
| 'email': f'audit.test.{datetime.now().timestamp()}@example.com', | |
| 'password': 'TestPass123!', | |
| 'first_name': 'Audit', | |
| 'last_name': 'Test', | |
| 'phone': '+1234567890' | |
| } | |
| def test_registration_audit_log(self, db_session: Session, test_user_data: dict): | |
| """Test that user registration creates audit log""" | |
| # Register user | |
| response = client.post('/api/v1/auth/register', json=test_user_data) | |
| assert response.status_code == 201 | |
| user_id = response.json()['user']['id'] | |
| # Check audit log | |
| audit_log = db_session.query(AuditLog).filter( | |
| AuditLog.action == 'register', | |
| AuditLog.entity_type == 'user', | |
| AuditLog.entity_id == user_id | |
| ).first() | |
| assert audit_log is not None | |
| assert audit_log.user_email == test_user_data['email'] | |
| assert audit_log.description.startswith('New user registered:') | |
| assert audit_log.ip_address is not None | |
| def test_login_success_audit_log(self, db_session: Session, test_user_data: dict): | |
| """Test that successful login creates audit log""" | |
| # Register user first | |
| client.post('/api/v1/auth/register', json=test_user_data) | |
| # Login | |
| response = client.post('/api/v1/auth/login', json={ | |
| 'email': test_user_data['email'], | |
| 'password': test_user_data['password'] | |
| }) | |
| assert response.status_code == 200 | |
| # Check audit log | |
| audit_log = db_session.query(AuditLog).filter( | |
| AuditLog.action == 'login', | |
| AuditLog.entity_type == 'auth' | |
| ).order_by(AuditLog.created_at.desc()).first() | |
| assert audit_log is not None | |
| assert audit_log.description.startswith('Successful login') | |
| assert test_user_data['email'] in audit_log.description | |
| def test_login_failure_audit_log(self, db_session: Session, test_user_data: dict): | |
| """Test that failed login creates audit log""" | |
| # Try to login with wrong password | |
| response = client.post('/api/v1/auth/login', json={ | |
| 'email': test_user_data['email'], | |
| 'password': 'WrongPassword123!' | |
| }) | |
| assert response.status_code == 401 | |
| # Check audit log | |
| audit_log = db_session.query(AuditLog).filter( | |
| AuditLog.action == 'login_failed', | |
| AuditLog.entity_type == 'auth' | |
| ).order_by(AuditLog.created_at.desc()).first() | |
| assert audit_log is not None | |
| assert 'Failed login' in audit_log.description | |
| assert test_user_data['email'] in audit_log.description | |
| def test_logout_audit_log(self, db_session: Session, test_user_data: dict): | |
| """Test that logout creates audit log""" | |
| # Register and login | |
| client.post('/api/v1/auth/register', json=test_user_data) | |
| login_response = client.post('/api/v1/auth/login', json={ | |
| 'email': test_user_data['email'], | |
| 'password': test_user_data['password'] | |
| }) | |
| access_token = login_response.json()['access_token'] | |
| # Logout | |
| response = client.post('/api/v1/auth/logout', headers={ | |
| 'Authorization': f'Bearer {access_token}' | |
| }) | |
| assert response.status_code == 200 | |
| # Check audit log | |
| audit_log = db_session.query(AuditLog).filter( | |
| AuditLog.action == 'logout', | |
| AuditLog.entity_type == 'auth' | |
| ).order_by(AuditLog.created_at.desc()).first() | |
| assert audit_log is not None | |
| assert 'logout' in audit_log.description.lower() | |
| def test_password_change_audit_log(self, db_session: Session, test_user_data: dict): | |
| """Test that password change creates audit log""" | |
| # Register and login | |
| client.post('/api/v1/auth/register', json=test_user_data) | |
| login_response = client.post('/api/v1/auth/login', json={ | |
| 'email': test_user_data['email'], | |
| 'password': test_user_data['password'] | |
| }) | |
| access_token = login_response.json()['access_token'] | |
| user_id = login_response.json()['user']['id'] | |
| # Change password | |
| response = client.post('/api/v1/auth/change-password', | |
| json={ | |
| 'current_password': test_user_data['password'], | |
| 'new_password': 'NewTestPass456!' | |
| }, | |
| headers={'Authorization': f'Bearer {access_token}'} | |
| ) | |
| assert response.status_code == 200 | |
| # Check audit log | |
| audit_log = db_session.query(AuditLog).filter( | |
| AuditLog.action == 'password_change', | |
| AuditLog.entity_type == 'user', | |
| AuditLog.entity_id == user_id | |
| ).first() | |
| assert audit_log is not None | |
| assert 'password' in audit_log.description.lower() | |
| def test_password_reset_request_audit_log(self, db_session: Session, test_user_data: dict): | |
| """Test that password reset request creates audit log""" | |
| # Register user first | |
| client.post('/api/v1/auth/register', json=test_user_data) | |
| # Request password reset | |
| response = client.post('/api/v1/auth/forgot-password', json={ | |
| 'email': test_user_data['email'] | |
| }) | |
| assert response.status_code == 200 | |
| # Check audit log | |
| audit_log = db_session.query(AuditLog).filter( | |
| AuditLog.action == 'password_reset_request', | |
| AuditLog.entity_type == 'auth' | |
| ).order_by(AuditLog.created_at.desc()).first() | |
| assert audit_log is not None | |
| assert 'password reset requested' in audit_log.description.lower() | |
| assert test_user_data['email'] in audit_log.additional_metadata.get('email', '') | |
| def test_profile_update_audit_log(self, db_session: Session, test_user_data: dict): | |
| """Test that profile update creates audit log""" | |
| # Register and login | |
| client.post('/api/v1/auth/register', json=test_user_data) | |
| login_response = client.post('/api/v1/auth/login', json={ | |
| 'email': test_user_data['email'], | |
| 'password': test_user_data['password'] | |
| }) | |
| access_token = login_response.json()['access_token'] | |
| user_id = login_response.json()['user']['id'] | |
| # Update profile | |
| response = client.put('/api/v1/auth/me', | |
| json={ | |
| 'first_name': 'Updated', | |
| 'last_name': 'Name', | |
| 'phone': '+9876543210' | |
| }, | |
| headers={'Authorization': f'Bearer {access_token}'} | |
| ) | |
| assert response.status_code == 200 | |
| # Check audit log | |
| audit_log = db_session.query(AuditLog).filter( | |
| AuditLog.action == 'update', | |
| AuditLog.entity_type == 'user', | |
| AuditLog.entity_id == user_id | |
| ).first() | |
| assert audit_log is not None | |
| assert 'profile' in audit_log.description.lower() | |
| assert audit_log.changes is not None | |
| assert 'old' in audit_log.changes | |
| assert 'new' in audit_log.changes | |
| def test_audit_log_contains_ip_and_user_agent(self, db_session: Session, test_user_data: dict): | |
| """Test that audit logs capture IP address and user agent""" | |
| # Register with custom headers | |
| response = client.post('/api/v1/auth/register', | |
| json=test_user_data, | |
| headers={ | |
| 'User-Agent': 'TestClient/1.0', | |
| 'X-Forwarded-For': '192.168.1.100' | |
| } | |
| ) | |
| assert response.status_code == 201 | |
| user_id = response.json()['user']['id'] | |
| # Check audit log | |
| audit_log = db_session.query(AuditLog).filter( | |
| AuditLog.action == 'register', | |
| AuditLog.entity_id == user_id | |
| ).first() | |
| assert audit_log is not None | |
| assert audit_log.user_agent is not None | |
| assert 'TestClient' in audit_log.user_agent | |
| # IP address might be from X-Forwarded-For or client.host | |
| assert audit_log.ip_address is not None | |
| if __name__ == '__main__': | |
| pytest.main([__file__, '-v']) | |