Spaces:
Running
Running
| import os | |
| import re | |
| import io | |
| import json | |
| import time | |
| import uuid | |
| import zipfile | |
| import sqlite3 | |
| import hashlib | |
| from dataclasses import dataclass | |
| from typing import List, Dict, Any, Optional, Tuple | |
| import gradio as gr | |
| BASE_DIR = os.environ.get("RFT_MEM_BASE", "var/rftmem") | |
| os.makedirs(BASE_DIR, exist_ok=True) | |
| def sha256_str(s: str) -> str: | |
| return hashlib.sha256(s.encode("utf-8")).hexdigest() | |
| def now_ms() -> int: | |
| return int(time.time() * 1000) | |
| def atomic_write(path: str, data: bytes) -> None: | |
| tmp = path + ".tmp" | |
| with open(tmp, "wb") as f: | |
| f.write(data) | |
| f.flush() | |
| os.fsync(f.fileno()) | |
| os.replace(tmp, path) | |
| def safe_fts_match(user_query: str) -> str: | |
| words = re.findall(r"[A-Za-z0-9_]+", (user_query or "").lower()) | |
| if not words: | |
| return "___NO_HITS___" | |
| seen = set() | |
| uniq = [] | |
| for w in words: | |
| if w not in seen: | |
| seen.add(w) | |
| uniq.append(w) | |
| return " OR ".join(uniq) | |
| class RetrievalHit: | |
| event_id: str | |
| seq: int | |
| role: str | |
| text: str | |
| ts_ms: int | |
| digest: str | |
| chain_hash: str | |
| score: float | |
| class RFTMemoryStore: | |
| """ | |
| Append-only ledger + SQLite FTS retrieval + hash-chained integrity. | |
| Produces per-turn receipts that can be verified against stored events. | |
| """ | |
| def __init__(self, base_dir: str): | |
| self.base_dir = base_dir | |
| self.db_path = os.path.join(base_dir, "index.sqlite") | |
| self._init_db() | |
| def _connect(self) -> sqlite3.Connection: | |
| return sqlite3.connect(self.db_path) | |
| def _init_db(self): | |
| os.makedirs(self.base_dir, exist_ok=True) | |
| con = self._connect() | |
| cur = con.cursor() | |
| cur.execute(""" | |
| CREATE TABLE IF NOT EXISTS events ( | |
| session_id TEXT, | |
| event_id TEXT PRIMARY KEY, | |
| seq INTEGER, | |
| ts_ms INTEGER, | |
| role TEXT, | |
| text TEXT, | |
| digest TEXT, | |
| prev_hash TEXT, | |
| chain_hash TEXT, | |
| collapse REAL | |
| ) | |
| """) | |
| cur.execute(""" | |
| CREATE TABLE IF NOT EXISTS receipts ( | |
| receipt_id TEXT PRIMARY KEY, | |
| session_id TEXT, | |
| ts_ms INTEGER, | |
| prompt_hash TEXT, | |
| response_hash TEXT, | |
| receipt_path TEXT | |
| ) | |
| """) | |
| # Ensure join-safe FTS5 (stored content) | |
| cur.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name='events_fts'") | |
| row = cur.fetchone() | |
| needs_rebuild = False | |
| if row is None: | |
| needs_rebuild = True | |
| else: | |
| sql = (row[0] or "").lower() | |
| if "content=''" in sql or 'content=""' in sql: | |
| needs_rebuild = True | |
| if needs_rebuild: | |
| cur.execute("DROP TABLE IF EXISTS events_fts") | |
| cur.execute(""" | |
| CREATE VIRTUAL TABLE events_fts USING fts5( | |
| event_id, | |
| session_id, | |
| text | |
| ) | |
| """) | |
| con.commit() | |
| cur.execute("DELETE FROM events_fts") | |
| cur.execute(""" | |
| INSERT INTO events_fts(event_id, session_id, text) | |
| SELECT event_id, session_id, text FROM events | |
| """) | |
| con.commit() | |
| con.close() | |
| # Filesystem | |
| def session_dir(self, session_id: str) -> str: | |
| d = os.path.join(self.base_dir, "sessions", session_id) | |
| os.makedirs(d, exist_ok=True) | |
| return d | |
| def session_log_path(self, session_id: str) -> str: | |
| return os.path.join(self.session_dir(session_id), "events.jsonl") | |
| def receipts_dir(self, session_id: str) -> str: | |
| d = os.path.join(self.session_dir(session_id), "receipts") | |
| os.makedirs(d, exist_ok=True) | |
| return d | |
| def exports_dir(self, session_id: str) -> str: | |
| d = os.path.join(self.session_dir(session_id), "exports") | |
| os.makedirs(d, exist_ok=True) | |
| return d | |
| # Ledger ops | |
| def get_events(self, session_id: str, limit: int = 600) -> List[Dict[str, Any]]: | |
| con = self._connect() | |
| cur = con.cursor() | |
| cur.execute(""" | |
| SELECT event_id, seq, ts_ms, role, text, digest, prev_hash, chain_hash, collapse | |
| FROM events | |
| WHERE session_id=? | |
| ORDER BY seq ASC | |
| LIMIT ? | |
| """, (session_id, int(limit))) | |
| rows = cur.fetchall() | |
| con.close() | |
| out = [] | |
| for r in rows: | |
| out.append({ | |
| "event_id": r[0], | |
| "seq": int(r[1] or 0), | |
| "ts_ms": r[2], | |
| "role": r[3], | |
| "text": r[4], | |
| "digest": r[5], | |
| "prev_hash": r[6], | |
| "chain_hash": r[7], | |
| "collapse": float(r[8] or 0.0), | |
| }) | |
| return out | |
| def _get_last_seq_and_chain(self, session_id: str) -> Tuple[int, str]: | |
| con = self._connect() | |
| cur = con.cursor() | |
| cur.execute("SELECT COALESCE(MAX(seq), 0) FROM events WHERE session_id=?", (session_id,)) | |
| last_seq = int(cur.fetchone()[0] or 0) | |
| cur.execute(""" | |
| SELECT chain_hash FROM events | |
| WHERE session_id=? | |
| ORDER BY seq DESC | |
| LIMIT 1 | |
| """, (session_id,)) | |
| row = cur.fetchone() | |
| con.close() | |
| last_chain = row[0] if row and row[0] else ("0" * 64) | |
| return last_seq, last_chain | |
| def collapse_score(self, session_id: str, role: str, text: str) -> float: | |
| role_w = {"user": 1.0, "tool": 0.9, "assistant": 0.6}.get(role, 0.7) | |
| tokens = set(t.lower() for t in re.findall(r"[A-Za-z0-9_]+", text or "")) | |
| if not tokens: | |
| return 0.0 | |
| recent = self.get_events(session_id, limit=20) | |
| recent_tokens = set() | |
| for e in recent: | |
| recent_tokens |= set(t.lower() for t in re.findall(r"[A-Za-z0-9_]+", e["text"] or "")) | |
| unseen = len(tokens - recent_tokens) | |
| novelty = unseen / max(1, len(tokens)) | |
| length_factor = min(1.0, len(tokens) / 30.0) | |
| score = role_w * (0.65 * novelty + 0.35 * length_factor) | |
| return float(max(0.0, min(1.0, score))) | |
| def append_event(self, session_id: str, role: str, text: str) -> Dict[str, Any]: | |
| event_id = uuid.uuid4().hex | |
| ts = now_ms() | |
| last_seq, prev_chain = self._get_last_seq_and_chain(session_id) | |
| seq = last_seq + 1 | |
| payload = { | |
| "session_id": session_id, | |
| "event_id": event_id, | |
| "seq": seq, | |
| "ts_ms": ts, | |
| "role": role, | |
| "text": text | |
| } | |
| digest = sha256_str(json.dumps(payload, sort_keys=True, ensure_ascii=False)) | |
| chain_hash = sha256_str(prev_chain + digest) | |
| collapse = self.collapse_score(session_id, role, text) | |
| rec = {**payload, "digest": digest, "prev_hash": prev_chain, "chain_hash": chain_hash, "collapse": collapse} | |
| # JSONL source of truth | |
| log_path = self.session_log_path(session_id) | |
| line = (json.dumps(rec, ensure_ascii=False) + "\n").encode("utf-8") | |
| with open(log_path, "ab") as f: | |
| f.write(line) | |
| f.flush() | |
| os.fsync(f.fileno()) | |
| # Index | |
| con = self._connect() | |
| cur = con.cursor() | |
| cur.execute(""" | |
| INSERT INTO events(session_id,event_id,seq,ts_ms,role,text,digest,prev_hash,chain_hash,collapse) | |
| VALUES(?,?,?,?,?,?,?,?,?,?) | |
| """, (session_id, event_id, seq, ts, role, text, digest, prev_chain, chain_hash, collapse)) | |
| cur.execute("INSERT INTO events_fts(event_id, session_id, text) VALUES(?,?,?)", (event_id, session_id, text)) | |
| con.commit() | |
| con.close() | |
| return rec | |
| # Retrieval | |
| def search_lexical(self, session_id: str, query: str, k: int = 8) -> List[RetrievalHit]: | |
| match = safe_fts_match(query) | |
| if match == "___NO_HITS___": | |
| return [] | |
| con = self._connect() | |
| cur = con.cursor() | |
| cur.execute(""" | |
| SELECT e.event_id, e.seq, e.role, e.text, e.ts_ms, e.digest, e.chain_hash, | |
| bm25(events_fts) as rank | |
| FROM events_fts | |
| JOIN events e ON e.event_id = events_fts.event_id | |
| WHERE events_fts.text MATCH ? AND e.session_id=? | |
| ORDER BY rank ASC | |
| LIMIT ? | |
| """, (match, session_id, int(k))) | |
| rows = cur.fetchall() | |
| con.close() | |
| hits: List[RetrievalHit] = [] | |
| for (eid, seq, role, text, ts, digest, chain_hash, rank) in rows: | |
| r = float(rank if rank is not None else 0.0) | |
| score = 1.0 / (1.0 + max(0.0, r)) | |
| hits.append(RetrievalHit(eid, int(seq or 0), role, text, ts, digest, chain_hash, score)) | |
| # UX fallback (if someone searches a single token that FTS doesn't match as expected) | |
| if not hits: | |
| tokens = re.findall(r"[A-Za-z0-9_]+", (query or "").lower()) | |
| if tokens: | |
| needle = f"%{tokens[-1]}%" | |
| con = self._connect() | |
| cur = con.cursor() | |
| cur.execute(""" | |
| SELECT event_id, seq, role, text, ts_ms, digest, chain_hash | |
| FROM events | |
| WHERE session_id=? AND LOWER(text) LIKE ? | |
| ORDER BY seq DESC | |
| LIMIT ? | |
| """, (session_id, needle, int(k))) | |
| rows2 = cur.fetchall() | |
| con.close() | |
| for (eid, seq, role, text, ts, digest, chain_hash) in rows2: | |
| hits.append(RetrievalHit(eid, int(seq or 0), role, text, ts, digest, chain_hash, 0.001)) | |
| return hits | |
| # Receipts | |
| def write_receipt(self, session_id: str, user_text: str, retrieved: List[RetrievalHit], prompt: str, response: str) -> str: | |
| receipt_id = uuid.uuid4().hex | |
| ts = now_ms() | |
| receipt = { | |
| "receipt_id": receipt_id, | |
| "session_id": session_id, | |
| "ts_ms": ts, | |
| "query": user_text, | |
| "retrieval": [{ | |
| "event_id": h.event_id, | |
| "seq": h.seq, | |
| "role": h.role, | |
| "content": h.text, | |
| "score": h.score, | |
| "digest": h.digest, | |
| "chain_hash": h.chain_hash | |
| } for h in retrieved], | |
| "prompt_hash": sha256_str(prompt), | |
| "response_hash": sha256_str(response), | |
| "engine": {"name": "RFTSystems TrustStack", "version": "1.0", "method": "ledger + fts + receipts + guardrails"} | |
| } | |
| path = os.path.join(self.receipts_dir(session_id), f"{receipt_id}.json") | |
| atomic_write(path, json.dumps(receipt, indent=2, ensure_ascii=False).encode("utf-8")) | |
| con = self._connect() | |
| cur = con.cursor() | |
| cur.execute(""" | |
| INSERT INTO receipts(receipt_id, session_id, ts_ms, prompt_hash, response_hash, receipt_path) | |
| VALUES(?,?,?,?,?,?) | |
| """, (receipt_id, session_id, ts, receipt["prompt_hash"], receipt["response_hash"], path)) | |
| con.commit() | |
| con.close() | |
| return path | |
| def verify_receipt(self, receipt_json: Dict[str, Any]) -> Tuple[bool, str]: | |
| session_id = receipt_json.get("session_id") | |
| if not session_id: | |
| return False, "Missing session_id." | |
| con = self._connect() | |
| cur = con.cursor() | |
| for item in receipt_json.get("retrieval", []): | |
| eid = item.get("event_id") | |
| expected_digest = item.get("digest") | |
| expected_chain = item.get("chain_hash") | |
| cur.execute("SELECT digest, chain_hash FROM events WHERE event_id=? AND session_id=?", (eid, session_id)) | |
| row = cur.fetchone() | |
| if not row: | |
| con.close() | |
| return False, f"Event not found: {eid}" | |
| if row[0] != expected_digest: | |
| con.close() | |
| return False, f"Digest mismatch: {eid}" | |
| if row[1] != expected_chain: | |
| con.close() | |
| return False, f"Chain hash mismatch: {eid}" | |
| con.close() | |
| return True, "Receipt verified: all referenced events exist and hashes match." | |
| # Trace export (OTel-style JSON, but self-contained) | |
| def export_trace(self, session_id: str, receipt_path: str) -> str: | |
| export_id = uuid.uuid4().hex | |
| ts = now_ms() | |
| with open(receipt_path, "r", encoding="utf-8") as f: | |
| receipt = json.load(f) | |
| trace = { | |
| "trace_id": sha256_str(session_id + receipt.get("receipt_id", "") + str(ts)), | |
| "session_id": session_id, | |
| "ts_ms": ts, | |
| "spans": [ | |
| {"span": "user.turn", "attrs": {"query": receipt.get("query", ""), "receipt_id": receipt.get("receipt_id", "")}}, | |
| {"span": "memory.retrieve", "attrs": {"k": len(receipt.get("retrieval", [])), "retrieval": receipt.get("retrieval", [])}}, | |
| {"span": "assistant.respond", "attrs": {"prompt_hash": receipt.get("prompt_hash", ""), "response_hash": receipt.get("response_hash", "")}}, | |
| {"span": "receipt.write", "attrs": {"receipt_id": receipt.get("receipt_id", ""), "receipt_path": receipt_path}}, | |
| ] | |
| } | |
| out_path = os.path.join(self.exports_dir(session_id), f"trace_{export_id}.json") | |
| atomic_write(out_path, json.dumps(trace, indent=2, ensure_ascii=False).encode("utf-8")) | |
| return out_path | |
| # Audit pack export | |
| def export_audit_pack(self, session_id: str) -> str: | |
| export_id = uuid.uuid4().hex | |
| out_zip = os.path.join(self.exports_dir(session_id), f"audit_pack_{export_id}.zip") | |
| events = self.get_events(session_id, limit=5000) | |
| receipts_dir = self.receipts_dir(session_id) | |
| ledger_path = self.session_log_path(session_id) | |
| summary = { | |
| "session_id": session_id, | |
| "export_id": export_id, | |
| "ts_ms": now_ms(), | |
| "events_count": len(events), | |
| "last_chain_hash": (events[-1]["chain_hash"] if events else "0"*64), | |
| "integrity": "hash-chained ledger + receipt verification", | |
| } | |
| with zipfile.ZipFile(out_zip, "w", compression=zipfile.ZIP_DEFLATED) as z: | |
| # include JSONL ledger if present | |
| if os.path.exists(ledger_path): | |
| z.write(ledger_path, arcname="ledger/events.jsonl") | |
| # include receipts | |
| for fn in os.listdir(receipts_dir): | |
| if fn.endswith(".json"): | |
| z.write(os.path.join(receipts_dir, fn), arcname=f"receipts/{fn}") | |
| # include summary | |
| z.writestr("summary.json", json.dumps(summary, indent=2, ensure_ascii=False)) | |
| return out_zip | |
| store = RFTMemoryStore(BASE_DIR) | |
| INVESTOR_SCRIPT = [ | |
| "Store these exactly: Dog=Nova, City=Manchester, Drink=Pepsi Max.", | |
| "What is my dog's name?", | |
| "What city did I say?", | |
| "My drink is Coke Zero now. This overrides earlier.", | |
| "What is my favourite drink?", | |
| "Search for: Nova", | |
| ] | |
| EXAMPLE_PROMPTS = [ | |
| "Store this: Dog=Nova, City=Manchester, Drink=Pepsi Max.", | |
| "What is my dog's name?", | |
| "What city did I say?", | |
| "My drink is Coke Zero now. This overrides earlier.", | |
| "What is my favourite drink?", | |
| "Search for: Nova", | |
| "Search for: Manchester", | |
| ] | |
| def new_session_id() -> str: | |
| return uuid.uuid4().hex | |
| def events_to_messages(events: List[Dict[str, Any]]) -> List[Dict[str, str]]: | |
| return [{"role": e["role"], "content": e["text"]} for e in events if e["role"] in ("user", "assistant")] | |
| def format_ledger(events: List[Dict[str, Any]]) -> str: | |
| lines = [] | |
| for e in events[-250:]: | |
| t = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(e["ts_ms"] / 1000)) | |
| lines.append( | |
| f"{t} | seq={e['seq']} | {e['role']}\n" | |
| f"{e['text']}\n" | |
| f"event_id={e['event_id']} collapse={e['collapse']:.2f}\n" | |
| f"digest={e['digest']}\n" | |
| f"chain={e['chain_hash']}\n" | |
| f"{'-'*72}" | |
| ) | |
| return "\n".join(lines) | |
| def build_prompt(user_msg: str, hits: List[RetrievalHit]) -> str: | |
| memories = "\n".join([f"- ({h.role}) {h.text}" for h in hits]) if hits else "(none)" | |
| return ( | |
| "SYSTEM: Use retrieved memory slices if relevant. Prefer exact stored facts.\n" | |
| f"RETRIEVED MEMORIES:\n{memories}\n\n" | |
| f"USER:\n{user_msg}\n" | |
| ) | |
| def extract_fact_from_hits(hits: List[RetrievalHit], key: str) -> Optional[str]: | |
| key_l = key.lower() | |
| patterns = [ | |
| rf"\b{re.escape(key_l)}\b\s*=\s*([A-Za-z0-9 _\-']+)", | |
| rf"\b{re.escape(key_l)}\b\s*:\s*([A-Za-z0-9 _\-']+)", | |
| ] | |
| for h in hits: | |
| tl = (h.text or "").strip().lower() | |
| if key_l == "name": | |
| m = re.search(r"\bmy name is\b\s+([A-Za-z0-9 _\-']+)", tl) | |
| if m: | |
| return m.group(1).strip().title() | |
| for p in patterns: | |
| m = re.search(p, tl) | |
| if m: | |
| return m.group(1).strip().strip(",.") | |
| return None | |
| def answer_from_memory(user_msg: str, hits: List[RetrievalHit]) -> str: | |
| q = (user_msg or "").lower() | |
| if q.startswith("search for") or q.startswith("search:"): | |
| if not hits: | |
| return "No matching memory slices were retrieved for this search." | |
| return "Search hits:\n" + "\n".join([f"- {h.score:.4f} | {h.role} | {h.text}" for h in hits]) | |
| if "dog" in q and "name" in q: | |
| v = extract_fact_from_hits(hits, "dog") | |
| return f"Your dog’s name (from stored memory) is: {v}" if v else "I didn’t retrieve a stored dog name for this query." | |
| if "city" in q: | |
| v = extract_fact_from_hits(hits, "city") | |
| return f"Your city (from stored memory) is: {v}" if v else "I didn’t retrieve a stored city for this query." | |
| if "drink" in q: | |
| v = extract_fact_from_hits(hits, "drink") | |
| return f"Your drink (from stored memory) is: {v}" if v else "I didn’t retrieve a stored drink for this query." | |
| if not hits: | |
| return "No matching memory slices were retrieved for this query." | |
| return "Retrieved memory slices:\n" + "\n".join([f"- {h.score:.4f} | {h.role} | {h.text}" for h in hits]) | |
| def chat_turn(session_id: str, user_msg: str, retrieval_k: int): | |
| if not session_id: | |
| session_id = new_session_id() | |
| store.append_event(session_id, "user", user_msg) | |
| hits = store.search_lexical(session_id, user_msg, k=int(retrieval_k)) | |
| prompt = build_prompt(user_msg, hits) | |
| response = answer_from_memory(user_msg, hits) | |
| store.append_event(session_id, "assistant", response) | |
| receipt_path = store.write_receipt(session_id, user_msg, hits, prompt, response) | |
| events = store.get_events(session_id, limit=1200) | |
| ledger = format_ledger(events) | |
| retrieved_view = "\n".join([f"{h.score:.4f} | {h.role} | {h.text}" for h in hits]) if hits else "(none)" | |
| messages = events_to_messages(events) | |
| # Export trace for the latest receipt | |
| trace_path = store.export_trace(session_id, receipt_path) | |
| return session_id, messages, retrieved_view, ledger, receipt_path, receipt_path, trace_path, trace_path | |
| def run_investor_demo(session_id: str, retrieval_k: int): | |
| if not session_id: | |
| session_id = new_session_id() | |
| last = (session_id, [], "", "", "", None, "", None, "", None) | |
| for step in INVESTOR_SCRIPT: | |
| last = chat_turn(session_id, step, retrieval_k) | |
| session_id = last[0] | |
| return last | |
| def verify_receipt_upload(file_obj) -> str: | |
| if file_obj is None: | |
| return "Upload a receipt JSON file." | |
| with open(file_obj.name, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| ok, msg = store.verify_receipt(data) | |
| return f"{'✅' if ok else '❌'} {msg}" | |
| def guardrail_tool_call(receipt_file, requested_action: str) -> str: | |
| if receipt_file is None: | |
| return "❌ Blocked: no receipt provided." | |
| with open(receipt_file.name, "r", encoding="utf-8") as f: | |
| receipt = json.load(f) | |
| ok, msg = store.verify_receipt(receipt) | |
| if not ok: | |
| return f"❌ Blocked: receipt failed verification. Reason: {msg}" | |
| # Allowed actions are deliberately boring in the demo. | |
| # The point is the gate, not the tool. | |
| return f"✅ Allowed: receipt verified. Executed action: {requested_action}" | |
| def export_audit_pack(session_id: str): | |
| if not session_id: | |
| session_id = new_session_id() | |
| path = store.export_audit_pack(session_id) | |
| return path, path | |
| def api_playground(session_id: str, action: str, payload_json: str, retrieval_k: int): | |
| if not session_id: | |
| session_id = new_session_id() | |
| payload = {} | |
| if payload_json and payload_json.strip(): | |
| payload = json.loads(payload_json) | |
| if action == "memory.write": | |
| role = payload.get("role", "user") | |
| text = payload.get("text", "") | |
| ev = store.append_event(session_id, role, text) | |
| return session_id, json.dumps({"ok": True, "event": ev}, indent=2, ensure_ascii=False) | |
| if action == "memory.search": | |
| q = payload.get("query", "") | |
| k = int(payload.get("k", retrieval_k)) | |
| hits = store.search_lexical(session_id, q, k=k) | |
| out = { | |
| "ok": True, | |
| "hits": [{ | |
| "event_id": h.event_id, "seq": h.seq, "role": h.role, "text": h.text, | |
| "score": h.score, "digest": h.digest, "chain_hash": h.chain_hash | |
| } for h in hits] | |
| } | |
| return session_id, json.dumps(out, indent=2, ensure_ascii=False) | |
| if action == "receipt.verify": | |
| receipt = payload.get("receipt", {}) | |
| ok, msg = store.verify_receipt(receipt) | |
| return session_id, json.dumps({"ok": ok, "message": msg}, indent=2, ensure_ascii=False) | |
| if action == "audit.export": | |
| path = store.export_audit_pack(session_id) | |
| return session_id, json.dumps({"ok": True, "audit_pack_path": path}, indent=2, ensure_ascii=False) | |
| return session_id, json.dumps({"ok": False, "error": "Unknown action"}, indent=2, ensure_ascii=False) | |
| def token_savings(n: int, B: int, M: int, W: int) -> List[List[Any]]: | |
| n = int(n) | |
| B = int(B) | |
| M = int(M) | |
| W = int(W) | |
| baseline = n * B + M * n * (n + 1) // 2 | |
| aifs = n * (B + W) | |
| red = 0.0 if baseline == 0 else (1.0 - (aifs / baseline)) | |
| return [["turns", n], | |
| ["baseline_total_tokens", baseline], | |
| ["budgeted_total_tokens", aifs], | |
| ["reduction_percent", round(100.0 * red, 2)]] | |
| def fill_example(selected: str) -> str: | |
| return selected or "" | |
| PITCH_MD = """ | |
| # RFTSystems TrustStack Console | |
| I don’t do “trust me”. I do receipts. | |
| This Space demonstrates an agent memory layer that is **durable, searchable, and provable**: | |
| - Append-only session ledger (JSONL) | |
| - SQLite FTS retrieval (fast lexical recall) | |
| - Hash-chain integrity (tamper-evident) | |
| - Per-turn “Memory Receipt” (what was retrieved + hashes) | |
| - Receipt verification (pass/fail) | |
| - Receipt-gated tool execution (guardrails) | |
| - Trace export (what influenced what) | |
| - Audit pack export (ZIP) | |
| If you build agents for real users, receipts are what turns “memory” into infrastructure. | |
| """ | |
| HOW_TO_MD = """ | |
| ## Quick demo | |
| 1) Click **Run Investor Demo** | |
| 2) Download the last receipt JSON | |
| 3) Upload it into **Verify Receipt** | |
| 4) Try the **Guardrails** tab: tool execution only passes with a verified receipt | |
| 5) Export an **Audit Pack** ZIP | |
| ## What this proves | |
| - Memory persistence is not the hard part | |
| - Retrieval is not the hard part | |
| - **Integrity + evidence** is the hard part | |
| """ | |
| with gr.Blocks(title="RFTSystems TrustStack Console") as demo: | |
| gr.Markdown(PITCH_MD) | |
| with gr.Row(): | |
| session_id = gr.Textbox(label="Session ID", value=new_session_id()) | |
| retrieval_k = gr.Slider(1, 20, value=8, step=1, label="Retrieval K") | |
| with gr.Tabs(): | |
| with gr.Tab("Investor Demo"): | |
| gr.Markdown("One click. Full story: storage → retrieval → override → receipt → verification → trace → audit.") | |
| run_demo_btn = gr.Button("Run Investor Demo", variant="primary") | |
| demo_chat = gr.Chatbot(label="Demo Conversation", height=320) | |
| demo_retrieved = gr.Textbox(label="Retrieved memory slices", lines=8) | |
| demo_ledger = gr.Textbox(label="Ledger (hash-chained)", lines=12) | |
| demo_receipt_path = gr.Textbox(label="Last receipt path (server)", lines=1) | |
| demo_receipt_file = gr.File(label="Download last receipt JSON") | |
| demo_trace_path = gr.Textbox(label="Last trace path (server)", lines=1) | |
| demo_trace_file = gr.File(label="Download last trace JSON") | |
| run_demo_btn.click( | |
| run_investor_demo, | |
| inputs=[session_id, retrieval_k], | |
| outputs=[session_id, demo_chat, demo_retrieved, demo_ledger, | |
| demo_receipt_path, demo_receipt_file, demo_trace_path, demo_trace_file], | |
| ) | |
| with gr.Tab("Chat"): | |
| chatbot = gr.Chatbot(label="Conversation", height=320) | |
| with gr.Row(): | |
| example_pick = gr.Dropdown(label="Example prompts", choices=EXAMPLE_PROMPTS, value=EXAMPLE_PROMPTS[0]) | |
| use_example = gr.Button("Use Example", variant="secondary") | |
| user_msg = gr.Textbox(label="Message") | |
| send = gr.Button("Send", variant="primary") | |
| retrieved_out = gr.Textbox(label="Retrieved memory slices", lines=8) | |
| ledger_out = gr.Textbox(label="Ledger (hash-chained)", lines=12) | |
| receipt_path = gr.Textbox(label="Last receipt path (server)", lines=1) | |
| receipt_file = gr.File(label="Download last receipt JSON") | |
| trace_path = gr.Textbox(label="Last trace path (server)", lines=1) | |
| trace_file = gr.File(label="Download last trace JSON") | |
| use_example.click(fill_example, inputs=[example_pick], outputs=[user_msg]) | |
| send.click( | |
| chat_turn, | |
| inputs=[session_id, user_msg, retrieval_k], | |
| outputs=[session_id, chatbot, retrieved_out, ledger_out, receipt_path, receipt_file, trace_path, trace_file], | |
| ) | |
| with gr.Tab("Verify Receipt"): | |
| receipt_upload = gr.File(label="Upload receipt JSON") | |
| verify_btn = gr.Button("Verify", variant="primary") | |
| verify_out = gr.Textbox(label="Verification result") | |
| verify_btn.click(verify_receipt_upload, inputs=[receipt_upload], outputs=[verify_out]) | |
| with gr.Tab("Guardrails"): | |
| gr.Markdown("Tool execution is blocked unless the receipt verifies.") | |
| receipt_for_tool = gr.File(label="Provide a receipt JSON") | |
| action = gr.Dropdown(label="Requested action", choices=[ | |
| "tool.send_email_simulated", | |
| "tool.export_customer_data_simulated", | |
| "tool.trigger_purchase_simulated", | |
| "tool.write_file_simulated", | |
| ], value="tool.write_file_simulated") | |
| run_tool_btn = gr.Button("Attempt tool call", variant="primary") | |
| tool_out = gr.Textbox(label="Result", lines=4) | |
| run_tool_btn.click(guardrail_tool_call, inputs=[receipt_for_tool, action], outputs=[tool_out]) | |
| with gr.Tab("Audit Pack"): | |
| gr.Markdown("One-click export: ledger + receipts + integrity summary.") | |
| export_btn = gr.Button("Export Audit Pack ZIP", variant="primary") | |
| audit_path = gr.Textbox(label="Audit pack path (server)", lines=1) | |
| audit_file = gr.File(label="Download audit pack ZIP") | |
| export_btn.click(export_audit_pack, inputs=[session_id], outputs=[audit_path, audit_file]) | |
| with gr.Tab("API Playground"): | |
| gr.Markdown("API-style calls for the demo. JSON in, JSON out.") | |
| action2 = gr.Dropdown(label="Action", choices=["memory.write", "memory.search", "receipt.verify", "audit.export"], value="memory.search") | |
| payload = gr.Textbox(label="JSON payload", lines=10, value=json.dumps({"query": "Nova", "k": 8}, indent=2)) | |
| call_btn = gr.Button("Call", variant="primary") | |
| api_out = gr.Textbox(label="Response", lines=14) | |
| call_btn.click(api_playground, inputs=[session_id, action2, payload, retrieval_k], outputs=[session_id, api_out]) | |
| with gr.Tab("Token Budget"): | |
| gr.Markdown("Why fixed retrieval budgets win as sessions grow.") | |
| n = gr.Slider(1, 500, value=50, step=1, label="Turns (n)") | |
| B = gr.Slider(0, 2000, value=500, step=50, label="Base framing tokens per call (B)") | |
| M = gr.Slider(0, 2000, value=650, step=50, label="Avg tokens added per turn to history (M)") | |
| W = gr.Slider(0, 8000, value=2000, step=100, label="Fixed retrieval budget tokens per call (W)") | |
| calc = gr.Button("Compute", variant="primary") | |
| tbl = gr.Dataframe(headers=["metric", "value"], datatype=["str", "str"], row_count=4, col_count=2) | |
| calc.click(lambda nn, bb, mm, ww: token_savings(nn, bb, mm, ww), inputs=[n, B, M, W], outputs=[tbl]) | |
| with gr.Tab("How to Use"): | |
| gr.Markdown(HOW_TO_MD) | |
| demo.launch() | |