api / backend /swe_bench_service.py
gary-boon
Fix pyarrow compatibility issue with datasets library
1680fda
raw
history blame
18.6 kB
"""
SWE-bench Integration Service for Visualisable.ai
Provides access to SWE-bench dataset and evaluation capabilities
"""
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
import json
import time
import logging
from datetime import datetime
import traceback
import numpy as np
logger = logging.getLogger(__name__)
@dataclass
class SWEBenchTask:
"""Represents a SWE-bench task/issue"""
instance_id: str
repo: str
problem_statement: str
base_commit: str
patch: Optional[str] = None
test_patch: Optional[str] = None
hints_text: Optional[str] = None
created_at: Optional[str] = None
version: Optional[str] = None
FAIL_TO_PASS: Optional[List[str]] = None
PASS_TO_PASS: Optional[List[str]] = None
@property
def difficulty(self) -> str:
"""Estimate difficulty based on patch size and test count"""
if not self.patch:
return "unknown"
patch_lines = len(self.patch.split('\n'))
test_count = len(self.FAIL_TO_PASS) if self.FAIL_TO_PASS else 0
# Adjusted thresholds for better distribution in SWE-bench_Lite
# Most tasks are complex, so we use percentile-based distribution
if patch_lines < 30:
return "easy"
elif patch_lines < 100:
return "medium"
else:
return "hard"
@property
def category(self) -> str:
"""Categorize based on problem statement keywords"""
statement_lower = self.problem_statement.lower()
if any(word in statement_lower for word in ['bug', 'fix', 'error', 'crash', 'fail']):
return "bug-fix"
elif any(word in statement_lower for word in ['add', 'feature', 'implement', 'support']):
return "feature"
elif any(word in statement_lower for word in ['refactor', 'clean', 'improve', 'optimize']):
return "refactor"
elif any(word in statement_lower for word in ['test', 'coverage', 'assert']):
return "test"
elif any(word in statement_lower for word in ['doc', 'comment', 'readme']):
return "documentation"
else:
return "other"
@dataclass
class SWEBenchResult:
"""Results from evaluating a solution"""
task_id: str
generated_solution: str
tokens: List[str]
token_probabilities: List[float]
attention_traces: List[Dict]
confidence_scores: List[float]
generation_time: float
success: Optional[bool] = None
tests_passed: Optional[int] = None
tests_failed: Optional[int] = None
error_message: Optional[str] = None
hallucination_risk: Optional[float] = None
def to_dict(self) -> Dict:
"""Convert to dictionary for JSON serialization"""
return asdict(self)
class SWEBenchService:
"""Service for managing SWE-bench tasks and evaluations"""
def __init__(self):
self.tasks: Dict[str, SWEBenchTask] = {}
self.results: Dict[str, List[SWEBenchResult]] = {}
self.dataset_loaded = False
self.metrics_cache: Dict[str, Any] = {}
# Removed _load_mock_tasks - real data only for research
# Mock data generation has been completely removed to ensure
# only real SWE-bench tasks are used for PhD research integrity
async def load_dataset(self, dataset_name: str = "princeton-nlp/SWE-bench_Lite"):
"""Load SWE-bench dataset from Hugging Face"""
try:
# Check if datasets library is available with proper dependencies
try:
from datasets import load_dataset
import pyarrow as pa
# Verify pyarrow has the required attribute
if not hasattr(pa, 'PyExtensionType'):
logger.error("pyarrow version incompatible with datasets library")
self.dataset_loaded = False
return
except ImportError as ie:
logger.error(f"Required libraries not properly installed: {ie}")
self.dataset_loaded = False
return
logger.info(f"Loading SWE-bench dataset: {dataset_name}")
# Load the dataset with error handling
try:
dataset = load_dataset(dataset_name, split='test')
# Convert to our task format
for item in dataset:
task = SWEBenchTask(
instance_id=item['instance_id'],
repo=item['repo'],
problem_statement=item['problem_statement'],
base_commit=item['base_commit'],
patch=item.get('patch'),
test_patch=item.get('test_patch'),
hints_text=item.get('hints_text'),
created_at=item.get('created_at'),
version=item.get('version'),
FAIL_TO_PASS=item.get('FAIL_TO_PASS'),
PASS_TO_PASS=item.get('PASS_TO_PASS')
)
self.tasks[task.instance_id] = task
self.dataset_loaded = True
logger.info(f"Loaded {len(self.tasks)} SWE-bench tasks")
except Exception as dataset_error:
logger.error(f"Could not load dataset: {dataset_error}")
# No mock data - research requires real dataset
self.dataset_loaded = False
return
# Initialize metrics cache
self._update_metrics_cache()
except Exception as e:
logger.error(f"Failed to load SWE-bench dataset: {e}")
self.dataset_loaded = False
def get_tasks(
self,
category: Optional[str] = None,
difficulty: Optional[str] = None,
repo: Optional[str] = None,
limit: int = 100,
offset: int = 0
) -> List[Dict]:
"""Get filtered list of tasks"""
tasks = list(self.tasks.values())
# Apply filters
if category:
tasks = [t for t in tasks if t.category == category]
if difficulty:
tasks = [t for t in tasks if t.difficulty == difficulty]
if repo:
tasks = [t for t in tasks if t.repo == repo]
# Apply pagination
tasks = tasks[offset:offset + limit]
# Convert to dict format
return [
{
'instance_id': t.instance_id,
'repo': t.repo,
'category': t.category,
'difficulty': t.difficulty,
'problem_statement': t.problem_statement, # Return full problem statement
'created_at': t.created_at,
'has_patch': t.patch is not None,
'has_tests': t.test_patch is not None,
'test_count': len(t.FAIL_TO_PASS) if t.FAIL_TO_PASS else 0,
# Add GitHub URLs if this looks like a real GitHub repo
'issue_url': f"https://github.com/{t.repo}/issues/{t.instance_id.split('-')[-1]}"
if '/' in t.repo and t.instance_id else None,
'pr_url': f"https://github.com/{t.repo}/pull/{t.instance_id.split('-')[-1]}"
if '/' in t.repo and t.instance_id else None,
# Mark if data source is real
'_is_real': hasattr(t, 'pr_url') if hasattr(t, 'pr_url') else False
}
for t in tasks
]
def get_task_details(self, task_id: str) -> Optional[Dict]:
"""Get detailed information about a specific task"""
task = self.tasks.get(task_id)
if not task:
return None
return {
'instance_id': task.instance_id,
'repo': task.repo,
'category': task.category,
'difficulty': task.difficulty,
'problem_statement': task.problem_statement,
'base_commit': task.base_commit,
'hints': task.hints_text,
'created_at': task.created_at,
'version': task.version,
'patch_preview': task.patch[:1000] if task.patch else None,
'test_preview': task.test_patch[:1000] if task.test_patch else None,
'gold_patch': task.patch, # Include full gold patch
'fail_to_pass': task.FAIL_TO_PASS,
'pass_to_pass': task.PASS_TO_PASS,
'patch_size': len(task.patch.split('\n')) if task.patch else 0,
'test_count': len(task.FAIL_TO_PASS) if task.FAIL_TO_PASS else 0
}
async def generate_solution(
self,
task_id: str,
model_manager,
enable_transparency: bool = True,
temperature: float = 0.7,
max_tokens: int = 500
) -> SWEBenchResult:
"""Generate a solution for a SWE-bench task"""
task = self.tasks.get(task_id)
if not task:
raise ValueError(f"Task {task_id} not found")
# Prepare prompt
prompt = self._create_prompt(task)
# Generate solution with traces
start_time = time.time()
try:
if enable_transparency:
# Generate with full trace extraction
result = await model_manager.generate_with_traces(
prompt=prompt,
max_tokens=max_tokens,
temperature=temperature,
sampling_rate=0.1,
layer_stride=2 # Sample every other layer for efficiency
)
else:
# Generate without traces (baseline)
result = await model_manager.generate_with_traces(
prompt=prompt,
max_tokens=max_tokens,
temperature=temperature,
sampling_rate=0, # No trace sampling
layer_stride=999 # Skip all layers
)
generation_time = time.time() - start_time
# Create result object
swe_result = SWEBenchResult(
task_id=task_id,
generated_solution=result.get('generated_text', ''),
tokens=result.get('tokens', []),
token_probabilities=result.get('probabilities', []),
attention_traces=result.get('traces', []) if enable_transparency else [],
confidence_scores=[p for p in result.get('probabilities', [])],
generation_time=generation_time,
hallucination_risk=result.get('hallucination_risk', 0.0)
)
# Store result
if task_id not in self.results:
self.results[task_id] = []
self.results[task_id].append(swe_result)
return swe_result
except Exception as e:
logger.error(f"Failed to generate solution for {task_id}: {e}")
logger.error(traceback.format_exc())
raise
def _create_prompt(self, task: SWEBenchTask) -> str:
"""Create a prompt for the model based on the task"""
prompt_parts = []
# Add repository context
prompt_parts.append(f"# Repository: {task.repo}")
prompt_parts.append(f"# Base commit: {task.base_commit[:8]}")
prompt_parts.append("")
# Add problem statement
prompt_parts.append("# Issue Description:")
prompt_parts.append(task.problem_statement[:2000]) # Limit length
prompt_parts.append("")
# Add hints if available
if task.hints_text:
prompt_parts.append("# Developer Comments:")
prompt_parts.append(task.hints_text[:500])
prompt_parts.append("")
# Add instruction
prompt_parts.append("# Task: Write code to fix this issue")
prompt_parts.append("# Solution:")
prompt_parts.append("")
return "\n".join(prompt_parts)
async def evaluate_solution(
self,
task_id: str,
solution: str,
run_tests: bool = False
) -> Dict:
"""Evaluate a generated solution against the gold patch"""
task = self.tasks.get(task_id)
if not task:
raise ValueError(f"Task {task_id} not found")
evaluation = {
'task_id': task_id,
'has_gold_patch': task.patch is not None,
'solution_length': len(solution.split('\n')),
'gold_patch_length': len(task.patch.split('\n')) if task.patch else 0,
}
if task.patch:
# Calculate similarity metrics
from difflib import SequenceMatcher
# Basic similarity score
similarity = SequenceMatcher(None, solution, task.patch).ratio()
evaluation['similarity_score'] = similarity
# Check if key patterns from gold patch are present
gold_lines = set(line.strip() for line in task.patch.split('\n')
if line.strip() and not line.startswith(('#', '//', '"""')))
solution_lines = set(line.strip() for line in solution.split('\n')
if line.strip() and not line.startswith(('#', '//', '"""')))
if gold_lines:
pattern_coverage = len(gold_lines.intersection(solution_lines)) / len(gold_lines)
evaluation['pattern_coverage'] = pattern_coverage
if run_tests and task.test_patch:
# Placeholder for actual test execution
# In production, this would apply the patch and run tests in a container
evaluation['test_execution'] = {
'status': 'not_implemented',
'message': 'Test execution requires Docker setup'
}
return evaluation
def get_metrics(self) -> Dict:
"""Get aggregate metrics across all evaluations"""
if not self.results:
return {
'total_tasks': len(self.tasks),
'tasks_attempted': 0,
'total_generations': 0,
'avg_generation_time': 0,
'avg_confidence': 0,
'avg_hallucination_risk': 0,
'categories': self._get_category_distribution(),
'difficulties': self._get_difficulty_distribution()
}
# Calculate metrics
all_results = []
for task_results in self.results.values():
all_results.extend(task_results)
if all_results:
avg_time = np.mean([r.generation_time for r in all_results])
avg_confidence = np.mean([np.mean(r.confidence_scores) for r in all_results if r.confidence_scores])
avg_hallucination = np.mean([r.hallucination_risk for r in all_results if r.hallucination_risk is not None])
else:
avg_time = avg_confidence = avg_hallucination = 0
return {
'total_tasks': len(self.tasks),
'tasks_attempted': len(self.results),
'total_generations': len(all_results),
'avg_generation_time': float(avg_time),
'avg_confidence': float(avg_confidence),
'avg_hallucination_risk': float(avg_hallucination),
'categories': self._get_category_distribution(),
'difficulties': self._get_difficulty_distribution(),
'with_transparency': sum(1 for r in all_results if r.attention_traces),
'without_transparency': sum(1 for r in all_results if not r.attention_traces)
}
def _get_category_distribution(self) -> Dict[str, int]:
"""Get distribution of task categories"""
distribution = {}
for task in self.tasks.values():
category = task.category
distribution[category] = distribution.get(category, 0) + 1
return distribution
def _get_difficulty_distribution(self) -> Dict[str, int]:
"""Get distribution of task difficulties"""
distribution = {}
for task in self.tasks.values():
difficulty = task.difficulty
distribution[difficulty] = distribution.get(difficulty, 0) + 1
return distribution
def _update_metrics_cache(self):
"""Update cached metrics"""
self.metrics_cache = {
'last_updated': datetime.now().isoformat(),
'dataset_info': {
'total_tasks': len(self.tasks),
'repositories': len(set(t.repo for t in self.tasks.values())),
'categories': self._get_category_distribution(),
'difficulties': self._get_difficulty_distribution()
}
}
def get_comparison_results(self, task_id: str) -> Optional[Dict]:
"""Get comparison between with/without transparency for a task"""
if task_id not in self.results:
return None
task_results = self.results[task_id]
# Separate results by transparency
with_transparency = [r for r in task_results if r.attention_traces]
without_transparency = [r for r in task_results if not r.attention_traces]
if not with_transparency or not without_transparency:
return None
# Get best results from each group
best_with = min(with_transparency, key=lambda r: r.generation_time)
best_without = min(without_transparency, key=lambda r: r.generation_time)
return {
'task_id': task_id,
'with_transparency': {
'generation_time': best_with.generation_time,
'avg_confidence': np.mean(best_with.confidence_scores) if best_with.confidence_scores else 0,
'hallucination_risk': best_with.hallucination_risk,
'solution_length': len(best_with.generated_solution.split('\n'))
},
'without_transparency': {
'generation_time': best_without.generation_time,
'avg_confidence': np.mean(best_without.confidence_scores) if best_without.confidence_scores else 0,
'hallucination_risk': best_without.hallucination_risk,
'solution_length': len(best_without.generated_solution.split('\n'))
},
'improvement': {
'time_delta': best_with.generation_time - best_without.generation_time,
'confidence_delta': (np.mean(best_with.confidence_scores) if best_with.confidence_scores else 0) -
(np.mean(best_without.confidence_scores) if best_without.confidence_scores else 0)
}
}
# Global service instance
swe_bench_service = SWEBenchService()