| |
| """ |
| Script to benchmark the performance of different providers for a given model. |
| |
| Usage: python model_provider_benchmark.py [--model "model_name"] [--output results.json] [--questions 5] |
| """ |
|
|
| import argparse |
| import json |
| import time |
| import os |
| import requests |
| from typing import List, Dict, Any, Tuple, Optional |
| import logging |
| from datetime import datetime |
| from dotenv import load_dotenv |
| from huggingface_hub import model_info |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", |
| ) |
| logger = logging.getLogger("provider_benchmark") |
|
|
| |
| DEFAULT_MODELS = [ |
| "Qwen/Qwen2.5-72B-Instruct", |
| "meta-llama/Llama-3.3-70B-Instruct", |
| "deepseek-ai/DeepSeek-R1-Distill-Llama-70B", |
| "Qwen/QwQ-32B", |
| "mistralai/Mistral-Small-24B-Instruct-2501" |
| ] |
|
|
| |
| DEFAULT_QUESTIONS = [ |
| "What are the key benefits of using distributed systems?", |
| "Explain the concept of quantum computing in simple terms.", |
| "What are the ethical considerations in artificial intelligence?", |
| "Compare and contrast supervised and unsupervised learning.", |
| "How does blockchain technology ensure security and transparency?" |
| ] |
|
|
| def get_model_providers(model_name: str) -> List[str]: |
| """ |
| Gets all available providers for a given model. |
| |
| Args: |
| model_name: Name of the model on the Hub |
| |
| Returns: |
| List of available providers |
| """ |
| try: |
| info = model_info(model_name, expand="inferenceProviderMapping") |
| if hasattr(info, "inference_provider_mapping"): |
| providers = list(info.inference_provider_mapping.keys()) |
| return providers |
| else: |
| logger.warning(f"No providers available for {model_name}") |
| return [] |
| except Exception as e: |
| logger.error(f"Error while retrieving providers for {model_name}: {e}") |
| return [] |
|
|
| def query_model( |
| model: str, |
| provider: str, |
| prompt: str, |
| token: str |
| ) -> Tuple[str, float]: |
| """ |
| Sends a request to a model via the Inference Endpoints API. |
| |
| Args: |
| model: Model name |
| provider: Provider name |
| prompt: Question to ask |
| token: HF token for authentication |
| |
| Returns: |
| Tuple containing the response and execution time |
| """ |
| headers = { |
| "Authorization": f"Bearer {token}", |
| "Content-Type": "application/json" |
| } |
| |
| payload = { |
| "inputs": prompt, |
| "parameters": { |
| "max_new_tokens": 100, |
| "temperature": 0.7, |
| "top_p": 0.9, |
| "do_sample": True, |
| "provider": provider |
| } |
| } |
| |
| |
| api_url = f"https://api-inference.huggingface.co/models/{model}" |
| |
| start_time = time.time() |
| try: |
| |
| time.sleep(0.5) |
| |
| response = requests.post(api_url, headers=headers, json=payload) |
| |
| |
| if response.status_code != 200: |
| try: |
| error_data = response.json() |
| error_msg = error_data.get("error", str(error_data)) |
| except: |
| error_msg = response.text |
| logger.error(f"Error for {model} ({provider}): {error_msg}") |
| return f"ERROR: {error_msg}", 0 |
| |
| response.raise_for_status() |
| result = response.json() |
| |
| |
| if isinstance(result, list) and len(result) > 0: |
| if "generated_text" in result[0]: |
| answer = result[0]["generated_text"] |
| else: |
| answer = str(result) |
| elif isinstance(result, dict): |
| if "generated_text" in result: |
| answer = result["generated_text"] |
| else: |
| answer = str(result) |
| else: |
| answer = str(result) |
| |
| except requests.exceptions.RequestException as e: |
| error_msg = str(e) |
| logger.error(f"Error for {model} ({provider}): {error_msg}") |
| return f"ERROR: {error_msg}", 0 |
| except Exception as e: |
| error_msg = str(e) |
| logger.error(f"Error for {model} ({provider}): {error_msg}") |
| return f"ERROR: {error_msg}", 0 |
| |
| end_time = time.time() |
| execution_time = end_time - start_time |
| |
| return answer, execution_time |
|
|
| def run_benchmark( |
| model: str, |
| questions: List[str] = DEFAULT_QUESTIONS, |
| output_file: str = None |
| ) -> Optional[List[Dict[str, Any]]]: |
| """ |
| Runs a benchmark for all model/provider combinations. |
| |
| Args: |
| model: Name of the model to test |
| questions: List of questions to ask |
| output_file: Path to the output JSON file (optional) |
| |
| Returns: |
| List of ranked providers or None in case of error |
| """ |
| |
| load_dotenv() |
| |
| |
| hf_token = os.environ.get("HF_TOKEN") |
| if not hf_token: |
| logger.error("HF_TOKEN not defined") |
| return None |
| |
| |
| providers = get_model_providers(model) |
| if not providers: |
| logger.warning(f"No providers for {model}") |
| return None |
| |
| logger.info(f"Testing {model} with providers: {', '.join(providers)}") |
| |
| |
| results = { |
| "providers": {} |
| } |
| |
| |
| for provider in providers: |
| logger.info(f"Provider: {provider}") |
| provider_results = { |
| "questions": [], |
| "total_time": 0, |
| "average_time": 0, |
| "success_rate": 0 |
| } |
| |
| successful_queries = 0 |
| total_time = 0 |
| |
| |
| for i, question in enumerate(questions): |
| answer, execution_time = query_model( |
| model=model, |
| provider=provider, |
| prompt=question, |
| token=hf_token |
| ) |
| |
| |
| is_error = answer.startswith("ERROR:") |
| if not is_error: |
| successful_queries += 1 |
| total_time += execution_time |
| |
| |
| provider_results["questions"].append({ |
| "question": question, |
| "time": execution_time, |
| "success": not is_error, |
| "answer": answer[:100] + "..." if len(answer) > 100 else answer |
| }) |
| |
| |
| provider_results["total_time"] = total_time |
| provider_results["average_time"] = total_time / successful_queries if successful_queries > 0 else 0 |
| provider_results["success_rate"] = successful_queries / len(questions) |
| |
| |
| results["providers"][provider] = provider_results |
| |
| |
| if not any(data["success_rate"] > 0 for data in results["providers"].values()): |
| logger.warning(f"No successful providers for {model}") |
| return None |
| |
| |
| sorted_providers = sorted( |
| results["providers"].items(), |
| key=lambda x: x[1]["total_time"] if x[1]["success_rate"] > 0 else float('inf') |
| ) |
| |
| |
| return [ |
| { |
| "provider": provider, |
| "total_time": data["total_time"], |
| "success_rate": data["success_rate"], |
| "average_time": data["average_time"] |
| } |
| for provider, data in sorted_providers |
| ] |
|
|
| def display_results(model: str, results: List[Dict[str, Any]]) -> None: |
| """ |
| Displays benchmark results in a readable format. |
| |
| Args: |
| model: Model name |
| results: List of ranked providers |
| """ |
| print(f"\n===== Benchmark Results for {model} =====") |
| print(f"Number of providers tested: {len(results)}") |
| |
| print("\nProvider Rankings (fastest to slowest):") |
| print("-" * 80) |
| print(f"{'Rank':<6} {'Provider':<20} {'Success Rate':<15} {'Total Time (s)':<20} {'Avg Time (s)':<15}") |
| print("-" * 80) |
| |
| for i, provider_data in enumerate(results, 1): |
| print(f"{i:<6} {provider_data['provider']:<20} {provider_data['success_rate']*100:>6.1f}% {provider_data['total_time']:>8.2f}s {provider_data['average_time']:>6.2f}s") |
|
|
| def calculate_model_rankings(all_results: Dict[str, Any]) -> List[Dict[str, Any]]: |
| """ |
| Calculates model rankings based on their performance. |
| |
| Args: |
| all_results: Complete benchmark results |
| |
| Returns: |
| List of models ranked by performance |
| """ |
| model_rankings = [] |
| |
| for model_name, results in all_results["models"].items(): |
| if results is None: |
| continue |
| |
| |
| best_provider = None |
| best_time = float('inf') |
| best_success_rate = 0 |
| |
| for provider_data in results: |
| if provider_data["success_rate"] >= 0.8: |
| if provider_data["total_time"] < best_time: |
| best_time = provider_data["total_time"] |
| best_success_rate = provider_data["success_rate"] |
| best_provider = provider_data["provider"] |
| |
| if best_provider: |
| model_rankings.append({ |
| "model": model_name, |
| "best_provider": best_provider, |
| "total_time": best_time, |
| "success_rate": best_success_rate, |
| "average_time": best_time / 5 |
| }) |
| |
| |
| return sorted(model_rankings, key=lambda x: x["total_time"]) |
|
|
| def display_final_rankings(model_rankings: List[Dict[str, Any]]) -> None: |
| """ |
| Displays the final model rankings. |
| |
| Args: |
| model_rankings: List of ranked models |
| """ |
| print("\n" + "="*80) |
| print("FINAL MODEL RANKINGS (fastest to slowest)") |
| print("="*80) |
| print(f"{'Rank':<6} {'Model':<40} {'Provider':<20} {'Total Time (s)':<15} {'Success Rate':<15}") |
| print("-"*80) |
| |
| for i, model_data in enumerate(model_rankings, 1): |
| print(f"{i:<6} {model_data['model']:<40} {model_data['best_provider']:<20} " |
| f"{model_data['total_time']:>8.2f}s {model_data['success_rate']*100:>6.1f}%") |
|
|
| def display_final_summary(all_results: Dict[str, Any]) -> None: |
| """ |
| Displays a final summary with ranked providers for each model. |
| |
| Args: |
| all_results: Complete benchmark results |
| """ |
| print("\n" + "="*100) |
| print("FINAL SUMMARY OF PROVIDERS BY MODEL") |
| print("="*100) |
| |
| for model_name, results in all_results["models"].items(): |
| if results is None: |
| print(f"\n{model_name}:") |
| print(" No successful providers found") |
| continue |
| |
| print(f"\n{model_name}:") |
| print(" Successful providers:") |
| for provider_data in results: |
| if provider_data["success_rate"] > 0: |
| print(f" - {provider_data['provider']} (Success rate: {provider_data['success_rate']*100:.1f}%, Avg time: {provider_data['average_time']:.2f}s)") |
| |
| |
| failed_providers = [p for p in results if p["success_rate"] == 0] |
| if failed_providers: |
| print(" Failed providers:") |
| for provider_data in failed_providers: |
| print(f" - {provider_data['provider']}") |
|
|
| def main(): |
| """ |
| Main entry point for the script. |
| """ |
| parser = argparse.ArgumentParser(description="Tests the performance of model providers.") |
| parser.add_argument("--model", type=str, help="Name of the model to test (if not specified, all default models will be tested)") |
| parser.add_argument("--output", type=str, default="benchmark_results.json", help="Path to the output JSON file") |
| parser.add_argument("--questions", type=int, default=5, help="Number of questions to ask (default: 5)") |
| |
| args = parser.parse_args() |
| |
| |
| num_questions = min(args.questions, len(DEFAULT_QUESTIONS)) |
| questions = DEFAULT_QUESTIONS[:num_questions] |
| |
| |
| models_to_test = [args.model] if args.model else DEFAULT_MODELS |
| |
| |
| all_results = { |
| "timestamp": datetime.now().isoformat(), |
| "models": {} |
| } |
| |
| |
| for model in models_to_test: |
| logger.info(f"\nModel: {model}") |
| results = run_benchmark( |
| model=model, |
| questions=questions, |
| output_file=None |
| ) |
| all_results["models"][model] = results |
| |
| |
| with open(args.output, "w") as f: |
| json.dump(all_results, f, indent=2) |
| logger.info(f"\nResults saved to {args.output}") |
| |
| |
| display_final_summary(all_results) |
|
|
| if __name__ == "__main__": |
| main() |