| """Append-only JSONL transaction journal for DataForge repairs.""" |
|
|
| from __future__ import annotations |
|
|
| import hashlib |
| import json |
| from datetime import UTC, datetime |
| from pathlib import Path |
| from typing import Any |
|
|
| from dataforge.transactions.txn import RepairTransaction |
|
|
| SCHEMA_VERSION = 1 |
|
|
|
|
| class TransactionLogError(Exception): |
| """Raised when a transaction journal cannot be written or replayed.""" |
|
|
|
|
| def sha256_bytes(payload: bytes) -> str: |
| """Return the SHA-256 digest for the given payload.""" |
| return hashlib.sha256(payload).hexdigest() |
|
|
|
|
| def sha256_file(path: Path) -> str: |
| """Return the SHA-256 digest for the file at ``path``.""" |
| return sha256_bytes(path.read_bytes()) |
|
|
|
|
| def dataforge_root_for(source_path: Path) -> Path: |
| """Return the hidden DataForge state directory for a source path.""" |
| return source_path.resolve().parent / ".dataforge" |
|
|
|
|
| def transactions_dir_for(source_path: Path) -> Path: |
| """Return the transaction journal directory for a source path.""" |
| return dataforge_root_for(source_path) / "transactions" |
|
|
|
|
| def snapshots_dir_for(source_path: Path) -> Path: |
| """Return the snapshot directory for a source path.""" |
| return dataforge_root_for(source_path) / "snapshots" |
|
|
|
|
| def cache_dir_for(source_path: Path) -> Path: |
| """Return the cache directory for a source path.""" |
| return dataforge_root_for(source_path) / "cache" |
|
|
|
|
| def snapshot_path_for(source_path: Path, txn_id: str) -> Path: |
| """Return the immutable snapshot path for a transaction.""" |
| return snapshots_dir_for(source_path) / f"{txn_id}.bin" |
|
|
|
|
| def transaction_log_path_for(source_path: Path, txn_id: str) -> Path: |
| """Return the JSONL log path for a transaction.""" |
| return transactions_dir_for(source_path) / f"{txn_id}.jsonl" |
|
|
|
|
| def _utc_now() -> datetime: |
| """Return the current UTC timestamp.""" |
| return datetime.now(UTC) |
|
|
|
|
| def _write_jsonl_line(path: Path, record: dict[str, Any], *, create: bool = False) -> None: |
| """Append or create a JSONL record on disk. |
| |
| Args: |
| path: The target JSONL log path. |
| record: JSON-serializable record to write. |
| create: When true, fail if the file already exists. |
| |
| Raises: |
| TransactionLogError: If the record cannot be written. |
| """ |
| path.parent.mkdir(parents=True, exist_ok=True) |
| mode = "x" if create else "a" |
| try: |
| with path.open(mode, encoding="utf-8", newline="\n") as handle: |
| handle.write(json.dumps(record, sort_keys=True)) |
| handle.write("\n") |
| except OSError as exc: |
| raise TransactionLogError(f"Could not write transaction log '{path}': {exc}") from exc |
|
|
|
|
| def append_created_transaction(transaction: RepairTransaction) -> Path: |
| """Write the immutable transaction creation event. |
| |
| Args: |
| transaction: The transaction to serialize. |
| |
| Returns: |
| The created JSONL log path. |
| """ |
| source_path = Path(transaction.source_path) |
| log_path = transaction_log_path_for(source_path, transaction.txn_id) |
| record = { |
| "schema_version": SCHEMA_VERSION, |
| "event_type": "created", |
| "occurred_at": transaction.created_at.isoformat(), |
| "transaction": transaction.model_dump(mode="json"), |
| } |
| _write_jsonl_line(log_path, record, create=True) |
| return log_path |
|
|
|
|
| def append_applied_event( |
| log_path: Path, |
| txn_id: str, |
| post_sha256: str, |
| *, |
| applied_at: datetime | None = None, |
| ) -> None: |
| """Append an ``applied`` event to an existing transaction log.""" |
| record = { |
| "schema_version": SCHEMA_VERSION, |
| "event_type": "applied", |
| "occurred_at": (applied_at or _utc_now()).isoformat(), |
| "txn_id": txn_id, |
| "post_sha256": post_sha256, |
| } |
| _write_jsonl_line(log_path, record, create=False) |
|
|
|
|
| def append_reverted_event( |
| log_path: Path, |
| txn_id: str, |
| *, |
| reverted_at: datetime | None = None, |
| ) -> None: |
| """Append a ``reverted`` event to an existing transaction log.""" |
| record = { |
| "schema_version": SCHEMA_VERSION, |
| "event_type": "reverted", |
| "occurred_at": (reverted_at or _utc_now()).isoformat(), |
| "txn_id": txn_id, |
| } |
| _write_jsonl_line(log_path, record, create=False) |
|
|
|
|
| def load_transaction(log_path: Path) -> RepairTransaction: |
| """Replay a transaction log into the latest transaction state. |
| |
| Args: |
| log_path: Path to the JSONL log file. |
| |
| Returns: |
| The latest replayed transaction state. |
| |
| Raises: |
| TransactionLogError: If the log is missing or malformed. |
| """ |
| if not log_path.exists(): |
| raise TransactionLogError(f"Transaction log not found: {log_path}") |
|
|
| transaction: RepairTransaction | None = None |
| for raw_line in log_path.read_text(encoding="utf-8").splitlines(): |
| if not raw_line.strip(): |
| continue |
| payload = json.loads(raw_line) |
| if payload.get("schema_version") != SCHEMA_VERSION: |
| raise TransactionLogError( |
| f"Unsupported transaction log schema version in '{log_path}'." |
| ) |
|
|
| event_type = payload.get("event_type") |
| if event_type == "created": |
| transaction = RepairTransaction.model_validate(payload["transaction"]) |
| continue |
|
|
| if transaction is None: |
| raise TransactionLogError( |
| f"Transaction log '{log_path}' is missing the initial created event." |
| ) |
|
|
| if payload.get("txn_id") != transaction.txn_id: |
| raise TransactionLogError( |
| f"Transaction log '{log_path}' contains mismatched txn_id values." |
| ) |
|
|
| if event_type == "applied": |
| transaction = transaction.model_copy( |
| update={ |
| "applied": True, |
| "post_sha256": payload["post_sha256"], |
| } |
| ) |
| elif event_type == "reverted": |
| transaction = transaction.model_copy( |
| update={ |
| "reverted_at": datetime.fromisoformat(payload["occurred_at"]), |
| } |
| ) |
| else: |
| raise TransactionLogError( |
| f"Unknown transaction log event type '{event_type}' in '{log_path}'." |
| ) |
|
|
| if transaction is None: |
| raise TransactionLogError(f"Transaction log '{log_path}' contained no transaction data.") |
|
|
| return transaction |
|
|
|
|
| def find_transaction_log(txn_id: str, *, search_root: Path | None = None) -> Path: |
| """Locate a transaction log by identifier under the working tree. |
| |
| Args: |
| txn_id: Canonical transaction identifier. |
| search_root: Optional root directory to search under. |
| |
| Returns: |
| The unique matching JSONL log path. |
| |
| Raises: |
| TransactionLogError: If no log or multiple logs are found. |
| """ |
| root = (search_root or Path.cwd()).resolve() |
| direct_candidate = root / ".dataforge" / "transactions" / f"{txn_id}.jsonl" |
| if direct_candidate.exists(): |
| return direct_candidate |
|
|
| matches: list[Path] = [] |
| for candidate in root.rglob(f"{txn_id}.jsonl"): |
| if candidate.parent.name == "transactions" and candidate.parent.parent.name == ".dataforge": |
| matches.append(candidate) |
|
|
| if not matches: |
| raise TransactionLogError(f"Could not find transaction '{txn_id}' under '{root}'.") |
| if len(matches) > 1: |
| raise TransactionLogError(f"Found multiple transaction logs for '{txn_id}' under '{root}'.") |
| return matches[0] |
|
|