Spaces:
Running
Running
rrubayet321
chore: add nosec B324 annotations to non-cryptographic MD5 usages and update temporary database path to use system temp directory
a0c1a38 | """ | |
| Truncation vs Summarization vs Headroom: A Fair Benchmark | |
| This benchmark compares three approaches to context compression: | |
| 1. Truncation - Keep first N items (industry standard) | |
| 2. Summarization - Use LLM to summarize (common alternative) | |
| 3. Headroom - Statistical compression with retrieval | |
| FAIRNESS PRINCIPLES: | |
| - Include scenarios where each approach could win | |
| - Use realistic data patterns | |
| - Measure both compression AND answer quality | |
| - Report failures honestly | |
| Metrics: | |
| - Tokens saved (compression ratio) | |
| - Answer accuracy (can LLM still answer correctly?) | |
| - Cost (including summarization LLM calls) | |
| - Latency | |
| """ | |
| import hashlib | |
| import json | |
| import random | |
| import time | |
| from dataclasses import dataclass | |
| from typing import Literal | |
| # We'll use OpenAI for the actual LLM calls | |
| try: | |
| from openai import OpenAI | |
| OPENAI_AVAILABLE = True | |
| except ImportError: | |
| OPENAI_AVAILABLE = False | |
| # Headroom imports | |
| try: | |
| from headroom.config import SmartCrusherConfig | |
| from headroom.tokenizers import TiktokenCounter | |
| from headroom.transforms.smart_crusher import SmartCrusher | |
| HEADROOM_AVAILABLE = True | |
| except ImportError: | |
| HEADROOM_AVAILABLE = False | |
| # Kompress imports (ML baseline) | |
| try: | |
| from headroom.transforms.kompress_compressor import KompressCompressor, is_kompress_available | |
| KOMPRESS_AVAILABLE = is_kompress_available() | |
| except ImportError: | |
| KOMPRESS_AVAILABLE = False | |
| class Question: | |
| """A question about the data with ground truth answer.""" | |
| text: str | |
| ground_truth: str | |
| answer_location: Literal["early", "middle", "late", "scattered", "semantic"] | |
| difficulty: Literal["easy", "medium", "hard"] | |
| class Scenario: | |
| """A benchmark scenario with data and questions.""" | |
| name: str | |
| description: str | |
| data: list[dict] | |
| questions: list[Question] | |
| expected_winner: str # Which approach should theoretically win | |
| class ApproachResult: | |
| """Result of running one approach on one scenario.""" | |
| approach: str | |
| scenario: str | |
| tokens_original: int | |
| tokens_after: int | |
| compression_ratio: float | |
| compression_latency_ms: float | |
| llm_cost_usd: float # Cost of summarization if applicable | |
| answers: list[dict] # {question, expected, actual, correct} | |
| accuracy: float | |
| total_cost_usd: float # Compression cost + query cost | |
| # ============================================================================= | |
| # DATA GENERATORS - Realistic synthetic data | |
| # ============================================================================= | |
| def generate_log_data( | |
| n_entries: int = 500, error_positions: list[int] = None | |
| ) -> tuple[list[dict], list[Question]]: | |
| """ | |
| Generate realistic server logs. | |
| 95% routine logs, 5% interesting events (errors, warnings). | |
| Errors placed at specified positions to test different approaches. | |
| """ | |
| if error_positions is None: | |
| # Default: errors at beginning, middle, and end | |
| error_positions = [3, n_entries // 2, n_entries - 5] | |
| log_templates = [ | |
| {"level": "INFO", "message": "Health check passed", "service": "api-gateway"}, | |
| {"level": "INFO", "message": "Request processed successfully", "service": "api-gateway"}, | |
| {"level": "INFO", "message": "Cache hit for user session", "service": "redis"}, | |
| {"level": "INFO", "message": "Database query completed", "service": "postgres"}, | |
| {"level": "INFO", "message": "Authentication successful", "service": "auth"}, | |
| { | |
| "level": "DEBUG", | |
| "message": "Connection pool stats: active=5, idle=15", | |
| "service": "postgres", | |
| }, | |
| ] | |
| error_templates = [ | |
| { | |
| "level": "ERROR", | |
| "message": "Connection refused to payment-service:8080 - ECONNREFUSED", | |
| "service": "payment-processor", | |
| "error_code": "PAYMENT_SERVICE_DOWN", | |
| "trace_id": "abc123", | |
| }, | |
| { | |
| "level": "ERROR", | |
| "message": "Timeout waiting for response from inventory-service after 30000ms", | |
| "service": "order-processor", | |
| "error_code": "INVENTORY_TIMEOUT", | |
| "trace_id": "def456", | |
| }, | |
| { | |
| "level": "CRITICAL", | |
| "message": "Out of memory: Java heap space - killing process", | |
| "service": "recommendation-engine", | |
| "error_code": "OOM_KILLED", | |
| "trace_id": "ghi789", | |
| }, | |
| ] | |
| logs = [] | |
| base_time = 1705320000 # Some Unix timestamp | |
| error_idx = 0 | |
| for i in range(n_entries): | |
| base_time + i * 60 # 1 minute apart | |
| if i in error_positions and error_idx < len(error_templates): | |
| entry = error_templates[error_idx].copy() | |
| error_idx += 1 | |
| else: | |
| entry = random.choice(log_templates).copy() | |
| entry["timestamp"] = f"2024-01-15T{10 + (i // 60):02d}:{i % 60:02d}:00Z" | |
| entry["request_id"] = f"req-{hashlib.md5(str(i).encode()).hexdigest()[:8]}" # nosec B324 | |
| logs.append(entry) | |
| # Questions designed to test different approaches | |
| questions = [ | |
| Question( | |
| text="What error code was returned by the payment service?", | |
| ground_truth="PAYMENT_SERVICE_DOWN", | |
| answer_location="early", # Position 3 | |
| difficulty="easy", | |
| ), | |
| Question( | |
| text="Which service experienced a timeout and what was the trace ID?", | |
| ground_truth="order-processor service had timeout with trace_id def456", | |
| answer_location="middle", | |
| difficulty="medium", | |
| ), | |
| Question( | |
| text="What critical error occurred and which service was affected?", | |
| ground_truth="Out of memory (OOM_KILLED) in recommendation-engine", | |
| answer_location="late", # Near end | |
| difficulty="medium", | |
| ), | |
| Question( | |
| text="How many distinct error types are in the logs?", | |
| ground_truth="3", | |
| answer_location="scattered", | |
| difficulty="hard", | |
| ), | |
| ] | |
| return logs, questions | |
| def generate_file_search_data(n_files: int = 1000) -> tuple[list[dict], list[Question]]: | |
| """ | |
| Generate realistic code search results. | |
| Simulates searching a codebase - lots of files with similar metadata, | |
| specific files of interest scattered throughout. | |
| """ | |
| # Common directories and file patterns | |
| dirs = [ | |
| "src/api", | |
| "src/services", | |
| "src/utils", | |
| "src/models", | |
| "src/controllers", | |
| "src/middleware", | |
| "tests/unit", | |
| "tests/integration", | |
| "lib/core", | |
| "lib/helpers", | |
| "config", | |
| "scripts", | |
| ] | |
| extensions = [".py", ".py", ".py", ".ts", ".js", ".json", ".yaml"] # Weighted toward .py | |
| # Files of interest (scattered at specific positions) | |
| special_files = { | |
| 50: { | |
| "path": "src/auth/jwt_handler.py", | |
| "size": 2341, | |
| "description": "JWT token validation and refresh", | |
| }, | |
| 250: { | |
| "path": "src/services/payment_processor.py", | |
| "size": 5672, | |
| "description": "Stripe payment integration", | |
| }, | |
| 500: { | |
| "path": "src/middleware/rate_limiter.py", | |
| "size": 1823, | |
| "description": "Redis-based rate limiting", | |
| }, | |
| 750: { | |
| "path": "config/database.py", | |
| "size": 892, | |
| "description": "PostgreSQL connection settings", | |
| }, | |
| 999: { | |
| "path": "src/api/health_check.py", | |
| "size": 456, | |
| "description": "Kubernetes health endpoints", | |
| }, | |
| } | |
| files = [] | |
| for i in range(n_files): | |
| if i in special_files: | |
| f = special_files[i].copy() | |
| f["type"] = "file" | |
| f["language"] = "python" | |
| f["modified"] = "2024-01-15" | |
| else: | |
| dir_path = random.choice(dirs) | |
| ext = random.choice(extensions) | |
| f = { | |
| "type": "file", | |
| "path": f"{dir_path}/module_{i}{ext}", | |
| "size": random.randint(200, 5000), | |
| "language": "python" | |
| if ext == ".py" | |
| else "typescript" | |
| if ext == ".ts" | |
| else "javascript", | |
| "modified": f"2024-01-{random.randint(1, 15):02d}", | |
| } | |
| files.append(f) | |
| questions = [ | |
| Question( | |
| text="Which file handles JWT token operations?", | |
| ground_truth="src/auth/jwt_handler.py", | |
| answer_location="early", # Position 50 | |
| difficulty="easy", | |
| ), | |
| Question( | |
| text="What file contains the Stripe payment integration and how large is it?", | |
| ground_truth="src/services/payment_processor.py, 5672 bytes", | |
| answer_location="middle", # Position 250 | |
| difficulty="medium", | |
| ), | |
| Question( | |
| text="Which file implements rate limiting and what technology does it use?", | |
| ground_truth="src/middleware/rate_limiter.py uses Redis", | |
| answer_location="middle", # Position 500 | |
| difficulty="medium", | |
| ), | |
| Question( | |
| text="What is the last Python file in the results and what does it do?", | |
| ground_truth="src/api/health_check.py - Kubernetes health endpoints", | |
| answer_location="late", # Position 999 | |
| difficulty="hard", | |
| ), | |
| ] | |
| return files, questions | |
| def generate_metrics_data(n_points: int = 500) -> tuple[list[dict], list[Question]]: | |
| """ | |
| Generate realistic time series metrics. | |
| Baseline values with anomalies (spikes) at specific positions. | |
| This is where Headroom should excel - detecting statistical outliers. | |
| """ | |
| base_cpu = 45.0 | |
| base_memory = 62.0 | |
| base_requests = 1000 | |
| # Anomaly positions | |
| anomalies = { | |
| 50: {"cpu": 95.0, "memory": 88.0, "requests": 5000, "event": "traffic_spike"}, | |
| 200: {"cpu": 98.0, "memory": 95.0, "requests": 150, "event": "service_degradation"}, | |
| 450: {"cpu": 15.0, "memory": 30.0, "requests": 50, "event": "service_restart"}, | |
| } | |
| metrics = [] | |
| base_time = 1705320000 | |
| for i in range(n_points): | |
| base_time + i * 60 | |
| if i in anomalies: | |
| point = { | |
| "timestamp": f"2024-01-15T{10 + (i // 60):02d}:{i % 60:02d}:00Z", | |
| "cpu_percent": anomalies[i]["cpu"], | |
| "memory_percent": anomalies[i]["memory"], | |
| "requests_per_min": anomalies[i]["requests"], | |
| "status": "degraded" if anomalies[i]["event"] != "traffic_spike" else "ok", | |
| "event": anomalies[i]["event"], | |
| } | |
| else: | |
| point = { | |
| "timestamp": f"2024-01-15T{10 + (i // 60):02d}:{i % 60:02d}:00Z", | |
| "cpu_percent": round(base_cpu + random.uniform(-5, 5), 1), | |
| "memory_percent": round(base_memory + random.uniform(-3, 3), 1), | |
| "requests_per_min": base_requests + random.randint(-100, 100), | |
| "status": "ok", | |
| } | |
| metrics.append(point) | |
| questions = [ | |
| Question( | |
| text="When did the traffic spike occur and what was the requests_per_min?", | |
| ground_truth="Around 10:50, requests_per_min was 5000", | |
| answer_location="early", | |
| difficulty="easy", | |
| ), | |
| Question( | |
| text="What event caused service degradation and what were the CPU/memory values?", | |
| ground_truth="service_degradation event, CPU 98%, memory 95%", | |
| answer_location="middle", | |
| difficulty="medium", | |
| ), | |
| Question( | |
| text="When did the service restart and how can you tell from the metrics?", | |
| ground_truth="Around 17:30, CPU dropped to 15%, memory to 30%, requests to 50", | |
| answer_location="late", | |
| difficulty="hard", | |
| ), | |
| Question( | |
| text="How many anomalous events occurred in total?", | |
| ground_truth="3", | |
| answer_location="scattered", | |
| difficulty="hard", | |
| ), | |
| ] | |
| return metrics, questions | |
| # ============================================================================= | |
| # COMPRESSION APPROACHES | |
| # ============================================================================= | |
| def truncate_data(data: list[dict], max_items: int = 20) -> list[dict]: | |
| """Simple truncation - keep first N items.""" | |
| return data[:max_items] | |
| def summarize_data( | |
| data: list[dict], client: "OpenAI", model: str = "gpt-4o-mini" | |
| ) -> tuple[str, float]: | |
| """ | |
| Use LLM to summarize the data. | |
| Returns (summary_text, cost_usd). | |
| """ | |
| data_str = json.dumps(data, indent=2) | |
| # Truncate if too long for summarization call | |
| if len(data_str) > 100000: | |
| data_str = data_str[:100000] + "\n... [truncated for summarization]" | |
| prompt = f"""Summarize this data concisely, preserving all important information including: | |
| - Any errors, warnings, or anomalies | |
| - Key identifiers (IDs, names, paths) | |
| - Statistical outliers | |
| - Important events | |
| Data: | |
| {data_str} | |
| Provide a structured summary that retains all critical details.""" | |
| start = time.time() | |
| response = client.chat.completions.create( | |
| model=model, | |
| messages=[{"role": "user", "content": prompt}], | |
| max_tokens=2000, | |
| ) | |
| latency = (time.time() - start) * 1000 | |
| summary = response.choices[0].message.content | |
| # Estimate cost (gpt-4o-mini pricing) | |
| input_tokens = response.usage.prompt_tokens | |
| output_tokens = response.usage.completion_tokens | |
| cost = (input_tokens * 0.00015 + output_tokens * 0.0006) / 1000 # Per token pricing | |
| return summary, cost, latency | |
| def kompress_compress(data: list[dict]) -> tuple[str, dict]: | |
| """ | |
| Use Kompress (ModernBERT) for ML-based compression. | |
| Returns (compressed_text, metadata). | |
| """ | |
| if not KOMPRESS_AVAILABLE: | |
| raise RuntimeError("Kompress not available. Install with: pip install headroom-ai[ml]") | |
| compressor = KompressCompressor() | |
| # Convert data to string for Kompress (it works on text, not structured data) | |
| data_str = json.dumps(data, indent=2) | |
| start = time.time() | |
| result = compressor.compress(data_str) | |
| latency = (time.time() - start) * 1000 | |
| metadata = { | |
| "latency_ms": latency, | |
| "original_tokens": result.original_tokens, | |
| "compressed_tokens": result.compressed_tokens, | |
| "compression_ratio": result.compression_ratio, | |
| } | |
| return result.compressed, metadata | |
| def headroom_compress(data: list[dict], query_context: str = "") -> tuple[list[dict], dict]: | |
| """ | |
| Use Headroom's SmartCrusher for statistical compression. | |
| Returns (compressed_data, metadata). | |
| """ | |
| if not HEADROOM_AVAILABLE: | |
| raise RuntimeError("Headroom not available") | |
| config = SmartCrusherConfig( | |
| enabled=True, | |
| min_items_to_analyze=5, | |
| variance_threshold=2.0, | |
| max_items_after_crush=20, | |
| preserve_change_points=True, | |
| ) | |
| crusher = SmartCrusher(config) | |
| # Wrap data in tool output format | |
| tool_content = json.dumps({"results": data}) | |
| start = time.time() | |
| crush_result = crusher.crush(tool_content, query=query_context) | |
| latency = (time.time() - start) * 1000 | |
| # Parse result - crush returns a CrushResult with .compressed attribute | |
| result_str = ( | |
| crush_result.compressed if hasattr(crush_result, "compressed") else str(crush_result) | |
| ) | |
| try: | |
| compressed = json.loads(result_str) | |
| if isinstance(compressed, dict) and "results" in compressed: | |
| compressed_data = compressed["results"] | |
| else: | |
| compressed_data = compressed if isinstance(compressed, list) else data[:20] | |
| except json.JSONDecodeError: | |
| compressed_data = data[:20] # Fallback | |
| metadata = { | |
| "latency_ms": latency, | |
| "items_before": len(data), | |
| "items_after": len(compressed_data) if isinstance(compressed_data, list) else "N/A", | |
| } | |
| return compressed_data, metadata | |
| # ============================================================================= | |
| # EVALUATION | |
| # ============================================================================= | |
| def count_tokens(text: str) -> int: | |
| """Count tokens using tiktoken.""" | |
| if HEADROOM_AVAILABLE: | |
| counter = TiktokenCounter() | |
| return counter.count_text(text) | |
| else: | |
| # Rough estimate: 4 chars per token | |
| return len(text) // 4 | |
| def evaluate_answer(question: Question, actual_answer: str) -> bool: | |
| """ | |
| Check if the answer is correct. | |
| Uses fuzzy matching - answer should contain key parts of ground truth. | |
| """ | |
| if not actual_answer: | |
| return False | |
| actual_lower = actual_answer.lower() | |
| truth_lower = question.ground_truth.lower() | |
| # Extract key terms from ground truth | |
| key_terms = [] | |
| for term in truth_lower.replace(",", " ").replace("-", " ").split(): | |
| if len(term) > 3 and term not in ["the", "and", "was", "with", "from"]: | |
| key_terms.append(term) | |
| # Check if most key terms appear in answer | |
| matches = sum(1 for term in key_terms if term in actual_lower) | |
| return matches >= len(key_terms) * 0.6 # 60% threshold | |
| def query_llm( | |
| client: "OpenAI", context: str, question: str, model: str = "gpt-4o-mini" | |
| ) -> tuple[str, float]: | |
| """ | |
| Ask the LLM a question about the given context. | |
| Returns (answer, cost_usd). | |
| """ | |
| prompt = f"""Based on the following data, answer the question. | |
| Data: | |
| {context} | |
| Question: {question} | |
| Answer concisely with specific details from the data.""" | |
| response = client.chat.completions.create( | |
| model=model, | |
| messages=[{"role": "user", "content": prompt}], | |
| max_tokens=500, | |
| ) | |
| answer = response.choices[0].message.content | |
| # Estimate cost | |
| input_tokens = response.usage.prompt_tokens | |
| output_tokens = response.usage.completion_tokens | |
| cost = (input_tokens * 0.00015 + output_tokens * 0.0006) / 1000 | |
| return answer, cost | |
| # ============================================================================= | |
| # BENCHMARK RUNNER | |
| # ============================================================================= | |
| class BenchmarkConfig: | |
| """Configuration for the benchmark run.""" | |
| model: str = "gpt-4o-mini" # Model for queries (and summarization) | |
| max_truncate_items: int = 20 | |
| max_headroom_items: int = 20 | |
| run_summarization: bool = True # Can disable to save cost | |
| run_kompress: bool = True # Run Kompress (ML baseline) | |
| def run_scenario_benchmark( | |
| scenario: Scenario, client: "OpenAI", config: BenchmarkConfig | |
| ) -> list[ApproachResult]: | |
| """Run all approaches on a single scenario.""" | |
| results = [] | |
| original_json = json.dumps(scenario.data, indent=2) | |
| original_tokens = count_tokens(original_json) | |
| print(f"\n{'=' * 60}") | |
| print(f"Scenario: {scenario.name}") | |
| print(f"Data size: {len(scenario.data)} items, {original_tokens} tokens") | |
| print(f"Expected winner: {scenario.expected_winner}") | |
| print(f"{'=' * 60}") | |
| # --- TRUNCATION --- | |
| print("\n[1/4] Running Truncation...") | |
| start = time.time() | |
| truncated = truncate_data(scenario.data, config.max_truncate_items) | |
| trunc_latency = (time.time() - start) * 1000 | |
| trunc_json = json.dumps(truncated, indent=2) | |
| trunc_tokens = count_tokens(trunc_json) | |
| trunc_answers = [] | |
| trunc_query_cost = 0.0 | |
| for q in scenario.questions: | |
| answer, cost = query_llm(client, trunc_json, q.text, config.model) | |
| correct = evaluate_answer(q, answer) | |
| trunc_answers.append( | |
| { | |
| "question": q.text, | |
| "expected": q.ground_truth, | |
| "actual": answer, | |
| "correct": correct, | |
| "location": q.answer_location, | |
| } | |
| ) | |
| trunc_query_cost += cost | |
| trunc_accuracy = sum(1 for a in trunc_answers if a["correct"]) / len(trunc_answers) | |
| results.append( | |
| ApproachResult( | |
| approach="truncation", | |
| scenario=scenario.name, | |
| tokens_original=original_tokens, | |
| tokens_after=trunc_tokens, | |
| compression_ratio=1 - (trunc_tokens / original_tokens), | |
| compression_latency_ms=trunc_latency, | |
| llm_cost_usd=0.0, # No LLM for compression | |
| answers=trunc_answers, | |
| accuracy=trunc_accuracy, | |
| total_cost_usd=trunc_query_cost, | |
| ) | |
| ) | |
| print( | |
| f" Tokens: {original_tokens} → {trunc_tokens} ({results[-1].compression_ratio:.1%} reduction)" | |
| ) | |
| print(f" Accuracy: {trunc_accuracy:.1%}") | |
| # --- SUMMARIZATION --- | |
| if config.run_summarization: | |
| print("\n[2/4] Running Summarization...") | |
| try: | |
| summary, summ_cost, summ_latency = summarize_data(scenario.data, client, config.model) | |
| summ_tokens = count_tokens(summary) | |
| summ_answers = [] | |
| summ_query_cost = 0.0 | |
| for q in scenario.questions: | |
| answer, cost = query_llm(client, summary, q.text, config.model) | |
| correct = evaluate_answer(q, answer) | |
| summ_answers.append( | |
| { | |
| "question": q.text, | |
| "expected": q.ground_truth, | |
| "actual": answer, | |
| "correct": correct, | |
| "location": q.answer_location, | |
| } | |
| ) | |
| summ_query_cost += cost | |
| summ_accuracy = sum(1 for a in summ_answers if a["correct"]) / len(summ_answers) | |
| results.append( | |
| ApproachResult( | |
| approach="summarization", | |
| scenario=scenario.name, | |
| tokens_original=original_tokens, | |
| tokens_after=summ_tokens, | |
| compression_ratio=1 - (summ_tokens / original_tokens), | |
| compression_latency_ms=summ_latency, | |
| llm_cost_usd=summ_cost, | |
| answers=summ_answers, | |
| accuracy=summ_accuracy, | |
| total_cost_usd=summ_cost + summ_query_cost, | |
| ) | |
| ) | |
| print( | |
| f" Tokens: {original_tokens} → {summ_tokens} ({results[-1].compression_ratio:.1%} reduction)" | |
| ) | |
| print(f" Accuracy: {summ_accuracy:.1%}") | |
| print(f" Summarization cost: ${summ_cost:.4f}") | |
| except Exception as e: | |
| print(f" Summarization failed: {e}") | |
| # --- KOMPRESS (ML baseline) --- | |
| if config.run_kompress: | |
| print("\n[3/4] Running Kompress (ModernBERT ML baseline)...") | |
| if KOMPRESS_AVAILABLE: | |
| try: | |
| ll_compressed, ll_metadata = kompress_compress(scenario.data) | |
| ll_tokens = count_tokens(ll_compressed) | |
| ll_answers = [] | |
| ll_query_cost = 0.0 | |
| for q in scenario.questions: | |
| answer, cost = query_llm(client, ll_compressed, q.text, config.model) | |
| correct = evaluate_answer(q, answer) | |
| ll_answers.append( | |
| { | |
| "question": q.text, | |
| "expected": q.ground_truth, | |
| "actual": answer, | |
| "correct": correct, | |
| "location": q.answer_location, | |
| } | |
| ) | |
| ll_query_cost += cost | |
| ll_accuracy = sum(1 for a in ll_answers if a["correct"]) / len(ll_answers) | |
| results.append( | |
| ApproachResult( | |
| approach="kompress", | |
| scenario=scenario.name, | |
| tokens_original=original_tokens, | |
| tokens_after=ll_tokens, | |
| compression_ratio=1 - (ll_tokens / original_tokens), | |
| compression_latency_ms=ll_metadata["latency_ms"], | |
| llm_cost_usd=0.0, # Model runs locally | |
| answers=ll_answers, | |
| accuracy=ll_accuracy, | |
| total_cost_usd=ll_query_cost, | |
| ) | |
| ) | |
| print( | |
| f" Tokens: {original_tokens} → {ll_tokens} ({results[-1].compression_ratio:.1%} reduction)" | |
| ) | |
| print(f" Accuracy: {ll_accuracy:.1%}") | |
| print(f" Compression latency: {ll_metadata['latency_ms']:.1f}ms") | |
| except Exception as e: | |
| print(f" Kompress failed: {e}") | |
| else: | |
| print(" Kompress not available. Install with: pip install headroom-ai[ml]") | |
| # --- HEADROOM --- | |
| print("\n[4/4] Running Headroom...") | |
| if HEADROOM_AVAILABLE: | |
| try: | |
| # Use first question as query context (realistic usage) | |
| query_context = scenario.questions[0].text if scenario.questions else "" | |
| compressed, metadata = headroom_compress(scenario.data, query_context) | |
| hr_json = ( | |
| json.dumps(compressed, indent=2) | |
| if isinstance(compressed, list) | |
| else str(compressed) | |
| ) | |
| hr_tokens = count_tokens(hr_json) | |
| hr_answers = [] | |
| hr_query_cost = 0.0 | |
| for q in scenario.questions: | |
| answer, cost = query_llm(client, hr_json, q.text, config.model) | |
| correct = evaluate_answer(q, answer) | |
| hr_answers.append( | |
| { | |
| "question": q.text, | |
| "expected": q.ground_truth, | |
| "actual": answer, | |
| "correct": correct, | |
| "location": q.answer_location, | |
| } | |
| ) | |
| hr_query_cost += cost | |
| hr_accuracy = sum(1 for a in hr_answers if a["correct"]) / len(hr_answers) | |
| results.append( | |
| ApproachResult( | |
| approach="headroom", | |
| scenario=scenario.name, | |
| tokens_original=original_tokens, | |
| tokens_after=hr_tokens, | |
| compression_ratio=1 - (hr_tokens / original_tokens), | |
| compression_latency_ms=metadata["latency_ms"], | |
| llm_cost_usd=0.0, # No LLM for compression | |
| answers=hr_answers, | |
| accuracy=hr_accuracy, | |
| total_cost_usd=hr_query_cost, | |
| ) | |
| ) | |
| print( | |
| f" Tokens: {original_tokens} → {hr_tokens} ({results[-1].compression_ratio:.1%} reduction)" | |
| ) | |
| print(f" Accuracy: {hr_accuracy:.1%}") | |
| print(f" Compression latency: {metadata['latency_ms']:.1f}ms") | |
| except Exception as e: | |
| print(f" Headroom failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| else: | |
| print(" Headroom not available") | |
| return results | |
| def run_full_benchmark(client: "OpenAI", config: BenchmarkConfig = None) -> dict: | |
| """Run the complete benchmark suite.""" | |
| if config is None: | |
| config = BenchmarkConfig() | |
| print("\n" + "=" * 70) | |
| print("TRUNCATION vs SUMMARIZATION vs LLMLINGUA-2 vs HEADROOM BENCHMARK") | |
| print("=" * 70) | |
| # Generate scenarios | |
| scenarios = [] | |
| # Scenario 1: Logs (Headroom should win - needs anomaly detection) | |
| logs, log_questions = generate_log_data(500, error_positions=[3, 250, 495]) | |
| scenarios.append( | |
| Scenario( | |
| name="Server Logs (500 entries)", | |
| description="Find errors buried in routine logs", | |
| data=logs, | |
| questions=log_questions, | |
| expected_winner="headroom", | |
| ) | |
| ) | |
| # Scenario 2: File Search (Mixed - depends on file position) | |
| files, file_questions = generate_file_search_data(1000) | |
| scenarios.append( | |
| Scenario( | |
| name="Code Search (1000 files)", | |
| description="Find specific files in search results", | |
| data=files, | |
| questions=file_questions, | |
| expected_winner="mixed", | |
| ) | |
| ) | |
| # Scenario 3: Metrics (Headroom should win - statistical outliers) | |
| metrics, metric_questions = generate_metrics_data(500) | |
| scenarios.append( | |
| Scenario( | |
| name="Time Series Metrics (500 points)", | |
| description="Find anomalies in metrics data", | |
| data=metrics, | |
| questions=metric_questions, | |
| expected_winner="headroom", | |
| ) | |
| ) | |
| all_results = [] | |
| for scenario in scenarios: | |
| results = run_scenario_benchmark(scenario, client, config) | |
| all_results.extend(results) | |
| # Generate summary | |
| print("\n" + "=" * 70) | |
| print("BENCHMARK SUMMARY") | |
| print("=" * 70) | |
| summary = generate_summary(all_results, scenarios) | |
| print(summary) | |
| return { | |
| "results": [r.__dict__ for r in all_results], | |
| "summary": summary, | |
| "scenarios": [s.name for s in scenarios], | |
| } | |
| def generate_summary(results: list[ApproachResult], scenarios: list[Scenario]) -> str: | |
| """Generate a human-readable summary of results.""" | |
| lines = [] | |
| # Per-scenario breakdown | |
| for scenario in scenarios: | |
| lines.append(f"\n### {scenario.name}") | |
| lines.append(f"Expected winner: {scenario.expected_winner}") | |
| lines.append("") | |
| lines.append("| Approach | Compression | Accuracy | Cost |") | |
| lines.append("|----------|-------------|----------|------|") | |
| scenario_results = [r for r in results if r.scenario == scenario.name] | |
| for r in scenario_results: | |
| lines.append( | |
| f"| {r.approach} | {r.compression_ratio:.1%} | {r.accuracy:.1%} | ${r.total_cost_usd:.4f} |" | |
| ) | |
| # Determine actual winner | |
| best = max(scenario_results, key=lambda r: (r.accuracy, r.compression_ratio)) | |
| lines.append(f"\n**Actual winner: {best.approach}** (accuracy: {best.accuracy:.1%})") | |
| # Overall stats | |
| lines.append("\n### Overall Statistics") | |
| for approach in ["truncation", "summarization", "llmlingua-2", "headroom"]: | |
| approach_results = [r for r in results if r.approach == approach] | |
| if approach_results: | |
| avg_compression = sum(r.compression_ratio for r in approach_results) / len( | |
| approach_results | |
| ) | |
| avg_accuracy = sum(r.accuracy for r in approach_results) / len(approach_results) | |
| total_cost = sum(r.total_cost_usd for r in approach_results) | |
| lines.append(f"\n**{approach.title()}**") | |
| lines.append(f"- Avg compression: {avg_compression:.1%}") | |
| lines.append(f"- Avg accuracy: {avg_accuracy:.1%}") | |
| lines.append(f"- Total cost: ${total_cost:.4f}") | |
| # Per-question-type analysis | |
| lines.append("\n### Accuracy by Answer Location") | |
| lines.append("(Where in the data is the answer?)") | |
| lines.append("") | |
| for location in ["early", "middle", "late", "scattered"]: | |
| lines.append(f"\n**{location.title()} position:**") | |
| for approach in ["truncation", "summarization", "llmlingua-2", "headroom"]: | |
| approach_results = [r for r in results if r.approach == approach] | |
| location_answers = [] | |
| for r in approach_results: | |
| location_answers.extend([a for a in r.answers if a["location"] == location]) | |
| if location_answers: | |
| correct = sum(1 for a in location_answers if a["correct"]) | |
| total = len(location_answers) | |
| lines.append(f" - {approach}: {correct}/{total} ({correct / total:.1%})") | |
| return "\n".join(lines) | |
| # ============================================================================= | |
| # MAIN | |
| # ============================================================================= | |
| if __name__ == "__main__": | |
| import os | |
| if not OPENAI_AVAILABLE: | |
| print("OpenAI not available. Install with: pip install openai") | |
| exit(1) | |
| api_key = os.environ.get("OPENAI_API_KEY") | |
| if not api_key: | |
| print("Set OPENAI_API_KEY environment variable") | |
| exit(1) | |
| client = OpenAI(api_key=api_key) | |
| config = BenchmarkConfig( | |
| model="gpt-4o-mini", | |
| max_truncate_items=20, | |
| max_headroom_items=20, | |
| run_summarization=True, | |
| ) | |
| results = run_full_benchmark(client, config) | |
| # Save results | |
| with open("benchmark_results.json", "w") as f: | |
| json.dump(results, f, indent=2, default=str) | |
| print("\nResults saved to benchmark_results.json") | |