File size: 21,656 Bytes
5143557
 
 
 
eed1cab
5143557
 
eed1cab
5143557
 
 
 
eed1cab
 
5143557
 
eed1cab
 
791c076
 
eed1cab
5143557
 
 
 
 
 
eed1cab
 
 
 
 
791c076
eed1cab
 
 
 
 
 
 
 
 
 
 
 
791c076
eed1cab
 
 
 
 
 
 
5143557
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eed1cab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5143557
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eed1cab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
791c076
eed1cab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
791c076
eed1cab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
791c076
eed1cab
 
 
 
 
 
 
 
 
5143557
 
 
 
 
 
 
 
 
 
 
eed1cab
5143557
 
 
 
 
 
 
 
 
 
 
eed1cab
 
 
 
 
 
5143557
 
 
 
 
 
 
 
 
 
eed1cab
 
 
 
 
 
5143557
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eed1cab
 
5143557
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eed1cab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
791c076
eed1cab
 
 
 
 
 
 
 
 
791c076
eed1cab
 
 
 
 
 
 
 
791c076
eed1cab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
791c076
eed1cab
 
 
 
 
 
 
 
 
791c076
eed1cab
 
 
 
 
791c076
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eed1cab
 
 
 
 
791c076
eed1cab
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
"""Append-only JSONL transaction journal for DataForge repairs."""

from __future__ import annotations

import enum
import hashlib
import json
import re
from datetime import UTC, datetime
from pathlib import Path
from typing import Any

from pydantic import BaseModel, Field

from dataforge.transactions.txn import RepairTransaction

LEGACY_SCHEMA_VERSION = 1
SCHEMA_VERSION = 2
LEGACY_SCHEMA_NAME = "transaction_journal_v1"
SCHEMA_NAME = "transaction_journal_v2"
_SHA256_RE = re.compile(r"^[0-9a-f]{64}$")


class TransactionLogError(Exception):
    """Raised when a transaction journal cannot be written or replayed."""


class TransactionAuditVerdict(enum.Enum):
    """Possible outcomes for transaction log audit verification."""

    VERIFIED = "verified"
    LEGACY_UNVERIFIED = "legacy_unverified"
    UNREVERTIBLE = "unrevertible"
    TAMPERED = "tampered"
    MISSING = "missing"
    MALFORMED = "malformed"


class TransactionAuditReport(BaseModel):
    """Machine-readable result of transaction hash-chain verification."""

    verdict: TransactionAuditVerdict
    log_path: str | None = None
    txn_id: str | None = None
    schema_version: int | None = None
    schema_name: str | None = None
    event_count: int = Field(ge=0)
    head_sha256: str | None = Field(default=None, pattern=r"^[0-9a-f]{64}$")
    errors: tuple[str, ...] = Field(default_factory=tuple)

    model_config = {"frozen": True}


def sha256_bytes(payload: bytes) -> str:
    """Return the SHA-256 digest for the given payload."""
    return hashlib.sha256(payload).hexdigest()


def sha256_file(path: Path) -> str:
    """Return the SHA-256 digest for the file at ``path``."""
    return sha256_bytes(path.read_bytes())


def dataforge_root_for(source_path: Path) -> Path:
    """Return the hidden DataForge state directory for a source path."""
    return source_path.resolve().parent / ".dataforge"


def transactions_dir_for(source_path: Path) -> Path:
    """Return the transaction journal directory for a source path."""
    return dataforge_root_for(source_path) / "transactions"


def snapshots_dir_for(source_path: Path) -> Path:
    """Return the snapshot directory for a source path."""
    return dataforge_root_for(source_path) / "snapshots"


def cache_dir_for(source_path: Path) -> Path:
    """Return the cache directory for a source path."""
    return dataforge_root_for(source_path) / "cache"


def snapshot_path_for(source_path: Path, txn_id: str) -> Path:
    """Return the immutable snapshot path for a transaction."""
    return snapshots_dir_for(source_path) / f"{txn_id}.bin"


def transaction_log_path_for(source_path: Path, txn_id: str) -> Path:
    """Return the JSONL log path for a transaction."""
    return transactions_dir_for(source_path) / f"{txn_id}.jsonl"


def _utc_now() -> datetime:
    """Return the current UTC timestamp."""
    return datetime.now(UTC)


