File size: 5,916 Bytes
876f891 a59435f 876f891 a59435f 876f891 a59435f 876f891 a59435f 876f891 a59435f 876f891 a59435f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
import os
import json
import threading
import tempfile
import zipfile
import time
import rft_flightrecorder as fr
def _append_with_retry(*, tries: int = 40, sleep_s: float = 0.01, **kwargs):
last = ""
for _ in range(tries):
ev, msg = fr.append_event(**kwargs)
if ev is not None:
return ev, msg
last = msg or ""
if "busy" in last.lower() or "lock" in last.lower():
time.sleep(sleep_s)
continue
break
return None, last
def two_tab_spam_test(log_path: str, threads: int = 6, events_per_thread: int = 80):
sk_hex, _pk_hex = fr.gen_keys()
sid, msg = fr.start_session(log_path, "brutal-test", "deterministic", "spam test", False, sk_hex)
assert sid, f"Failed to start session: {msg}"
errors = []
errors_lock = threading.Lock()
def worker(tid: int):
for i in range(events_per_thread):
payload = json.dumps({"thread": tid, "i": i}, ensure_ascii=False)
ev, m = _append_with_retry(
log_path=log_path,
session_id=sid,
event_type="note",
payload_text=payload,
parent_event_hash="",
sign_event=False,
sk_hex=sk_hex,
model_id="brutal-test",
run_mode="deterministic",
)
if not ev:
with errors_lock:
errors.append(m)
ts = [threading.Thread(target=worker, args=(t,)) for t in range(threads)]
for t in ts:
t.start()
for t in ts:
t.join()
assert not errors, f"Append errors occurred (first 5): {errors[:5]}"
status, ok, report = fr.verify_session(log_path, sid, pk_hex="", require_signatures=False)
assert ok, f"Session failed verification after spam.\n{status}\n{report}"
all_events, _corrupt = fr.read_jsonl(log_path)
evs = fr.events_for_session(all_events, sid)
expected_before = 1 + (threads * events_per_thread)
assert len(evs) == expected_before, f"Expected {expected_before} events before finalise, got {len(evs)}"
anchor, m = fr.finalise_session(log_path, sid, False, sk_hex, "brutal-test", "deterministic")
assert anchor, f"Finalise failed: {m}"
ev, m = fr.append_event(
log_path=log_path,
session_id=sid,
event_type="note",
payload_text='{"post_end": true}',
parent_event_hash="",
sign_event=False,
sk_hex=sk_hex,
model_id="brutal-test",
run_mode="deterministic",
)
assert ev is None and "end" in (m or "").lower(), f"Expected refused append after end, got: {m}"
status, ok, report = fr.verify_session(log_path, sid, pk_hex="", require_signatures=False)
assert ok, f"Session failed verification after finalise.\n{status}\n{report}"
all_events, _corrupt = fr.read_jsonl(log_path)
evs = fr.events_for_session(all_events, sid)
expected_after = expected_before + 1 # session_end
assert len(evs) == expected_after, f"Expected {expected_after} events after finalise, got {len(evs)}"
return sid
def tamper_zip_test(log_path: str):
sk_hex, _pk_hex = fr.gen_keys()
sid, msg = fr.start_session(log_path, "brutal-test", "deterministic", "tamper test", False, sk_hex)
assert sid, f"Failed to start session: {msg}"
for i in range(25):
ev, m = _append_with_retry(
log_path=log_path,
session_id=sid,
event_type="note",
payload_text=json.dumps({"i": i}),
parent_event_hash="",
sign_event=False,
sk_hex=sk_hex,
model_id="brutal-test",
run_mode="deterministic",
)
assert ev, f"Append failed during tamper test: {m}"
anchor, m = fr.finalise_session(log_path, sid, False, sk_hex, "brutal-test", "deterministic")
assert anchor, f"Finalise failed during tamper test: {m}"
zip_name, msg = fr.export_session_bundle(log_path, sid)
assert zip_name and os.path.exists(zip_name), f"Export failed: {msg}"
with tempfile.TemporaryDirectory() as td:
with zipfile.ZipFile(zip_name, "r") as z:
z.extractall(td)
events_file = None
for fn in os.listdir(td):
if fn.endswith("_events.jsonl"):
events_file = os.path.join(td, fn)
break
assert events_file, "No events jsonl found inside exported zip"
with open(events_file, "r", encoding="utf-8") as f:
lines = f.read().splitlines()
assert len(lines) > 5, "Not enough events in bundle to tamper"
obj = json.loads(lines[4])
obj.setdefault("payload", {})
if isinstance(obj["payload"], dict):
obj["payload"]["tampered"] = True
lines[4] = json.dumps(obj, ensure_ascii=False)
with open(events_file, "w", encoding="utf-8") as f:
f.write("\n".join(lines) + "\n")
tampered_zip = os.path.join(td, "tampered.zip")
with zipfile.ZipFile(tampered_zip, "w", compression=zipfile.ZIP_DEFLATED) as z:
z.write(events_file, arcname=os.path.basename(events_file))
status, ok, report, _ = fr.import_bundle_verify(
bundle_path=tampered_zip,
pk_hex="",
require_signatures=False,
store_into_log=False,
log_path=log_path,
)
assert not ok, f"Tampered bundle incorrectly verified PASS.\n{status}\n{report}"
return True
def main():
with tempfile.TemporaryDirectory() as td:
log_path = os.path.join(td, "flightlog.jsonl")
sid1 = two_tab_spam_test(log_path, threads=8, events_per_thread=60)
print(f"[PASS] two_tab_spam_test: session_id={sid1}")
ok = tamper_zip_test(log_path)
print(f"[PASS] tamper_zip_test: {ok}")
print("[ALL PASS]")
if __name__ == "__main__":
main()
|