swiftops-backend / tests /integration /test_auth_audit_logs.py
kamau1's picture
fix: correct get_db imports and remove duplicate definition
44ca2cd
"""
Authentication Audit Logs Integration Tests
Tests that all auth endpoints properly create audit logs:
- Registration
- Login (success and failure)
- Logout
- Password change
- Password reset request
- Password reset completion
- Profile updates
"""
import pytest
import asyncio
from datetime import datetime, timezone
from sqlalchemy.orm import Session
from fastapi.testclient import TestClient
from app.main import app
from app.api.deps import get_db
from app.models.user import User
from app.models.audit_log import AuditLog
from app.services.password_reset_service import PasswordResetService
# Test client
client = TestClient(app)
class TestAuthAuditLogs:
"""Test suite for authentication audit logging"""
@pytest.fixture
def db_session(self):
"""Get database session"""
db = next(get_db())
yield db
db.close()
@pytest.fixture
def test_user_data(self):
"""Test user data"""
return {
'email': f'audit.test.{datetime.now().timestamp()}@example.com',
'password': 'TestPass123!',
'first_name': 'Audit',
'last_name': 'Test',
'phone': '+1234567890'
}
def test_registration_audit_log(self, db_session: Session, test_user_data: dict):
"""Test that user registration creates audit log"""
# Register user
response = client.post('/api/v1/auth/register', json=test_user_data)
assert response.status_code == 201
user_id = response.json()['user']['id']
# Check audit log
audit_log = db_session.query(AuditLog).filter(
AuditLog.action == 'register',
AuditLog.entity_type == 'user',
AuditLog.entity_id == user_id
).first()
assert audit_log is not None
assert audit_log.user_email == test_user_data['email']
assert audit_log.description.startswith('New user registered:')
assert audit_log.ip_address is not None
def test_login_success_audit_log(self, db_session: Session, test_user_data: dict):
"""Test that successful login creates audit log"""
# Register user first
client.post('/api/v1/auth/register', json=test_user_data)
# Login
response = client.post('/api/v1/auth/login', json={
'email': test_user_data['email'],
'password': test_user_data['password']
})
assert response.status_code == 200
# Check audit log
audit_log = db_session.query(AuditLog).filter(
AuditLog.action == 'login',
AuditLog.entity_type == 'auth'
).order_by(AuditLog.created_at.desc()).first()
assert audit_log is not None
assert audit_log.description.startswith('Successful login')
assert test_user_data['email'] in audit_log.description
def test_login_failure_audit_log(self, db_session: Session, test_user_data: dict):
"""Test that failed login creates audit log"""
# Try to login with wrong password
response = client.post('/api/v1/auth/login', json={
'email': test_user_data['email'],
'password': 'WrongPassword123!'
})
assert response.status_code == 401
# Check audit log
audit_log = db_session.query(AuditLog).filter(
AuditLog.action == 'login_failed',
AuditLog.entity_type == 'auth'
).order_by(AuditLog.created_at.desc()).first()
assert audit_log is not None
assert 'Failed login' in audit_log.description
assert test_user_data['email'] in audit_log.description
def test_logout_audit_log(self, db_session: Session, test_user_data: dict):
"""Test that logout creates audit log"""
# Register and login
client.post('/api/v1/auth/register', json=test_user_data)
login_response = client.post('/api/v1/auth/login', json={
'email': test_user_data['email'],
'password': test_user_data['password']
})
access_token = login_response.json()['access_token']
# Logout
response = client.post('/api/v1/auth/logout', headers={
'Authorization': f'Bearer {access_token}'
})
assert response.status_code == 200
# Check audit log
audit_log = db_session.query(AuditLog).filter(
AuditLog.action == 'logout',
AuditLog.entity_type == 'auth'
).order_by(AuditLog.created_at.desc()).first()
assert audit_log is not None
assert 'logout' in audit_log.description.lower()
def test_password_change_audit_log(self, db_session: Session, test_user_data: dict):
"""Test that password change creates audit log"""
# Register and login
client.post('/api/v1/auth/register', json=test_user_data)
login_response = client.post('/api/v1/auth/login', json={
'email': test_user_data['email'],
'password': test_user_data['password']
})
access_token = login_response.json()['access_token']
user_id = login_response.json()['user']['id']
# Change password
response = client.post('/api/v1/auth/change-password',
json={
'current_password': test_user_data['password'],
'new_password': 'NewTestPass456!'
},
headers={'Authorization': f'Bearer {access_token}'}
)
assert response.status_code == 200
# Check audit log
audit_log = db_session.query(AuditLog).filter(
AuditLog.action == 'password_change',
AuditLog.entity_type == 'user',
AuditLog.entity_id == user_id
).first()
assert audit_log is not None
assert 'password' in audit_log.description.lower()
def test_password_reset_request_audit_log(self, db_session: Session, test_user_data: dict):
"""Test that password reset request creates audit log"""
# Register user first
client.post('/api/v1/auth/register', json=test_user_data)
# Request password reset
response = client.post('/api/v1/auth/forgot-password', json={
'email': test_user_data['email']
})
assert response.status_code == 200
# Check audit log
audit_log = db_session.query(AuditLog).filter(
AuditLog.action == 'password_reset_request',
AuditLog.entity_type == 'auth'
).order_by(AuditLog.created_at.desc()).first()
assert audit_log is not None
assert 'password reset requested' in audit_log.description.lower()
assert test_user_data['email'] in audit_log.additional_metadata.get('email', '')
def test_profile_update_audit_log(self, db_session: Session, test_user_data: dict):
"""Test that profile update creates audit log"""
# Register and login
client.post('/api/v1/auth/register', json=test_user_data)
login_response = client.post('/api/v1/auth/login', json={
'email': test_user_data['email'],
'password': test_user_data['password']
})
access_token = login_response.json()['access_token']
user_id = login_response.json()['user']['id']
# Update profile
response = client.put('/api/v1/auth/me',
json={
'first_name': 'Updated',
'last_name': 'Name',
'phone': '+9876543210'
},
headers={'Authorization': f'Bearer {access_token}'}
)
assert response.status_code == 200
# Check audit log
audit_log = db_session.query(AuditLog).filter(
AuditLog.action == 'update',
AuditLog.entity_type == 'user',
AuditLog.entity_id == user_id
).first()
assert audit_log is not None
assert 'profile' in audit_log.description.lower()
assert audit_log.changes is not None
assert 'old' in audit_log.changes
assert 'new' in audit_log.changes
def test_audit_log_contains_ip_and_user_agent(self, db_session: Session, test_user_data: dict):
"""Test that audit logs capture IP address and user agent"""
# Register with custom headers
response = client.post('/api/v1/auth/register',
json=test_user_data,
headers={
'User-Agent': 'TestClient/1.0',
'X-Forwarded-For': '192.168.1.100'
}
)
assert response.status_code == 201
user_id = response.json()['user']['id']
# Check audit log
audit_log = db_session.query(AuditLog).filter(
AuditLog.action == 'register',
AuditLog.entity_id == user_id
).first()
assert audit_log is not None
assert audit_log.user_agent is not None
assert 'TestClient' in audit_log.user_agent
# IP address might be from X-Forwarded-For or client.host
assert audit_log.ip_address is not None
if __name__ == '__main__':
pytest.main([__file__, '-v'])