def _canonical_event_bytes(record: dict[str, Any]) -> bytes:
    """Serialize an audit event into the canonical hash material."""
    unsigned = {key: value for key, value in record.items() if key != "event_sha256"}
    return json.dumps(
        unsigned,
        sort_keys=True,
        separators=(",", ":"),
        ensure_ascii=False,
    ).encode("utf-8")


def _event_sha256(record: dict[str, Any]) -> str:
    """Return the canonical SHA-256 hash for an event record."""
    return sha256_bytes(_canonical_event_bytes(record))


def _sign_event(record: dict[str, Any]) -> dict[str, Any]:
    """Return a copy of ``record`` with its canonical event hash attached."""
    signed = dict(record)
    signed["event_sha256"] = _event_sha256(signed)
    return signed


def _write_jsonl_line(path: Path, record: dict[str, Any], *, create: bool = False) -> None:
    """Append or create a JSONL record on disk.

    Args:
        path: The target JSONL log path.
        record: JSON-serializable record to write.
        create: When true, fail if the file already exists.

    Raises:
        TransactionLogError: If the record cannot be written.
    """
    path.parent.mkdir(parents=True, exist_ok=True)
    mode = "x" if create else "a"
    try:
        with path.open(mode, encoding="utf-8", newline="\n") as handle:
            handle.write(json.dumps(record, sort_keys=True))
            handle.write("\n")
    except OSError as exc:
        raise TransactionLogError(f"Could not write transaction log '{path}': {exc}") from exc


def _read_records(log_path: Path) -> list[dict[str, Any]]:
    """Read non-empty JSONL records from a transaction log."""
    records: list[dict[str, Any]] = []
    for line_number, raw_line in enumerate(log_path.read_text(encoding="utf-8").splitlines(), 1):
        if not raw_line.strip():
            continue
        try:
            payload = json.loads(raw_line)
        except json.JSONDecodeError as exc:
            raise TransactionLogError(
                f"Malformed JSON at {log_path}:{line_number}: {exc.msg}"
            ) from exc
        if not isinstance(payload, dict):
            raise TransactionLogError(f"Malformed transaction event at {log_path}:{line_number}.")
        records.append(payload)
    return records


def _log_schema_version(log_path: Path) -> int | None:
    """Return the first event schema version for an existing log."""
    if not log_path.exists():
        return None
    records = _read_records(log_path)
    if not records:
        return None
    raw_version = records[0].get("schema_version")
    return raw_version if isinstance(raw_version, int) else None


def _next_event_metadata(log_path: Path) -> tuple[int, str | None]:
    """Return the next v2 event index and previous hash for ``log_path``."""
    records = _read_records(log_path)
    if not records:
        raise TransactionLogError(f"Transaction log '{log_path}' contained no events.")
    previous = records[-1].get("event_sha256")
    if not isinstance(previous, str) or not _SHA256_RE.fullmatch(previous):
        raise TransactionLogError(
            f"Transaction log '{log_path}' is missing a valid previous event hash."
        )
    return len(records), previous


def _v1_created_record(transaction: RepairTransaction) -> dict[str, Any]:
    """Build a legacy v1 transaction creation event."""
    return {
        "schema_version": LEGACY_SCHEMA_VERSION,
        "event_type": "created",
        "occurred_at": transaction.created_at.isoformat(),
        "transaction": transaction.model_dump(mode="json"),
    }


def _v2_created_record(transaction: RepairTransaction) -> dict[str, Any]:
    """Build a hash-chained v2 transaction creation event."""
    return _sign_event(
        {
            "schema_version": SCHEMA_VERSION,
            "schema_name": SCHEMA_NAME,
            "event_index": 0,
            "event_type": "created",
            "occurred_at": transaction.created_at.isoformat(),
            "previous_event_sha256": None,
            "transaction": transaction.model_dump(mode="json"),
        }
    )


def _v1_applied_record(txn_id: str, post_sha256: str, applied_at: datetime) -> dict[str, Any]:
    """Build a legacy v1 applied event."""
    return {
        "schema_version": LEGACY_SCHEMA_VERSION,
        "event_type": "applied",
        "occurred_at": applied_at.isoformat(),
        "txn_id": txn_id,
        "post_sha256": post_sha256,
    }


