Spaces:
Sleeping
Sleeping
| """ | |
| SWE-bench Integration Service for Visualisable.ai | |
| Provides access to SWE-bench dataset and evaluation capabilities | |
| """ | |
| from typing import Dict, List, Optional, Any | |
| from dataclasses import dataclass, asdict | |
| import json | |
| import time | |
| import logging | |
| from datetime import datetime | |
| import traceback | |
| import numpy as np | |
| logger = logging.getLogger(__name__) | |
| class SWEBenchTask: | |
| """Represents a SWE-bench task/issue""" | |
| instance_id: str | |
| repo: str | |
| problem_statement: str | |
| base_commit: str | |
| patch: Optional[str] = None | |
| test_patch: Optional[str] = None | |
| hints_text: Optional[str] = None | |
| created_at: Optional[str] = None | |
| version: Optional[str] = None | |
| FAIL_TO_PASS: Optional[List[str]] = None | |
| PASS_TO_PASS: Optional[List[str]] = None | |
| def difficulty(self) -> str: | |
| """Estimate difficulty based on patch size and test count""" | |
| if not self.patch: | |
| return "unknown" | |
| patch_lines = len(self.patch.split('\n')) | |
| test_count = len(self.FAIL_TO_PASS) if self.FAIL_TO_PASS else 0 | |
| # Adjusted thresholds for better distribution in SWE-bench_Lite | |
| # Most tasks are complex, so we use percentile-based distribution | |
| if patch_lines < 30: | |
| return "easy" | |
| elif patch_lines < 100: | |
| return "medium" | |
| else: | |
| return "hard" | |
| def category(self) -> str: | |
| """Categorize based on problem statement keywords""" | |
| statement_lower = self.problem_statement.lower() | |
| if any(word in statement_lower for word in ['bug', 'fix', 'error', 'crash', 'fail']): | |
| return "bug-fix" | |
| elif any(word in statement_lower for word in ['add', 'feature', 'implement', 'support']): | |
| return "feature" | |
| elif any(word in statement_lower for word in ['refactor', 'clean', 'improve', 'optimize']): | |
| return "refactor" | |
| elif any(word in statement_lower for word in ['test', 'coverage', 'assert']): | |
| return "test" | |
| elif any(word in statement_lower for word in ['doc', 'comment', 'readme']): | |
| return "documentation" | |
| else: | |
| return "other" | |
| class SWEBenchResult: | |
| """Results from evaluating a solution""" | |
| task_id: str | |
| generated_solution: str | |
| tokens: List[str] | |
| token_probabilities: List[float] | |
| attention_traces: List[Dict] | |
| confidence_scores: List[float] | |
| generation_time: float | |
| success: Optional[bool] = None | |
| tests_passed: Optional[int] = None | |
| tests_failed: Optional[int] = None | |
| error_message: Optional[str] = None | |
| hallucination_risk: Optional[float] = None | |
| def to_dict(self) -> Dict: | |
| """Convert to dictionary for JSON serialization""" | |
| return asdict(self) | |
| class SWEBenchService: | |
| """Service for managing SWE-bench tasks and evaluations""" | |
| def __init__(self): | |
| self.tasks: Dict[str, SWEBenchTask] = {} | |
| self.results: Dict[str, List[SWEBenchResult]] = {} | |
| self.dataset_loaded = False | |
| self.metrics_cache: Dict[str, Any] = {} | |
| # Removed _load_mock_tasks - real data only for research | |
| # Mock data generation has been completely removed to ensure | |
| # only real SWE-bench tasks are used for PhD research integrity | |
| async def load_dataset(self, dataset_name: str = "princeton-nlp/SWE-bench_Lite"): | |
| """Load SWE-bench dataset from Hugging Face""" | |
| try: | |
| # Check if datasets library is available with proper dependencies | |
| try: | |
| from datasets import load_dataset | |
| import pyarrow as pa | |
| # Verify pyarrow has the required attribute | |
| if not hasattr(pa, 'PyExtensionType'): | |
| logger.error("pyarrow version incompatible with datasets library") | |
| self.dataset_loaded = False | |
| return | |
| except ImportError as ie: | |
| logger.error(f"Required libraries not properly installed: {ie}") | |
| self.dataset_loaded = False | |
| return | |
| logger.info(f"Loading SWE-bench dataset: {dataset_name}") | |
| # Load the dataset with error handling | |
| try: | |
| dataset = load_dataset(dataset_name, split='test') | |
| # Convert to our task format | |
| for item in dataset: | |
| task = SWEBenchTask( | |
| instance_id=item['instance_id'], | |
| repo=item['repo'], | |
| problem_statement=item['problem_statement'], | |
| base_commit=item['base_commit'], | |
| patch=item.get('patch'), | |
| test_patch=item.get('test_patch'), | |
| hints_text=item.get('hints_text'), | |
| created_at=item.get('created_at'), | |
| version=item.get('version'), | |
| FAIL_TO_PASS=item.get('FAIL_TO_PASS'), | |
| PASS_TO_PASS=item.get('PASS_TO_PASS') | |
| ) | |
| self.tasks[task.instance_id] = task | |
| self.dataset_loaded = True | |
| logger.info(f"Loaded {len(self.tasks)} SWE-bench tasks") | |
| except Exception as dataset_error: | |
| logger.error(f"Could not load dataset: {dataset_error}") | |
| # No mock data - research requires real dataset | |
| self.dataset_loaded = False | |
| return | |
| # Initialize metrics cache | |
| self._update_metrics_cache() | |
| except Exception as e: | |
| logger.error(f"Failed to load SWE-bench dataset: {e}") | |
| self.dataset_loaded = False | |
| def get_tasks( | |
| self, | |
| category: Optional[str] = None, | |
| difficulty: Optional[str] = None, | |
| repo: Optional[str] = None, | |
| limit: int = 100, | |
| offset: int = 0 | |
| ) -> List[Dict]: | |
| """Get filtered list of tasks""" | |
| tasks = list(self.tasks.values()) | |
| # Apply filters | |
| if category: | |
| tasks = [t for t in tasks if t.category == category] | |
| if difficulty: | |
| tasks = [t for t in tasks if t.difficulty == difficulty] | |
| if repo: | |
| tasks = [t for t in tasks if t.repo == repo] | |
| # Apply pagination | |
| tasks = tasks[offset:offset + limit] | |
| # Convert to dict format | |
| return [ | |
| { | |
| 'instance_id': t.instance_id, | |
| 'repo': t.repo, | |
| 'category': t.category, | |
| 'difficulty': t.difficulty, | |
| 'problem_statement': t.problem_statement, # Return full problem statement | |
| 'created_at': t.created_at, | |
| 'has_patch': t.patch is not None, | |
| 'has_tests': t.test_patch is not None, | |
| 'test_count': len(t.FAIL_TO_PASS) if t.FAIL_TO_PASS else 0, | |
| # Add GitHub URLs if this looks like a real GitHub repo | |
| 'issue_url': f"https://github.com/{t.repo}/issues/{t.instance_id.split('-')[-1]}" | |
| if '/' in t.repo and t.instance_id else None, | |
| 'pr_url': f"https://github.com/{t.repo}/pull/{t.instance_id.split('-')[-1]}" | |
| if '/' in t.repo and t.instance_id else None, | |
| # Mark if data source is real | |
| '_is_real': hasattr(t, 'pr_url') if hasattr(t, 'pr_url') else False | |
| } | |
| for t in tasks | |
| ] | |
| def get_task_details(self, task_id: str) -> Optional[Dict]: | |
| """Get detailed information about a specific task""" | |
| task = self.tasks.get(task_id) | |
| if not task: | |
| return None | |
| return { | |
| 'instance_id': task.instance_id, | |
| 'repo': task.repo, | |
| 'category': task.category, | |
| 'difficulty': task.difficulty, | |
| 'problem_statement': task.problem_statement, | |
| 'base_commit': task.base_commit, | |
| 'hints': task.hints_text, | |
| 'created_at': task.created_at, | |
| 'version': task.version, | |
| 'patch_preview': task.patch[:1000] if task.patch else None, | |
| 'test_preview': task.test_patch[:1000] if task.test_patch else None, | |
| 'gold_patch': task.patch, # Include full gold patch | |
| 'fail_to_pass': task.FAIL_TO_PASS, | |
| 'pass_to_pass': task.PASS_TO_PASS, | |
| 'patch_size': len(task.patch.split('\n')) if task.patch else 0, | |
| 'test_count': len(task.FAIL_TO_PASS) if task.FAIL_TO_PASS else 0 | |
| } | |
| async def generate_solution( | |
| self, | |
| task_id: str, | |
| model_manager, | |
| enable_transparency: bool = True, | |
| temperature: float = 0.7, | |
| max_tokens: int = 500 | |
| ) -> SWEBenchResult: | |
| """Generate a solution for a SWE-bench task""" | |
| task = self.tasks.get(task_id) | |
| if not task: | |
| raise ValueError(f"Task {task_id} not found") | |
| # Prepare prompt | |
| prompt = self._create_prompt(task) | |
| # Generate solution with traces | |
| start_time = time.time() | |
| try: | |
| if enable_transparency: | |
| # Generate with full trace extraction | |
| result = await model_manager.generate_with_traces( | |
| prompt=prompt, | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| sampling_rate=0.1, | |
| layer_stride=2 # Sample every other layer for efficiency | |
| ) | |
| else: | |
| # Generate without traces (baseline) | |
| result = await model_manager.generate_with_traces( | |
| prompt=prompt, | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| sampling_rate=0, # No trace sampling | |
| layer_stride=999 # Skip all layers | |
| ) | |
| generation_time = time.time() - start_time | |
| # Create result object | |
| swe_result = SWEBenchResult( | |
| task_id=task_id, | |
| generated_solution=result.get('generated_text', ''), | |
| tokens=result.get('tokens', []), | |
| token_probabilities=result.get('probabilities', []), | |
| attention_traces=result.get('traces', []) if enable_transparency else [], | |
| confidence_scores=[p for p in result.get('probabilities', [])], | |
| generation_time=generation_time, | |
| hallucination_risk=result.get('hallucination_risk', 0.0) | |
| ) | |
| # Store result | |
| if task_id not in self.results: | |
| self.results[task_id] = [] | |
| self.results[task_id].append(swe_result) | |
| return swe_result | |
| except Exception as e: | |
| logger.error(f"Failed to generate solution for {task_id}: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise | |
| def _create_prompt(self, task: SWEBenchTask) -> str: | |
| """Create a prompt for the model based on the task""" | |
| prompt_parts = [] | |
| # Add repository context | |
| prompt_parts.append(f"# Repository: {task.repo}") | |
| prompt_parts.append(f"# Base commit: {task.base_commit[:8]}") | |
| prompt_parts.append("") | |
| # Add problem statement | |
| prompt_parts.append("# Issue Description:") | |
| prompt_parts.append(task.problem_statement[:2000]) # Limit length | |
| prompt_parts.append("") | |
| # Add hints if available | |
| if task.hints_text: | |
| prompt_parts.append("# Developer Comments:") | |
| prompt_parts.append(task.hints_text[:500]) | |
| prompt_parts.append("") | |
| # Add instruction | |
| prompt_parts.append("# Task: Write code to fix this issue") | |
| prompt_parts.append("# Solution:") | |
| prompt_parts.append("") | |
| return "\n".join(prompt_parts) | |
| async def evaluate_solution( | |
| self, | |
| task_id: str, | |
| solution: str, | |
| run_tests: bool = False | |
| ) -> Dict: | |
| """Evaluate a generated solution against the gold patch""" | |
| task = self.tasks.get(task_id) | |
| if not task: | |
| raise ValueError(f"Task {task_id} not found") | |
| evaluation = { | |
| 'task_id': task_id, | |
| 'has_gold_patch': task.patch is not None, | |
| 'solution_length': len(solution.split('\n')), | |
| 'gold_patch_length': len(task.patch.split('\n')) if task.patch else 0, | |
| } | |
| if task.patch: | |
| # Calculate similarity metrics | |
| from difflib import SequenceMatcher | |
| # Basic similarity score | |
| similarity = SequenceMatcher(None, solution, task.patch).ratio() | |
| evaluation['similarity_score'] = similarity | |
| # Check if key patterns from gold patch are present | |
| gold_lines = set(line.strip() for line in task.patch.split('\n') | |
| if line.strip() and not line.startswith(('#', '//', '"""'))) | |
| solution_lines = set(line.strip() for line in solution.split('\n') | |
| if line.strip() and not line.startswith(('#', '//', '"""'))) | |
| if gold_lines: | |
| pattern_coverage = len(gold_lines.intersection(solution_lines)) / len(gold_lines) | |
| evaluation['pattern_coverage'] = pattern_coverage | |
| if run_tests and task.test_patch: | |
| # Placeholder for actual test execution | |
| # In production, this would apply the patch and run tests in a container | |
| evaluation['test_execution'] = { | |
| 'status': 'not_implemented', | |
| 'message': 'Test execution requires Docker setup' | |
| } | |
| return evaluation | |
| def get_metrics(self) -> Dict: | |
| """Get aggregate metrics across all evaluations""" | |
| if not self.results: | |
| return { | |
| 'total_tasks': len(self.tasks), | |
| 'tasks_attempted': 0, | |
| 'total_generations': 0, | |
| 'avg_generation_time': 0, | |
| 'avg_confidence': 0, | |
| 'avg_hallucination_risk': 0, | |
| 'categories': self._get_category_distribution(), | |
| 'difficulties': self._get_difficulty_distribution() | |
| } | |
| # Calculate metrics | |
| all_results = [] | |
| for task_results in self.results.values(): | |
| all_results.extend(task_results) | |
| if all_results: | |
| avg_time = np.mean([r.generation_time for r in all_results]) | |
| avg_confidence = np.mean([np.mean(r.confidence_scores) for r in all_results if r.confidence_scores]) | |
| avg_hallucination = np.mean([r.hallucination_risk for r in all_results if r.hallucination_risk is not None]) | |
| else: | |
| avg_time = avg_confidence = avg_hallucination = 0 | |
| return { | |
| 'total_tasks': len(self.tasks), | |
| 'tasks_attempted': len(self.results), | |
| 'total_generations': len(all_results), | |
| 'avg_generation_time': float(avg_time), | |
| 'avg_confidence': float(avg_confidence), | |
| 'avg_hallucination_risk': float(avg_hallucination), | |
| 'categories': self._get_category_distribution(), | |
| 'difficulties': self._get_difficulty_distribution(), | |
| 'with_transparency': sum(1 for r in all_results if r.attention_traces), | |
| 'without_transparency': sum(1 for r in all_results if not r.attention_traces) | |
| } | |
| def _get_category_distribution(self) -> Dict[str, int]: | |
| """Get distribution of task categories""" | |
| distribution = {} | |
| for task in self.tasks.values(): | |
| category = task.category | |
| distribution[category] = distribution.get(category, 0) + 1 | |
| return distribution | |
| def _get_difficulty_distribution(self) -> Dict[str, int]: | |
| """Get distribution of task difficulties""" | |
| distribution = {} | |
| for task in self.tasks.values(): | |
| difficulty = task.difficulty | |
| distribution[difficulty] = distribution.get(difficulty, 0) + 1 | |
| return distribution | |
| def _update_metrics_cache(self): | |
| """Update cached metrics""" | |
| self.metrics_cache = { | |
| 'last_updated': datetime.now().isoformat(), | |
| 'dataset_info': { | |
| 'total_tasks': len(self.tasks), | |
| 'repositories': len(set(t.repo for t in self.tasks.values())), | |
| 'categories': self._get_category_distribution(), | |
| 'difficulties': self._get_difficulty_distribution() | |
| } | |
| } | |
| def get_comparison_results(self, task_id: str) -> Optional[Dict]: | |
| """Get comparison between with/without transparency for a task""" | |
| if task_id not in self.results: | |
| return None | |
| task_results = self.results[task_id] | |
| # Separate results by transparency | |
| with_transparency = [r for r in task_results if r.attention_traces] | |
| without_transparency = [r for r in task_results if not r.attention_traces] | |
| if not with_transparency or not without_transparency: | |
| return None | |
| # Get best results from each group | |
| best_with = min(with_transparency, key=lambda r: r.generation_time) | |
| best_without = min(without_transparency, key=lambda r: r.generation_time) | |
| return { | |
| 'task_id': task_id, | |
| 'with_transparency': { | |
| 'generation_time': best_with.generation_time, | |
| 'avg_confidence': np.mean(best_with.confidence_scores) if best_with.confidence_scores else 0, | |
| 'hallucination_risk': best_with.hallucination_risk, | |
| 'solution_length': len(best_with.generated_solution.split('\n')) | |
| }, | |
| 'without_transparency': { | |
| 'generation_time': best_without.generation_time, | |
| 'avg_confidence': np.mean(best_without.confidence_scores) if best_without.confidence_scores else 0, | |
| 'hallucination_risk': best_without.hallucination_risk, | |
| 'solution_length': len(best_without.generated_solution.split('\n')) | |
| }, | |
| 'improvement': { | |
| 'time_delta': best_with.generation_time - best_without.generation_time, | |
| 'confidence_delta': (np.mean(best_with.confidence_scores) if best_with.confidence_scores else 0) - | |
| (np.mean(best_without.confidence_scores) if best_without.confidence_scores else 0) | |
| } | |
| } | |
| # Global service instance | |
| swe_bench_service = SWEBenchService() |