#!/usr/bin/env python3 """Generate a reproducible local cache-validation report bundle.""" from __future__ import annotations import argparse import copy import hashlib import html import json import logging import platform import subprocess import sys from dataclasses import asdict from datetime import timedelta from pathlib import Path from typing import Any if __package__ in {None, ""}: sys.path.insert(0, str(Path(__file__).resolve().parents[1])) import benchmarks.claude_session_mode_benchmark as real_bench import benchmarks.synthetic_long_cache_suite_report as long_suite import benchmarks.synthetic_token_cache_bust_report as token_bust from benchmarks.claude_session_mode_benchmark import ( PROXY_MODE_CACHE, PROXY_MODE_TOKEN, _apply_mode_to_messages, _cache_gap_within_ttl, _rewrite_scope, build_dataset_and_observed_from_files, determine_winners, format_currency, get_tokenizer, load_session_replay, resolve_checkpoint_dir, select_session_files, simulate_session_files, trim_replay_to_recent_turns, write_report, ) from headroom.cache.compression_cache import CompressionCache from headroom.cache.prefix_tracker import PrefixCacheTracker DEFAULT_OUTPUT_DIR = Path("benchmark_results") / "cache_validation_bundle" def _excerpt_content(content: Any, *, max_chars: int) -> str: if isinstance(content, str): text = content.replace("\n", " ") return text[:max_chars] + ("..." if len(text) > max_chars else "") if isinstance(content, list): parts = [] for block in content[:4]: if isinstance(block, dict): btype = str(block.get("type", "unknown")) bcontent = block.get("content", "") if isinstance(bcontent, str): bcontent = bcontent.replace("\n", " ") bcontent = bcontent[:max_chars] + ("..." if len(bcontent) > max_chars else "") parts.append(f"[{btype}] {bcontent}") else: parts.append(str(block)[:max_chars]) return " | ".join(parts) return str(content)[:max_chars] def _message_preview(msg: dict[str, Any], *, max_chars: int) -> dict[str, str]: return { "role": str(msg.get("role")), "content_excerpt": _excerpt_content(msg.get("content"), max_chars=max_chars), } def _stable_hash(value: str) -> str: return hashlib.sha256(value.encode("utf-8")).hexdigest()[:12] def _redact_text(value: str, *, prefix: str) -> str: return f"{prefix}-{_stable_hash(value)}" def _redact_path(value: str) -> str: path = Path(value) suffix = path.suffix return f"path-{_stable_hash(value)}{suffix}" def _git_output(args: list[str], cwd: Path) -> str | None: try: completed = subprocess.run( ["git", *args], cwd=cwd, check=True, capture_output=True, text=True, ) return completed.stdout.strip() except Exception: return None def _runtime_metadata(repo_root: Path) -> dict[str, Any]: return { "git_sha": _git_output(["rev-parse", "HEAD"], repo_root), "git_dirty": bool(_git_output(["status", "--porcelain"], repo_root)), "python_version": sys.version, "platform": platform.platform(), "implementation": platform.python_implementation(), } def _corpus_fingerprint( *, root: Path, session_files: list[Path], max_sessions: int | None, recent_turns_per_session: int | None, cache_ttl_minutes: int, ) -> dict[str, Any]: normalized_files = [str(p.resolve()) for p in session_files] payload = { "root": str(root.resolve()), "session_files": normalized_files, "max_sessions": max_sessions, "recent_turns_per_session": recent_turns_per_session, "cache_ttl_minutes": cache_ttl_minutes, } digest = hashlib.sha256(json.dumps(payload, sort_keys=True).encode("utf-8")).hexdigest() return { "root": str(root.resolve()), "session_file_count": len(session_files), "session_files_sha256": digest, "max_sessions": max_sessions, "recent_turns_per_session": recent_turns_per_session, "cache_ttl_minutes": cache_ttl_minutes, } def _collect_real_processed_events( *, root: Path, recent_turns_per_session: int | None, max_events_per_mode: int, ttl_minutes: int, max_chars: int, include_content: bool, ) -> dict[str, Any]: ttl = timedelta(minutes=ttl_minutes) events: list[dict[str, Any]] = [] session_files = select_session_files(root) for mode in (PROXY_MODE_TOKEN, PROXY_MODE_CACHE): proxy = real_bench._make_proxy(mode) collected = 0 for session_file in session_files: replay = load_session_replay(session_file) if replay is None: continue replay = trim_replay_to_recent_turns(replay, recent_turns_per_session) prefix_tracker = PrefixCacheTracker("anthropic") comp_cache = CompressionCache() if mode == PROXY_MODE_TOKEN else None conversation: list[dict[str, Any]] = [] previous_original_context: list[dict[str, Any]] | None = None previous_forwarded_context: list[dict[str, Any]] | None = None previous_forwarded: list[dict[str, Any]] = [] previous_timestamp = None pending = None for turn in replay.turns: tokenizer = get_tokenizer(turn.model) prior_context_message_count = len(conversation) conversation.extend(turn.input_messages) forwarded = _apply_mode_to_messages( proxy, mode, conversation, model=turn.model, prefix_tracker=prefix_tracker, comp_cache=comp_cache, previous_original_messages=previous_original_context, previous_forwarded_messages=previous_forwarded_context, ) rewrite, retro = _rewrite_scope( conversation, forwarded, stable_prefix_message_count=prior_context_message_count, ) if rewrite: prior_forwarded = ( pending.forwarded if pending is not None else previous_forwarded ) prior_ts = pending.turn.timestamp if pending is not None else previous_timestamp eligible = bool( prior_ts is not None and _cache_gap_within_ttl(turn.timestamp, prior_ts, ttl=ttl) and prior_forwarded ) prefix_preserved = None first_diff_index = None if eligible: prefix_preserved = ( len(forwarded) >= len(prior_forwarded) and forwarded[: len(prior_forwarded)] == prior_forwarded ) if not prefix_preserved: for idx, (a, b) in enumerate(zip(prior_forwarded, forwarded)): if a != b: first_diff_index = idx break if first_diff_index is None: first_diff_index = min(len(prior_forwarded), len(forwarded)) events.append( { "mode": mode, "session_id": replay.session_id if include_content else _redact_text(replay.session_id, prefix="session"), "project": replay.decoded_project_path if include_content else _redact_path(replay.decoded_project_path), "request_id": turn.request_id if include_content else _redact_text(turn.request_id, prefix="request"), "timestamp": turn.timestamp.isoformat(), "cache_eligible": eligible, "prefix_preserved": prefix_preserved, "retroactive_rewrite": retro, "first_diff_index": first_diff_index, "original_tail": [ _message_preview(m, max_chars=max_chars) if include_content else { "role": str(m.get("role")), "content_excerpt": "[redacted]", } for m in conversation[max(0, len(conversation) - 4) :] ], "forwarded_tail": [ _message_preview(m, max_chars=max_chars) if include_content else { "role": str(m.get("role")), "content_excerpt": "[redacted]", } for m in forwarded[max(0, len(forwarded) - 4) :] ], } ) collected += 1 if collected >= max_events_per_mode: break if pending is not None: previous_forwarded = copy.deepcopy(pending.forwarded) previous_timestamp = pending.turn.timestamp real_bench._update_prefix_tracker( prefix_tracker, cache_read_tokens=0, cache_write_tokens=0, messages=forwarded, message_token_counts=[tokenizer.count_message(msg) for msg in forwarded], original_messages=conversation, ) class Pending: pass pending = Pending() pending.turn = turn pending.forwarded = forwarded conversation.append(turn.assistant_message) previous_original_context = copy.deepcopy(conversation) previous_forwarded_context = copy.deepcopy(forwarded) + [ copy.deepcopy(turn.assistant_message) ] if collected >= max_events_per_mode: break return {"events": events} def _write_processed_event_reports( output_dir: Path, payload: dict[str, Any] ) -> tuple[Path, Path, Path]: out_dir = output_dir / "real_processed" out_dir.mkdir(parents=True, exist_ok=True) json_path = out_dir / "real_processed_rewrite_report.json" md_path = out_dir / "real_processed_rewrite_report.md" html_path = out_dir / "real_processed_rewrite_report.html" json_path.write_text(json.dumps(payload, indent=2), encoding="utf-8") md = [ "# Real Processed Rewrite Report", "", "Local-only report from real Claude transcript replays. Do not commit.", "", ] for mode in (PROXY_MODE_TOKEN, PROXY_MODE_CACHE): mode_events = [e for e in payload["events"] if e["mode"] == mode] md.extend([f"## `{mode}`", ""]) if not mode_events: md.extend(["No rewrite events captured.", ""]) continue for i, e in enumerate(mode_events, start=1): md.extend( [ f"### Event {i}", "", f"- session: `{e['session_id']}`", f"- request: `{e['request_id']}`", f"- cache eligible: `{e['cache_eligible']}`", f"- prefix preserved: `{e['prefix_preserved']}`", f"- retroactive rewrite: `{e['retroactive_rewrite']}`", f"- first diff index: `{e['first_diff_index']}`", "", "**Original Tail**", "", ] ) for msg in e["original_tail"]: md.append(f"- `{msg['role']}`: {msg['content_excerpt']}") md.extend(["", "**Forwarded Tail**", ""]) for msg in e["forwarded_tail"]: md.append(f"- `{msg['role']}`: {msg['content_excerpt']}") md.extend(["", ""]) md_path.write_text("\n".join(md), encoding="utf-8") sections = [] for mode in (PROXY_MODE_TOKEN, PROXY_MODE_CACHE): mode_events = [e for e in payload["events"] if e["mode"] == mode] cards = [] for i, e in enumerate(mode_events, start=1): orig = "".join( f"
{html.escape(str(m['role']))}: "
f"{html.escape(str(m['content_excerpt']))}{html.escape(str(m['role']))}: "
f"{html.escape(str(m['content_excerpt']))}session: {html.escape(e['session_id'])}
"
f"request: {html.escape(e['request_id'])}
"
f"cache eligible: {e['cache_eligible']}
"
f"prefix preserved: {e['prefix_preserved']}
"
f"retroactive rewrite: {e['retroactive_rewrite']}
"
f"first diff index: {e['first_diff_index']}
No rewrite events captured.
") + "Local-only report from real Claude transcript replays. Do not commit.
{html.escape(mode)}root: {html.escape(str(args.root))}
"
f"recent turns per session: {html.escape(str(args.recent_turns_per_session))}
"
f"workers: {args.workers}
"
f"cache TTL minutes: {args.cache_ttl_minutes}
"
f"include transcript content: {args.include_content}
git sha: {html.escape(str(metadata['git_sha']))}
"
f"git dirty: {metadata['git_dirty']}
"
f"python: {html.escape(str(metadata['implementation']))}
"
f"platform: {html.escape(str(metadata['platform']))}
"
f"corpus session file count: {corpus['session_file_count']}
"
f"corpus fingerprint: {html.escape(str(corpus['session_files_sha256']))}
projects: {dataset['projects']}
"
f"sessions: {dataset['sessions']}
"
f"requests: {dataset['requests']}
"
f"observed total cost: {html.escape(format_currency(observed['total_cost_usd']))}
"
f"winner by total cost: {html.escape(winners['total_cost'])}
| Mode | Total Cost | Cache Busts | Busting Rewrites | " "Stable Replay Rewrites | Rewrites | " "Retroactive Rewrites | TTL Expiry | Forwarded Tokens |
|---|
Interpretation: cache_bust_turns and "
"busting_rewrite_turns are the hard-failure metrics. "
"stable_replay_rewrite_turns is acceptable stable replay. "
"retroactive_rewrite_turns is descriptive only. "
"ttl_expiry_turns is workload timing context.