File size: 5,916 Bytes
876f891
 
 
 
 
 
 
a59435f
 
 
876f891
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a59435f
876f891
a59435f
 
 
876f891
 
a59435f
876f891
 
a59435f
876f891
 
 
 
a59435f
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import os
import json
import threading
import tempfile
import zipfile
import time

import rft_flightrecorder as fr


def _append_with_retry(*, tries: int = 40, sleep_s: float = 0.01, **kwargs):
    last = ""
    for _ in range(tries):
        ev, msg = fr.append_event(**kwargs)
        if ev is not None:
            return ev, msg
        last = msg or ""
        if "busy" in last.lower() or "lock" in last.lower():
            time.sleep(sleep_s)
            continue
        break
    return None, last


def two_tab_spam_test(log_path: str, threads: int = 6, events_per_thread: int = 80):
    sk_hex, _pk_hex = fr.gen_keys()
    sid, msg = fr.start_session(log_path, "brutal-test", "deterministic", "spam test", False, sk_hex)
    assert sid, f"Failed to start session: {msg}"

    errors = []
    errors_lock = threading.Lock()

    def worker(tid: int):
        for i in range(events_per_thread):
            payload = json.dumps({"thread": tid, "i": i}, ensure_ascii=False)
            ev, m = _append_with_retry(
                log_path=log_path,
                session_id=sid,
                event_type="note",
                payload_text=payload,
                parent_event_hash="",
                sign_event=False,
                sk_hex=sk_hex,
                model_id="brutal-test",
                run_mode="deterministic",
            )
            if not ev:
                with errors_lock:
                    errors.append(m)

    ts = [threading.Thread(target=worker, args=(t,)) for t in range(threads)]
    for t in ts:
        t.start()
    for t in ts:
        t.join()

    assert not errors, f"Append errors occurred (first 5): {errors[:5]}"

    status, ok, report = fr.verify_session(log_path, sid, pk_hex="", require_signatures=False)
    assert ok, f"Session failed verification after spam.\n{status}\n{report}"

    all_events, _corrupt = fr.read_jsonl(log_path)
    evs = fr.events_for_session(all_events, sid)
    expected_before = 1 + (threads * events_per_thread)
    assert len(evs) == expected_before, f"Expected {expected_before} events before finalise, got {len(evs)}"

    anchor, m = fr.finalise_session(log_path, sid, False, sk_hex, "brutal-test", "deterministic")
    assert anchor, f"Finalise failed: {m}"

    ev, m = fr.append_event(
        log_path=log_path,
        session_id=sid,
        event_type="note",
        payload_text='{"post_end": true}',
        parent_event_hash="",
        sign_event=False,
        sk_hex=sk_hex,
        model_id="brutal-test",
        run_mode="deterministic",
    )
    assert ev is None and "end" in (m or "").lower(), f"Expected refused append after end, got: {m}"

    status, ok, report = fr.verify_session(log_path, sid, pk_hex="", require_signatures=False)
    assert ok, f"Session failed verification after finalise.\n{status}\n{report}"

    all_events, _corrupt = fr.read_jsonl(log_path)
    evs = fr.events_for_session(all_events, sid)
    expected_after = expected_before + 1  # session_end
    assert len(evs) == expected_after, f"Expected {expected_after} events after finalise, got {len(evs)}"

    return sid


def tamper_zip_test(log_path: str):
    sk_hex, _pk_hex = fr.gen_keys()
    sid, msg = fr.start_session(log_path, "brutal-test", "deterministic", "tamper test", False, sk_hex)
    assert sid, f"Failed to start session: {msg}"

    for i in range(25):
        ev, m = _append_with_retry(
            log_path=log_path,
            session_id=sid,
            event_type="note",
            payload_text=json.dumps({"i": i}),
            parent_event_hash="",
            sign_event=False,
            sk_hex=sk_hex,
            model_id="brutal-test",
            run_mode="deterministic",
        )
        assert ev, f"Append failed during tamper test: {m}"

    anchor, m = fr.finalise_session(log_path, sid, False, sk_hex, "brutal-test", "deterministic")
    assert anchor, f"Finalise failed during tamper test: {m}"

    zip_name, msg = fr.export_session_bundle(log_path, sid)
    assert zip_name and os.path.exists(zip_name), f"Export failed: {msg}"

    with tempfile.TemporaryDirectory() as td:
        with zipfile.ZipFile(zip_name, "r") as z:
            z.extractall(td)

        events_file = None
        for fn in os.listdir(td):
            if fn.endswith("_events.jsonl"):
                events_file = os.path.join(td, fn)
                break
        assert events_file, "No events jsonl found inside exported zip"

        with open(events_file, "r", encoding="utf-8") as f:
            lines = f.read().splitlines()
        assert len(lines) > 5, "Not enough events in bundle to tamper"

        obj = json.loads(lines[4])
        obj.setdefault("payload", {})
        if isinstance(obj["payload"], dict):
            obj["payload"]["tampered"] = True
        lines[4] = json.dumps(obj, ensure_ascii=False)

        with open(events_file, "w", encoding="utf-8") as f:
            f.write("\n".join(lines) + "\n")

        tampered_zip = os.path.join(td, "tampered.zip")
        with zipfile.ZipFile(tampered_zip, "w", compression=zipfile.ZIP_DEFLATED) as z:
            z.write(events_file, arcname=os.path.basename(events_file))

        status, ok, report, _ = fr.import_bundle_verify(
            bundle_path=tampered_zip,
            pk_hex="",
            require_signatures=False,
            store_into_log=False,
            log_path=log_path,
        )
        assert not ok, f"Tampered bundle incorrectly verified PASS.\n{status}\n{report}"

    return True


def main():
    with tempfile.TemporaryDirectory() as td:
        log_path = os.path.join(td, "flightlog.jsonl")

        sid1 = two_tab_spam_test(log_path, threads=8, events_per_thread=60)
        print(f"[PASS] two_tab_spam_test: session_id={sid1}")

        ok = tamper_zip_test(log_path)
        print(f"[PASS] tamper_zip_test: {ok}")

    print("[ALL PASS]")


if __name__ == "__main__":
    main()