RFTSystems commited on
Commit
a59435f
·
verified ·
1 Parent(s): e17a541

Create brutal_test.py

Browse files
Files changed (1) hide show
  1. brutal_test.py +125 -0
brutal_test.py ADDED
@@ -0,0 +1,125 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os, json, tempfile, shutil, zipfile
2
+ import rft_flightrecorder as fr
3
+
4
+ def read_lines(p):
5
+ with open(p, "r", encoding="utf-8") as f:
6
+ return [ln.rstrip("\n") for ln in f if ln.strip()]
7
+
8
+ def write_lines(p, lines):
9
+ with open(p, "w", encoding="utf-8") as f:
10
+ for ln in lines:
11
+ f.write(ln + "\n")
12
+
13
+ def assert_true(cond, msg):
14
+ if not cond:
15
+ raise AssertionError(msg)
16
+
17
+ def assert_false(cond, msg):
18
+ if cond:
19
+ raise AssertionError(msg)
20
+
21
+ def main():
22
+ td = tempfile.mkdtemp(prefix="rft_brutal_")
23
+ log = os.path.join(td, "flightlog.jsonl")
24
+ print("TEMP:", td)
25
+
26
+ sk, pk = fr.gen_keys()
27
+
28
+ # --- PASS flow ---
29
+ sid, msg = fr.start_session(log, "audit-demo", "deterministic", "brutal test", True, sk)
30
+ print("start:", msg)
31
+ assert_true(sid, "No session_id returned")
32
+
33
+ # append signed events
34
+ for i in range(1, 6):
35
+ ev, m = fr.append_event(
36
+ log_path=log,
37
+ session_id=sid,
38
+ event_type="note",
39
+ payload_text=json.dumps({"i": i, "msg": "ok"}, ensure_ascii=False),
40
+ parent_event_hash="",
41
+ sign_event=True,
42
+ sk_hex=sk,
43
+ model_id="audit-demo",
44
+ run_mode="deterministic",
45
+ )
46
+ assert_true(ev is not None, f"append failed at {i}: {m}")
47
+
48
+ status, ok, report = fr.verify_session(log, sid, pk, require_signatures=True)
49
+ print("verify signed:", status)
50
+ assert_true(ok, "Signed session should verify PASS")
51
+
52
+ anchor, msg = fr.finalise_session(log, sid, True, sk, "audit-demo", "deterministic")
53
+ print("finalise:", msg)
54
+ assert_true(anchor is not None, "Finalise failed")
55
+
56
+ status, ok, report = fr.verify_session(log, sid, pk, require_signatures=True)
57
+ print("verify after finalise:", status)
58
+ assert_true(ok, "Session should still verify after finalise")
59
+
60
+ bundle, msg = fr.export_session_bundle(log, sid)
61
+ print("export:", msg)
62
+ assert_true(bundle and os.path.exists(bundle), "Bundle not created")
63
+
64
+ # import verify
65
+ status, ok, report, stored = fr.import_bundle_verify(bundle, pk_hex=pk, require_signatures=False, store_into_log=False, log_path=log)
66
+ print("import verify:", status)
67
+ assert_true(ok, "Imported bundle should verify PASS")
68
+
69
+ # --- FAIL: tamper payload ---
70
+ lines = read_lines(log)
71
+ assert_true(len(lines) > 3, "Not enough log lines")
72
+
73
+ tampered = lines[:]
74
+ obj = json.loads(tampered[2])
75
+ obj["payload"]["msg"] = "hacked"
76
+ tampered[2] = json.dumps(obj, ensure_ascii=False)
77
+ tamper_log = os.path.join(td, "tamper_payload.jsonl")
78
+ write_lines(tamper_log, tampered)
79
+
80
+ status, ok, report = fr.verify_session(tamper_log, sid, pk, require_signatures=True)
81
+ print("tamper payload verify:", status)
82
+ assert_false(ok, "Tampered payload must FAIL")
83
+
84
+ # --- FAIL: reorder lines ---
85
+ reordered = lines[:]
86
+ if len(reordered) >= 4:
87
+ reordered[2], reordered[3] = reordered[3], reordered[2]
88
+ reorder_log = os.path.join(td, "tamper_reorder.jsonl")
89
+ write_lines(reorder_log, reordered)
90
+
91
+ status, ok, report = fr.verify_session(reorder_log, sid, pk, require_signatures=True)
92
+ print("reorder verify:", status)
93
+ assert_false(ok, "Reordered events must FAIL")
94
+
95
+ # --- FAIL: delete a line ---
96
+ deleted = lines[:]
97
+ deleted.pop(2)
98
+ del_log = os.path.join(td, "tamper_delete.jsonl")
99
+ write_lines(del_log, deleted)
100
+
101
+ status, ok, report = fr.verify_session(del_log, sid, pk, require_signatures=True)
102
+ print("delete verify:", status)
103
+ assert_false(ok, "Deleted event must FAIL")
104
+
105
+ # --- FAIL: wrong public key ---
106
+ sk2, pk2 = fr.gen_keys()
107
+ status, ok, report = fr.verify_session(log, sid, pk2, require_signatures=True)
108
+ print("wrong pk verify:", status)
109
+ assert_false(ok, "Wrong public key must FAIL")
110
+
111
+ # --- FAIL: require signatures but unsigned event exists ---
112
+ # create new session with mixed signed/unsigned
113
+ sid2, _ = fr.start_session(log, "audit-demo", "deterministic", "mixed sigs", False, sk)
114
+ ev, _ = fr.append_event(log, sid2, "note", '{"x":1}', "", False, sk, "audit-demo", "deterministic") # unsigned
115
+ status, ok, report = fr.verify_session(log, sid2, pk, require_signatures=True)
116
+ print("require sigs w/ unsigned:", status)
117
+ assert_false(ok, "Require signatures must fail if any event unsigned")
118
+
119
+ print("\nALL BRUTAL TESTS PASSED (meaning PASS/FAIL behaved correctly).")
120
+ shutil.rmtree(td, ignore_errors=True)
121
+ # leave bundle in cwd for inspection
122
+ print("Bundle left in cwd:", bundle)
123
+
124
+ if __name__ == "__main__":
125
+ main()