Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
File size: 18,615 Bytes
4444ae2 c0d95bf 9dbec03 ae9e159 4444ae2 1680fda 4444ae2 ae9e159 1680fda c0d95bf 1680fda 4444ae2 c0d95bf 1680fda 4444ae2 1d23728 4444ae2 22c69fa c0d95bf 4444ae2 1d23728 4444ae2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 |
"""
SWE-bench Integration Service for Visualisable.ai
Provides access to SWE-bench dataset and evaluation capabilities
"""
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
import json
import time
import logging
from datetime import datetime
import traceback
import numpy as np
logger = logging.getLogger(__name__)
@dataclass
class SWEBenchTask:
"""Represents a SWE-bench task/issue"""
instance_id: str
repo: str
problem_statement: str
base_commit: str
patch: Optional[str] = None
test_patch: Optional[str] = None
hints_text: Optional[str] = None
created_at: Optional[str] = None
version: Optional[str] = None
FAIL_TO_PASS: Optional[List[str]] = None
PASS_TO_PASS: Optional[List[str]] = None
@property
def difficulty(self) -> str:
"""Estimate difficulty based on patch size and test count"""
if not self.patch:
return "unknown"
patch_lines = len(self.patch.split('\n'))
test_count = len(self.FAIL_TO_PASS) if self.FAIL_TO_PASS else 0
# Adjusted thresholds for better distribution in SWE-bench_Lite
# Most tasks are complex, so we use percentile-based distribution
if patch_lines < 30:
return "easy"
elif patch_lines < 100:
return "medium"
else:
return "hard"
@property
def category(self) -> str:
"""Categorize based on problem statement keywords"""
statement_lower = self.problem_statement.lower()
if any(word in statement_lower for word in ['bug', 'fix', 'error', 'crash', 'fail']):
return "bug-fix"
elif any(word in statement_lower for word in ['add', 'feature', 'implement', 'support']):
return "feature"
elif any(word in statement_lower for word in ['refactor', 'clean', 'improve', 'optimize']):
return "refactor"
elif any(word in statement_lower for word in ['test', 'coverage', 'assert']):
return "test"
elif any(word in statement_lower for word in ['doc', 'comment', 'readme']):
return "documentation"
else:
return "other"
@dataclass
class SWEBenchResult:
"""Results from evaluating a solution"""
task_id: str
generated_solution: str
tokens: List[str]
token_probabilities: List[float]
attention_traces: List[Dict]
confidence_scores: List[float]
generation_time: float
success: Optional[bool] = None
tests_passed: Optional[int] = None
tests_failed: Optional[int] = None
error_message: Optional[str] = None
hallucination_risk: Optional[float] = None
def to_dict(self) -> Dict:
"""Convert to dictionary for JSON serialization"""
return asdict(self)
class SWEBenchService:
"""Service for managing SWE-bench tasks and evaluations"""
def __init__(self):
self.tasks: Dict[str, SWEBenchTask] = {}
self.results: Dict[str, List[SWEBenchResult]] = {}
self.dataset_loaded = False
self.metrics_cache: Dict[str, Any] = {}
# Removed _load_mock_tasks - real data only for research
# Mock data generation has been completely removed to ensure
# only real SWE-bench tasks are used for PhD research integrity
async def load_dataset(self, dataset_name: str = "princeton-nlp/SWE-bench_Lite"):
"""Load SWE-bench dataset from Hugging Face"""
try:
# Check if datasets library is available with proper dependencies
try:
from datasets import load_dataset
import pyarrow as pa
# Verify pyarrow has the required attribute
if not hasattr(pa, 'PyExtensionType'):
logger.error("pyarrow version incompatible with datasets library")
self.dataset_loaded = False
return
except ImportError as ie:
logger.error(f"Required libraries not properly installed: {ie}")
self.dataset_loaded = False
return
logger.info(f"Loading SWE-bench dataset: {dataset_name}")
# Load the dataset with error handling
try:
dataset = load_dataset(dataset_name, split='test')
# Convert to our task format
for item in dataset:
task = SWEBenchTask(
instance_id=item['instance_id'],
repo=item['repo'],
problem_statement=item['problem_statement'],
base_commit=item['base_commit'],
patch=item.get('patch'),
test_patch=item.get('test_patch'),
hints_text=item.get('hints_text'),
created_at=item.get('created_at'),
version=item.get('version'),
FAIL_TO_PASS=item.get('FAIL_TO_PASS'),
PASS_TO_PASS=item.get('PASS_TO_PASS')
)
self.tasks[task.instance_id] = task
self.dataset_loaded = True
logger.info(f"Loaded {len(self.tasks)} SWE-bench tasks")
except Exception as dataset_error:
logger.error(f"Could not load dataset: {dataset_error}")
# No mock data - research requires real dataset
self.dataset_loaded = False
return
# Initialize metrics cache
self._update_metrics_cache()
except Exception as e:
logger.error(f"Failed to load SWE-bench dataset: {e}")
self.dataset_loaded = False
def get_tasks(
self,
category: Optional[str] = None,
difficulty: Optional[str] = None,
repo: Optional[str] = None,
limit: int = 100,
offset: int = 0
) -> List[Dict]:
"""Get filtered list of tasks"""
tasks = list(self.tasks.values())
# Apply filters
if category:
tasks = [t for t in tasks if t.category == category]
if difficulty:
tasks = [t for t in tasks if t.difficulty == difficulty]
if repo:
tasks = [t for t in tasks if t.repo == repo]
# Apply pagination
tasks = tasks[offset:offset + limit]
# Convert to dict format
return [
{
'instance_id': t.instance_id,
'repo': t.repo,
'category': t.category,
'difficulty': t.difficulty,
'problem_statement': t.problem_statement, # Return full problem statement
'created_at': t.created_at,
'has_patch': t.patch is not None,
'has_tests': t.test_patch is not None,
'test_count': len(t.FAIL_TO_PASS) if t.FAIL_TO_PASS else 0,
# Add GitHub URLs if this looks like a real GitHub repo
'issue_url': f"https://github.com/{t.repo}/issues/{t.instance_id.split('-')[-1]}"
if '/' in t.repo and t.instance_id else None,
'pr_url': f"https://github.com/{t.repo}/pull/{t.instance_id.split('-')[-1]}"
if '/' in t.repo and t.instance_id else None,
# Mark if data source is real
'_is_real': hasattr(t, 'pr_url') if hasattr(t, 'pr_url') else False
}
for t in tasks
]
def get_task_details(self, task_id: str) -> Optional[Dict]:
"""Get detailed information about a specific task"""
task = self.tasks.get(task_id)
if not task:
return None
return {
'instance_id': task.instance_id,
'repo': task.repo,
'category': task.category,
'difficulty': task.difficulty,
'problem_statement': task.problem_statement,
'base_commit': task.base_commit,
'hints': task.hints_text,
'created_at': task.created_at,
'version': task.version,
'patch_preview': task.patch[:1000] if task.patch else None,
'test_preview': task.test_patch[:1000] if task.test_patch else None,
'gold_patch': task.patch, # Include full gold patch
'fail_to_pass': task.FAIL_TO_PASS,
'pass_to_pass': task.PASS_TO_PASS,
'patch_size': len(task.patch.split('\n')) if task.patch else 0,
'test_count': len(task.FAIL_TO_PASS) if task.FAIL_TO_PASS else 0
}
async def generate_solution(
self,
task_id: str,
model_manager,
enable_transparency: bool = True,
temperature: float = 0.7,
max_tokens: int = 500
) -> SWEBenchResult:
"""Generate a solution for a SWE-bench task"""
task = self.tasks.get(task_id)
if not task:
raise ValueError(f"Task {task_id} not found")
# Prepare prompt
prompt = self._create_prompt(task)
# Generate solution with traces
start_time = time.time()
try:
if enable_transparency:
# Generate with full trace extraction
result = await model_manager.generate_with_traces(
prompt=prompt,
max_tokens=max_tokens,
temperature=temperature,
sampling_rate=0.1,
layer_stride=2 # Sample every other layer for efficiency
)
else:
# Generate without traces (baseline)
result = await model_manager.generate_with_traces(
prompt=prompt,
max_tokens=max_tokens,
temperature=temperature,
sampling_rate=0, # No trace sampling
layer_stride=999 # Skip all layers
)
generation_time = time.time() - start_time
# Create result object
swe_result = SWEBenchResult(
task_id=task_id,
generated_solution=result.get('generated_text', ''),
tokens=result.get('tokens', []),
token_probabilities=result.get('probabilities', []),
attention_traces=result.get('traces', []) if enable_transparency else [],
confidence_scores=[p for p in result.get('probabilities', [])],
generation_time=generation_time,
hallucination_risk=result.get('hallucination_risk', 0.0)
)
# Store result
if task_id not in self.results:
self.results[task_id] = []
self.results[task_id].append(swe_result)
return swe_result
except Exception as e:
logger.error(f"Failed to generate solution for {task_id}: {e}")
logger.error(traceback.format_exc())
raise
def _create_prompt(self, task: SWEBenchTask) -> str:
"""Create a prompt for the model based on the task"""
prompt_parts = []
# Add repository context
prompt_parts.append(f"# Repository: {task.repo}")
prompt_parts.append(f"# Base commit: {task.base_commit[:8]}")
prompt_parts.append("")
# Add problem statement
prompt_parts.append("# Issue Description:")
prompt_parts.append(task.problem_statement[:2000]) # Limit length
prompt_parts.append("")
# Add hints if available
if task.hints_text:
prompt_parts.append("# Developer Comments:")
prompt_parts.append(task.hints_text[:500])
prompt_parts.append("")
# Add instruction
prompt_parts.append("# Task: Write code to fix this issue")
prompt_parts.append("# Solution:")
prompt_parts.append("")
return "\n".join(prompt_parts)
async def evaluate_solution(
self,
task_id: str,
solution: str,
run_tests: bool = False
) -> Dict:
"""Evaluate a generated solution against the gold patch"""
task = self.tasks.get(task_id)
if not task:
raise ValueError(f"Task {task_id} not found")
evaluation = {
'task_id': task_id,
'has_gold_patch': task.patch is not None,
'solution_length': len(solution.split('\n')),
'gold_patch_length': len(task.patch.split('\n')) if task.patch else 0,
}
if task.patch:
# Calculate similarity metrics
from difflib import SequenceMatcher
# Basic similarity score
similarity = SequenceMatcher(None, solution, task.patch).ratio()
evaluation['similarity_score'] = similarity
# Check if key patterns from gold patch are present
gold_lines = set(line.strip() for line in task.patch.split('\n')
if line.strip() and not line.startswith(('#', '//', '"""')))
solution_lines = set(line.strip() for line in solution.split('\n')
if line.strip() and not line.startswith(('#', '//', '"""')))
if gold_lines:
pattern_coverage = len(gold_lines.intersection(solution_lines)) / len(gold_lines)
evaluation['pattern_coverage'] = pattern_coverage
if run_tests and task.test_patch:
# Placeholder for actual test execution
# In production, this would apply the patch and run tests in a container
evaluation['test_execution'] = {
'status': 'not_implemented',
'message': 'Test execution requires Docker setup'
}
return evaluation
def get_metrics(self) -> Dict:
"""Get aggregate metrics across all evaluations"""
if not self.results:
return {
'total_tasks': len(self.tasks),
'tasks_attempted': 0,
'total_generations': 0,
'avg_generation_time': 0,
'avg_confidence': 0,
'avg_hallucination_risk': 0,
'categories': self._get_category_distribution(),
'difficulties': self._get_difficulty_distribution()
}
# Calculate metrics
all_results = []
for task_results in self.results.values():
all_results.extend(task_results)
if all_results:
avg_time = np.mean([r.generation_time for r in all_results])
avg_confidence = np.mean([np.mean(r.confidence_scores) for r in all_results if r.confidence_scores])
avg_hallucination = np.mean([r.hallucination_risk for r in all_results if r.hallucination_risk is not None])
else:
avg_time = avg_confidence = avg_hallucination = 0
return {
'total_tasks': len(self.tasks),
'tasks_attempted': len(self.results),
'total_generations': len(all_results),
'avg_generation_time': float(avg_time),
'avg_confidence': float(avg_confidence),
'avg_hallucination_risk': float(avg_hallucination),
'categories': self._get_category_distribution(),
'difficulties': self._get_difficulty_distribution(),
'with_transparency': sum(1 for r in all_results if r.attention_traces),
'without_transparency': sum(1 for r in all_results if not r.attention_traces)
}
def _get_category_distribution(self) -> Dict[str, int]:
"""Get distribution of task categories"""
distribution = {}
for task in self.tasks.values():
category = task.category
distribution[category] = distribution.get(category, 0) + 1
return distribution
def _get_difficulty_distribution(self) -> Dict[str, int]:
"""Get distribution of task difficulties"""
distribution = {}
for task in self.tasks.values():
difficulty = task.difficulty
distribution[difficulty] = distribution.get(difficulty, 0) + 1
return distribution
def _update_metrics_cache(self):
"""Update cached metrics"""
self.metrics_cache = {
'last_updated': datetime.now().isoformat(),
'dataset_info': {
'total_tasks': len(self.tasks),
'repositories': len(set(t.repo for t in self.tasks.values())),
'categories': self._get_category_distribution(),
'difficulties': self._get_difficulty_distribution()
}
}
def get_comparison_results(self, task_id: str) -> Optional[Dict]:
"""Get comparison between with/without transparency for a task"""
if task_id not in self.results:
return None
task_results = self.results[task_id]
# Separate results by transparency
with_transparency = [r for r in task_results if r.attention_traces]
without_transparency = [r for r in task_results if not r.attention_traces]
if not with_transparency or not without_transparency:
return None
# Get best results from each group
best_with = min(with_transparency, key=lambda r: r.generation_time)
best_without = min(without_transparency, key=lambda r: r.generation_time)
return {
'task_id': task_id,
'with_transparency': {
'generation_time': best_with.generation_time,
'avg_confidence': np.mean(best_with.confidence_scores) if best_with.confidence_scores else 0,
'hallucination_risk': best_with.hallucination_risk,
'solution_length': len(best_with.generated_solution.split('\n'))
},
'without_transparency': {
'generation_time': best_without.generation_time,
'avg_confidence': np.mean(best_without.confidence_scores) if best_without.confidence_scores else 0,
'hallucination_risk': best_without.hallucination_risk,
'solution_length': len(best_without.generated_solution.split('\n'))
},
'improvement': {
'time_delta': best_with.generation_time - best_without.generation_time,
'confidence_delta': (np.mean(best_with.confidence_scores) if best_with.confidence_scores else 0) -
(np.mean(best_without.confidence_scores) if best_without.confidence_scores else 0)
}
}
# Global service instance
swe_bench_service = SWEBenchService() |