""" SWE-bench Integration Service for Visualisable.ai Provides access to SWE-bench dataset and evaluation capabilities """ from typing import Dict, List, Optional, Any from dataclasses import dataclass, asdict import json import time import logging from datetime import datetime import traceback import numpy as np logger = logging.getLogger(__name__) @dataclass class SWEBenchTask: """Represents a SWE-bench task/issue""" instance_id: str repo: str problem_statement: str base_commit: str patch: Optional[str] = None test_patch: Optional[str] = None hints_text: Optional[str] = None created_at: Optional[str] = None version: Optional[str] = None FAIL_TO_PASS: Optional[List[str]] = None PASS_TO_PASS: Optional[List[str]] = None @property def difficulty(self) -> str: """Estimate difficulty based on patch size and test count""" if not self.patch: return "unknown" patch_lines = len(self.patch.split('\n')) test_count = len(self.FAIL_TO_PASS) if self.FAIL_TO_PASS else 0 # Adjusted thresholds for better distribution in SWE-bench_Lite # Most tasks are complex, so we use percentile-based distribution if patch_lines < 30: return "easy" elif patch_lines < 100: return "medium" else: return "hard" @property def category(self) -> str: """Categorize based on problem statement keywords""" statement_lower = self.problem_statement.lower() if any(word in statement_lower for word in ['bug', 'fix', 'error', 'crash', 'fail']): return "bug-fix" elif any(word in statement_lower for word in ['add', 'feature', 'implement', 'support']): return "feature" elif any(word in statement_lower for word in ['refactor', 'clean', 'improve', 'optimize']): return "refactor" elif any(word in statement_lower for word in ['test', 'coverage', 'assert']): return "test" elif any(word in statement_lower for word in ['doc', 'comment', 'readme']): return "documentation" else: return "other" @dataclass class SWEBenchResult: """Results from evaluating a solution""" task_id: str generated_solution: str tokens: List[str] token_probabilities: List[float] attention_traces: List[Dict] confidence_scores: List[float] generation_time: float success: Optional[bool] = None tests_passed: Optional[int] = None tests_failed: Optional[int] = None error_message: Optional[str] = None hallucination_risk: Optional[float] = None def to_dict(self) -> Dict: """Convert to dictionary for JSON serialization""" return asdict(self) class SWEBenchService: """Service for managing SWE-bench tasks and evaluations""" def __init__(self): self.tasks: Dict[str, SWEBenchTask] = {} self.results: Dict[str, List[SWEBenchResult]] = {} self.dataset_loaded = False self.metrics_cache: Dict[str, Any] = {} # Removed _load_mock_tasks - real data only for research # Mock data generation has been completely removed to ensure # only real SWE-bench tasks are used for PhD research integrity async def load_dataset(self, dataset_name: str = "princeton-nlp/SWE-bench_Lite"): """Load SWE-bench dataset from Hugging Face""" try: # Check if datasets library is available with proper dependencies try: from datasets import load_dataset import pyarrow as pa # Verify pyarrow has the required attribute if not hasattr(pa, 'PyExtensionType'): logger.error("pyarrow version incompatible with datasets library") self.dataset_loaded = False return except ImportError as ie: logger.error(f"Required libraries not properly installed: {ie}") self.dataset_loaded = False return logger.info(f"Loading SWE-bench dataset: {dataset_name}") # Load the dataset with error handling try: dataset = load_dataset(dataset_name, split='test') # Convert to our task format for item in dataset: task = SWEBenchTask( instance_id=item['instance_id'], repo=item['repo'], problem_statement=item['problem_statement'], base_commit=item['base_commit'], patch=item.get('patch'), test_patch=item.get('test_patch'), hints_text=item.get('hints_text'), created_at=item.get('created_at'), version=item.get('version'), FAIL_TO_PASS=item.get('FAIL_TO_PASS'), PASS_TO_PASS=item.get('PASS_TO_PASS') ) self.tasks[task.instance_id] = task self.dataset_loaded = True logger.info(f"Loaded {len(self.tasks)} SWE-bench tasks") except Exception as dataset_error: logger.error(f"Could not load dataset: {dataset_error}") # No mock data - research requires real dataset self.dataset_loaded = False return # Initialize metrics cache self._update_metrics_cache() except Exception as e: logger.error(f"Failed to load SWE-bench dataset: {e}") self.dataset_loaded = False def get_tasks( self, category: Optional[str] = None, difficulty: Optional[str] = None, repo: Optional[str] = None, limit: int = 100, offset: int = 0 ) -> List[Dict]: """Get filtered list of tasks""" tasks = list(self.tasks.values()) # Apply filters if category: tasks = [t for t in tasks if t.category == category] if difficulty: tasks = [t for t in tasks if t.difficulty == difficulty] if repo: tasks = [t for t in tasks if t.repo == repo] # Apply pagination tasks = tasks[offset:offset + limit] # Convert to dict format return [ { 'instance_id': t.instance_id, 'repo': t.repo, 'category': t.category, 'difficulty': t.difficulty, 'problem_statement': t.problem_statement, # Return full problem statement 'created_at': t.created_at, 'has_patch': t.patch is not None, 'has_tests': t.test_patch is not None, 'test_count': len(t.FAIL_TO_PASS) if t.FAIL_TO_PASS else 0, # Add GitHub URLs if this looks like a real GitHub repo 'issue_url': f"https://github.com/{t.repo}/issues/{t.instance_id.split('-')[-1]}" if '/' in t.repo and t.instance_id else None, 'pr_url': f"https://github.com/{t.repo}/pull/{t.instance_id.split('-')[-1]}" if '/' in t.repo and t.instance_id else None, # Mark if data source is real '_is_real': hasattr(t, 'pr_url') if hasattr(t, 'pr_url') else False } for t in tasks ] def get_task_details(self, task_id: str) -> Optional[Dict]: """Get detailed information about a specific task""" task = self.tasks.get(task_id) if not task: return None return { 'instance_id': task.instance_id, 'repo': task.repo, 'category': task.category, 'difficulty': task.difficulty, 'problem_statement': task.problem_statement, 'base_commit': task.base_commit, 'hints': task.hints_text, 'created_at': task.created_at, 'version': task.version, 'patch_preview': task.patch[:1000] if task.patch else None, 'test_preview': task.test_patch[:1000] if task.test_patch else None, 'gold_patch': task.patch, # Include full gold patch 'fail_to_pass': task.FAIL_TO_PASS, 'pass_to_pass': task.PASS_TO_PASS, 'patch_size': len(task.patch.split('\n')) if task.patch else 0, 'test_count': len(task.FAIL_TO_PASS) if task.FAIL_TO_PASS else 0 } async def generate_solution( self, task_id: str, model_manager, enable_transparency: bool = True, temperature: float = 0.7, max_tokens: int = 500 ) -> SWEBenchResult: """Generate a solution for a SWE-bench task""" task = self.tasks.get(task_id) if not task: raise ValueError(f"Task {task_id} not found") # Prepare prompt prompt = self._create_prompt(task) # Generate solution with traces start_time = time.time() try: if enable_transparency: # Generate with full trace extraction result = await model_manager.generate_with_traces( prompt=prompt, max_tokens=max_tokens, temperature=temperature, sampling_rate=0.1, layer_stride=2 # Sample every other layer for efficiency ) else: # Generate without traces (baseline) result = await model_manager.generate_with_traces( prompt=prompt, max_tokens=max_tokens, temperature=temperature, sampling_rate=0, # No trace sampling layer_stride=999 # Skip all layers ) generation_time = time.time() - start_time # Create result object swe_result = SWEBenchResult( task_id=task_id, generated_solution=result.get('generated_text', ''), tokens=result.get('tokens', []), token_probabilities=result.get('probabilities', []), attention_traces=result.get('traces', []) if enable_transparency else [], confidence_scores=[p for p in result.get('probabilities', [])], generation_time=generation_time, hallucination_risk=result.get('hallucination_risk', 0.0) ) # Store result if task_id not in self.results: self.results[task_id] = [] self.results[task_id].append(swe_result) return swe_result except Exception as e: logger.error(f"Failed to generate solution for {task_id}: {e}") logger.error(traceback.format_exc()) raise def _create_prompt(self, task: SWEBenchTask) -> str: """Create a prompt for the model based on the task""" prompt_parts = [] # Add repository context prompt_parts.append(f"# Repository: {task.repo}") prompt_parts.append(f"# Base commit: {task.base_commit[:8]}") prompt_parts.append("") # Add problem statement prompt_parts.append("# Issue Description:") prompt_parts.append(task.problem_statement[:2000]) # Limit length prompt_parts.append("") # Add hints if available if task.hints_text: prompt_parts.append("# Developer Comments:") prompt_parts.append(task.hints_text[:500]) prompt_parts.append("") # Add instruction prompt_parts.append("# Task: Write code to fix this issue") prompt_parts.append("# Solution:") prompt_parts.append("") return "\n".join(prompt_parts) async def evaluate_solution( self, task_id: str, solution: str, run_tests: bool = False ) -> Dict: """Evaluate a generated solution against the gold patch""" task = self.tasks.get(task_id) if not task: raise ValueError(f"Task {task_id} not found") evaluation = { 'task_id': task_id, 'has_gold_patch': task.patch is not None, 'solution_length': len(solution.split('\n')), 'gold_patch_length': len(task.patch.split('\n')) if task.patch else 0, } if task.patch: # Calculate similarity metrics from difflib import SequenceMatcher # Basic similarity score similarity = SequenceMatcher(None, solution, task.patch).ratio() evaluation['similarity_score'] = similarity # Check if key patterns from gold patch are present gold_lines = set(line.strip() for line in task.patch.split('\n') if line.strip() and not line.startswith(('#', '//', '"""'))) solution_lines = set(line.strip() for line in solution.split('\n') if line.strip() and not line.startswith(('#', '//', '"""'))) if gold_lines: pattern_coverage = len(gold_lines.intersection(solution_lines)) / len(gold_lines) evaluation['pattern_coverage'] = pattern_coverage if run_tests and task.test_patch: # Placeholder for actual test execution # In production, this would apply the patch and run tests in a container evaluation['test_execution'] = { 'status': 'not_implemented', 'message': 'Test execution requires Docker setup' } return evaluation def get_metrics(self) -> Dict: """Get aggregate metrics across all evaluations""" if not self.results: return { 'total_tasks': len(self.tasks), 'tasks_attempted': 0, 'total_generations': 0, 'avg_generation_time': 0, 'avg_confidence': 0, 'avg_hallucination_risk': 0, 'categories': self._get_category_distribution(), 'difficulties': self._get_difficulty_distribution() } # Calculate metrics all_results = [] for task_results in self.results.values(): all_results.extend(task_results) if all_results: avg_time = np.mean([r.generation_time for r in all_results]) avg_confidence = np.mean([np.mean(r.confidence_scores) for r in all_results if r.confidence_scores]) avg_hallucination = np.mean([r.hallucination_risk for r in all_results if r.hallucination_risk is not None]) else: avg_time = avg_confidence = avg_hallucination = 0 return { 'total_tasks': len(self.tasks), 'tasks_attempted': len(self.results), 'total_generations': len(all_results), 'avg_generation_time': float(avg_time), 'avg_confidence': float(avg_confidence), 'avg_hallucination_risk': float(avg_hallucination), 'categories': self._get_category_distribution(), 'difficulties': self._get_difficulty_distribution(), 'with_transparency': sum(1 for r in all_results if r.attention_traces), 'without_transparency': sum(1 for r in all_results if not r.attention_traces) } def _get_category_distribution(self) -> Dict[str, int]: """Get distribution of task categories""" distribution = {} for task in self.tasks.values(): category = task.category distribution[category] = distribution.get(category, 0) + 1 return distribution def _get_difficulty_distribution(self) -> Dict[str, int]: """Get distribution of task difficulties""" distribution = {} for task in self.tasks.values(): difficulty = task.difficulty distribution[difficulty] = distribution.get(difficulty, 0) + 1 return distribution def _update_metrics_cache(self): """Update cached metrics""" self.metrics_cache = { 'last_updated': datetime.now().isoformat(), 'dataset_info': { 'total_tasks': len(self.tasks), 'repositories': len(set(t.repo for t in self.tasks.values())), 'categories': self._get_category_distribution(), 'difficulties': self._get_difficulty_distribution() } } def get_comparison_results(self, task_id: str) -> Optional[Dict]: """Get comparison between with/without transparency for a task""" if task_id not in self.results: return None task_results = self.results[task_id] # Separate results by transparency with_transparency = [r for r in task_results if r.attention_traces] without_transparency = [r for r in task_results if not r.attention_traces] if not with_transparency or not without_transparency: return None # Get best results from each group best_with = min(with_transparency, key=lambda r: r.generation_time) best_without = min(without_transparency, key=lambda r: r.generation_time) return { 'task_id': task_id, 'with_transparency': { 'generation_time': best_with.generation_time, 'avg_confidence': np.mean(best_with.confidence_scores) if best_with.confidence_scores else 0, 'hallucination_risk': best_with.hallucination_risk, 'solution_length': len(best_with.generated_solution.split('\n')) }, 'without_transparency': { 'generation_time': best_without.generation_time, 'avg_confidence': np.mean(best_without.confidence_scores) if best_without.confidence_scores else 0, 'hallucination_risk': best_without.hallucination_risk, 'solution_length': len(best_without.generated_solution.split('\n')) }, 'improvement': { 'time_delta': best_with.generation_time - best_without.generation_time, 'confidence_delta': (np.mean(best_with.confidence_scores) if best_with.confidence_scores else 0) - (np.mean(best_without.confidence_scores) if best_without.confidence_scores else 0) } } # Global service instance swe_bench_service = SWEBenchService()