def _v2_applied_record(
    log_path: Path,
    txn_id: str,
    post_sha256: str,
    applied_at: datetime,
) -> dict[str, Any]:
    """Build a hash-chained v2 applied event."""
    event_index, previous_hash = _next_event_metadata(log_path)
    return _sign_event(
        {
            "schema_version": SCHEMA_VERSION,
            "schema_name": SCHEMA_NAME,
            "event_index": event_index,
            "event_type": "applied",
            "occurred_at": applied_at.isoformat(),
            "previous_event_sha256": previous_hash,
            "txn_id": txn_id,
            "post_sha256": post_sha256,
        }
    )


def _v1_reverted_record(txn_id: str, reverted_at: datetime) -> dict[str, Any]:
    """Build a legacy v1 reverted event."""
    return {
        "schema_version": LEGACY_SCHEMA_VERSION,
        "event_type": "reverted",
        "occurred_at": reverted_at.isoformat(),
        "txn_id": txn_id,
    }


def _v2_reverted_record(log_path: Path, txn_id: str, reverted_at: datetime) -> dict[str, Any]:
    """Build a hash-chained v2 reverted event."""
    event_index, previous_hash = _next_event_metadata(log_path)
    return _sign_event(
        {
            "schema_version": SCHEMA_VERSION,
            "schema_name": SCHEMA_NAME,
            "event_index": event_index,
            "event_type": "reverted",
            "occurred_at": reverted_at.isoformat(),
            "previous_event_sha256": previous_hash,
            "txn_id": txn_id,
        }
    )


def append_created_transaction(transaction: RepairTransaction) -> Path:
    """Write the immutable transaction creation event.

    Args:
        transaction: The transaction to serialize.

    Returns:
        The created JSONL log path.
    """
    source_path = Path(transaction.source_path)
    log_path = transaction_log_path_for(source_path, transaction.txn_id)
    _write_jsonl_line(log_path, _v2_created_record(transaction), create=True)
    return log_path


def append_applied_event(
    log_path: Path,
    txn_id: str,
    post_sha256: str,
    *,
    applied_at: datetime | None = None,
) -> None:
    """Append an ``applied`` event to an existing transaction log."""
    occurred_at = applied_at or _utc_now()
    record = (
        _v1_applied_record(txn_id, post_sha256, occurred_at)
        if _log_schema_version(log_path) == LEGACY_SCHEMA_VERSION
        else _v2_applied_record(log_path, txn_id, post_sha256, occurred_at)
    )
    _write_jsonl_line(log_path, record, create=False)


def append_reverted_event(
    log_path: Path,
    txn_id: str,
    *,
    reverted_at: datetime | None = None,
) -> None:
    """Append a ``reverted`` event to an existing transaction log."""
    occurred_at = reverted_at or _utc_now()
    record = (
        _v1_reverted_record(txn_id, occurred_at)
        if _log_schema_version(log_path) == LEGACY_SCHEMA_VERSION
        else _v2_reverted_record(log_path, txn_id, occurred_at)
    )
    _write_jsonl_line(log_path, record, create=False)


def load_transaction(log_path: Path) -> RepairTransaction:
    """Replay a transaction log into the latest transaction state.

    Args:
        log_path: Path to the JSONL log file.

    Returns:
        The latest replayed transaction state.

    Raises:
        TransactionLogError: If the log is missing or malformed.
    """
    if not log_path.exists():
        raise TransactionLogError(f"Transaction log not found: {log_path}")

    transaction: RepairTransaction | None = None
    for payload in _read_records(log_path):
        if payload.get("schema_version") not in {LEGACY_SCHEMA_VERSION, SCHEMA_VERSION}:
            raise TransactionLogError(
                f"Unsupported transaction log schema version in '{log_path}'."
            )

        event_type = payload.get("event_type")
        if event_type == "created":
            transaction = RepairTransaction.model_validate(payload["transaction"])
            continue

        if transaction is None:
            raise TransactionLogError(
                f"Transaction log '{log_path}' is missing the initial created event."
            )

        if payload.get("txn_id") != transaction.txn_id:
            raise TransactionLogError(
                f"Transaction log '{log_path}' contains mismatched txn_id values."
            )

        if event_type == "applied":
            transaction = transaction.model_copy(
                update={
                    "applied": True,
                    "post_sha256": payload["post_sha256"],
                }
            )
        elif event_type == "reverted":
            transaction = transaction.model_copy(
                update={
                    "reverted_at": datetime.fromisoformat(payload["occurred_at"]),
                }
            )
        else:
            raise TransactionLogError(
                f"Unknown transaction log event type '{event_type}' in '{log_path}'."
            )

    if transaction is None:
        raise TransactionLogError(f"Transaction log '{log_path}' contained no transaction data.")

    return transaction


