"""Append-only JSONL transaction journal for DataForge repairs.""" from __future__ import annotations import hashlib import json from datetime import UTC, datetime from pathlib import Path from typing import Any from dataforge.transactions.txn import RepairTransaction SCHEMA_VERSION = 1 class TransactionLogError(Exception): """Raised when a transaction journal cannot be written or replayed.""" def sha256_bytes(payload: bytes) -> str: """Return the SHA-256 digest for the given payload.""" return hashlib.sha256(payload).hexdigest() def sha256_file(path: Path) -> str: """Return the SHA-256 digest for the file at ``path``.""" return sha256_bytes(path.read_bytes()) def dataforge_root_for(source_path: Path) -> Path: """Return the hidden DataForge state directory for a source path.""" return source_path.resolve().parent / ".dataforge" def transactions_dir_for(source_path: Path) -> Path: """Return the transaction journal directory for a source path.""" return dataforge_root_for(source_path) / "transactions" def snapshots_dir_for(source_path: Path) -> Path: """Return the snapshot directory for a source path.""" return dataforge_root_for(source_path) / "snapshots" def cache_dir_for(source_path: Path) -> Path: """Return the cache directory for a source path.""" return dataforge_root_for(source_path) / "cache" def snapshot_path_for(source_path: Path, txn_id: str) -> Path: """Return the immutable snapshot path for a transaction.""" return snapshots_dir_for(source_path) / f"{txn_id}.bin" def transaction_log_path_for(source_path: Path, txn_id: str) -> Path: """Return the JSONL log path for a transaction.""" return transactions_dir_for(source_path) / f"{txn_id}.jsonl" def _utc_now() -> datetime: """Return the current UTC timestamp.""" return datetime.now(UTC) def _write_jsonl_line(path: Path, record: dict[str, Any], *, create: bool = False) -> None: """Append or create a JSONL record on disk. Args: path: The target JSONL log path. record: JSON-serializable record to write. create: When true, fail if the file already exists. Raises: TransactionLogError: If the record cannot be written. """ path.parent.mkdir(parents=True, exist_ok=True) mode = "x" if create else "a" try: with path.open(mode, encoding="utf-8", newline="\n") as handle: handle.write(json.dumps(record, sort_keys=True)) handle.write("\n") except OSError as exc: raise TransactionLogError(f"Could not write transaction log '{path}': {exc}") from exc def append_created_transaction(transaction: RepairTransaction) -> Path: """Write the immutable transaction creation event. Args: transaction: The transaction to serialize. Returns: The created JSONL log path. """ source_path = Path(transaction.source_path) log_path = transaction_log_path_for(source_path, transaction.txn_id) record = { "schema_version": SCHEMA_VERSION, "event_type": "created", "occurred_at": transaction.created_at.isoformat(), "transaction": transaction.model_dump(mode="json"), } _write_jsonl_line(log_path, record, create=True) return log_path def append_applied_event( log_path: Path, txn_id: str, post_sha256: str, *, applied_at: datetime | None = None, ) -> None: """Append an ``applied`` event to an existing transaction log.""" record = { "schema_version": SCHEMA_VERSION, "event_type": "applied", "occurred_at": (applied_at or _utc_now()).isoformat(), "txn_id": txn_id, "post_sha256": post_sha256, } _write_jsonl_line(log_path, record, create=False) def append_reverted_event( log_path: Path, txn_id: str, *, reverted_at: datetime | None = None, ) -> None: """Append a ``reverted`` event to an existing transaction log.""" record = { "schema_version": SCHEMA_VERSION, "event_type": "reverted", "occurred_at": (reverted_at or _utc_now()).isoformat(), "txn_id": txn_id, } _write_jsonl_line(log_path, record, create=False) def load_transaction(log_path: Path) -> RepairTransaction: """Replay a transaction log into the latest transaction state. Args: log_path: Path to the JSONL log file. Returns: The latest replayed transaction state. Raises: TransactionLogError: If the log is missing or malformed. """ if not log_path.exists(): raise TransactionLogError(f"Transaction log not found: {log_path}") transaction: RepairTransaction | None = None for raw_line in log_path.read_text(encoding="utf-8").splitlines(): if not raw_line.strip(): continue payload = json.loads(raw_line) if payload.get("schema_version") != SCHEMA_VERSION: raise TransactionLogError( f"Unsupported transaction log schema version in '{log_path}'." ) event_type = payload.get("event_type") if event_type == "created": transaction = RepairTransaction.model_validate(payload["transaction"]) continue if transaction is None: raise TransactionLogError( f"Transaction log '{log_path}' is missing the initial created event." ) if payload.get("txn_id") != transaction.txn_id: raise TransactionLogError( f"Transaction log '{log_path}' contains mismatched txn_id values." ) if event_type == "applied": transaction = transaction.model_copy( update={ "applied": True, "post_sha256": payload["post_sha256"], } ) elif event_type == "reverted": transaction = transaction.model_copy( update={ "reverted_at": datetime.fromisoformat(payload["occurred_at"]), } ) else: raise TransactionLogError( f"Unknown transaction log event type '{event_type}' in '{log_path}'." ) if transaction is None: raise TransactionLogError(f"Transaction log '{log_path}' contained no transaction data.") return transaction def find_transaction_log(txn_id: str, *, search_root: Path | None = None) -> Path: """Locate a transaction log by identifier under the working tree. Args: txn_id: Canonical transaction identifier. search_root: Optional root directory to search under. Returns: The unique matching JSONL log path. Raises: TransactionLogError: If no log or multiple logs are found. """ root = (search_root or Path.cwd()).resolve() direct_candidate = root / ".dataforge" / "transactions" / f"{txn_id}.jsonl" if direct_candidate.exists(): return direct_candidate matches: list[Path] = [] for candidate in root.rglob(f"{txn_id}.jsonl"): if candidate.parent.name == "transactions" and candidate.parent.parent.name == ".dataforge": matches.append(candidate) if not matches: raise TransactionLogError(f"Could not find transaction '{txn_id}' under '{root}'.") if len(matches) > 1: raise TransactionLogError(f"Found multiple transaction logs for '{txn_id}' under '{root}'.") return matches[0]