RFTSystems commited on
Commit
175d86d
·
verified ·
1 Parent(s): b7e16d0

Update rft_flightrecorder.py

Browse files
Files changed (1) hide show
  1. rft_flightrecorder.py +117 -36
rft_flightrecorder.py CHANGED
@@ -19,7 +19,15 @@ EVENT_SPEC = "rft-flight-event-v0"
19
  ROOT_SPEC = "rft-flight-session-root-v0"
20
  DEFAULT_LOG_PATH = "flightlog.jsonl"
21
 
22
- LOCK_TIMEOUT_S = 10 # keep small; we only lock for quick file IO
 
 
 
 
 
 
 
 
23
 
24
 
25
  # ============================================================
@@ -54,7 +62,6 @@ def short(h: str, n: int = 12) -> str:
54
  # ============================================================
55
 
56
  def _lock_for(path: str) -> FileLock:
57
- # one lock file per jsonl
58
  return FileLock(path + ".lock", timeout=LOCK_TIMEOUT_S)
59
 
60
 
@@ -175,14 +182,17 @@ def events_for_session(all_events: List[Dict[str, Any]], session_id: str) -> Lis
175
  e for e in all_events
176
  if isinstance(e, dict)
177
  and e.get("session_id") == sid
178
- and "event_hash_sha256" in e
179
  and e.get("spec") == EVENT_SPEC
 
180
  ]
181
 
182
 
183
  def last_event_for_session(all_events: List[Dict[str, Any]], session_id: str) -> Optional[Dict[str, Any]]:
184
  evs = events_for_session(all_events, session_id)
185
- return evs[-1] if evs else None
 
 
 
186
 
187
 
188
  def next_seq(all_events: List[Dict[str, Any]], session_id: str) -> int:
