secureagentrag-api / utils /audit_verify.py
LeomordKaly's picture
deploy: phase 3 BYOK backend (Dockerfile.hf, FastAPI on 7860)
2a83c3b verified
"""Scheduled SHA-256 audit hash-chain verification.
The audit log is tamper-evident (each JSONL entry chains ``prev_hash`` β†’
``entry_hash``), but tamper-*evidence* only matters if someone actually looks.
``scripts/verify_audit_chain.py`` does it on demand; this module runs the same
``AuditLogger.verify_chain()`` periodically from the FastAPI lifespan so a
broken chain is surfaced (log + Prometheus metric) without a human in the loop.
Reads local JSONL only β€” no external services β€” so it is safe to run anywhere,
including the public BYOK Space.
"""
from __future__ import annotations
from typing import Any
from config.settings import settings
from utils.audit import audit_logger
from utils.logging import get_logger
from utils.metrics import record_audit_verification
logger = get_logger(__name__)
def run_audit_verification(audit: Any | None = None) -> dict[str, Any]:
"""Verify the audit hash chain once and emit a log line + metric.
Returns the ``verify_chain`` result dict (``valid`` / ``checked`` /
``broken_at`` / ``last_hash``). Never raises β€” a verification failure is
reported, not propagated, so the scheduler keeps running.
"""
al = audit or audit_logger
try:
result = al.verify_chain()
except Exception as exc:
logger.error("audit_verify_error", error=str(exc))
record_audit_verification("error", valid=False)
return {"valid": False, "checked": 0, "broken_at": [f"error:{exc}"], "last_hash": ""}
if result.get("valid"):
logger.info("audit_verify_ok", checked=result.get("checked", 0))
record_audit_verification("valid", valid=True)
else:
# broken_at is a list of "file:line:reason" strings β€” safe to log
# (no PII; the audit entries themselves are already PII-redacted).
logger.error(
"audit_verify_tamper_detected",
checked=result.get("checked", 0),
broken_at=result.get("broken_at", []),
)
record_audit_verification("broken", valid=False)
return result
def schedule_audit_verification(
*, interval_hours: int | None = None, audit: Any | None = None
) -> Any | None:
"""Start an APScheduler job that runs :func:`run_audit_verification`.
Called from the FastAPI ``lifespan``. Returns the ``AsyncIOScheduler``
instance, or ``None`` when disabled or when APScheduler is unavailable (in
which case a single startup sweep still runs so each boot verifies once).
"""
if not settings.audit_verify_enabled:
logger.debug("audit_verify_not_scheduled", reason="disabled")
return None
interval = interval_hours or settings.audit_verify_interval_hours
try:
from apscheduler.schedulers.asyncio import (
AsyncIOScheduler, # type: ignore[import-not-found]
)
except ImportError:
logger.warning("apscheduler_missing", action="single-shot audit verify instead")
run_audit_verification(audit)
return None
# Verify once at startup, then on the interval.
run_audit_verification(audit)
scheduler = AsyncIOScheduler()
scheduler.add_job(
run_audit_verification,
"interval",
hours=interval,
args=[audit],
id="audit-chain-verify",
replace_existing=True,
)
scheduler.start()
logger.info("audit_verify_scheduled", every_hours=interval)
return scheduler