felix-framework / benchmarks /benchmark_enhanced_systems.py
jkbennitt
Clean hf-space branch and prepare for HuggingFace Spaces deployment
fb867c3
#!/usr/bin/env python3
"""
Benchmark Enhanced Felix Framework Systems
Comprehensive benchmarking of all five priority enhancement systems:
1. Intelligent Output Chunking & Streaming
2. Dynamic Agent Spawning
3. Prompt Optimization Pipeline
4. Memory and Persistence Layer
5. Benchmarking & Quality Metrics
This script provides performance metrics and validation results for our enhanced systems.
"""
import sys
import time
import json
import tempfile
import statistics
import os
from pathlib import Path
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
# Add src to Python path
sys.path.insert(0, str(Path(__file__).parent / "src"))
@dataclass
class BenchmarkResult:
"""Results from a single benchmark test."""
system_name: str
test_name: str
execution_time: float
success: bool
metrics: Dict[str, Any]
error_message: Optional[str] = None
@dataclass
class SystemBenchmark:
"""Benchmark results for an entire system."""
system_name: str
total_tests: int
successful_tests: int
total_time: float
average_time: float
results: List[BenchmarkResult]
def benchmark_chunking_system() -> SystemBenchmark:
"""Benchmark the intelligent output chunking system."""
print("🧩 Benchmarking Chunking System...")
from src.pipeline.chunking import ProgressiveProcessor, ChunkedResult
results = []
start_time = time.time()
# Test 1: Basic chunking functionality
test_start = time.time()
try:
processor = ProgressiveProcessor(
task_id="benchmark_task",
agent_id="benchmark_agent",
full_content="This is test content for chunking. " * 100, # 500 words
chunk_size=200
)
chunks_count = processor.total_chunks
first_chunk = processor.get_chunk_by_index(0)
success = chunks_count > 1 and first_chunk is not None
metrics = {
"total_chunks": chunks_count,
"content_length": len(processor.full_content),
"chunk_size": processor.chunk_size,
"first_chunk_length": len(first_chunk.content_chunk) if first_chunk else 0
}
results.append(BenchmarkResult(
system_name="Chunking",
test_name="basic_chunking",
execution_time=time.time() - test_start,
success=success,
metrics=metrics
))
except Exception as e:
results.append(BenchmarkResult(
system_name="Chunking",
test_name="basic_chunking",
execution_time=time.time() - test_start,
success=False,
metrics={},
error_message=str(e)
))
# Test 2: Performance with large content
test_start = time.time()
try:
large_content = "Large content for performance testing. " * 1000 # ~5000 words
large_processor = ProgressiveProcessor(
task_id="large_task",
agent_id="benchmark_agent",
full_content=large_content,
chunk_size=500
)
# Process all chunks
all_chunks = []
for i in range(large_processor.total_chunks):
chunk = large_processor.get_chunk_by_index(i)
if chunk:
all_chunks.append(chunk)
success = len(all_chunks) == large_processor.total_chunks
metrics = {
"content_length": len(large_content),
"total_chunks": large_processor.total_chunks,
"processed_chunks": len(all_chunks),
"avg_chunk_size": statistics.mean([len(chunk.content_chunk) for chunk in all_chunks]) if all_chunks else 0
}
results.append(BenchmarkResult(
system_name="Chunking",
test_name="large_content_performance",
execution_time=time.time() - test_start,
success=success,
metrics=metrics
))
except Exception as e:
results.append(BenchmarkResult(
system_name="Chunking",
test_name="large_content_performance",
execution_time=time.time() - test_start,
success=False,
metrics={},
error_message=str(e)
))
total_time = time.time() - start_time
successful_tests = sum(1 for r in results if r.success)
return SystemBenchmark(
system_name="Chunking System",
total_tests=len(results),
successful_tests=successful_tests,
total_time=total_time,
average_time=total_time / len(results) if results else 0,
results=results
)
def benchmark_knowledge_store() -> SystemBenchmark:
"""Benchmark the memory and persistence layer."""
print("🧠 Benchmarking Knowledge Store...")
from src.memory.knowledge_store import KnowledgeStore, KnowledgeType, ConfidenceLevel, KnowledgeQuery
results = []
start_time = time.time()
# Create temporary database
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as temp_file:
temp_db_path = temp_file.name
try:
# Test 1: Basic storage and retrieval
test_start = time.time()
try:
store = KnowledgeStore(storage_path=temp_db_path)
# Store test knowledge
knowledge_id = store.store_knowledge(
knowledge_type=KnowledgeType.TASK_RESULT,
content={"task": "benchmark_test", "result": "success", "score": 0.95},
confidence_level=ConfidenceLevel.HIGH,
source_agent="benchmark_agent",
domain="testing",
tags=["benchmark", "test", "performance"]
)
# Retrieve knowledge
query = KnowledgeQuery(
knowledge_types=[KnowledgeType.TASK_RESULT],
domains=["testing"]
)
retrieved = store.retrieve_knowledge(query)
success = len(retrieved) == 1 and retrieved[0].knowledge_id == knowledge_id
metrics = {
"stored_entries": 1,
"retrieved_entries": len(retrieved),
"knowledge_id_match": retrieved[0].knowledge_id == knowledge_id if retrieved else False
}
results.append(BenchmarkResult(
system_name="KnowledgeStore",
test_name="basic_storage_retrieval",
execution_time=time.time() - test_start,
success=success,
metrics=metrics
))
except Exception as e:
results.append(BenchmarkResult(
system_name="KnowledgeStore",
test_name="basic_storage_retrieval",
execution_time=time.time() - test_start,
success=False,
metrics={},
error_message=str(e)
))
# Test 2: Performance with multiple entries
test_start = time.time()
try:
store = KnowledgeStore(storage_path=temp_db_path)
# Store multiple knowledge entries
knowledge_ids = []
for i in range(100):
kid = store.store_knowledge(
knowledge_type=KnowledgeType.AGENT_INSIGHT,
content={"insight": f"test_insight_{i}", "value": i * 0.01},
confidence_level=ConfidenceLevel.MEDIUM,
source_agent=f"agent_{i % 5}",
domain="performance_testing",
tags=["bulk_test", f"batch_{i // 20}"]
)
knowledge_ids.append(kid)
# Query all entries
query = KnowledgeQuery(domains=["performance_testing"], limit=200)
all_entries = store.retrieve_knowledge(query)
# Query with filters - batch_2 should have entries 40-59 (20 entries)
filtered_query = KnowledgeQuery(
domains=["performance_testing"],
tags=["batch_2"],
min_confidence=ConfidenceLevel.MEDIUM
)
filtered_entries = store.retrieve_knowledge(filtered_query)
success = len(all_entries) >= 100 and len(filtered_entries) >= 5
metrics = {
"stored_entries": len(knowledge_ids),
"retrieved_all": len(all_entries),
"retrieved_filtered": len(filtered_entries),
"storage_success_rate": len([kid for kid in knowledge_ids if kid]) / len(knowledge_ids)
}
results.append(BenchmarkResult(
system_name="KnowledgeStore",
test_name="bulk_storage_performance",
execution_time=time.time() - test_start,
success=success,
metrics=metrics
))
except Exception as e:
results.append(BenchmarkResult(
system_name="KnowledgeStore",
test_name="bulk_storage_performance",
execution_time=time.time() - test_start,
success=False,
metrics={},
error_message=str(e)
))
finally:
# Cleanup temporary database
if os.path.exists(temp_db_path):
os.unlink(temp_db_path)
total_time = time.time() - start_time
successful_tests = sum(1 for r in results if r.success)
return SystemBenchmark(
system_name="Knowledge Store",
total_tests=len(results),
successful_tests=successful_tests,
total_time=total_time,
average_time=total_time / len(results) if results else 0,
results=results
)
def benchmark_quality_metrics() -> SystemBenchmark:
"""Benchmark the quality metrics calculation system."""
print("📊 Benchmarking Quality Metrics...")
from src.comparison.quality_metrics import QualityMetricsCalculator, DomainType
results = []
start_time = time.time()
# Test 1: Basic quality assessment
test_start = time.time()
try:
calculator = QualityMetricsCalculator()
test_text = """
This comprehensive analysis examines artificial intelligence development trends.
First, we establish the foundational concepts. Furthermore, advanced machine learning
techniques demonstrate significant improvements in accuracy and efficiency. Research
indicates that 95% of systems show enhanced performance. Therefore, these methodologies
provide substantial value for implementation.
"""
quality_score = calculator.calculate_quality_score(test_text, DomainType.TECHNICAL)
success = (
0 <= quality_score.overall_score <= 1 and
quality_score.coherence_score > 0 and
quality_score.accuracy_score > 0 and
quality_score.word_count > 0
)
metrics = {
"overall_score": quality_score.overall_score,
"coherence_score": quality_score.coherence_score,
"accuracy_score": quality_score.accuracy_score,
"completeness_score": quality_score.completeness_score,
"clarity_score": quality_score.clarity_score,
"word_count": quality_score.word_count,
"sentence_count": quality_score.sentence_count,
"has_bleu_score": quality_score.bleu_score is not None
}
results.append(BenchmarkResult(
system_name="QualityMetrics",
test_name="basic_quality_assessment",
execution_time=time.time() - test_start,
success=success,
metrics=metrics
))
except Exception as e:
results.append(BenchmarkResult(
system_name="QualityMetrics",
test_name="basic_quality_assessment",
execution_time=time.time() - test_start,
success=False,
metrics={},
error_message=str(e)
))
# Test 2: Performance with multiple texts
test_start = time.time()
try:
calculator = QualityMetricsCalculator()
test_texts = [
"High quality technical documentation with research backing.",
"Some random text without much structure or clarity here.",
"This demonstrates excellent coherence. Furthermore, the analysis provides substantial evidence. Research indicates clear patterns.",
"Poor quality text with no structure clarity issues many problems",
"Comprehensive examination reveals innovative approaches. Studies show 92% effectiveness rates."
]
batch_scores = calculator.batch_calculate_scores(test_texts, DomainType.GENERAL)
success = len(batch_scores) == len(test_texts) and all(0 <= score.overall_score <= 1 for score in batch_scores)
metrics = {
"texts_processed": len(batch_scores),
"average_overall_score": statistics.mean([score.overall_score for score in batch_scores]),
"score_range": max([score.overall_score for score in batch_scores]) - min([score.overall_score for score in batch_scores]),
"all_valid_scores": all(0 <= score.overall_score <= 1 for score in batch_scores)
}
results.append(BenchmarkResult(
system_name="QualityMetrics",
test_name="batch_processing_performance",
execution_time=time.time() - test_start,
success=success,
metrics=metrics
))
except Exception as e:
results.append(BenchmarkResult(
system_name="QualityMetrics",
test_name="batch_processing_performance",
execution_time=time.time() - test_start,
success=False,
metrics={},
error_message=str(e)
))
total_time = time.time() - start_time
successful_tests = sum(1 for r in results if r.success)
return SystemBenchmark(
system_name="Quality Metrics",
total_tests=len(results),
successful_tests=successful_tests,
total_time=total_time,
average_time=total_time / len(results) if results else 0,
results=results
)
def benchmark_prompt_optimization() -> SystemBenchmark:
"""Benchmark the prompt optimization system."""
print("🎯 Benchmarking Prompt Optimization...")
from src.agents.prompt_optimization import PromptOptimizer
results = []
start_time = time.time()
# Create temporary database
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as temp_file:
temp_db_path = temp_file.name
try:
# Test 1: Basic prompt optimization
test_start = time.time()
try:
optimizer = PromptOptimizer()
base_prompt = "Analyze the following content and provide insights"
context = {"domain": "technical", "task_type": "analysis"}
optimized_prompt = optimizer.optimize_prompt(base_prompt, context)
success = (
optimized_prompt is not None and
len(optimized_prompt.prompt_text) > len(base_prompt) and
optimized_prompt.prompt_id is not None
)
metrics = {
"base_prompt_length": len(base_prompt),
"optimized_prompt_length": len(optimized_prompt.prompt_text),
"improvement_ratio": len(optimized_prompt.prompt_text) / len(base_prompt),
"has_prompt_id": optimized_prompt.prompt_id is not None,
"has_context": len(optimized_prompt.context) > 0
}
results.append(BenchmarkResult(
system_name="PromptOptimizer",
test_name="basic_optimization",
execution_time=time.time() - test_start,
success=success,
metrics=metrics
))
except Exception as e:
results.append(BenchmarkResult(
system_name="PromptOptimizer",
test_name="basic_optimization",
execution_time=time.time() - test_start,
success=False,
metrics={},
error_message=str(e)
))
# Test 2: Performance tracking
test_start = time.time()
try:
optimizer = PromptOptimizer()
# Create and track multiple prompts
prompt_performance_data = []
for i in range(10):
prompt_text = f"Test prompt {i} for analysis task"
optimized = optimizer.optimize_prompt(prompt_text, {"iteration": i})
# Record performance
optimizer.record_prompt_performance(
prompt_id=optimized.prompt_id,
success_rate=0.7 + (i * 0.02), # Simulated improvement
quality_metrics={"coherence": 0.8 + (i * 0.01)},
context={"test_iteration": i}
)
performance = optimizer.get_prompt_performance(optimized.prompt_id)
prompt_performance_data.append(len(performance))
success = (
len(prompt_performance_data) == 10 and
all(count > 0 for count in prompt_performance_data)
)
metrics = {
"prompts_created": len(prompt_performance_data),
"average_performance_records": statistics.mean(prompt_performance_data),
"all_recorded_performance": all(count > 0 for count in prompt_performance_data)
}
results.append(BenchmarkResult(
system_name="PromptOptimizer",
test_name="performance_tracking",
execution_time=time.time() - test_start,
success=success,
metrics=metrics
))
except Exception as e:
results.append(BenchmarkResult(
system_name="PromptOptimizer",
test_name="performance_tracking",
execution_time=time.time() - test_start,
success=False,
metrics={},
error_message=str(e)
))
finally:
# Cleanup temporary database
if os.path.exists(temp_db_path):
os.unlink(temp_db_path)
total_time = time.time() - start_time
successful_tests = sum(1 for r in results if r.success)
return SystemBenchmark(
system_name="Prompt Optimization",
total_tests=len(results),
successful_tests=successful_tests,
total_time=total_time,
average_time=total_time / len(results) if results else 0,
results=results
)
def benchmark_integration_scenario() -> SystemBenchmark:
"""Benchmark integration of multiple enhanced systems."""
print("🔗 Benchmarking Integration Scenarios...")
results = []
start_time = time.time()
# Test 1: Chunking + Quality Metrics integration
test_start = time.time()
try:
from src.pipeline.chunking import ProgressiveProcessor
from src.comparison.quality_metrics import QualityMetricsCalculator, DomainType
# Create content for chunking
blog_content = """
# AI Ethics in Healthcare: A Comprehensive Analysis
The integration of artificial intelligence into healthcare systems presents unprecedented
opportunities and challenges. This analysis examines key ethical considerations, regulatory
frameworks, and implementation strategies for responsible AI deployment in medical contexts.
## Current State and Challenges
Healthcare AI systems currently face several critical ethical challenges. First, ensuring
patient privacy while enabling valuable medical research requires sophisticated data handling.
Furthermore, algorithmic bias in diagnostic tools can perpetuate healthcare disparities.
## Regulatory and Implementation Frameworks
Effective governance structures must balance innovation with patient safety. Research indicates
that 87% of healthcare institutions require comprehensive ethical review processes for AI systems.
## Future Directions
Therefore, successful AI integration demands collaborative approaches between technologists,
clinicians, ethicists, and policymakers to ensure beneficial outcomes for all patients.
"""
# Chunk the content
processor = ProgressiveProcessor(
task_id="integration_test",
agent_id="integration_agent",
full_content=blog_content,
chunk_size=400
)
# Calculate quality metrics for each chunk
quality_calculator = QualityMetricsCalculator()
chunk_quality_scores = []
for i in range(processor.total_chunks):
chunk = processor.get_chunk_by_index(i)
if chunk and chunk.content_chunk:
quality_score = quality_calculator.calculate_quality_score(
chunk.content_chunk,
DomainType.TECHNICAL
)
chunk_quality_scores.append(quality_score.overall_score)
# Calculate overall quality metrics
overall_quality = quality_calculator.calculate_quality_score(blog_content, DomainType.TECHNICAL)
success = (
processor.total_chunks > 1 and
len(chunk_quality_scores) == processor.total_chunks and
overall_quality.overall_score > 0.5 and
all(0 <= score <= 1 for score in chunk_quality_scores)
)
metrics = {
"total_chunks": processor.total_chunks,
"chunks_with_quality": len(chunk_quality_scores),
"average_chunk_quality": statistics.mean(chunk_quality_scores) if chunk_quality_scores else 0,
"overall_quality_score": overall_quality.overall_score,
"quality_consistency": statistics.stdev(chunk_quality_scores) if len(chunk_quality_scores) > 1 else 0
}
results.append(BenchmarkResult(
system_name="Integration",
test_name="chunking_quality_metrics",
execution_time=time.time() - test_start,
success=success,
metrics=metrics
))
except Exception as e:
results.append(BenchmarkResult(
system_name="Integration",
test_name="chunking_quality_metrics",
execution_time=time.time() - test_start,
success=False,
metrics={},
error_message=str(e)
))
# Test 2: Knowledge Store + Quality Metrics integration
test_start = time.time()
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as temp_file:
temp_db_path = temp_file.name
try:
from src.memory.knowledge_store import KnowledgeStore, KnowledgeType, ConfidenceLevel
from src.comparison.quality_metrics import QualityMetricsCalculator, DomainType
store = KnowledgeStore(storage_path=temp_db_path)
calculator = QualityMetricsCalculator()
# Store knowledge with quality assessments
test_content = [
"High quality technical analysis with comprehensive research backing and statistical evidence. This demonstrates sophisticated understanding of complex systems with detailed methodological approach and rigorous validation procedures.",
"Poor quality text with unclear structure and limited substance or insight.",
"Excellent coherence demonstrated through logical flow. Furthermore, evidence supports conclusions with comprehensive analysis and detailed reasoning throughout the investigation."
]
quality_assessments = []
stored_ids = []
for i, content in enumerate(test_content):
# Calculate quality
quality_score = calculator.calculate_quality_score(content, DomainType.TECHNICAL)
quality_assessments.append(quality_score.overall_score)
# Store knowledge with quality metadata
knowledge_id = store.store_knowledge(
knowledge_type=KnowledgeType.TASK_RESULT,
content={
"text": content,
"quality_assessment": {
"overall_score": quality_score.overall_score,
"coherence": quality_score.coherence_score,
"accuracy": quality_score.accuracy_score
}
},
confidence_level=ConfidenceLevel.HIGH if quality_score.overall_score > 0.7 else ConfidenceLevel.MEDIUM,
source_agent="integration_test",
domain="quality_testing",
tags=["integration", "quality", f"batch_{i}"]
)
stored_ids.append(knowledge_id)
# Query high-quality knowledge
from src.memory.knowledge_store import KnowledgeQuery
high_quality_query = KnowledgeQuery(
domains=["quality_testing"],
min_confidence=ConfidenceLevel.MEDIUM,
content_keywords=["quality"]
)
high_quality_entries = store.retrieve_knowledge(high_quality_query)
success = (
len(stored_ids) == len(test_content) and
len(quality_assessments) == len(test_content) and
len(high_quality_entries) > 0 and
all(entry.content.get("quality_assessment") for entry in high_quality_entries)
)
metrics = {
"content_pieces": len(test_content),
"stored_entries": len(stored_ids),
"quality_assessments": len(quality_assessments),
"high_quality_retrieved": len(high_quality_entries),
"average_quality": statistics.mean(quality_assessments),
"quality_range": max(quality_assessments) - min(quality_assessments)
}
results.append(BenchmarkResult(
system_name="Integration",
test_name="knowledge_quality_integration",
execution_time=time.time() - test_start,
success=success,
metrics=metrics
))
except Exception as e:
results.append(BenchmarkResult(
system_name="Integration",
test_name="knowledge_quality_integration",
execution_time=time.time() - test_start,
success=False,
metrics={},
error_message=str(e)
))
finally:
if os.path.exists(temp_db_path):
os.unlink(temp_db_path)
total_time = time.time() - start_time
successful_tests = sum(1 for r in results if r.success)
return SystemBenchmark(
system_name="Integration Scenarios",
total_tests=len(results),
successful_tests=successful_tests,
total_time=total_time,
average_time=total_time / len(results) if results else 0,
results=results
)
def generate_benchmark_report(benchmarks: List[SystemBenchmark]) -> Dict[str, Any]:
"""Generate comprehensive benchmark report."""
total_tests = sum(b.total_tests for b in benchmarks)
total_successful = sum(b.successful_tests for b in benchmarks)
total_time = sum(b.total_time for b in benchmarks)
report = {
"benchmark_timestamp": time.time(),
"summary": {
"total_systems": len(benchmarks),
"total_tests": total_tests,
"successful_tests": total_successful,
"success_rate": total_successful / total_tests if total_tests > 0 else 0,
"total_execution_time": total_time,
"average_time_per_test": total_time / total_tests if total_tests > 0 else 0
},
"system_results": []
}
for benchmark in benchmarks:
system_result = {
"system_name": benchmark.system_name,
"tests": benchmark.total_tests,
"successful": benchmark.successful_tests,
"success_rate": benchmark.successful_tests / benchmark.total_tests if benchmark.total_tests > 0 else 0,
"total_time": benchmark.total_time,
"average_time": benchmark.average_time,
"detailed_results": []
}
for result in benchmark.results:
system_result["detailed_results"].append({
"test_name": result.test_name,
"success": result.success,
"execution_time": result.execution_time,
"metrics": result.metrics,
"error": result.error_message
})
report["system_results"].append(system_result)
return report
def main():
"""Run comprehensive benchmarks of all enhanced systems."""
print("🚀 Felix Framework Enhanced Systems Benchmark")
print("=" * 60)
benchmarks = []
# Run individual system benchmarks
benchmarks.append(benchmark_chunking_system())
benchmarks.append(benchmark_knowledge_store())
benchmarks.append(benchmark_quality_metrics())
benchmarks.append(benchmark_prompt_optimization())
benchmarks.append(benchmark_integration_scenario())
print("\n" + "=" * 60)
print("📊 BENCHMARK RESULTS SUMMARY")
print("=" * 60)
# Generate and display report
report = generate_benchmark_report(benchmarks)
print(f"Total Systems Tested: {report['summary']['total_systems']}")
print(f"Total Tests: {report['summary']['total_tests']}")
print(f"Successful Tests: {report['summary']['successful_tests']}")
print(f"Overall Success Rate: {report['summary']['success_rate']:.1%}")
print(f"Total Execution Time: {report['summary']['total_execution_time']:.3f}s")
print(f"Average Time per Test: {report['summary']['average_time_per_test']:.3f}s")
print("\n" + "-" * 60)
print("DETAILED SYSTEM RESULTS")
print("-" * 60)
for system_result in report["system_results"]:
print(f"\n🔧 {system_result['system_name']}")
print(f" Tests: {system_result['successful']}/{system_result['tests']} "
f"({system_result['success_rate']:.1%} success)")
print(f" Time: {system_result['total_time']:.3f}s "
f"(avg: {system_result['average_time']:.3f}s)")
for test_result in system_result["detailed_results"]:
status = "✅" if test_result["success"] else "❌"
print(f" {status} {test_result['test_name']}: {test_result['execution_time']:.3f}s")
if not test_result["success"] and test_result["error"]:
print(f" Error: {test_result['error']}")
# Save detailed report
report_path = Path("benchmark_results.json")
with open(report_path, 'w') as f:
json.dump(report, f, indent=2)
print(f"\n💾 Detailed results saved to: {report_path}")
print("\n" + "=" * 60)
print("✨ BENCHMARK COMPLETE")
print("=" * 60)
if report['summary']['success_rate'] >= 0.8:
print("🎉 EXCELLENT: All enhanced systems performing well!")
elif report['summary']['success_rate'] >= 0.6:
print("✅ GOOD: Most enhanced systems working correctly")
else:
print("⚠️ WARNING: Some systems need attention")
return report
if __name__ == "__main__":
main()