File size: 7,497 Bytes
66b1c50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
"""Append-only JSONL transaction journal for DataForge repairs."""

from __future__ import annotations

import hashlib
import json
from datetime import UTC, datetime
from pathlib import Path
from typing import Any

from dataforge.transactions.txn import RepairTransaction

SCHEMA_VERSION = 1


class TransactionLogError(Exception):
    """Raised when a transaction journal cannot be written or replayed."""


def sha256_bytes(payload: bytes) -> str:
    """Return the SHA-256 digest for the given payload."""
    return hashlib.sha256(payload).hexdigest()


def sha256_file(path: Path) -> str:
    """Return the SHA-256 digest for the file at ``path``."""
    return sha256_bytes(path.read_bytes())


def dataforge_root_for(source_path: Path) -> Path:
    """Return the hidden DataForge state directory for a source path."""
    return source_path.resolve().parent / ".dataforge"


def transactions_dir_for(source_path: Path) -> Path:
    """Return the transaction journal directory for a source path."""
    return dataforge_root_for(source_path) / "transactions"


def snapshots_dir_for(source_path: Path) -> Path:
    """Return the snapshot directory for a source path."""
    return dataforge_root_for(source_path) / "snapshots"


def cache_dir_for(source_path: Path) -> Path:
    """Return the cache directory for a source path."""
    return dataforge_root_for(source_path) / "cache"


def snapshot_path_for(source_path: Path, txn_id: str) -> Path:
    """Return the immutable snapshot path for a transaction."""
    return snapshots_dir_for(source_path) / f"{txn_id}.bin"


def transaction_log_path_for(source_path: Path, txn_id: str) -> Path:
    """Return the JSONL log path for a transaction."""
    return transactions_dir_for(source_path) / f"{txn_id}.jsonl"


def _utc_now() -> datetime:
    """Return the current UTC timestamp."""
    return datetime.now(UTC)


def _write_jsonl_line(path: Path, record: dict[str, Any], *, create: bool = False) -> None:
    """Append or create a JSONL record on disk.

    Args:
        path: The target JSONL log path.
        record: JSON-serializable record to write.
        create: When true, fail if the file already exists.

    Raises:
        TransactionLogError: If the record cannot be written.
    """
    path.parent.mkdir(parents=True, exist_ok=True)
    mode = "x" if create else "a"
    try:
        with path.open(mode, encoding="utf-8", newline="\n") as handle:
            handle.write(json.dumps(record, sort_keys=True))
            handle.write("\n")
    except OSError as exc:
        raise TransactionLogError(f"Could not write transaction log '{path}': {exc}") from exc


def append_created_transaction(transaction: RepairTransaction) -> Path:
    """Write the immutable transaction creation event.

    Args:
        transaction: The transaction to serialize.

    Returns:
        The created JSONL log path.
    """
    source_path = Path(transaction.source_path)
    log_path = transaction_log_path_for(source_path, transaction.txn_id)
    record = {
        "schema_version": SCHEMA_VERSION,
        "event_type": "created",
        "occurred_at": transaction.created_at.isoformat(),
        "transaction": transaction.model_dump(mode="json"),
    }
    _write_jsonl_line(log_path, record, create=True)
    return log_path


def append_applied_event(
    log_path: Path,
    txn_id: str,
    post_sha256: str,
    *,
    applied_at: datetime | None = None,
) -> None:
    """Append an ``applied`` event to an existing transaction log."""
    record = {
        "schema_version": SCHEMA_VERSION,
        "event_type": "applied",
        "occurred_at": (applied_at or _utc_now()).isoformat(),
        "txn_id": txn_id,
        "post_sha256": post_sha256,
    }
    _write_jsonl_line(log_path, record, create=False)


def append_reverted_event(
    log_path: Path,
    txn_id: str,
    *,
    reverted_at: datetime | None = None,
) -> None:
    """Append a ``reverted`` event to an existing transaction log."""
    record = {
        "schema_version": SCHEMA_VERSION,
        "event_type": "reverted",
        "occurred_at": (reverted_at or _utc_now()).isoformat(),
        "txn_id": txn_id,
    }
    _write_jsonl_line(log_path, record, create=False)


def load_transaction(log_path: Path) -> RepairTransaction:
    """Replay a transaction log into the latest transaction state.

    Args:
        log_path: Path to the JSONL log file.

    Returns:
        The latest replayed transaction state.

    Raises:
        TransactionLogError: If the log is missing or malformed.
    """
    if not log_path.exists():
        raise TransactionLogError(f"Transaction log not found: {log_path}")

    transaction: RepairTransaction | None = None
    for raw_line in log_path.read_text(encoding="utf-8").splitlines():
        if not raw_line.strip():
            continue
        payload = json.loads(raw_line)
        if payload.get("schema_version") != SCHEMA_VERSION:
            raise TransactionLogError(
                f"Unsupported transaction log schema version in '{log_path}'."
            )

        event_type = payload.get("event_type")
        if event_type == "created":
            transaction = RepairTransaction.model_validate(payload["transaction"])
            continue

        if transaction is None:
            raise TransactionLogError(
                f"Transaction log '{log_path}' is missing the initial created event."
            )

        if payload.get("txn_id") != transaction.txn_id:
            raise TransactionLogError(
                f"Transaction log '{log_path}' contains mismatched txn_id values."
            )

        if event_type == "applied":
            transaction = transaction.model_copy(
                update={
                    "applied": True,
                    "post_sha256": payload["post_sha256"],
                }
            )
        elif event_type == "reverted":
            transaction = transaction.model_copy(
                update={
                    "reverted_at": datetime.fromisoformat(payload["occurred_at"]),
                }
            )
        else:
            raise TransactionLogError(
                f"Unknown transaction log event type '{event_type}' in '{log_path}'."
            )

    if transaction is None:
        raise TransactionLogError(f"Transaction log '{log_path}' contained no transaction data.")

    return transaction


def find_transaction_log(txn_id: str, *, search_root: Path | None = None) -> Path:
    """Locate a transaction log by identifier under the working tree.

    Args:
        txn_id: Canonical transaction identifier.
        search_root: Optional root directory to search under.

    Returns:
        The unique matching JSONL log path.

    Raises:
        TransactionLogError: If no log or multiple logs are found.
    """
    root = (search_root or Path.cwd()).resolve()
    direct_candidate = root / ".dataforge" / "transactions" / f"{txn_id}.jsonl"
    if direct_candidate.exists():
        return direct_candidate

    matches: list[Path] = []
    for candidate in root.rglob(f"{txn_id}.jsonl"):
        if candidate.parent.name == "transactions" and candidate.parent.parent.name == ".dataforge":
            matches.append(candidate)

    if not matches:
        raise TransactionLogError(f"Could not find transaction '{txn_id}' under '{root}'.")
    if len(matches) > 1:
        raise TransactionLogError(f"Found multiple transaction logs for '{txn_id}' under '{root}'.")
    return matches[0]