import ollama import time from typing import Dict, Any, List, Tuple, Callable, Optional, Generator import json from rich.console import Console from rich.panel import Panel from rich.table import Table from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn from rich.style import Style import threading import queue class BenchmarkSuite: def __init__(self, model_name: str, judge_model: str): self.model_name = model_name self.judge_model = judge_model self.console = Console() self.progress_callback = None def set_progress_callback(self, callback: Callable[[str], None]): self.progress_callback = callback def log_progress(self, message: str, style: str = ""): self.console.print(message, style=style) if self.progress_callback: # Strip rich formatting for UI display clean_message = message.replace("[cyan]", "").replace("[/cyan]", "") \ .replace("[green]", "").replace("[/green]", "") \ .replace("[yellow]", "").replace("[/yellow]", "") \ .replace("[red]", "").replace("[/red]", "") self.progress_callback(clean_message) def run_all_tests(self, num_iterations: int, interaction_callback=None) -> Generator[Tuple[str, Dict[str, Any]], None, None]: tests = [ ("Logical Reasoning", self.test_logical_reasoning), ("Code Generation", self.test_code_generation), ("Mathematical Problem Solving", self.test_math_solving), ("Context Understanding", self.test_context_understanding), ("Performance Metrics", self.test_performance) ] for test_name, test_func in tests: result = test_func(num_iterations, interaction_callback) yield test_name, result def evaluate_response(self, prompt: str, expected_elements: List[str], interaction_callback=None) -> Dict[str, Any]: start_time = time.time() try: # Get model response with streaming try: response_stream = self._ollama_generate_with_timeout( model=self.model_name, prompt=prompt, stream=True, timeout=60 ) except Exception as e: raise except TimeoutError as e: # If the initial request times out, return an error result error_msg = f"Model {self.model_name} timed out: {str(e)}" if interaction_callback: interaction_callback( prompt=prompt, model_response=f"[ERROR] {error_msg}", judge_response="N/A", model_name=self.model_name, judge_model_name=self.judge_model ) return { "score": 0, "response_time": 0, "error": error_msg, "evaluation": None, "response": None, "prompt": prompt, "judge_response_raw": None } except Exception as e: # If the initial request fails, return an error result error_msg = f"Failed to connect to model {self.model_name}: {str(e)}" if interaction_callback: interaction_callback( prompt=prompt, model_response=f"[ERROR] {error_msg}", judge_response="N/A", model_name=self.model_name, judge_model_name=self.judge_model ) return { "score": 0, "response_time": 0, "error": error_msg, "evaluation": None, "response": None, "prompt": prompt, "judge_response_raw": None } # Process streaming response response_text = "" model_update_counter = 0 model_stream_start_time = time.time() model_stream_timeout = 60 # 60 seconds timeout for model response (more reasonable) last_response_length = 0 progress_check_count = 0 max_progress_checks = 30 # Max number of checks without progress last_progress_time = time.time() # Track when we last saw progress absolute_timeout = model_stream_start_time + model_stream_timeout + 60 # Absolute timeout (60s buffer) try: for chunk in response_stream: # Check for absolute timeout current_time = time.time() if current_time > absolute_timeout: response_text += " [Timeout: Model response took too long (absolute timeout)]" break # Check for timeout - only timeout if no progress for a while if current_time - last_progress_time > model_stream_timeout: response_text += " [Timeout: Model response took too long]" break # Check if chunk is empty or None if not chunk or (isinstance(chunk, dict) and not chunk): break # Check if the stream has ended if 'done' in chunk and chunk['done']: break # Additional check for stream termination if 'done' in chunk and isinstance(chunk['done'], bool) and chunk['done'] == True: break if 'response' in chunk and chunk['response']: response_text += chunk['response'] model_update_counter += 1 elif 'response' not in chunk: # If there's no response key, it might be a control message, continue processing pass # Update UI with streaming response (every 5 chunks to reduce UI updates) if interaction_callback and model_update_counter % 5 == 0: interaction_callback( prompt=prompt, model_response=response_text, judge_response="", # Empty for now, will update when judge responds model_name=self.model_name, judge_model_name=self.judge_model ) # Also print to terminal for real-time streaming display if model_update_counter % 10 == 0: # Print every 10 chunks to terminal print(f"\rModel response (chunk {model_update_counter}): {response_text[-100:]}", end="", flush=True) # Check for progress - if no progress in a while, break if len(response_text) > last_response_length: last_response_length = len(response_text) last_progress_time = time.time() # Update progress time progress_check_count = 0 # Reset progress counter else: progress_check_count += 1 if progress_check_count > max_progress_checks: response_text += " [Stuck: No progress in response]" break # Safety check: prevent infinite loop if model_update_counter > 5000: # Maximum 5000 chunks response_text += " [Error: Too many response chunks]" break # Additional safety check: if we've been streaming for too long without meaningful content if len(response_text) > 10000: # Reduced limit to prevent infinite streaming response_text += " [Truncated: Response too long]" break # Safety check: if we've been in this loop for too long, break if time.time() - model_stream_start_time > 120: # Maximum 2 minutes total response_text += " [Timeout: Maximum time exceeded]" break except Exception as e: # If streaming fails, try to get the full response response_text = "Error during streaming: " + str(e) if interaction_callback: interaction_callback( prompt=prompt, model_response=response_text, judge_response="", model_name=self.model_name, judge_model_name=self.judge_model ) # Add a newline after streaming display print() # Newline after model response streaming # Calculate metrics response_time = time.time() - start_time # Ask judge model to evaluate with streaming judge_prompt = f""" Evaluate the following response based on accuracy, completeness, and correctness. The response should contain or address these elements: {', '.join(expected_elements)} Response to evaluate: {response_text} Rate each criteria from 0-10 and provide a brief explanation. Return your evaluation in JSON format: {{ "accuracy": {{"score": "X", "reason": "..."}}, "completeness": {{"score": "X", "reason": "..."}}, "correctness": {{"score": "X", "reason": "..."}} }} """ try: judge_response_stream = self._ollama_generate_with_timeout( model=self.judge_model, prompt=judge_prompt, stream=True, timeout=60 ) except TimeoutError as e: # If the judge request times out, return an error result error_msg = f"Judge model {self.judge_model} timed out: {str(e)}" judge_response_text = f"[ERROR] {error_msg}" if interaction_callback: interaction_callback( prompt=prompt, model_response=response_text, judge_response=judge_response_text, model_name=self.model_name, judge_model_name=self.judge_model ) evaluation = { "accuracy": {"score": 0, "reason": error_msg}, "completeness": {"score": 0, "reason": error_msg}, "correctness": {"score": 0, "reason": error_msg} } return { "score": 0, "response_time": time.time() - start_time, "evaluation": evaluation, "response": response_text, "prompt": prompt, "judge_response_raw": judge_response_text } except Exception as e: # If the judge request fails, return an error result error_msg = f"Failed to connect to judge model {self.judge_model}: {str(e)}" judge_response_text = f"[ERROR] {error_msg}" if interaction_callback: interaction_callback( prompt=prompt, model_response=response_text, judge_response=judge_response_text, model_name=self.model_name, judge_model_name=self.judge_model ) evaluation = { "accuracy": {"score": 0, "reason": error_msg}, "completeness": {"score": 0, "reason": error_msg}, "correctness": {"score": 0, "reason": error_msg} } return { "score": 0, "response_time": time.time() - start_time, "evaluation": evaluation, "response": response_text, "prompt": prompt, "judge_response_raw": judge_response_text } # Process streaming judge response judge_response_text = "" judge_update_counter = 0 judge_stream_start_time = time.time() judge_stream_timeout = 300 # 5 minutes timeout for judge response (more reasonable) last_judge_response_length = 0 judge_progress_check_count = 0 max_judge_progress_checks = 20 # Max number of checks without progress last_judge_progress_time = time.time() # Track when we last saw progress judge_absolute_timeout = judge_stream_start_time + judge_stream_timeout + 300 # Absolute timeout (5min buffer) try: for chunk in judge_response_stream: # Check for absolute timeout current_time = time.time() if current_time > judge_absolute_timeout: judge_response_text += " [Timeout: Judge response took too long (absolute timeout)]" break # Check for timeout - only timeout if no progress for a while if current_time - last_judge_progress_time > judge_stream_timeout: judge_response_text += " [Timeout: Judge response took too long]" break # Check if chunk is empty or None if not chunk or (isinstance(chunk, dict) and not chunk): break # Check if the stream has ended if 'done' in chunk and chunk['done']: break # Additional check for stream termination if 'done' in chunk and isinstance(chunk['done'], bool) and chunk['done'] == True: break if 'response' in chunk and chunk['response']: judge_response_text += chunk['response'] judge_update_counter += 1 elif 'response' not in chunk: # If there's no response key, it might be a control message, continue processing pass # Update UI with streaming judge response (every 5 chunks to reduce UI updates) if interaction_callback and judge_update_counter % 5 == 0: interaction_callback( prompt=prompt, model_response=response_text, judge_response=judge_response_text, model_name=self.model_name, judge_model_name=self.judge_model ) # Also print to terminal for real-time streaming display if judge_update_counter % 10 == 0: # Print every 10 chunks to terminal print(f"\rJudge response (chunk {judge_update_counter}): {judge_response_text[-100:]}", end="", flush=True) # Check for progress - if no progress in a while, break if len(judge_response_text) > last_judge_response_length: last_judge_response_length = len(judge_response_text) last_judge_progress_time = time.time() # Update progress time judge_progress_check_count = 0 # Reset progress counter else: judge_progress_check_count += 1 if judge_progress_check_count > max_judge_progress_checks: judge_response_text += " [Stuck: No progress in response]" break # Safety check: prevent infinite loop if judge_update_counter > 10000: # Maximum 10000 chunks judge_response_text += " [Error: Too many response chunks]" break # Additional safety check: if we've been streaming for too long without meaningful content if len(judge_response_text) > 50000: # Increased limit to prevent infinite streaming judge_response_text += " [Truncated: Response too long]" break # Safety check: if we've been in this loop for too long, break if time.time() - judge_stream_start_time > 600: # Maximum 10 minutes total judge_response_text += " [Timeout: Maximum time exceeded]" break except Exception as e: # If streaming fails, use an error message judge_response_text = "Error during judge streaming: " + str(e) if interaction_callback: interaction_callback( prompt=prompt, model_response=response_text, judge_response=judge_response_text, model_name=self.model_name, judge_model_name=self.judge_model ) # Add a newline after streaming display print() # Newline after judge response streaming # Parse judge response evaluation = None try: # Try to extract JSON from code blocks first json_text = judge_response_text if "```json" in judge_response_text: start = judge_response_text.find("```json") + 7 end = judge_response_text.find("```", start) if end != -1: json_text = judge_response_text[start:end].strip() elif "```" in judge_response_text: # Handle generic code blocks start = judge_response_text.find("```") + 3 end = judge_response_text.find("```", start) if end != -1: json_text = judge_response_text[start:end].strip() # Try to parse the JSON try: evaluation = json.loads(json_text) except json.JSONDecodeError: # If JSON parsing fails, try to fix common issues # Replace unescaped backslashes in LaTeX expressions fixed_json_text = json_text.replace(r'\\', r'\\\\').replace(r'\(', r'\\(').replace(r'\)', r'\\)').replace(r'\frac', r'\\frac') try: evaluation = json.loads(fixed_json_text) except json.JSONDecodeError: # If still failing, try a more aggressive fix import re # Escape all backslashes that aren't already escaped fixed_json_text = re.sub(r'(? 0 else 0, "response_time": response_time, "evaluation": evaluation, "response": response_text, "prompt": prompt, "judge_response_raw": judge_response_text } except Exception as e: if interaction_callback: interaction_callback( prompt=prompt, model_response=f"[ERROR] {str(e)}", judge_response="N/A", model_name=self.model_name, judge_model_name=self.judge_model ) return { "score": 0, "response_time": 0, "error": str(e), "evaluation": None, "response": None, "prompt": prompt, "judge_response_raw": None } def _ollama_generate_with_timeout(self, model: str, prompt: str, stream: bool = True, timeout: int = 60): """ Wrapper function to call ollama.generate with a timeout. """ result_queue = queue.Queue() exception_queue = queue.Queue() def generate_wrapper(): try: result = ollama.generate( model=model, prompt=prompt, options={"temperature": 0.7}, stream=stream ) result_queue.put(result) except Exception as e: exception_queue.put(e) # Start the generation in a separate thread thread = threading.Thread(target=generate_wrapper) thread.daemon = True thread.start() # Wait for the thread to complete or timeout thread.join(timeout) if thread.is_alive(): # Thread is still running, which means it timed out raise TimeoutError(f"Request to model {model} timed out after {timeout} seconds") # Check if there was an exception if not exception_queue.empty(): raise exception_queue.get() # Return the result if not result_queue.empty(): return result_queue.get() else: raise TimeoutError(f"Request to model {model} timed out after {timeout} seconds") def _extract_scores_from_text(self, text: str) -> dict: """ Extract scores from text-based judge responses. Look for patterns like "accuracy: 10", "completeness: 9", etc. """ import re # Look for score patterns in the text scores = { "accuracy": {"score": 0, "reason": "Score extracted from text evaluation"}, "completeness": {"score": 0, "reason": "Score extracted from text evaluation"}, "correctness": {"score": 0, "reason": "Score extracted from text evaluation"} } # Look for score patterns like "accuracy: 10", "completeness: 9", etc. patterns = { "accuracy": r"[aA]ccuracy[^\d]{0,20}(\d+)", "completeness": r"[cC]ompleteness[^\d]{0,20}(\d+)", "correctness": r"[cC]orrectness[^\d]{0,20}(\d+)" } for key, pattern in patterns.items(): match = re.search(pattern, text) if match: try: score = int(match.group(1)) scores[key]["score"] = score except ValueError: pass # If no scores found, try to extract any numbers that might be scores if all(scores[key]["score"] == 0 for key in scores): # Look for any numbers in the text numbers = re.findall(r'\b\d+\b', text) # Take the first few numbers as scores (up to 3) for i, key in enumerate(scores.keys()): if i < len(numbers): try: score = int(numbers[i]) if 0 <= score <= 10: # Valid score range scores[key]["score"] = score except ValueError: pass return scores def _normalize_score(self, score) -> float: """ Normalize score from 0-100 scale to 0-10 scale if needed. Handle various score formats (numbers, strings, fractions, letter grades). """ try: # Convert string scores to numbers if isinstance(score, str): # Handle letter grades letter_grades = { 'A+': 10.0, 'A': 9.5, 'A-': 9.0, 'B+': 8.5, 'B': 8.0, 'B-': 7.5, 'C+': 7.0, 'C': 6.5, 'C-': 6.0, 'D+': 5.5, 'D': 5.0, 'D-': 4.5, 'F': 0.0 } score_upper = score.strip().upper() if score_upper in letter_grades: return letter_grades[score_upper] # Handle fractional scores like "4/5" if "/" in score: parts = score.split("/") if len(parts) == 2: numerator = float(parts[0].strip()) denominator = float(parts[1].strip()) if denominator != 0: score = (numerator / denominator) * 10 # Convert to 0-10 scale else: score = 0 else: score = float(score) else: score = float(score) else: score = float(score) # If score is greater than 10, assume it's out of 100 and normalize if score > 10: return score / 10.0 return score except (ValueError, TypeError): # If we can't parse the score, return 0 return 0.0 def test_logical_reasoning(self, num_iterations: int, interaction_callback=None) -> Dict[str, Any]: prompts = [ (""" Three people - Alice, Bob, and Charlie - are standing in a line. We know that: 1. Alice is not first in line 2. Charlie is not last in line 3. Bob is not second in line What is the correct order of people in the line? Explain your reasoning step by step. """, ["logical steps", "final answer", "explanation"]), (""" In a bag, there are red, blue, and green marbles. If you pick two marbles at random: - The probability of getting two red marbles is 1/6 - The probability of getting two blue marbles is 1/15 How many marbles of each color are in the bag? Show your work. """, ["equation setup", "calculation", "final answer"]), (""" You have 8 coins that look identical, but one is slightly heavier than the others. Using a balance scale only twice, how can you identify the heavier coin? Provide a detailed strategy. """, ["strategy", "steps", "explanation"]) ] results = [] for _ in range(num_iterations): prompt, expected = prompts[_ % len(prompts)] results.append(self.evaluate_response(prompt, expected, interaction_callback)) # Aggregate results avg_score = sum(r["score"] for r in results) / len(results) avg_time = sum(r["response_time"] for r in results) / len(results) return { "score": avg_score, "average_response_time": avg_time, "iterations": len(results), "individual_results": results } def test_code_generation(self, num_iterations: int, interaction_callback=None) -> Dict[str, Any]: prompts = [ (""" Write a Python function that implements a binary search algorithm. The function should: 1. Take a sorted list and target value as input 2. Return the index if found, or -1 if not found 3. Include type hints 4. Include docstring with examples 5. Include error handling """, ["function signature", "implementation", "type hints", "docstring", "error handling"]), (""" Create a class representing a Queue data structure using two stacks. Implement the following methods: 1. enqueue(item) 2. dequeue() 3. peek() 4. is_empty() Include proper error handling and type hints. """, ["class definition", "method implementations", "error handling", "type hints"]), (""" Write a function that finds all prime numbers up to a given number using the Sieve of Eratosthenes algorithm. The function should: 1. Take a positive integer n as input 2. Return a list of all prime numbers up to n 3. Include time complexity analysis in comments 4. Include memory optimization techniques """, ["function implementation", "algorithm correctness", "optimization", "complexity analysis"]) ] results = [] for _ in range(num_iterations): prompt, expected = prompts[_ % len(prompts)] results.append(self.evaluate_response(prompt, expected, interaction_callback)) avg_score = sum(r["score"] for r in results) / len(results) avg_time = sum(r["response_time"] for r in results) / len(results) return { "score": avg_score, "average_response_time": avg_time, "iterations": len(results), "individual_results": results } def test_math_solving(self, num_iterations: int, interaction_callback=None) -> Dict[str, Any]: prompts = [ (""" Solve the following calculus problem: Find the volume of the solid obtained by rotating the region bounded by y = x², y = 2x, and the y-axis about the x-axis. Show all steps and explain your reasoning. """, ["setup", "integration", "calculation", "final answer"]), (""" Prove that the sum of two odd numbers is even. Provide a formal mathematical proof using algebraic notation. """, ["definition", "algebraic representation", "logical steps", "conclusion"]), (""" Solve the following probability problem: A box contains 3 red balls, 4 blue balls, and 5 green balls. Two balls are drawn without replacement. What is the probability that both balls are the same color? Show detailed calculations. """, ["probability theory", "calculations", "final answer"]) ] results = [] for _ in range(num_iterations): prompt, expected = prompts[_ % len(prompts)] results.append(self.evaluate_response(prompt, expected, interaction_callback)) avg_score = sum(r["score"] for r in results) / len(results) avg_time = sum(r["response_time"] for r in results) / len(results) return { "score": avg_score, "average_response_time": avg_time, "iterations": len(results), "individual_results": results } def test_context_understanding(self, num_iterations: int, interaction_callback=None) -> Dict[str, Any]: prompts = [ (""" Read the following passage and answer the questions: The Antikythera mechanism is an ancient Greek hand-powered orrery, described as the first analog computer, used to predict astronomical positions and eclipses for calendar and astrological purposes decades in advance. It was recovered in 1901 from the Antikythera wreck, a shipwreck off the Greek island of Antikythera. The instrument has been dated to about 100 BCE. Questions: 1. What was the main purpose of the Antikythera mechanism? 2. When and where was it discovered? 3. Why is it considered significant in the history of technology? Provide detailed answers with supporting evidence from the text. """, ["accurate answers", "text evidence", "comprehension"]), (""" Analyze the following code snippet and explain its implications: ```python def process_data(items: List[Dict[str, Any]]) -> Generator[Dict[str, Any], None, None]: seen = set() for item in items: if item['id'] not in seen: seen.add(item['id']) yield item ``` Explain: 1. What does this code do? 2. What are potential performance implications? 3. What are possible use cases? 4. Are there any potential improvements? """, ["functionality", "performance analysis", "use cases", "improvements"]), (""" Consider this business scenario: A startup is experiencing rapid growth but facing scalability issues with their current monolithic architecture. They need to decide between: 1. Gradually refactoring to microservices 2. Complete rewrite with modern architecture 3. Optimizing current monolith Provide a recommendation with justification. """, ["analysis", "trade-offs", "recommendation", "justification"]) ] results = [] for _ in range(num_iterations): prompt, expected = prompts[_ % len(prompts)] results.append(self.evaluate_response(prompt, expected, interaction_callback)) avg_score = sum(r["score"] for r in results) / len(results) avg_time = sum(r["response_time"] for r in results) / len(results) return { "score": avg_score, "average_response_time": avg_time, "iterations": len(results), "individual_results": results } def test_performance(self, num_iterations: int, interaction_callback=None) -> Dict[str, Any]: # Test various performance metrics start_time = time.time() total_tokens = 0 response_times = [] prompt = "Generate a detailed technical explanation of how quantum computers work." for _ in range(num_iterations): iteration_start = time.time() response_stream = ollama.generate( model=self.model_name, prompt=prompt, options={"temperature": 0.7}, stream=True ) # Collect full response for token counting response_text = "" for chunk in response_stream: if 'response' in chunk: response_text += chunk['response'] response_time = time.time() - iteration_start response_times.append(response_time) total_tokens += len(response_text.split()) total_time = time.time() - start_time avg_response_time = sum(response_times) / len(response_times) tokens_per_second = total_tokens / total_time return { "score": min(10, 10 * (1 / avg_response_time)) if avg_response_time > 0 else 0, "average_response_time": avg_response_time, "tokens_per_second": tokens_per_second, "total_tokens": total_tokens, "total_time": total_time, "iterations": num_iterations }