Agent_Flight_Recorder / brutal_test.py
RFTSystems's picture
Update brutal_test.py
876f891 verified
import os
import json
import threading
import tempfile
import zipfile
import time
import rft_flightrecorder as fr
def _append_with_retry(*, tries: int = 40, sleep_s: float = 0.01, **kwargs):
last = ""
for _ in range(tries):
ev, msg = fr.append_event(**kwargs)
if ev is not None:
return ev, msg
last = msg or ""
if "busy" in last.lower() or "lock" in last.lower():
time.sleep(sleep_s)
continue
break
return None, last
def two_tab_spam_test(log_path: str, threads: int = 6, events_per_thread: int = 80):
sk_hex, _pk_hex = fr.gen_keys()
sid, msg = fr.start_session(log_path, "brutal-test", "deterministic", "spam test", False, sk_hex)
assert sid, f"Failed to start session: {msg}"
errors = []
errors_lock = threading.Lock()
def worker(tid: int):
for i in range(events_per_thread):
payload = json.dumps({"thread": tid, "i": i}, ensure_ascii=False)
ev, m = _append_with_retry(
log_path=log_path,
session_id=sid,
event_type="note",
payload_text=payload,
parent_event_hash="",
sign_event=False,
sk_hex=sk_hex,
model_id="brutal-test",
run_mode="deterministic",
)
if not ev:
with errors_lock:
errors.append(m)
ts = [threading.Thread(target=worker, args=(t,)) for t in range(threads)]
for t in ts:
t.start()
for t in ts:
t.join()
assert not errors, f"Append errors occurred (first 5): {errors[:5]}"
status, ok, report = fr.verify_session(log_path, sid, pk_hex="", require_signatures=False)
assert ok, f"Session failed verification after spam.\n{status}\n{report}"
all_events, _corrupt = fr.read_jsonl(log_path)
evs = fr.events_for_session(all_events, sid)
expected_before = 1 + (threads * events_per_thread)
assert len(evs) == expected_before, f"Expected {expected_before} events before finalise, got {len(evs)}"
anchor, m = fr.finalise_session(log_path, sid, False, sk_hex, "brutal-test", "deterministic")
assert anchor, f"Finalise failed: {m}"
ev, m = fr.append_event(
log_path=log_path,
session_id=sid,
event_type="note",
payload_text='{"post_end": true}',
parent_event_hash="",
sign_event=False,
sk_hex=sk_hex,
model_id="brutal-test",
run_mode="deterministic",
)
assert ev is None and "end" in (m or "").lower(), f"Expected refused append after end, got: {m}"
status, ok, report = fr.verify_session(log_path, sid, pk_hex="", require_signatures=False)
assert ok, f"Session failed verification after finalise.\n{status}\n{report}"
all_events, _corrupt = fr.read_jsonl(log_path)
evs = fr.events_for_session(all_events, sid)
expected_after = expected_before + 1 # session_end
assert len(evs) == expected_after, f"Expected {expected_after} events after finalise, got {len(evs)}"
return sid
def tamper_zip_test(log_path: str):
sk_hex, _pk_hex = fr.gen_keys()
sid, msg = fr.start_session(log_path, "brutal-test", "deterministic", "tamper test", False, sk_hex)
assert sid, f"Failed to start session: {msg}"
for i in range(25):
ev, m = _append_with_retry(
log_path=log_path,
session_id=sid,
event_type="note",
payload_text=json.dumps({"i": i}),
parent_event_hash="",
sign_event=False,
sk_hex=sk_hex,
model_id="brutal-test",
run_mode="deterministic",
)
assert ev, f"Append failed during tamper test: {m}"
anchor, m = fr.finalise_session(log_path, sid, False, sk_hex, "brutal-test", "deterministic")
assert anchor, f"Finalise failed during tamper test: {m}"
zip_name, msg = fr.export_session_bundle(log_path, sid)
assert zip_name and os.path.exists(zip_name), f"Export failed: {msg}"
with tempfile.TemporaryDirectory() as td:
with zipfile.ZipFile(zip_name, "r") as z:
z.extractall(td)
events_file = None
for fn in os.listdir(td):
if fn.endswith("_events.jsonl"):
events_file = os.path.join(td, fn)
break
assert events_file, "No events jsonl found inside exported zip"
with open(events_file, "r", encoding="utf-8") as f:
lines = f.read().splitlines()
assert len(lines) > 5, "Not enough events in bundle to tamper"
obj = json.loads(lines[4])
obj.setdefault("payload", {})
if isinstance(obj["payload"], dict):
obj["payload"]["tampered"] = True
lines[4] = json.dumps(obj, ensure_ascii=False)
with open(events_file, "w", encoding="utf-8") as f:
f.write("\n".join(lines) + "\n")
tampered_zip = os.path.join(td, "tampered.zip")
with zipfile.ZipFile(tampered_zip, "w", compression=zipfile.ZIP_DEFLATED) as z:
z.write(events_file, arcname=os.path.basename(events_file))
status, ok, report, _ = fr.import_bundle_verify(
bundle_path=tampered_zip,
pk_hex="",
require_signatures=False,
store_into_log=False,
log_path=log_path,
)
assert not ok, f"Tampered bundle incorrectly verified PASS.\n{status}\n{report}"
return True
def main():
with tempfile.TemporaryDirectory() as td:
log_path = os.path.join(td, "flightlog.jsonl")
sid1 = two_tab_spam_test(log_path, threads=8, events_per_thread=60)
print(f"[PASS] two_tab_spam_test: session_id={sid1}")
ok = tamper_zip_test(log_path)
print(f"[PASS] tamper_zip_test: {ok}")
print("[ALL PASS]")
if __name__ == "__main__":
main()