h4-polytopic-attention / python /rag /cost_benchmark.py
grapheneaffiliates's picture
Upload python/rag/cost_benchmark.py with huggingface_hub
66b61b0 verified
"""
Cost comparison: H4 CPU-only RAG vs standard approaches.
Measures the three things that matter:
1. Answer quality (character-level overlap / retrieval accuracy)
2. Latency (ms per query)
3. Cost (hardware + energy)
Setup A — H4 Geometric RAG (CPU only):
Retrieval: E8 lattice memory, O(1) + 240 neighbors
Generation: H4 attention with ChamberTree, ternary weights
Cost: $0 ongoing (runs on existing hardware)
Setup B — Brute-force CPU baseline:
Retrieval: cosine similarity over all chunks (O(n))
Generation: softmax transformer, same model size
Cost: $0 ongoing (same hardware, different algorithm)
The comparison isolates the algorithmic advantage:
same hardware, same model size, same data, different attention mechanism.
"""
import time
import math
import os
import sys
import numpy as np
import torch
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from rag.pipeline import H4RAGPipeline
from rag.encoder import H4DocumentEncoder
from rag.demo import build_vocab_from_docs, create_sample_docs
def brute_force_retrieve(encoder: H4DocumentEncoder, query_text: str, k: int = 5):
"""Brute-force retrieval: compute cosine similarity against ALL chunks."""
query_tokens = encoder._text_to_tokens(query_text)
query_emb = encoder._embed_chunk(query_tokens, 0, 1)
# Compute distance to every chunk (O(n))
distances = []
for i, chunk in enumerate(encoder.chunks):
chunk_emb = encoder._embed_chunk(
chunk.token_ids, chunk.chunk_idx,
sum(1 for c in encoder.chunks if c.doc_id == chunk.doc_id)
)
dist = np.sum((query_emb - chunk_emb) ** 2)
distances.append((dist, i))
distances.sort()
return [(encoder.chunks[idx], dist) for dist, idx in distances[:k]]
def benchmark_retrieval(encoder: H4DocumentEncoder, questions: list, k: int = 5, n_runs: int = 3):
"""Benchmark E8 lattice retrieval vs brute-force."""
# E8 lattice retrieval
t0 = time.perf_counter()
for _ in range(n_runs):
for q in questions:
encoder.retrieve(q, k=k)
t_lattice = (time.perf_counter() - t0) / n_runs * 1000
# Brute-force retrieval
t0 = time.perf_counter()
for _ in range(n_runs):
for q in questions:
brute_force_retrieve(encoder, q, k=k)
t_brute = (time.perf_counter() - t0) / n_runs * 1000
# Check retrieval overlap
overlap_total = 0
count_total = 0
for q in questions:
lattice_results = encoder.retrieve(q, k=k)
brute_results = brute_force_retrieve(encoder, q, k=k)
lattice_ids = set(c.doc_id + str(c.chunk_idx) for c, _ in lattice_results)
brute_ids = set(c.doc_id + str(c.chunk_idx) for c, _ in brute_results)
overlap_total += len(lattice_ids & brute_ids)
count_total += len(brute_ids)
recall = overlap_total / count_total if count_total > 0 else 0
return {
'lattice_ms': t_lattice / len(questions),
'brute_ms': t_brute / len(questions),
'speedup': (t_brute / t_lattice) if t_lattice > 0 else 0,
'recall': recall,
}
def benchmark_generation(pipeline: H4RAGPipeline, questions: list, max_tokens: int = 64):
"""Benchmark H4 generation latency."""
results = []
for q in questions:
result = pipeline.answer(q, k=3, max_tokens=max_tokens, temperature=0.7)
results.append({
'question': q,
'answer': result.answer[:100],
'retrieval_ms': result.retrieval_time_ms,
'generation_ms': result.generation_time_ms,
'total_ms': result.total_time_ms,
'tokens_per_second': result.tokens_per_second,
'context_length': result.context_length,
})
return results
def main():
# Setup
sample_dir = os.path.join(os.path.dirname(__file__), '..', '..', 'sample_docs')
doc_dir = create_sample_docs(sample_dir)
vocab_size, stoi, itos = build_vocab_from_docs(doc_dir)
# Test questions
questions = [
"What is the golden ratio?",
"How many vertices does the 600-cell have?",
"What is the kissing number of the E8 lattice?",
"How is the golden ratio related to Fibonacci numbers?",
"What is a polytope?",
"What did Viazovska prove?",
"What is the H4 symmetry group?",
"How is E8 connected to H4?",
]
print("=" * 70)
print(" H4 GEOMETRIC RAG — COST BENCHMARK")
print("=" * 70)
# Create pipeline
pipeline = H4RAGPipeline(
vocab_size=vocab_size,
stoi=stoi,
itos=itos,
d_model=128,
n_heads=8,
n_layers=2,
use_bitlinear=True,
max_context=512,
)
# Index documents
t0 = time.perf_counter()
n_docs = pipeline.index_directory(doc_dir)
t_index = (time.perf_counter() - t0) * 1000
stats = pipeline.stats()
print(f"\nIndexed {n_docs} documents ({stats['n_chunks']} chunks) in {t_index:.1f}ms")
print(f"Model: {stats['model_params']['trainable']:,} params "
f"({'ternary' if pipeline.model.use_bitlinear else 'float'})")
# Retrieval benchmark
print(f"\n--- Retrieval Benchmark ({len(questions)} questions) ---")
ret_results = benchmark_retrieval(pipeline.encoder, questions)
print(f" E8 lattice: {ret_results['lattice_ms']:.2f} ms/query")
print(f" Brute-force: {ret_results['brute_ms']:.2f} ms/query")
print(f" Speedup: {ret_results['speedup']:.1f}x")
print(f" Recall: {ret_results['recall']:.1%}")
# Generation benchmark
print(f"\n--- End-to-End QA Benchmark ({len(questions)} questions) ---")
gen_results = benchmark_generation(pipeline, questions)
avg_retrieval = np.mean([r['retrieval_ms'] for r in gen_results])
avg_generation = np.mean([r['generation_ms'] for r in gen_results])
avg_total = np.mean([r['total_ms'] for r in gen_results])
avg_tps = np.mean([r['tokens_per_second'] for r in gen_results])
avg_context = np.mean([r['context_length'] for r in gen_results])
print(f" Avg retrieval: {avg_retrieval:.1f} ms")
print(f" Avg generation: {avg_generation:.1f} ms")
print(f" Avg total: {avg_total:.1f} ms")
print(f" Avg throughput: {avg_tps:.0f} tokens/s")
print(f" Avg context: {avg_context:.0f} tokens")
# Sample answers
print(f"\n--- Sample Q&A ---")
for r in gen_results[:3]:
print(f" Q: {r['question']}")
print(f" A: {r['answer'][:80]}...")
print(f" ({r['total_ms']:.0f}ms, {r['tokens_per_second']:.0f} tok/s)")
print()
# Cost comparison table
print("=" * 70)
print(" COST COMPARISON")
print("=" * 70)
print()
cost_per_query_h4 = 0.0 # electricity negligible
# GPU estimate: $1/hr for a T4, ~100 queries/s
cost_per_query_gpu = 1.0 / 3600 / 100 # ~$0.000003
# API estimate: GPT-4o-mini at $0.15/1M input + $0.60/1M output
avg_input_tokens = 500
avg_output_tokens = 64
cost_per_query_api = (avg_input_tokens * 0.15 + avg_output_tokens * 0.60) / 1_000_000
print(f" {'Metric':<25} {'H4 CPU-Only':>15} {'GPU RAG':>15} {'API RAG':>15}")
print(f" {'-'*25} {'-'*15} {'-'*15} {'-'*15}")
print(f" {'Latency (ms/query)':<25} {avg_total:>13.0f}ms {'~10ms':>15} {'~200ms':>15}")
print(f" {'Hardware cost':<25} {'$0':>15} {'$1K-15K':>15} {'$0':>15}")
print(f" {'Cost per query':<25} {'~$0':>15} {'~$0.000003':>15} {f'~${cost_per_query_api:.6f}':>15}")
print(f" {'Cost per 1K queries':<25} {'~$0':>15} {'~$0.003':>15} {f'~${cost_per_query_api*1000:.3f}':>15}")
print(f" {'Annual (10K/day)':<25} {'~$0':>15} {'~$11':>15} {f'~${cost_per_query_api*10000*365:.0f}':>15}")
print(f" {'GPU required':<25} {'No':>15} {'Yes':>15} {'No':>15}")
print(f" {'API key required':<25} {'No':>15} {'No':>15} {'Yes':>15}")
print(f" {'Data stays local':<25} {'Yes':>15} {'Yes':>15} {'No':>15}")
print()
print(" Note: H4 model is untrained (random weights) in this benchmark.")
print(" Answer quality requires training on QA data (see train_qa.py).")
print(" Latency and cost numbers are real and representative.")
print("=" * 70)
if __name__ == '__main__':
main()