Spaces:
Sleeping
Sleeping
| """ | |
| Audit Log Middleware | |
| Automatically logs every mutating request (POST/PUT/PATCH/DELETE) | |
| to sensitive paths. Read (GET) requests to admin/audit paths are | |
| also captured for compliance tracing. | |
| """ | |
| import logging | |
| import json | |
| from django.conf import settings | |
| from django.utils.deprecation import MiddlewareMixin | |
| logger = logging.getLogger('audit') | |
| SENSITIVE_PATHS = getattr(settings, 'AUDIT_LOG_SENSITIVE_PATHS', [ | |
| '/api/admin/', | |
| '/api/auth/', | |
| '/api/caregivers/', | |
| '/api/bookings/', | |
| '/admin/', | |
| ]) | |
| LOG_METHODS = {'POST', 'PUT', 'PATCH', 'DELETE'} | |
| # Also log GETs on these extra paths | |
| LOG_GET_PATHS = ['/admin/', '/api/audit/'] | |
| def _get_client_ip(request): | |
| x_forwarded = request.META.get('HTTP_X_FORWARDED_FOR') | |
| if x_forwarded: | |
| return x_forwarded.split(',')[0].strip() | |
| return request.META.get('REMOTE_ADDR') | |
| def _is_sensitive(path): | |
| return any(path.startswith(p) for p in SENSITIVE_PATHS) | |
| def _is_log_get(path): | |
| return any(path.startswith(p) for p in LOG_GET_PATHS) | |
| class AuditLogMiddleware(MiddlewareMixin): | |
| """ | |
| Captures mutating requests on sensitive endpoints after the response | |
| is generated. Only imports AuditLog lazily to avoid AppRegistryNotReady | |
| errors during startup. | |
| """ | |
| def process_response(self, request, response): | |
| method = request.method | |
| path = request.path | |
| should_log = (method in LOG_METHODS and _is_sensitive(path)) or \ | |
| (method == 'GET' and _is_log_get(path)) | |
| if not should_log: | |
| return response | |
| try: | |
| from audit.models import AuditLog, AuditAction | |
| user = getattr(request, 'user', None) | |
| actor = user if (user and user.is_authenticated) else None | |
| # Derive action from method | |
| action_map = { | |
| 'POST': AuditAction.CREATE, | |
| 'PUT': AuditAction.UPDATE, | |
| 'PATCH': AuditAction.UPDATE, | |
| 'DELETE': AuditAction.DELETE, | |
| 'GET': AuditAction.VIEW, | |
| } | |
| action = action_map.get(method, AuditAction.VIEW) | |
| # Specialise for known paths | |
| if '/caregivers/' in path: | |
| if method == 'POST': | |
| action = AuditAction.CAREGIVER_ADD | |
| elif method in ('PUT', 'PATCH'): | |
| action = AuditAction.CAREGIVER_UPDATE | |
| elif method == 'DELETE': | |
| action = AuditAction.CAREGIVER_DELETE | |
| elif '/bookings/' in path: | |
| if method == 'POST': | |
| action = AuditAction.BOOKING_CREATE | |
| elif method in ('PUT', 'PATCH'): | |
| action = AuditAction.BOOKING_UPDATE | |
| elif '/auth/login' in path: | |
| action = AuditAction.LOGIN if response.status_code < 400 else AuditAction.LOGIN_FAILED | |
| elif '/auth/logout' in path: | |
| action = AuditAction.LOGOUT | |
| elif '/auth/register' in path: | |
| action = AuditAction.REGISTER | |
| # Build description | |
| description = f'{method} {path} → {response.status_code}' | |
| AuditLog.objects.create( | |
| actor=actor, | |
| action=action, | |
| description=description, | |
| request_path=path, | |
| request_method=method, | |
| response_status=response.status_code, | |
| ip_address=_get_client_ip(request), | |
| user_agent=request.META.get('HTTP_USER_AGENT', '')[:500], | |
| ) | |
| except Exception as exc: # noqa: BLE001 — never break the response | |
| logger.warning('AuditLogMiddleware error: %s', exc) | |
| return response | |