File size: 28,233 Bytes
eefb8cb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 |
"""
Model Evaluator for Mamba Swarm
Comprehensive evaluation system for model performance and quality
"""
import time
import json
import logging
import torch
import numpy as np
from typing import Dict, List, Any, Optional, Tuple, Callable, Union
from dataclasses import dataclass, field
from collections import defaultdict
import math
import re
from datetime import datetime
from pathlib import Path
import asyncio
import concurrent.futures
# Evaluation metrics
@dataclass
class EvaluationResult:
metric_name: str
score: float
details: Dict[str, Any] = field(default_factory=dict)
timestamp: float = field(default_factory=time.time)
@dataclass
class BenchmarkResult:
benchmark_name: str
overall_score: float
individual_metrics: List[EvaluationResult]
execution_time: float
model_info: Dict[str, Any]
timestamp: float = field(default_factory=time.time)
"""
Model Evaluator for Mamba Swarm
Comprehensive evaluation system for model performance and quality
"""
import time
import json
import logging
import torch
import numpy as np
from typing import Dict, List, Any, Optional, Tuple, Callable, Union
from dataclasses import dataclass, field
from collections import defaultdict
import math
import re
from datetime import datetime
from pathlib import Path
import asyncio
import concurrent.futures
# Evaluation metrics
@dataclass
class EvaluationResult:
metric_name: str
score: float
details: Dict[str, Any] = field(default_factory=dict)
timestamp: float = field(default_factory=time.time)
@dataclass
class BenchmarkResult:
benchmark_name: str
overall_score: float
individual_metrics: List[EvaluationResult]
execution_time: float
model_info: Dict[str, Any]
timestamp: float = field(default_factory=time.time)
class PerplexityCalculator:
"""Calculate perplexity for language models"""
def __init__(self, model, tokenizer):
self.model = model
self.tokenizer = tokenizer
self.device = next(model.parameters()).device
def calculate_perplexity(self, text: str, max_length: int = 512) -> float:
"""Calculate perplexity for given text"""
# Tokenize text
tokens = self.tokenizer.encode(text, return_tensors="pt", max_length=max_length, truncation=True)
tokens = tokens.to(self.device)
with torch.no_grad():
# Get model outputs
outputs = self.model(tokens)
logits = outputs.logits if hasattr(outputs, 'logits') else outputs
# Calculate cross-entropy loss
shift_logits = logits[..., :-1, :].contiguous()
shift_labels = tokens[..., 1:].contiguous()
loss_fn = torch.nn.CrossEntropyLoss()
loss = loss_fn(shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1))
# Convert to perplexity
perplexity = torch.exp(loss)
return perplexity.item()
class BLEUScore:
"""BLEU score calculator for text generation"""
def __init__(self, n_grams: int = 4):
self.n_grams = n_grams
def calculate_bleu(self, reference: str, candidate: str) -> float:
"""Calculate BLEU score between reference and candidate"""
ref_tokens = self._tokenize(reference)
cand_tokens = self._tokenize(candidate)
if len(cand_tokens) == 0:
return 0.0
# Calculate n-gram precisions
precisions = []
for n in range(1, self.n_grams + 1):
precision = self._calculate_n_gram_precision(ref_tokens, cand_tokens, n)
precisions.append(precision)
# Brevity penalty
bp = self._brevity_penalty(len(ref_tokens), len(cand_tokens))
# Calculate BLEU score
if 0 in precisions:
return 0.0
log_precisions = [math.log(p) for p in precisions]
bleu = bp * math.exp(sum(log_precisions) / len(log_precisions))
return bleu
def _tokenize(self, text: str) -> List[str]:
"""Simple tokenization"""
return text.lower().split()
def _calculate_n_gram_precision(self, ref_tokens: List[str], cand_tokens: List[str], n: int) -> float:
"""Calculate n-gram precision"""
if len(cand_tokens) < n:
return 0.0
# Get n-grams
ref_ngrams = self._get_ngrams(ref_tokens, n)
cand_ngrams = self._get_ngrams(cand_tokens, n)
if len(cand_ngrams) == 0:
return 0.0
# Count matches
matches = 0
for ngram in cand_ngrams:
if ngram in ref_ngrams:
matches += min(cand_ngrams[ngram], ref_ngrams[ngram])
return matches / sum(cand_ngrams.values())
def _get_ngrams(self, tokens: List[str], n: int) -> Dict[Tuple[str, ...], int]:
"""Get n-gram counts"""
ngrams = defaultdict(int)
for i in range(len(tokens) - n + 1):
ngram = tuple(tokens[i:i+n])
ngrams[ngram] += 1
return ngrams
def _brevity_penalty(self, ref_len: int, cand_len: int) -> float:
"""Calculate brevity penalty"""
if cand_len > ref_len:
return 1.0
elif cand_len == 0:
return 0.0
else:
return math.exp(1 - ref_len / cand_len)
class ROUGEScore:
"""ROUGE score calculator"""
def __init__(self):
pass
def calculate_rouge_l(self, reference: str, candidate: str) -> float:
"""Calculate ROUGE-L score"""
ref_tokens = reference.lower().split()
cand_tokens = candidate.lower().split()
if not ref_tokens or not cand_tokens:
return 0.0
# Calculate LCS
lcs_length = self._lcs_length(ref_tokens, cand_tokens)
if lcs_length == 0:
return 0.0
# Calculate precision and recall
precision = lcs_length / len(cand_tokens)
recall = lcs_length / len(ref_tokens)
# Calculate F1 score
if precision + recall == 0:
return 0.0
f1 = 2 * precision * recall / (precision + recall)
return f1
def _lcs_length(self, seq1: List[str], seq2: List[str]) -> int:
"""Calculate length of longest common subsequence"""
m, n = len(seq1), len(seq2)
dp = [[0] * (n + 1) for _ in range(m + 1)]
for i in range(1, m + 1):
for j in range(1, n + 1):
if seq1[i-1] == seq2[j-1]:
dp[i][j] = dp[i-1][j-1] + 1
else:
dp[i][j] = max(dp[i-1][j], dp[i][j-1])
return dp[m][n]
class CoherenceAnalyzer:
"""Analyze text coherence and quality"""
def __init__(self):
pass
def analyze_coherence(self, text: str) -> Dict[str, float]:
"""Analyze text coherence"""
sentences = self._split_sentences(text)
if len(sentences) < 2:
return {"coherence_score": 1.0, "repetition_score": 1.0, "diversity_score": 0.5}
# Calculate coherence metrics
coherence_score = self._calculate_coherence(sentences)
repetition_score = self._calculate_repetition(text)
diversity_score = self._calculate_diversity(text)
return {
"coherence_score": coherence_score,
"repetition_score": repetition_score,
"diversity_score": diversity_score
}
def _split_sentences(self, text: str) -> List[str]:
"""Split text into sentences"""
# Simple sentence splitting
sentences = re.split(r'[.!?]+', text)
return [s.strip() for s in sentences if s.strip()]
def _calculate_coherence(self, sentences: List[str]) -> float:
"""Calculate coherence score based on sentence similarity"""
if len(sentences) < 2:
return 1.0
similarities = []
for i in range(len(sentences) - 1):
sim = self._sentence_similarity(sentences[i], sentences[i+1])
similarities.append(sim)
return sum(similarities) / len(similarities)
def _sentence_similarity(self, sent1: str, sent2: str) -> float:
"""Calculate similarity between two sentences"""
words1 = set(sent1.lower().split())
words2 = set(sent2.lower().split())
if not words1 or not words2:
return 0.0
intersection = words1.intersection(words2)
union = words1.union(words2)
return len(intersection) / len(union)
def _calculate_repetition(self, text: str) -> float:
"""Calculate repetition score (lower is better)"""
words = text.lower().split()
if len(words) < 2:
return 1.0
unique_words = set(words)
repetition_ratio = len(words) / len(unique_words)
# Normalize to 0-1 scale (1 is best, no repetition)
return 1.0 / repetition_ratio
def _calculate_diversity(self, text: str) -> float:
"""Calculate lexical diversity"""
words = text.lower().split()
if len(words) == 0:
return 0.0
unique_words = set(words)
return len(unique_words) / len(words)
class LatencyBenchmark:
"""Benchmark model latency and throughput"""
def __init__(self, model, tokenizer):
self.model = model
self.tokenizer = tokenizer
self.device = next(model.parameters()).device
def benchmark_inference_speed(self, prompts: List[str], max_length: int = 100, num_runs: int = 5) -> Dict[str, float]:
"""Benchmark inference speed"""
latencies = []
token_counts = []
for _ in range(num_runs):
for prompt in prompts:
start_time = time.time()
# Tokenize input
inputs = self.tokenizer.encode(prompt, return_tensors="pt").to(self.device)
# Generate
with torch.no_grad():
outputs = self.model.generate(
inputs,
max_length=max_length,
do_sample=False,
pad_token_id=self.tokenizer.eos_token_id
)
end_time = time.time()
# Calculate metrics
latency = end_time - start_time
generated_tokens = outputs.shape[1] - inputs.shape[1]
latencies.append(latency)
token_counts.append(generated_tokens)
# Calculate statistics
avg_latency = np.mean(latencies)
p95_latency = np.percentile(latencies, 95)
total_tokens = sum(token_counts)
total_time = sum(latencies)
throughput = total_tokens / total_time if total_time > 0 else 0
return {
"avg_latency_ms": avg_latency * 1000,
"p95_latency_ms": p95_latency * 1000,
"throughput_tokens_per_sec": throughput,
"total_runs": len(latencies)
}
class QualityEvaluator:
"""Comprehensive quality evaluation"""
def __init__(self, model, tokenizer):
self.model = model
self.tokenizer = tokenizer
self.perplexity_calc = PerplexityCalculator(model, tokenizer)
self.bleu_calc = BLEUScore()
self.rouge_calc = ROUGEScore()
self.coherence_analyzer = CoherenceAnalyzer()
self.latency_benchmark = LatencyBenchmark(model, tokenizer)
def evaluate_generation_quality(self, prompts: List[str], references: Optional[List[str]] = None, max_length: int = 100) -> List[EvaluationResult]:
"""Evaluate generation quality"""
results = []
for i, prompt in enumerate(prompts):
# Generate text
generated_text = self._generate_text(prompt, max_length)
# Calculate perplexity
try:
perplexity = self.perplexity_calc.calculate_perplexity(generated_text)
results.append(EvaluationResult(
metric_name="perplexity",
score=perplexity,
details={"prompt_index": i, "generated_text": generated_text[:100]}
))
except Exception as e:
logging.warning(f"Failed to calculate perplexity: {e}")
# Calculate coherence metrics
coherence_metrics = self.coherence_analyzer.analyze_coherence(generated_text)
for metric_name, score in coherence_metrics.items():
results.append(EvaluationResult(
metric_name=metric_name,
score=score,
details={"prompt_index": i}
))
# Calculate BLEU and ROUGE if references are provided
if references and i < len(references):
reference = references[i]
bleu_score = self.bleu_calc.calculate_bleu(reference, generated_text)
results.append(EvaluationResult(
metric_name="bleu_score",
score=bleu_score,
details={"prompt_index": i, "reference": reference[:100]}
))
rouge_score = self.rouge_calc.calculate_rouge_l(reference, generated_text)
results.append(EvaluationResult(
metric_name="rouge_l",
score=rouge_score,
details={"prompt_index": i}
))
return results
def _generate_text(self, prompt: str, max_length: int) -> str:
"""Generate text from prompt"""
inputs = self.tokenizer.encode(prompt, return_tensors="pt").to(next(self.model.parameters()).device)
with torch.no_grad():
outputs = self.model.generate(
inputs,
max_length=max_length,
do_sample=True,
temperature=0.7,
pad_token_id=self.tokenizer.eos_token_id
)
generated_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
# Remove the original prompt
generated_text = generated_text[len(prompt):].strip()
return generated_text
class MambaSwarmEvaluator:
"""Main evaluator for Mamba Swarm models"""
def __init__(self, swarm_engine, config: Optional[Dict[str, Any]] = None):
self.swarm_engine = swarm_engine
self.config = config or {}
self.logger = logging.getLogger(__name__)
# Initialize evaluators
self.quality_evaluator = None
self._initialize_evaluators()
# Benchmark datasets
self.benchmark_prompts = [
"The future of artificial intelligence is",
"In a world where technology advances rapidly,",
"The most important challenge facing humanity today is",
"Scientific discoveries have always been driven by",
"The relationship between humans and machines will"
]
def _initialize_evaluators(self):
"""Initialize quality evaluators"""
try:
# Get model and tokenizer from swarm engine
model = self.swarm_engine.get_model()
tokenizer = self.swarm_engine.get_tokenizer()
if model and tokenizer:
self.quality_evaluator = QualityEvaluator(model, tokenizer)
except Exception as e:
self.logger.warning(f"Failed to initialize evaluators: {e}")
def run_comprehensive_evaluation(self) -> BenchmarkResult:
"""Run comprehensive evaluation of the Mamba Swarm"""
start_time = time.time()
all_results = []
# Performance benchmarks
performance_results = self._evaluate_performance()
all_results.extend(performance_results)
# Quality benchmarks
if self.quality_evaluator:
quality_results = self._evaluate_quality()
all_results.extend(quality_results)
# Scalability benchmarks
scalability_results = self._evaluate_scalability()
all_results.extend(scalability_results)
# Resource utilization
resource_results = self._evaluate_resource_utilization()
all_results.extend(resource_results)
# Calculate overall score
overall_score = self._calculate_overall_score(all_results)
execution_time = time.time() - start_time
# Get model info
model_info = self.swarm_engine.get_model_info()
return BenchmarkResult(
benchmark_name="comprehensive_evaluation",
overall_score=overall_score,
individual_metrics=all_results,
execution_time=execution_time,
model_info=model_info
)
def _evaluate_performance(self) -> List[EvaluationResult]:
"""Evaluate performance metrics"""
results = []
try:
# Latency benchmark
if self.quality_evaluator:
latency_metrics = self.quality_evaluator.latency_benchmark.benchmark_inference_speed(
self.benchmark_prompts[:3] # Use subset for speed
)
for metric_name, score in latency_metrics.items():
results.append(EvaluationResult(
metric_name=f"performance_{metric_name}",
score=score,
details={"category": "performance"}
))
# Throughput test
throughput = self._measure_throughput()
results.append(EvaluationResult(
metric_name="throughput_requests_per_sec",
score=throughput,
details={"category": "performance"}
))
except Exception as e:
self.logger.error(f"Performance evaluation failed: {e}")
return results
def _evaluate_quality(self) -> List[EvaluationResult]:
"""Evaluate generation quality"""
results = []
try:
# Quality evaluation
quality_results = self.quality_evaluator.evaluate_generation_quality(
self.benchmark_prompts
)
# Add category to results
for result in quality_results:
result.details["category"] = "quality"
results.append(result)
except Exception as e:
self.logger.error(f"Quality evaluation failed: {e}")
return results
def _evaluate_scalability(self) -> List[EvaluationResult]:
"""Evaluate scalability metrics"""
results = []
try:
# Test with different loads
load_levels = [1, 5, 10]
for load in load_levels:
start_time = time.time()
# Simulate concurrent requests
tasks = []
for _ in range(load):
task = self._simulate_inference_request()
tasks.append(task)
# Wait for completion
success_count = sum(1 for task in tasks if task)
total_time = time.time() - start_time
# Calculate metrics
success_rate = success_count / load
avg_response_time = total_time / load
results.append(EvaluationResult(
metric_name=f"scalability_success_rate_load_{load}",
score=success_rate,
details={"category": "scalability", "load_level": load}
))
results.append(EvaluationResult(
metric_name=f"scalability_avg_response_time_load_{load}",
score=avg_response_time,
details={"category": "scalability", "load_level": load}
))
except Exception as e:
self.logger.error(f"Scalability evaluation failed: {e}")
return results
def _evaluate_resource_utilization(self) -> List[EvaluationResult]:
"""Evaluate resource utilization"""
results = []
try:
# Get memory stats
memory_stats = self.swarm_engine.memory_manager.get_memory_stats()
results.append(EvaluationResult(
metric_name="memory_utilization_gb",
score=memory_stats.used_memory,
details={"category": "resources", "type": "memory"}
))
results.append(EvaluationResult(
metric_name="gpu_memory_utilization_gb",
score=memory_stats.gpu_memory,
details={"category": "resources", "type": "gpu_memory"}
))
# Encoder utilization
active_encoders = len(self.swarm_engine.get_active_encoders())
total_encoders = 100 # As specified in requirements
results.append(EvaluationResult(
metric_name="encoder_utilization_ratio",
score=active_encoders / total_encoders,
details={"category": "resources", "active": active_encoders, "total": total_encoders}
))
except Exception as e:
self.logger.error(f"Resource evaluation failed: {e}")
return results
def _measure_throughput(self) -> float:
"""Measure system throughput"""
try:
num_requests = 10
start_time = time.time()
for _ in range(num_requests):
self._simulate_inference_request()
total_time = time.time() - start_time
throughput = num_requests / total_time
return throughput
except Exception as e:
self.logger.error(f"Throughput measurement failed: {e}")
return 0.0
def _simulate_inference_request(self) -> bool:
"""Simulate an inference request"""
try:
prompt = "This is a test prompt for evaluation."
result = self.swarm_engine.generate(prompt, max_length=50)
return result is not None
except Exception as e:
self.logger.error(f"Simulated request failed: {e}")
return False
def _calculate_overall_score(self, results: List[EvaluationResult]) -> float:
"""Calculate overall benchmark score"""
if not results:
return 0.0
# Weight different categories
weights = {
"performance": 0.3,
"quality": 0.4,
"scalability": 0.2,
"resources": 0.1
}
category_scores = defaultdict(list)
for result in results:
category = result.details.get("category", "other")
# Normalize scores based on metric type
normalized_score = self._normalize_score(result)
category_scores[category].append(normalized_score)
# Calculate weighted average
total_score = 0.0
total_weight = 0.0
for category, scores in category_scores.items():
if category in weights and scores:
avg_score = sum(scores) / len(scores)
weight = weights[category]
total_score += avg_score * weight
total_weight += weight
return total_score / total_weight if total_weight > 0 else 0.0
def _normalize_score(self, result: EvaluationResult) -> float:
"""Normalize score to 0-1 range"""
metric_name = result.metric_name
score = result.score
# Define normalization rules for different metrics
if "perplexity" in metric_name:
# Lower is better, normalize to 0-1 where 1 is best
return max(0.0, 1.0 - min(score / 100.0, 1.0))
elif "latency" in metric_name or "response_time" in metric_name:
# Lower is better, normalize based on reasonable thresholds
return max(0.0, 1.0 - min(score / 1000.0, 1.0)) # 1 second threshold
elif "throughput" in metric_name:
# Higher is better, normalize based on expected range
return min(score / 100.0, 1.0) # 100 requests/sec as max
elif "success_rate" in metric_name or "utilization" in metric_name:
# Already in 0-1 range
return score
else:
# Default: assume higher is better and clamp to 0-1
return min(max(score, 0.0), 1.0)
def export_evaluation_report(self, result: BenchmarkResult, filename: Optional[str] = None) -> str:
"""Export evaluation report to file"""
if not filename:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"mamba_swarm_evaluation_{timestamp}.json"
# Convert to serializable format
report = {
"benchmark_name": result.benchmark_name,
"overall_score": result.overall_score,
"execution_time": result.execution_time,
"timestamp": result.timestamp,
"model_info": result.model_info,
"metrics": [
{
"name": metric.metric_name,
"score": metric.score,
"details": metric.details,
"timestamp": metric.timestamp
}
for metric in result.individual_metrics
]
}
with open(filename, 'w') as f:
json.dump(report, f, indent=2, default=str)
self.logger.info(f"Evaluation report saved to {filename}")
return filename
# Example usage
if __name__ == "__main__":
# This would be used with actual SwarmEngine instance
# evaluator = MambaSwarmEvaluator(swarm_engine)
# result = evaluator.run_comprehensive_evaluation()
# report_file = evaluator.export_evaluation_report(result)
# Demo of individual components
print("Mamba Swarm Evaluator components initialized successfully")
# Example BLEU calculation
bleu_calc = BLEUScore()
reference = "The quick brown fox jumps over the lazy dog"
candidate = "The fast brown fox leaps over the sleepy dog"
bleu_score = bleu_calc.calculate_bleu(reference, candidate)
print(f"BLEU score: {bleu_score:.3f}")
# Example coherence analysis
coherence_analyzer = CoherenceAnalyzer()
text = "This is a coherent text. It flows well from sentence to sentence. The ideas are connected logically."
coherence_metrics = coherence_analyzer.analyze_coherence(text)
print(f"Coherence metrics: {coherence_metrics}") |