#!/usr/bin/env python3 # HTML report generator for visualizing exact kept spans on the original context. # Redesigned with a Modern/Clean aesthetic. import json import html import argparse from pathlib import Path from difflib import SequenceMatcher # --- Data Loading Utilities --- def read_jsonl(path: Path): rows = [] with path.open("r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue rows.append(json.loads(line)) return rows def read_json(path: Path): with path.open("r", encoding="utf-8") as f: return json.load(f) # --- Interval Logic --- def merge_intervals(intervals): if not intervals: return [] intervals = sorted(intervals) merged = [list(intervals[0])] for s, e in intervals[1:]: if s <= merged[-1][1]: merged[-1][1] = max(merged[-1][1], e) else: merged.append([s, e]) return [(s, e) for s, e in merged] def normalize_kept_char_spans(kept_char_spans, original_len: int): """Validate and merge kept character spans on the original context.""" if kept_char_spans is None: return None if not isinstance(kept_char_spans, (list, tuple)): raise ValueError(f"kept_char_spans must be a list, got {type(kept_char_spans)}") spans = [] for pair in kept_char_spans: if not isinstance(pair, (list, tuple)) or len(pair) != 2: raise ValueError(f"Invalid span entry: {pair!r}") s, e = int(pair[0]), int(pair[1]) if e <= s: continue if s < 0 or e < 0 or s > original_len or e > original_len: pass # Clamp to bounds to prevent crashes s = max(0, min(s, original_len)) e = max(0, min(e, original_len)) if s < e: spans.append((s, e)) if not spans: return [] return merge_intervals(spans) def compute_kept_intervals(original: str, compressed: str, min_match_chars: int = 8): """Approximate kept spans via difflib matching (fallback only).""" if not original or not compressed: return [] sm = SequenceMatcher(a=original, b=compressed, autojunk=False) intervals = [] for a0, b0, size in sm.get_matching_blocks(): if size >= min_match_chars: intervals.append((a0, a0 + size)) return merge_intervals(intervals) # --- HTML Rendering Logic --- def render_highlight_html(original: str, kept_intervals): """Render the original text as HTML with kept spans highlighted.""" parts = [] cur = 0 for s, e in kept_intervals: # Text before the kept span (Dropped) if cur < s: dropped = html.escape(original[cur:s]) if dropped: parts.append(f'{dropped}') # The kept span (Kept) kept = html.escape(original[s:e]) if kept: parts.append(f'{kept}') cur = e # Tail (Dropped) if cur < len(original): dropped = html.escape(original[cur:]) if dropped: parts.append(f'{dropped}') return "".join(parts) def build_report_item(idx: int, qa_row: dict, res_row: dict, min_match_chars: int): original = str(qa_row.get("input", "")) inst = str(qa_row.get("instruction", "")) comp = str(res_row.get("compressed_context", "")) ratio = res_row.get("compression_ratio", None) kept_char_spans = res_row.get("kept_char_spans", None) kept_intervals = normalize_kept_char_spans(kept_char_spans, original_len=len(original)) if kept_intervals is None: if not res_row.get("_allow_approx", False): raise RuntimeError( f"Sample #{idx}: Missing `kept_char_spans`. " "Pass --allow_approx to use diff-based fallback." ) kept_intervals = compute_kept_intervals(original, comp, min_match_chars=min_match_chars) highlighted_original = render_highlight_html(original, kept_intervals) # Statistics kept_chars = sum(e - s for s, e in kept_intervals) total_chars = len(original) kept_pct = (kept_chars / total_chars * 100.0) if total_chars else 0.0 kept_spans_n = len(kept_intervals) original_len = res_row.get("original_len", total_chars) compressed_len = res_row.get("compressed_len", 0) # HTML Components inst_html = html.escape(inst) # Logic for Badge Colors based on ratio ratio_color_class = "neutral" if ratio: if ratio < 0.3: ratio_color_class = "success" elif ratio > 0.8: ratio_color_class = "warning" stats_html = [] if ratio is not None: stats_html.append(f'''
Compression {ratio:.2f}x
''') if original_len is not None and compressed_len is not None: stats_html.append(f'''
Tokens {original_len} → {compressed_len}
''') stats_html.append(f'''
Kept {kept_pct:.1f}% ({kept_spans_n} spans)
''') stats_block = "".join(stats_html) item_html = f"""
#{idx}
{stats_block}
{inst_html}
Kept Dropped
{highlighted_original}
""" return item_html def build_full_html(items_html: str, title: str): return f""" {html.escape(title)}
{html.escape(title)}
Visualizing compressed context • Kept spans highlighted
{items_html}
Generated by Beaver Visualizer
""" # --- Main Driver --- def main(): ap = argparse.ArgumentParser() ap.add_argument("--qa_jsonl", type=str, default="./QA.jsonl") ap.add_argument("--result_json", type=str, default="./QA_Result.json") ap.add_argument("--out_html", type=str, default="./QA_Report.html") ap.add_argument("--min_match_chars", type=int, default=8) ap.add_argument("--allow_approx", action="store_true", help="Allow difflib matching if exact spans missing") args = ap.parse_args() qa_rows = read_jsonl(Path(args.qa_jsonl)) res_rows = read_json(Path(args.result_json)) res_by_idx = {} for i, r in enumerate(res_rows): ridx = r.get("idx", i) r["_allow_approx"] = bool(args.allow_approx) res_by_idx[int(ridx)] = r items = [] for i, qa in enumerate(qa_rows): r = res_by_idx.get(i) if r is None: continue items.append(build_report_item(i, qa, r, min_match_chars=args.min_match_chars)) title = f"Compression Report - {Path(args.qa_jsonl).name}" html_text = build_full_html("\n".join(items), title) Path(args.out_html).write_text(html_text, encoding="utf-8") print(f"[OK] wrote: {args.out_html}") if __name__ == "__main__": main()