LeadPilot / backend /tests /test_audit.py
Ashraf Al-Kassem
feat: Mission 17 β€” Enterprise Audit Logging Framework
6d44285
raw
history blame
15.4 kB
"""
Mission 17 β€” Enterprise Audit Logging Tests
"""
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from sqlmodel import select, func
from uuid import uuid4
from app.models.models import AdminAuditLog, User
from app.services.audit_service import audit_event
# ── Helpers ──────────────────────────────────────────────────────────────────
async def _signup(client: AsyncClient, email: str, name: str = "Test User") -> dict:
r = await client.post("/api/v1/auth/signup", json={
"email": email, "password": "Test1234!", "full_name": name,
})
return r.json()
async def _login(client: AsyncClient, email: str, password: str = "Test1234!") -> dict:
r = await client.post(
"/api/v1/auth/login",
data={"username": email, "password": password},
headers={"content-type": "application/x-www-form-urlencoded"},
)
return r.json()
async def _auth_header(client: AsyncClient, email: str) -> dict:
data = await _login(client, email)
token = (data.get("data") or {}).get("access_token", "")
return {"Authorization": f"Bearer {token}"}
async def _count_audit(db: AsyncSession, **filters) -> int:
q = select(func.count(AdminAuditLog.id))
for k, v in filters.items():
q = q.where(getattr(AdminAuditLog, k) == v)
result = await db.execute(q)
return result.scalar_one()
# ── Service Unit Tests ───────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_audit_event_creates_row(db_session: AsyncSession):
"""audit_event() should insert a row into AdminAuditLog."""
before = await _count_audit(db_session)
await audit_event(
db_session,
action="test_action",
entity_type="test_entity",
entity_id="abc123",
outcome="success",
)
await db_session.commit()
after = await _count_audit(db_session)
assert after == before + 1
@pytest.mark.asyncio
async def test_audit_event_stores_fields(db_session: AsyncSession):
"""audit_event() should correctly populate all fields."""
uid = uuid4()
ws_id = uuid4()
ag_id = uuid4()
await audit_event(
db_session,
action="field_test",
entity_type="widget",
entity_id="w-42",
actor_user_id=uid,
actor_type="admin",
outcome="failure",
workspace_id=ws_id,
agency_id=ag_id,
error_code="E001",
error_message="Something broke",
metadata={"provider": "zoho"},
)
await db_session.commit()
result = await db_session.execute(
select(AdminAuditLog).where(AdminAuditLog.action == "field_test")
)
entry = result.scalars().first()
assert entry is not None
assert entry.actor_user_id == uid
assert entry.actor_type == "admin"
assert entry.outcome == "failure"
assert entry.workspace_id == ws_id
assert entry.agency_id == ag_id
assert entry.error_code == "E001"
assert entry.error_message == "Something broke"
assert entry.metadata_json == {"provider": "zoho"}
@pytest.mark.asyncio
async def test_audit_event_redacts_sensitive_metadata(db_session: AsyncSession):
"""Sensitive keys in metadata should be redacted."""
await audit_event(
db_session,
action="redact_test",
entity_type="secret",
entity_id="s-1",
metadata={"password": "hunter2", "provider": "zoho"},
)
await db_session.commit()
result = await db_session.execute(
select(AdminAuditLog).where(AdminAuditLog.action == "redact_test")
)
entry = result.scalars().first()
assert entry is not None
assert entry.metadata_json["password"] == "***REDACTED***"
assert entry.metadata_json["provider"] == "zoho"
@pytest.mark.asyncio
async def test_audit_event_swallows_exceptions(db_session: AsyncSession):
"""audit_event should not raise even if something goes wrong."""
# Pass an intentionally bad actor_user_id that would violate FK if enforced,
# but in SQLite mode the FK is soft. Instead, test with None action which
# would fail due to NOT NULL. Actually audit_event wraps in try/except.
# Best way: test that it doesn't raise on normal call.
try:
await audit_event(
db_session,
action="safe_test",
entity_type="x",
entity_id="y",
)
await db_session.commit()
except Exception:
pytest.fail("audit_event should not raise")
@pytest.mark.asyncio
async def test_audit_event_truncates_long_fields(db_session: AsyncSession):
"""Long error_message and user_agent should be truncated."""
long_msg = "x" * 5000
await audit_event(
db_session,
action="truncate_test",
entity_type="x",
entity_id="y",
error_message=long_msg,
)
await db_session.commit()
result = await db_session.execute(
select(AdminAuditLog).where(AdminAuditLog.action == "truncate_test")
)
entry = result.scalars().first()
assert entry is not None
assert len(entry.error_message) <= 2048
@pytest.mark.asyncio
async def test_audit_event_nullable_actor(db_session: AsyncSession):
"""actor_user_id=None should be allowed (e.g., failed login for unknown user)."""
await audit_event(
db_session,
action="null_actor_test",
entity_type="user",
entity_id="unknown@example.com",
actor_user_id=None,
outcome="failure",
)
await db_session.commit()
result = await db_session.execute(
select(AdminAuditLog).where(AdminAuditLog.action == "null_actor_test")
)
entry = result.scalars().first()
assert entry is not None
assert entry.actor_user_id is None
# ── Integration Tests (via HTTP) ─────────────────────────────────────────────
@pytest.mark.asyncio
async def test_signup_produces_audit_entry(async_client: AsyncClient, db_session: AsyncSession):
"""POST /auth/signup should produce a user_signup audit entry."""
before = await _count_audit(db_session, action="user_signup")
await _signup(async_client, "audit_signup@test.com")
after = await _count_audit(db_session, action="user_signup")
assert after == before + 1
@pytest.mark.asyncio
async def test_login_success_produces_audit(async_client: AsyncClient, db_session: AsyncSession):
"""Successful login should produce a user_login audit with outcome=success."""
await _signup(async_client, "audit_login_s@test.com")
before = await _count_audit(db_session, action="user_login", outcome="success")
await _login(async_client, "audit_login_s@test.com")
after = await _count_audit(db_session, action="user_login", outcome="success")
assert after == before + 1
@pytest.mark.asyncio
async def test_login_failure_produces_audit(async_client: AsyncClient, db_session: AsyncSession):
"""Failed login should produce a user_login audit with outcome=failure."""
await _signup(async_client, "audit_login_f@test.com")
before = await _count_audit(db_session, action="user_login", outcome="failure")
await _login(async_client, "audit_login_f@test.com", password="WrongPass!")
after = await _count_audit(db_session, action="user_login", outcome="failure")
assert after == before + 1
@pytest.mark.asyncio
async def test_workspace_create_produces_audit(async_client: AsyncClient, db_session: AsyncSession):
"""POST /workspaces should produce a workspace_create audit entry."""
await _signup(async_client, "audit_ws@test.com")
hdr = await _auth_header(async_client, "audit_ws@test.com")
before = await _count_audit(db_session, action="workspace_create")
await async_client.post("/api/v1/workspaces", json={"name": "Audit WS"}, headers=hdr)
after = await _count_audit(db_session, action="workspace_create")
assert after == before + 1
# ── Workspace Audit Endpoint Tests ───────────────────────────────────────────
@pytest.mark.asyncio
async def test_workspace_audit_endpoint(async_client: AsyncClient, db_session: AsyncSession):
"""GET /audit-logs should return workspace-scoped audit entries."""
await _signup(async_client, "audit_ep@test.com")
hdr = await _auth_header(async_client, "audit_ep@test.com")
# Create a workspace to get WS ID
r = await async_client.post("/api/v1/workspaces", json={"name": "Audit EP WS"}, headers=hdr)
ws_id = r.json()["data"]["id"]
ws_hdr = {**hdr, "X-Workspace-ID": ws_id}
r = await async_client.get("/api/v1/audit-logs", headers=ws_hdr)
assert r.status_code == 200
d = r.json()
assert d["success"] is True
assert "items" in d["data"]
assert "total" in d["data"]
@pytest.mark.asyncio
async def test_workspace_audit_scoped(async_client: AsyncClient, db_session: AsyncSession):
"""Audit entries from one workspace should not appear in another."""
await _signup(async_client, "audit_scope@test.com")
hdr = await _auth_header(async_client, "audit_scope@test.com")
# Create two workspaces
r1 = await async_client.post("/api/v1/workspaces", json={"name": "Scope WS1"}, headers=hdr)
ws1_id = r1.json()["data"]["id"]
r2 = await async_client.post("/api/v1/workspaces", json={"name": "Scope WS2"}, headers=hdr)
ws2_id = r2.json()["data"]["id"]
# Get audit for WS1
ws1_hdr = {**hdr, "X-Workspace-ID": ws1_id}
r = await async_client.get("/api/v1/audit-logs", headers=ws1_hdr)
ws1_entries = r.json()["data"]["items"]
ws1_ids = {e["workspace_id"] for e in ws1_entries if e["workspace_id"]}
# All entries should belong to ws1
assert ws2_id not in ws1_ids
@pytest.mark.asyncio
async def test_workspace_audit_filter_action(async_client: AsyncClient, db_session: AsyncSession):
"""GET /audit-logs?action=X should filter by action."""
await _signup(async_client, "audit_filter@test.com")
hdr = await _auth_header(async_client, "audit_filter@test.com")
r = await async_client.post("/api/v1/workspaces", json={"name": "Filter WS"}, headers=hdr)
ws_id = r.json()["data"]["id"]
ws_hdr = {**hdr, "X-Workspace-ID": ws_id}
r = await async_client.get("/api/v1/audit-logs?action=workspace_create", headers=ws_hdr)
assert r.status_code == 200
items = r.json()["data"]["items"]
for item in items:
assert item["action"] == "workspace_create"
@pytest.mark.asyncio
async def test_workspace_audit_filter_outcome(async_client: AsyncClient, db_session: AsyncSession):
"""GET /audit-logs?outcome=success should filter by outcome."""
await _signup(async_client, "audit_outcome@test.com")
hdr = await _auth_header(async_client, "audit_outcome@test.com")
r = await async_client.post("/api/v1/workspaces", json={"name": "Outcome WS"}, headers=hdr)
ws_id = r.json()["data"]["id"]
ws_hdr = {**hdr, "X-Workspace-ID": ws_id}
r = await async_client.get("/api/v1/audit-logs?outcome=success", headers=ws_hdr)
assert r.status_code == 200
items = r.json()["data"]["items"]
for item in items:
assert item["outcome"] == "success"
# ── Admin Audit Endpoint Tests ───────────────────────────────────────────────
@pytest.mark.asyncio
async def test_admin_audit_endpoint(async_client: AsyncClient, db_session: AsyncSession):
"""GET /admin/audit-log should return all audit entries for superadmin."""
# Create superadmin
from app.core.security import get_password_hash
admin = User(
email="audit_admin@leadpilot.io",
hashed_password=get_password_hash("Admin1234!"),
full_name="Audit Admin",
is_active=True,
is_superuser=True,
)
db_session.add(admin)
await db_session.flush()
# Login as admin
r = await async_client.post(
"/api/v1/auth/login",
data={"username": "audit_admin@leadpilot.io", "password": "Admin1234!"},
headers={"content-type": "application/x-www-form-urlencoded"},
)
token = r.json()["data"]["access_token"]
admin_hdr = {"Authorization": f"Bearer {token}"}
r = await async_client.get("/api/v1/admin/audit-log", headers=admin_hdr)
assert r.status_code == 200
d = r.json()
assert d["success"] is True
assert "items" in d["data"]
assert "total" in d["data"]
# Verify new fields are present in response
if d["data"]["items"]:
item = d["data"]["items"][0]
assert "actor_type" in item
assert "outcome" in item
assert "ip_address" in item
assert "request_path" in item
@pytest.mark.asyncio
async def test_admin_audit_filter_actor_type(async_client: AsyncClient, db_session: AsyncSession):
"""GET /admin/audit-log?actor_type=user should filter properly."""
from app.core.security import get_password_hash
admin = User(
email="audit_admin2@leadpilot.io",
hashed_password=get_password_hash("Admin1234!"),
full_name="Audit Admin 2",
is_active=True,
is_superuser=True,
)
db_session.add(admin)
await db_session.flush()
r = await async_client.post(
"/api/v1/auth/login",
data={"username": "audit_admin2@leadpilot.io", "password": "Admin1234!"},
headers={"content-type": "application/x-www-form-urlencoded"},
)
token = r.json()["data"]["access_token"]
admin_hdr = {"Authorization": f"Bearer {token}"}
r = await async_client.get("/api/v1/admin/audit-log?actor_type=user", headers=admin_hdr)
assert r.status_code == 200
items = r.json()["data"]["items"]
for item in items:
assert item["actor_type"] == "user" or item["actor_type"] is None
@pytest.mark.asyncio
async def test_admin_audit_detail_endpoint(async_client: AsyncClient, db_session: AsyncSession):
"""GET /admin/audit-log/{log_id} should return a single entry."""
from app.core.security import get_password_hash
admin = User(
email="audit_admin3@leadpilot.io",
hashed_password=get_password_hash("Admin1234!"),
full_name="Audit Admin 3",
is_active=True,
is_superuser=True,
)
db_session.add(admin)
await db_session.flush()
r = await async_client.post(
"/api/v1/auth/login",
data={"username": "audit_admin3@leadpilot.io", "password": "Admin1234!"},
headers={"content-type": "application/x-www-form-urlencoded"},
)
token = r.json()["data"]["access_token"]
admin_hdr = {"Authorization": f"Bearer {token}"}
# Get first log entry
r = await async_client.get("/api/v1/admin/audit-log?limit=1", headers=admin_hdr)
items = r.json()["data"]["items"]
if items:
log_id = items[0]["id"]
r2 = await async_client.get(f"/api/v1/admin/audit-log/{log_id}", headers=admin_hdr)
assert r2.status_code == 200
d = r2.json()
assert d["success"] is True
assert d["data"]["id"] == log_id