import os import json import threading import tempfile import zipfile import time import rft_flightrecorder as fr def _append_with_retry(*, tries: int = 40, sleep_s: float = 0.01, **kwargs): last = "" for _ in range(tries): ev, msg = fr.append_event(**kwargs) if ev is not None: return ev, msg last = msg or "" if "busy" in last.lower() or "lock" in last.lower(): time.sleep(sleep_s) continue break return None, last def two_tab_spam_test(log_path: str, threads: int = 6, events_per_thread: int = 80): sk_hex, _pk_hex = fr.gen_keys() sid, msg = fr.start_session(log_path, "brutal-test", "deterministic", "spam test", False, sk_hex) assert sid, f"Failed to start session: {msg}" errors = [] errors_lock = threading.Lock() def worker(tid: int): for i in range(events_per_thread): payload = json.dumps({"thread": tid, "i": i}, ensure_ascii=False) ev, m = _append_with_retry( log_path=log_path, session_id=sid, event_type="note", payload_text=payload, parent_event_hash="", sign_event=False, sk_hex=sk_hex, model_id="brutal-test", run_mode="deterministic", ) if not ev: with errors_lock: errors.append(m) ts = [threading.Thread(target=worker, args=(t,)) for t in range(threads)] for t in ts: t.start() for t in ts: t.join() assert not errors, f"Append errors occurred (first 5): {errors[:5]}" status, ok, report = fr.verify_session(log_path, sid, pk_hex="", require_signatures=False) assert ok, f"Session failed verification after spam.\n{status}\n{report}" all_events, _corrupt = fr.read_jsonl(log_path) evs = fr.events_for_session(all_events, sid) expected_before = 1 + (threads * events_per_thread) assert len(evs) == expected_before, f"Expected {expected_before} events before finalise, got {len(evs)}" anchor, m = fr.finalise_session(log_path, sid, False, sk_hex, "brutal-test", "deterministic") assert anchor, f"Finalise failed: {m}" ev, m = fr.append_event( log_path=log_path, session_id=sid, event_type="note", payload_text='{"post_end": true}', parent_event_hash="", sign_event=False, sk_hex=sk_hex, model_id="brutal-test", run_mode="deterministic", ) assert ev is None and "end" in (m or "").lower(), f"Expected refused append after end, got: {m}" status, ok, report = fr.verify_session(log_path, sid, pk_hex="", require_signatures=False) assert ok, f"Session failed verification after finalise.\n{status}\n{report}" all_events, _corrupt = fr.read_jsonl(log_path) evs = fr.events_for_session(all_events, sid) expected_after = expected_before + 1 # session_end assert len(evs) == expected_after, f"Expected {expected_after} events after finalise, got {len(evs)}" return sid def tamper_zip_test(log_path: str): sk_hex, _pk_hex = fr.gen_keys() sid, msg = fr.start_session(log_path, "brutal-test", "deterministic", "tamper test", False, sk_hex) assert sid, f"Failed to start session: {msg}" for i in range(25): ev, m = _append_with_retry( log_path=log_path, session_id=sid, event_type="note", payload_text=json.dumps({"i": i}), parent_event_hash="", sign_event=False, sk_hex=sk_hex, model_id="brutal-test", run_mode="deterministic", ) assert ev, f"Append failed during tamper test: {m}" anchor, m = fr.finalise_session(log_path, sid, False, sk_hex, "brutal-test", "deterministic") assert anchor, f"Finalise failed during tamper test: {m}" zip_name, msg = fr.export_session_bundle(log_path, sid) assert zip_name and os.path.exists(zip_name), f"Export failed: {msg}" with tempfile.TemporaryDirectory() as td: with zipfile.ZipFile(zip_name, "r") as z: z.extractall(td) events_file = None for fn in os.listdir(td): if fn.endswith("_events.jsonl"): events_file = os.path.join(td, fn) break assert events_file, "No events jsonl found inside exported zip" with open(events_file, "r", encoding="utf-8") as f: lines = f.read().splitlines() assert len(lines) > 5, "Not enough events in bundle to tamper" obj = json.loads(lines[4]) obj.setdefault("payload", {}) if isinstance(obj["payload"], dict): obj["payload"]["tampered"] = True lines[4] = json.dumps(obj, ensure_ascii=False) with open(events_file, "w", encoding="utf-8") as f: f.write("\n".join(lines) + "\n") tampered_zip = os.path.join(td, "tampered.zip") with zipfile.ZipFile(tampered_zip, "w", compression=zipfile.ZIP_DEFLATED) as z: z.write(events_file, arcname=os.path.basename(events_file)) status, ok, report, _ = fr.import_bundle_verify( bundle_path=tampered_zip, pk_hex="", require_signatures=False, store_into_log=False, log_path=log_path, ) assert not ok, f"Tampered bundle incorrectly verified PASS.\n{status}\n{report}" return True def main(): with tempfile.TemporaryDirectory() as td: log_path = os.path.join(td, "flightlog.jsonl") sid1 = two_tab_spam_test(log_path, threads=8, events_per_thread=60) print(f"[PASS] two_tab_spam_test: session_id={sid1}") ok = tamper_zip_test(log_path) print(f"[PASS] tamper_zip_test: {ok}") print("[ALL PASS]") if __name__ == "__main__": main()