@@ -309,17 +319,17 @@ def finalise_session(
309
  return None, "Missing session_id."
310
 
311
  try:
312
- # Lock while reading events + appending session_end (append_event is locked too,
313
- # but we want a consistent anchor over a stable snapshot of the file)
314
  with _lock_for(log_path):
315
  all_events, _corrupt = read_jsonl(log_path)
316
  evs = events_for_session(all_events, sid)
317
  if not evs:
318
  return None, "No events found for this session."
319
 
320
- # Ensure correct order
321
  evs.sort(key=lambda x: int(x.get("seq", 0)))
322
 
 
323
  first_hash = evs[0]["event_hash_sha256"]
324
  last_hash = evs[-1]["event_hash_sha256"]
325
  count = len(evs)
@@ -349,15 +359,13 @@ def finalise_session(
349
  except Exception:
350
  return None, "Failed to sign anchor (invalid private key?)."
351
 
352
- # Append session_end inside the same lock to prevent new events slipping in
353
  payload_text = json.dumps({"anchor": anchor}, ensure_ascii=False)
354
-
355
- # Build the session_end event manually so it stays inside our lock
356
- # (we avoid calling append_event here to prevent nested lock complexity)
357
  meta = {
358
  "model_id": (model_id or "unknown").strip(),
359
  "run_mode": (run_mode or "unknown").strip(),
360
  }
 
361
  seq = int(evs[-1].get("seq", 0)) + 1
362
  prev_hash = evs[-1]["event_hash_sha256"]
363
  parent = prev_hash
@@ -376,8 +384,6 @@ def finalise_session(
376
  event["event_hash_sha256"] = event_hash
377
 
378
  if sign_anchor:
379
- if not sk_hex or not sk_hex.strip():
380
- return None, "Anchor signing enabled but private key is missing."
381
  try:
382
  sk = load_sk(sk_hex)
383
  sig = sk.sign(bytes.fromhex(event_hash))
@@ -387,7 +393,7 @@ def finalise_session(
387
 
388
  append_jsonl(log_path, event)
389
 
390
- return anchor, f"OK. Session finalised. Root hash: {root_hash} (last event hash: {event_hash})"
391
 
392
  except Timeout:
393
  return None, "Busy: log is locked (try again)."
@@ -399,6 +405,26 @@ def finalise_session(
399
  # Verification
400
  # ============================================================
401
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
402
  def verify_session_from_events(
403
  evs: List[Dict[str, Any]],
404
  session_id: str,
@@ -424,6 +450,10 @@ def verify_session_from_events(
424
  except Exception:
425
  return "Invalid public key.", False, ""
426
 
 
 
 
 
427
  expected_prev = "0" * 64
428
  expected_seq = 1
429
 
@@ -439,6 +469,11 @@ def verify_session_from_events(
439
  report.append(f"[FAIL] Bad spec at seq {e.get('seq')}.")
440
  continue
441
 
 
 
 
 
 
442
  if int(e.get("seq", -1)) != expected_seq:
443
  ok = False
444
  report.append(f"[FAIL] Seq mismatch: got {e.get('seq')} expected {expected_seq}.")
@@ -480,28 +515,20 @@ def verify_session_from_events(
480
 
481
  expected_prev = e["event_hash_sha256"]
482
 
483
- end_events = [e for e in evs if e.get("event_type") == "session_end"]
484
- if end_events:
485
- se = end_events[-1]
 
 
486
  anchor = (se.get("payload") or {}).get("anchor")
487
- if isinstance(anchor, dict):
488
- first_hash = evs[0]["event_hash_sha256"]
489
- last_hash = evs[-1]["event_hash_sha256"]
490
- count = len(evs)
491
 
492
- root_core = {
493
- "spec": ROOT_SPEC,
494
- "session_id": sid,
495
- "first_event_hash_sha256": first_hash,
496
- "last_event_hash_sha256": last_hash,
497
- "event_count": count,
498
- }
499
- root_hash = sha256_hex(canon(root_core))
500
- if anchor.get("root_hash_sha256") != root_hash:
501
  ok = False
502
- report.append("[FAIL] Session anchor root hash does not match current event chain.")
503
  else:
504
- report.append("[OK] Session anchor matches current event chain.")
505
  else:
506
  report.append("[WARN] session_end found but anchor payload is missing/invalid.")
507
 
@@ -518,8 +545,8 @@ def verify_session(log_path: str, session_id: str, pk_hex: str, require_signatur
518
  all_events, _corrupt = _read_jsonl_locked(log_path)
519
  except Timeout:
520
  return "FAIL", False, "Busy: log is locked (try again)."
 
521
  evs = events_for_session(all_events, session_id)
522
- evs.sort(key=lambda x: int(x.get("seq", 0)))
523
  return verify_session_from_events(evs, session_id, pk_hex=pk_hex, require_signatures=require_signatures)
524
 
525
 
@@ -650,7 +677,6 @@ def export_session_bundle(log_path: str, session_id: str) -> Tuple[Optional[str]
650
  return None, "No events found."
651
 
652
  evs.sort(key=lambda x: int(x.get("seq", 0)))
653
-
654
  status, ok, report = verify_session_from_events(evs, sid, pk_hex="", require_signatures=False)
655
 
656
  zip_name = f"rft_flight_bundle_{sid}.zip"
@@ -681,19 +707,48 @@ def export_session_bundle(log_path: str, session_id: str) -> Tuple[Optional[str]
681
  # Import bundle (third-party verification)
682
  # ============================================================
683
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
684
  def _read_jsonl_from_zip(z: zipfile.ZipFile, member: str) -> List[Dict[str, Any]]:
685
  out: List[Dict[str, Any]] = []
686
  raw_text = z.read(member).decode("utf-8", errors="replace")
 
687
  for raw in raw_text.splitlines():
688
  raw = raw.strip()
689
  if not raw:
690
  continue
 
 
 
 
 
691
  try:
692
  obj = json.loads(raw)
693
  if isinstance(obj, dict):
694
  out.append(obj)
695
  except Exception:
696
  continue
 
 
 
 
697
  return out
698
 
699
 
@@ -707,9 +762,17 @@ def import_bundle_verify(
707
  if not bundle_path or not os.path.exists(bundle_path):
708
  return "Missing bundle file.", False, "", None
709
 
 
 
 
 
 
 
 
710
  try:
711
  with zipfile.ZipFile(bundle_path, "r") as z:
712
  members = z.namelist()
 
713
  events_member = None
714
  for m in members:
715
  if m.endswith("_events.jsonl"):
@@ -723,11 +786,22 @@ def import_bundle_verify(
723
  if not events_member:
724
  return "No .jsonl events file found in bundle.", False, "", None
725
 
 
 
 
 
726
  evs = _read_jsonl_from_zip(z, events_member)
727
 
728
  except Exception:
729
  return "Failed to read bundle (invalid zip?).", False, "", None
730
 
 
 
 
 
 
 
 
731
  sid = ""
732
  for e in evs:
733
  if e.get("spec") == EVENT_SPEC and e.get("session_id"):
@@ -744,11 +818,18 @@ def import_bundle_verify(
744
  )
745
 
746
  if store_into_log and ok:
747
- # Store under a single lock to avoid interleaving with live writers.
748
  try:
749
  with _lock_for(log_path):
 
 
 
750
  for e in evs:
751
- append_jsonl(log_path, e)
 
 
 
 
752
  except Timeout:
753
  return "FAIL", False, "Busy: log is locked (try again).", None
754
 
 
19
  ROOT_SPEC = "rft-flight-session-root-v0"
20
  DEFAULT_LOG_PATH = "flightlog.jsonl"
21
 
22
+ # ---------- Locking ----------
23
+ LOCK_TIMEOUT_S = 10 # short on purpose; UI should not hang
24
+
25
+ # ---------- Public abuse limits (Import Bundle) ----------
26
+ MAX_BUNDLE_BYTES = 15 * 1024 * 1024 # 15 MB zip upload cap
27
+ MAX_ZIP_MEMBER_BYTES = 25 * 1024 * 1024 # 25 MB decompressed member cap
28
+ MAX_EVENTS_PER_BUNDLE = 50_000
29
+ MAX_EVENT_LINE_BYTES = 200_000 # 200 KB per JSONL line (prevents huge single lines)
30
+ MAX_ZIP_RATIO = 200.0 # zip-bomb heuristic (file_size / compress_size)
31
 
32
 
33
  # ============================================================
 
62
  # ============================================================
63
 
64
  def _lock_for(path: str) -> FileLock:
 
65
  return FileLock(path + ".lock", timeout=LOCK_TIMEOUT_S)
66
 
67
 
 
182
  e for e in all_events
183
  if isinstance(e, dict)
184
  and e.get("session_id") == sid
 
185
  and e.get("spec") == EVENT_SPEC
186
+ and "event_hash_sha256" in e
187
  ]
188
 
189
 
190
  def last_event_for_session(all_events: List[Dict[str, Any]], session_id: str) -> Optional[Dict[str, Any]]:
191
  evs = events_for_session(all_events, session_id)
192
+ if not evs:
193
+ return None
194
+ evs.sort(key=lambda x: int(x.get("seq", 0)))
195
+ return evs[-1]
196
 
197
 
198
  def next_seq(all_events: List[Dict[str, Any]], session_id: str) -> int:
 
319
  return None, "Missing session_id."
320
 
321
  try:
322
+ # Lock the entire finalise operation so nothing can append between:
323
+ # anchor computation -> session_end append.
324
  with _lock_for(log_path):
325
  all_events, _corrupt = read_jsonl(log_path)
326
  evs = events_for_session(all_events, sid)
327
  if not evs:
328
  return None, "No events found for this session."
329
 
 
330
  evs.sort(key=lambda x: int(x.get("seq", 0)))
331
 
332
+ # Anchor must NOT include the session_end event itself (avoids circularity).
333
  first_hash = evs[0]["event_hash_sha256"]
334
  last_hash = evs[-1]["event_hash_sha256"]
335
  count = len(evs)
 
359
  except Exception:
360
  return None, "Failed to sign anchor (invalid private key?)."
361
 
362
+ # Append session_end event (manually, inside the same lock)
363
  payload_text = json.dumps({"anchor": anchor}, ensure_ascii=False)
 
 
 
364
  meta = {
365
  "model_id": (model_id or "unknown").strip(),
366
  "run_mode": (run_mode or "unknown").strip(),
367
  }
368
+
369
  seq = int(evs[-1].get("seq", 0)) + 1
370
  prev_hash = evs[-1]["event_hash_sha256"]
371
  parent = prev_hash
 
384
  event["event_hash_sha256"] = event_hash
385
 
386
  if sign_anchor:
 
 
387
  try:
388
  sk = load_sk(sk_hex)
389
  sig = sk.sign(bytes.fromhex(event_hash))
 
393
 
394
  append_jsonl(log_path, event)
395
 
396
+ return anchor, f"OK. Session finalised. Root hash: {root_hash} (session_end hash: {event_hash})"
397
 
398
  except Timeout:
399
  return None, "Busy: log is locked (try again)."
 
405
  # Verification
406
  # ============================================================
407
 
408
+ def _anchor_expected_from_events(evs_sorted: List[Dict[str, Any]], session_end_index: int) -> str:
409
+ """
410
+ Compute the expected anchor root_hash for the chain *before* the session_end event at session_end_index.
411
+ This avoids circularity (session_end payload contains anchor).
412
+ """
413
+ chain = evs_sorted[:session_end_index] # exclude the session_end itself
414
+ first_hash = chain[0]["event_hash_sha256"]
415
+ last_hash = chain[-1]["event_hash_sha256"]
416
+ count = len(chain)
417
+
418
+ root_core = {
419
+ "spec": ROOT_SPEC,
420
+ "session_id": chain[0]["session_id"],
421
+ "first_event_hash_sha256": first_hash,
422
+ "last_event_hash_sha256": last_hash,
423
+ "event_count": count,
424
+ }
425
+ return sha256_hex(canon(root_core))
426
+
427
+
428
  def verify_session_from_events(
429
  evs: List[Dict[str, Any]],
430
  session_id: str,
 
450
  except Exception:
451
  return "Invalid public key.", False, ""
452
 
453
+ # Ensure order by seq
454
+ evs = [e for e in evs if isinstance(e, dict)]
455
+ evs.sort(key=lambda x: int(x.get("seq", 0)))
456
+
457
  expected_prev = "0" * 64
458
  expected_seq = 1
459
 
 
469
  report.append(f"[FAIL] Bad spec at seq {e.get('seq')}.")
470
  continue
471
 
472
+ if e.get("session_id") != sid:
473
+ ok = False
474
+ report.append(f"[FAIL] session_id mismatch at seq {e.get('seq')}.")
475
+ continue
476
+
477
  if int(e.get("seq", -1)) != expected_seq:
478
  ok = False
479
  report.append(f"[FAIL] Seq mismatch: got {e.get('seq')} expected {expected_seq}.")
 
515
 
516
  expected_prev = e["event_hash_sha256"]
517
 
518
+ # Anchor check: validate the LAST session_end event (if present)
519
+ end_idxs = [i for i, e in enumerate(evs) if e.get("event_type") == "session_end"]
520
+ if end_idxs:
521
+ se_idx = end_idxs[-1]
522
+ se = evs[se_idx]
523
  anchor = (se.get("payload") or {}).get("anchor")
 
 
 
 
524
 
525
+ if isinstance(anchor, dict) and se_idx > 0:
526
+ expected_root = _anchor_expected_from_events(evs, se_idx)
527
+ if anchor.get("root_hash_sha256") != expected_root:
 
 
 
 
 
 
528
  ok = False
529
+ report.append("[FAIL] Session anchor root hash does not match the pre-session_end event chain.")
530
  else:
531
+ report.append("[OK] Session anchor matches the pre-session_end event chain.")
532
  else:
533
  report.append("[WARN] session_end found but anchor payload is missing/invalid.")
534
 
 
545
  all_events, _corrupt = _read_jsonl_locked(log_path)
546
  except Timeout:
547
  return "FAIL", False, "Busy: log is locked (try again)."
548
+
549
  evs = events_for_session(all_events, session_id)
 
550
  return verify_session_from_events(evs, session_id, pk_hex=pk_hex, require_signatures=require_signatures)
551
 
552
 
 
677
  return None, "No events found."
678
 
679
  evs.sort(key=lambda x: int(x.get("seq", 0)))
 
680
  status, ok, report = verify_session_from_events(evs, sid, pk_hex="", require_signatures=False)
681
 
682
  zip_name = f"rft_flight_bundle_{sid}.zip"
 
707
  # Import bundle (third-party verification)
708
  # ============================================================
709
 
710
+ def _zip_member_safe(z: zipfile.ZipFile, member: str) -> Tuple[bool, str]:
711
+ try:
712
+ info = z.getinfo(member)
713
+ except Exception:
714
+ return False, "Missing events member in zip."
715
+
716
+ # Decompressed size cap
717
+ if info.file_size > MAX_ZIP_MEMBER_BYTES:
718
+ return False, "Events file too large (decompressed)."
719
+
720
+ # Zip-bomb heuristic: insane compression ratio
721
+ if info.compress_size and info.compress_size > 0:
722
+ ratio = float(info.file_size) / float(info.compress_size)
723
+ if ratio > MAX_ZIP_RATIO:
724
+ return False, "Suspicious zip compression ratio (possible zip bomb)."
725
+
726
+ return True, "OK"
727
+
728
+
729
  def _read_jsonl_from_zip(z: zipfile.ZipFile, member: str) -> List[Dict[str, Any]]:
730
  out: List[Dict[str, Any]] = []
731
  raw_text = z.read(member).decode("utf-8", errors="replace")
732
+
733
  for raw in raw_text.splitlines():
734
  raw = raw.strip()
735
  if not raw:
736
  continue
737
+
738
+ # single-line cap
739
+ if len(raw.encode("utf-8", errors="ignore")) > MAX_EVENT_LINE_BYTES:
740
+ continue
741
+
742
  try:
743
  obj = json.loads(raw)
744
  if isinstance(obj, dict):
745
  out.append(obj)
746
  except Exception:
747
  continue
748
+
749
+ if len(out) > MAX_EVENTS_PER_BUNDLE:
750
+ break
751
+
752
  return out
753
 
754
 
 
762
  if not bundle_path or not os.path.exists(bundle_path):
763
  return "Missing bundle file.", False, "", None
764
 
765
+ # zip upload cap
766
+ try:
767
+ if os.path.getsize(bundle_path) > MAX_BUNDLE_BYTES:
768
+ return "FAIL", False, "Bundle too large.", None
769
+ except Exception:
770
+ return "FAIL", False, "Unable to read bundle size.", None
771
+
772
  try:
773
  with zipfile.ZipFile(bundle_path, "r") as z:
774
  members = z.namelist()
775
+
776
  events_member = None
777
  for m in members:
778
  if m.endswith("_events.jsonl"):
 
786
  if not events_member:
787
  return "No .jsonl events file found in bundle.", False, "", None
788
 
789
+ safe, why = _zip_member_safe(z, events_member)
790
+ if not safe:
791
+ return "FAIL", False, why, None
792
+
793
  evs = _read_jsonl_from_zip(z, events_member)
794
 
795
  except Exception:
796
  return "Failed to read bundle (invalid zip?).", False, "", None
797
 
798
+ if not evs:
799
+ return "Bundle contains no usable events.", False, "", None
800
+
801
+ if len(evs) > MAX_EVENTS_PER_BUNDLE:
802
+ return "FAIL", False, "Too many events in bundle.", None
803
+
804
+ # Determine session_id
805
  sid = ""
806
  for e in evs:
807
  if e.get("spec") == EVENT_SPEC and e.get("session_id"):
 
818
  )
819
 
820
  if store_into_log and ok:
821
+ # Store under a lock + dedupe by event_hash_sha256 to avoid log pollution
822
  try:
823
  with _lock_for(log_path):
824
+ existing, _ = read_jsonl(log_path)
825
+ seen = {e.get("event_hash_sha256") for e in existing if isinstance(e, dict)}
826
+
827
  for e in evs:
828
+ h = e.get("event_hash_sha256")
829
+ if h and h not in seen:
830
+ append_jsonl(log_path, e)
831
+ seen.add(h)
832
+
833
  except Timeout:
834
  return "FAIL", False, "Busy: log is locked (try again).", None
835