""" SWE-bench Integration Service for Visualisable.ai Provides access to SWE-bench dataset and evaluation capabilities """ from typing import Dict, List, Optional, Any from dataclasses import dataclass, asdict import json import time import logging from datetime import datetime import traceback import numpy as np logger = logging.getLogger(__name__) @dataclass class SWEBenchTask: """Represents a SWE-bench task/issue""" instance_id: str repo: str problem_statement: str base_commit: str patch: Optional[str] = None test_patch: Optional[str] = None hints_text: Optional[str] = None created_at: Optional[str] = None version: Optional[str] = None FAIL_TO_PASS: Optional[List[str]] = None PASS_TO_PASS: Optional[List[str]] = None @property def difficulty(self) -> str: """Estimate difficulty based on patch size and test count""" if not self.patch: return "unknown" patch_lines = len(self.patch.split('\n')) test_count = len(self.FAIL_TO_PASS) if self.FAIL_TO_PASS else 0 # Adjusted thresholds for better distribution in SWE-bench_Lite # Most tasks are complex, so we use percentile-based distribution if patch_lines < 30: return "easy" elif patch_lines < 100: return "medium" else: return "hard" @property def category(self) -> str: """Categorize based on problem statement keywords""" statement_lower = self.problem_statement.lower() if any(word in statement_lower for word in ['bug', 'fix', 'error', 'crash', 'fail']): return "bug-fix" elif any(word in statement_lower for word in ['add', 'feature', 'implement', 'support']): return "feature" elif any(word in statement_lower for word in ['refactor', 'clean', 'improve', 'optimize']): return "refactor" elif any(word in statement_lower for word in ['test', 'coverage', 'assert']): return "test" elif any(word in statement_lower for word in ['doc', 'comment', 'readme']): return "documentation" else: return "other" @dataclass class SWEBenchResult: """Results from evaluating a solution""" task_id: str generated_solution: str tokens: List[str] token_probabilities: List[float] attention_traces: List[Dict] confidence_scores: List[float] generation_time: float success: Optional[bool] = None tests_passed: Optional[int] = None tests_failed: Optional[int] = None error_message: Optional[str] = None hallucination_risk: Optional[float] = None def to_dict(self) -> Dict: """Convert to dictionary for JSON serialization""" return asdict(self) class SWEBenchService: """Service for managing SWE-bench tasks and evaluations""" def __init__(self): self.tasks: Dict[str, SWEBenchTask] = {} self.results: Dict[str, List[SWEBenchResult]] = {} self.dataset_loaded = False self.metrics_cache: Dict[str, Any] = {} def _load_mock_tasks(self): """Load mock tasks when dataset isn't available""" repos = [ "astropy/astropy", "django/django", "matplotlib/matplotlib", "pandas-dev/pandas", "pytest-dev/pytest", "scikit-learn/scikit-learn" ] statements = [ """Modeling's `separability_matrix` does not compute separability correctly for nested CompoundModels Consider the following model: ```python from astropy.modeling import models as m from astropy.modeling.separable import separable_matrix cm = m.Linear1D(10) & m.Linear1D(5) ``` It's separability matrix as you might expect is a diagonal: ```python >>> separability_matrix(cm) array([[ True, False], [False, True]]) ```""", """Please support header rows in RestructuredText output ### Description It would be great if the RestructuredText output could have header rows for tables, similar to what MySQL does for pipe formatting. ### Expected behavior According to the documentation for MyST parsers, the docutils RST table expects the first row to be treated as a header row. ### Actual behavior The RST output treats the first row as a regular data row and doesn't mark it as a header.""", """Issue when parsing empty lists/arrays in configuration When attempting to parse empty lists or arrays from configuration files, the parser incorrectly raises a ValueError instead of returning an empty list. ```python >>> config.parse_list("[]") ValueError: invalid literal for int() with base 10: '[]' ``` Expected behavior: Should return an empty list []""" ] for i in range(300): # Create 300 mock tasks for better testing repo = repos[i % len(repos)] repo_name = repo.split('/')[1] issue_number = 11000 + i task = SWEBenchTask( instance_id=f"{repo_name}__{repo_name}-{issue_number}", repo=repo, problem_statement=statements[i % len(statements)], base_commit=f"commit_{i:04d}", patch="# Mock patch\n+ line added\n- line removed", FAIL_TO_PASS=["test_1", "test_2"] if i % 2 == 0 else ["test_a"], PASS_TO_PASS=["test_pass_1", "test_pass_2"] ) self.tasks[task.instance_id] = task logger.info(f"Loaded {len(self.tasks)} mock SWE-bench tasks") async def load_dataset(self, dataset_name: str = "princeton-nlp/SWE-bench_Lite"): """Load SWE-bench dataset from Hugging Face""" try: from datasets import load_dataset logger.info(f"Loading SWE-bench dataset: {dataset_name}") # Load the dataset with error handling try: dataset = load_dataset(dataset_name, split='test') # Convert to our task format for item in dataset: task = SWEBenchTask( instance_id=item['instance_id'], repo=item['repo'], problem_statement=item['problem_statement'], base_commit=item['base_commit'], patch=item.get('patch'), test_patch=item.get('test_patch'), hints_text=item.get('hints_text'), created_at=item.get('created_at'), version=item.get('version'), FAIL_TO_PASS=item.get('FAIL_TO_PASS'), PASS_TO_PASS=item.get('PASS_TO_PASS') ) self.tasks[task.instance_id] = task self.dataset_loaded = True logger.info(f"Loaded {len(self.tasks)} SWE-bench tasks") except Exception as dataset_error: logger.warning(f"Could not load full dataset, using mock data: {dataset_error}") self._load_mock_tasks() self.dataset_loaded = True # Initialize metrics cache self._update_metrics_cache() except ImportError: logger.error("datasets library not installed. Using mock data instead") self._load_mock_tasks() self.dataset_loaded = True except Exception as e: logger.error(f"Failed to load SWE-bench dataset, using mock: {e}") self._load_mock_tasks() self.dataset_loaded = True def get_tasks( self, category: Optional[str] = None, difficulty: Optional[str] = None, repo: Optional[str] = None, limit: int = 100, offset: int = 0 ) -> List[Dict]: """Get filtered list of tasks""" tasks = list(self.tasks.values()) # Apply filters if category: tasks = [t for t in tasks if t.category == category] if difficulty: tasks = [t for t in tasks if t.difficulty == difficulty] if repo: tasks = [t for t in tasks if t.repo == repo] # Apply pagination tasks = tasks[offset:offset + limit] # Convert to dict format return [ { 'instance_id': t.instance_id, 'repo': t.repo, 'category': t.category, 'difficulty': t.difficulty, 'problem_statement': t.problem_statement, # Return full problem statement 'created_at': t.created_at, 'has_patch': t.patch is not None, 'has_tests': t.test_patch is not None, 'test_count': len(t.FAIL_TO_PASS) if t.FAIL_TO_PASS else 0, # Add GitHub URLs if this looks like a real GitHub repo 'issue_url': f"https://github.com/{t.repo}/issues/{t.instance_id.split('-')[-1]}" if '/' in t.repo and t.instance_id else None, 'pr_url': f"https://github.com/{t.repo}/pull/{t.instance_id.split('-')[-1]}" if '/' in t.repo and t.instance_id else None, # Mark mock data '_is_mock': not hasattr(t, 'issue_url') # Real tasks would have issue_url attribute } for t in tasks ] def get_task_details(self, task_id: str) -> Optional[Dict]: """Get detailed information about a specific task""" task = self.tasks.get(task_id) if not task: return None return { 'instance_id': task.instance_id, 'repo': task.repo, 'category': task.category, 'difficulty': task.difficulty, 'problem_statement': task.problem_statement, 'base_commit': task.base_commit, 'hints': task.hints_text, 'created_at': task.created_at, 'version': task.version, 'patch_preview': task.patch[:1000] if task.patch else None, 'test_preview': task.test_patch[:1000] if task.test_patch else None, 'gold_patch': task.patch, # Include full gold patch 'fail_to_pass': task.FAIL_TO_PASS, 'pass_to_pass': task.PASS_TO_PASS, 'patch_size': len(task.patch.split('\n')) if task.patch else 0, 'test_count': len(task.FAIL_TO_PASS) if task.FAIL_TO_PASS else 0 } async def generate_solution( self, task_id: str, model_manager, enable_transparency: bool = True, temperature: float = 0.7, max_tokens: int = 500 ) -> SWEBenchResult: """Generate a solution for a SWE-bench task""" task = self.tasks.get(task_id) if not task: raise ValueError(f"Task {task_id} not found") # Prepare prompt prompt = self._create_prompt(task) # Generate solution with traces start_time = time.time() try: if enable_transparency: # Generate with full trace extraction result = await model_manager.generate_with_traces( prompt=prompt, max_tokens=max_tokens, temperature=temperature, sampling_rate=0.1, layer_stride=2 # Sample every other layer for efficiency ) else: # Generate without traces (baseline) result = await model_manager.generate_with_traces( prompt=prompt, max_tokens=max_tokens, temperature=temperature, sampling_rate=0, # No trace sampling layer_stride=999 # Skip all layers ) generation_time = time.time() - start_time # Create result object swe_result = SWEBenchResult( task_id=task_id, generated_solution=result.get('generated_text', ''), tokens=result.get('tokens', []), token_probabilities=result.get('probabilities', []), attention_traces=result.get('traces', []) if enable_transparency else [], confidence_scores=[p for p in result.get('probabilities', [])], generation_time=generation_time, hallucination_risk=result.get('hallucination_risk', 0.0) ) # Store result if task_id not in self.results: self.results[task_id] = [] self.results[task_id].append(swe_result) return swe_result except Exception as e: logger.error(f"Failed to generate solution for {task_id}: {e}") logger.error(traceback.format_exc()) raise def _create_prompt(self, task: SWEBenchTask) -> str: """Create a prompt for the model based on the task""" prompt_parts = [] # Add repository context prompt_parts.append(f"# Repository: {task.repo}") prompt_parts.append(f"# Base commit: {task.base_commit[:8]}") prompt_parts.append("") # Add problem statement prompt_parts.append("# Issue Description:") prompt_parts.append(task.problem_statement[:2000]) # Limit length prompt_parts.append("") # Add hints if available if task.hints_text: prompt_parts.append("# Developer Comments:") prompt_parts.append(task.hints_text[:500]) prompt_parts.append("") # Add instruction prompt_parts.append("# Task: Write code to fix this issue") prompt_parts.append("# Solution:") prompt_parts.append("") return "\n".join(prompt_parts) async def evaluate_solution( self, task_id: str, solution: str, run_tests: bool = False ) -> Dict: """Evaluate a generated solution against the gold patch""" task = self.tasks.get(task_id) if not task: raise ValueError(f"Task {task_id} not found") evaluation = { 'task_id': task_id, 'has_gold_patch': task.patch is not None, 'solution_length': len(solution.split('\n')), 'gold_patch_length': len(task.patch.split('\n')) if task.patch else 0, } if task.patch: # Calculate similarity metrics from difflib import SequenceMatcher # Basic similarity score similarity = SequenceMatcher(None, solution, task.patch).ratio() evaluation['similarity_score'] = similarity # Check if key patterns from gold patch are present gold_lines = set(line.strip() for line in task.patch.split('\n') if line.strip() and not line.startswith(('#', '//', '"""'))) solution_lines = set(line.strip() for line in solution.split('\n') if line.strip() and not line.startswith(('#', '//', '"""'))) if gold_lines: pattern_coverage = len(gold_lines.intersection(solution_lines)) / len(gold_lines) evaluation['pattern_coverage'] = pattern_coverage if run_tests and task.test_patch: # Placeholder for actual test execution # In production, this would apply the patch and run tests in a container evaluation['test_execution'] = { 'status': 'not_implemented', 'message': 'Test execution requires Docker setup' } return evaluation def get_metrics(self) -> Dict: """Get aggregate metrics across all evaluations""" if not self.results: return { 'total_tasks': len(self.tasks), 'tasks_attempted': 0, 'total_generations': 0, 'avg_generation_time': 0, 'avg_confidence': 0, 'avg_hallucination_risk': 0, 'categories': self._get_category_distribution(), 'difficulties': self._get_difficulty_distribution() } # Calculate metrics all_results = [] for task_results in self.results.values(): all_results.extend(task_results) if all_results: avg_time = np.mean([r.generation_time for r in all_results]) avg_confidence = np.mean([np.mean(r.confidence_scores) for r in all_results if r.confidence_scores]) avg_hallucination = np.mean([r.hallucination_risk for r in all_results if r.hallucination_risk is not None]) else: avg_time = avg_confidence = avg_hallucination = 0 return { 'total_tasks': len(self.tasks), 'tasks_attempted': len(self.results), 'total_generations': len(all_results), 'avg_generation_time': float(avg_time), 'avg_confidence': float(avg_confidence), 'avg_hallucination_risk': float(avg_hallucination), 'categories': self._get_category_distribution(), 'difficulties': self._get_difficulty_distribution(), 'with_transparency': sum(1 for r in all_results if r.attention_traces), 'without_transparency': sum(1 for r in all_results if not r.attention_traces) } def _get_category_distribution(self) -> Dict[str, int]: """Get distribution of task categories""" distribution = {} for task in self.tasks.values(): category = task.category distribution[category] = distribution.get(category, 0) + 1 return distribution def _get_difficulty_distribution(self) -> Dict[str, int]: """Get distribution of task difficulties""" distribution = {} for task in self.tasks.values(): difficulty = task.difficulty distribution[difficulty] = distribution.get(difficulty, 0) + 1 return distribution def _update_metrics_cache(self): """Update cached metrics""" self.metrics_cache = { 'last_updated': datetime.now().isoformat(), 'dataset_info': { 'total_tasks': len(self.tasks), 'repositories': len(set(t.repo for t in self.tasks.values())), 'categories': self._get_category_distribution(), 'difficulties': self._get_difficulty_distribution() } } def get_comparison_results(self, task_id: str) -> Optional[Dict]: """Get comparison between with/without transparency for a task""" if task_id not in self.results: return None task_results = self.results[task_id] # Separate results by transparency with_transparency = [r for r in task_results if r.attention_traces] without_transparency = [r for r in task_results if not r.attention_traces] if not with_transparency or not without_transparency: return None # Get best results from each group best_with = min(with_transparency, key=lambda r: r.generation_time) best_without = min(without_transparency, key=lambda r: r.generation_time) return { 'task_id': task_id, 'with_transparency': { 'generation_time': best_with.generation_time, 'avg_confidence': np.mean(best_with.confidence_scores) if best_with.confidence_scores else 0, 'hallucination_risk': best_with.hallucination_risk, 'solution_length': len(best_with.generated_solution.split('\n')) }, 'without_transparency': { 'generation_time': best_without.generation_time, 'avg_confidence': np.mean(best_without.confidence_scores) if best_without.confidence_scores else 0, 'hallucination_risk': best_without.hallucination_risk, 'solution_length': len(best_without.generated_solution.split('\n')) }, 'improvement': { 'time_delta': best_with.generation_time - best_without.generation_time, 'confidence_delta': (np.mean(best_with.confidence_scores) if best_with.confidence_scores else 0) - (np.mean(best_without.confidence_scores) if best_without.confidence_scores else 0) } } # Global service instance swe_bench_service = SWEBenchService()