def find_transaction_log(txn_id: str, *, search_root: Path | None = None) -> Path:
    """Locate a transaction log by identifier under the working tree.

    Args:
        txn_id: Canonical transaction identifier.
        search_root: Optional root directory to search under.

    Returns:
        The unique matching JSONL log path.

    Raises:
        TransactionLogError: If no log or multiple logs are found.
    """
    root = (search_root or Path.cwd()).resolve()
    direct_candidate = root / ".dataforge" / "transactions" / f"{txn_id}.jsonl"
    if direct_candidate.exists():
        return direct_candidate

    matches: list[Path] = []
    for candidate in root.rglob(f"{txn_id}.jsonl"):
        if candidate.parent.name == "transactions" and candidate.parent.parent.name == ".dataforge":
            matches.append(candidate)

    if not matches:
        raise TransactionLogError(f"Could not find transaction '{txn_id}' under '{root}'.")
    if len(matches) > 1:
        raise TransactionLogError(f"Found multiple transaction logs for '{txn_id}' under '{root}'.")
    return matches[0]


def verify_transaction_log(
    txn_id: str | None = None,
    *,
    log_path: Path | None = None,
    search_root: Path | None = None,
) -> TransactionAuditReport:
    """Verify a transaction log's local hash chain.

    Legacy v1 logs remain replayable but cannot be cryptographically verified,
    so they return ``legacy_unverified`` instead of ``verified``.
    """
    try:
        resolved_log_path = log_path.resolve() if log_path is not None else None
        if resolved_log_path is None:
            if txn_id is None:
                return TransactionAuditReport(
                    verdict=TransactionAuditVerdict.MISSING,
                    txn_id=txn_id,
                    event_count=0,
                    errors=("txn_id or log_path is required.",),
                )
            resolved_log_path = find_transaction_log(txn_id, search_root=search_root)
    except TransactionLogError as exc:
        return TransactionAuditReport(
            verdict=TransactionAuditVerdict.MISSING,
            txn_id=txn_id,
            event_count=0,
            errors=(str(exc),),
        )

    if not resolved_log_path.exists():
        return TransactionAuditReport(
            verdict=TransactionAuditVerdict.MISSING,
            log_path=str(resolved_log_path),
            txn_id=txn_id,
            event_count=0,
            errors=(f"Transaction log not found: {resolved_log_path}",),
        )

    try:
        records = _read_records(resolved_log_path)
    except TransactionLogError as exc:
        return TransactionAuditReport(
            verdict=TransactionAuditVerdict.MALFORMED,
            log_path=str(resolved_log_path),
            txn_id=txn_id,
            event_count=0,
            errors=(str(exc),),
        )

    if not records:
        return TransactionAuditReport(
            verdict=TransactionAuditVerdict.MALFORMED,
            log_path=str(resolved_log_path),
            txn_id=txn_id,
            event_count=0,
            errors=("Transaction log contained no events.",),
        )

    versions = {record.get("schema_version") for record in records}
    if versions == {LEGACY_SCHEMA_VERSION}:
        try:
            transaction = load_transaction(resolved_log_path)
        except TransactionLogError as exc:
            return TransactionAuditReport(
                verdict=TransactionAuditVerdict.MALFORMED,
                log_path=str(resolved_log_path),
                schema_version=LEGACY_SCHEMA_VERSION,
                schema_name=LEGACY_SCHEMA_NAME,
                event_count=len(records),
                errors=(str(exc),),
            )
        if txn_id is not None and transaction.txn_id != txn_id:
            return TransactionAuditReport(
                verdict=TransactionAuditVerdict.TAMPERED,
                log_path=str(resolved_log_path),
                txn_id=transaction.txn_id,
                schema_version=LEGACY_SCHEMA_VERSION,
                schema_name=LEGACY_SCHEMA_NAME,
                event_count=len(records),
                errors=(f"Expected txn_id '{txn_id}', found '{transaction.txn_id}'.",),
            )
        return TransactionAuditReport(
            verdict=TransactionAuditVerdict.LEGACY_UNVERIFIED,
            log_path=str(resolved_log_path),
            txn_id=transaction.txn_id,
            schema_version=LEGACY_SCHEMA_VERSION,
            schema_name=LEGACY_SCHEMA_NAME,
            event_count=len(records),
            errors=("Legacy v1 logs do not contain event hashes.",),
        )

    if versions != {SCHEMA_VERSION}:
        return TransactionAuditReport(
            verdict=TransactionAuditVerdict.MALFORMED,
            log_path=str(resolved_log_path),
            txn_id=txn_id,
            event_count=len(records),
            errors=(f"Mixed or unsupported schema versions: {sorted(map(str, versions))}.",),
        )

    errors: list[str] = []
    previous_hash: str | None = None
    resolved_txn_id: str | None = None
    head_sha256: str | None = None
    for expected_index, record in enumerate(records):
        if record.get("event_index") != expected_index:
            errors.append(f"Event {expected_index} has event_index {record.get('event_index')!r}.")
        if record.get("previous_event_sha256") != previous_hash:
            errors.append(f"Event {expected_index} previous hash does not match.")

        recorded_hash = record.get("event_sha256")
        if not isinstance(recorded_hash, str) or not _SHA256_RE.fullmatch(recorded_hash):
            errors.append(f"Event {expected_index} is missing a valid event hash.")
        else:
            calculated_hash = _event_sha256(record)
            if calculated_hash != recorded_hash:
                errors.append(f"Event {expected_index} hash does not match its payload.")
            previous_hash = recorded_hash
            head_sha256 = recorded_hash

        event_type = record.get("event_type")
        if event_type == "created":
            raw_transaction = record.get("transaction")
            if not isinstance(raw_transaction, dict):
                errors.append("Created event is missing a transaction payload.")
            else:
                current_txn_id = raw_transaction.get("txn_id")
                if not isinstance(current_txn_id, str):
                    errors.append("Created transaction payload is missing txn_id.")
                elif resolved_txn_id is None:
                    resolved_txn_id = current_txn_id
                elif resolved_txn_id != current_txn_id:
                    errors.append("Created transaction payload changed txn_id.")
        elif event_type in {"applied", "reverted"}:
            current_txn_id = record.get("txn_id")
            if current_txn_id != resolved_txn_id:
                errors.append(
                    f"Event {expected_index} txn_id {current_txn_id!r} does not match created event."
                )
        else:
            errors.append(f"Event {expected_index} has unknown event_type {event_type!r}.")

    if txn_id is not None and resolved_txn_id is not None and resolved_txn_id != txn_id:
        errors.append(f"Expected txn_id '{txn_id}', found '{resolved_txn_id}'.")

    try:
        transaction = load_transaction(resolved_log_path)
    except TransactionLogError as exc:
        errors.append(str(exc))

    if errors:
        return TransactionAuditReport(
            verdict=TransactionAuditVerdict.TAMPERED,
            log_path=str(resolved_log_path),
            txn_id=resolved_txn_id or txn_id,
            schema_version=SCHEMA_VERSION,
            schema_name=SCHEMA_NAME,
            event_count=len(records),
            head_sha256=head_sha256,
            errors=tuple(errors),
        )

    if transaction.applied and transaction.reverted_at is None:
        source_path = Path(transaction.source_path)
        snapshot_path = Path(transaction.source_snapshot_path)
        revert_errors: list[str] = []
        if not source_path.exists():
            revert_errors.append(f"Source file not found: {source_path}")
        elif (
            transaction.post_sha256 is not None
            and sha256_file(source_path) != transaction.post_sha256
        ):
            revert_errors.append("Source file no longer matches the recorded post-state hash.")
        if not snapshot_path.exists():
            revert_errors.append(f"Source snapshot not found: {snapshot_path}")
        if revert_errors:
            return TransactionAuditReport(
                verdict=TransactionAuditVerdict.UNREVERTIBLE,
                log_path=str(resolved_log_path),
                txn_id=resolved_txn_id,
                schema_version=SCHEMA_VERSION,
                schema_name=SCHEMA_NAME,
                event_count=len(records),
                head_sha256=head_sha256,
                errors=tuple(revert_errors),
            )

    return TransactionAuditReport(
        verdict=TransactionAuditVerdict.VERIFIED,
        log_path=str(resolved_log_path),
        txn_id=resolved_txn_id,
        schema_version=SCHEMA_VERSION,
        schema_name=SCHEMA_NAME,
        event_count=len(records),
        head_sha256=head_sha256,
    )