| """ |
| Execute and evaluate generated Gurobi code |
| """ |
|
|
| import argparse |
| import json |
| import os |
| import re |
| import subprocess |
| import sys |
| from collections import defaultdict |
| from pathlib import Path |
| from typing import Dict, List |
| from concurrent.futures import ProcessPoolExecutor, as_completed |
| from tqdm import tqdm |
|
|
| from .debug_utils import sanitize_code, save_debug_metadata, write_debug_suggestions |
|
|
| SCRIPT_DIR = Path(__file__).resolve().parent |
| PROJECT_ROOT = SCRIPT_DIR.parent.parent |
| DEFAULT_MEMORY_DIR = PROJECT_ROOT / "memory_storage" |
| DEFAULT_GUIDELINES = DEFAULT_MEMORY_DIR / "category_guidelines.jsonl" |
| DEFAULT_DEBUG_MEMORY = DEFAULT_MEMORY_DIR / "debug_memory.jsonl" |
|
|
|
|
| def extract_objective_value(output: str) -> float: |
| """ |
| Extract objective value from Gurobi output |
| |
| Args: |
| output: stdout from Gurobi code execution |
| |
| Returns: |
| Objective value as float, or None if not found |
| """ |
| if not output or output.strip() == "": |
| return None |
| |
| |
| patterns = [ |
| r'Optimal\s+[Oo]bjective[:\s]+([+-]?\d+\.?\d*(?:[eE][+-]?\d+)?)', |
| r'Obj:\s*([+-]?\d+\.?\d*(?:[eE][+-]?\d+)?)', |
| r'Best\s+objective\s+([+-]?\d+\.?\d*(?:[eE][+-]?\d+)?)', |
| r'Objective\s+value:\s*([+-]?\d+\.?\d*(?:[eE][+-]?\d+)?)', |
| r'OBJECTIVE_VALUE:\s*([+-]?\d+\.?\d*(?:[eE][+-]?\d+)?)', |
| ] |
|
|
| for pattern in patterns: |
| match = re.search(pattern, output, re.IGNORECASE) |
| if match: |
| try: |
| return float(match.group(1)) |
| except ValueError: |
| continue |
|
|
| |
| fallback_patterns = [ |
| r'Total\s+Cost[:\s]+([+-]?\d+\.?\d*(?:[eE][+-]?\d+)?)', |
| r'Total\s+Profit[:\s]+([+-]?\d+\.?\d*(?:[eE][+-]?\d+)?)', |
| r'Total\s+Net\s+Profit[:\s]+([+-]?\d+\.?\d*(?:[eE][+-]?\d+)?)', |
| r'Total\s+Revenue[:\s]+([+-]?\d+\.?\d*(?:[eE][+-]?\d+)?)', |
| ] |
|
|
| for pattern in fallback_patterns: |
| match = re.search(pattern, output, re.IGNORECASE) |
| if match: |
| try: |
| return float(match.group(1)) |
| except ValueError: |
| continue |
|
|
| return None |
|
|
|
|
| def enhance_code_with_objective_print(code: str) -> str: |
| """ |
| Add objective value printing to code if not already present |
| |
| This helps ensure we can extract the objective value even if |
| the generated code doesn't print it explicitly. |
| |
| Note: Always adds a fallback print to handle cases where existing |
| prints are conditional (e.g., inside if status == OPTIMAL blocks) |
| """ |
| |
| enhancement_code = """ |
| # Auto-added: Print objective value for evaluation (fallback) |
| try: |
| # Try common variable names for Gurobi model |
| if 'model' in dir(): |
| mdl = model |
| elif 'm' in dir(): |
| mdl = m |
| elif 'Model' in dir(): |
| mdl = Model |
| else: |
| mdl = None |
| |
| # Fallback: scan globals for a likely Gurobi model instance. |
| # This helps when the generated code uses a non-standard variable name. |
| if mdl is None: |
| try: |
| for _name, _val in list(globals().items()): |
| try: |
| if hasattr(_val, 'objVal') and hasattr(_val, 'optimize'): |
| mdl = _val |
| break |
| except Exception: |
| continue |
| except Exception: |
| pass |
| |
| if mdl is not None and hasattr(mdl, 'objVal'): |
| try: |
| obj_value = mdl.objVal |
| print(f"OBJECTIVE_VALUE: {obj_value}") |
| except: |
| # Model might not have been solved yet |
| pass |
| except: |
| pass |
| """ |
| |
| return code + "\n" + enhancement_code |
|
|
|
|
| def execute_code(code: str, problem_id: int, output_dir: str, timeout: int = 60) -> Dict: |
| """ |
| Execute Gurobi code and capture results |
| |
| Args: |
| code: Python code to execute |
| problem_id: Problem ID |
| output_dir: Directory to save code files |
| timeout: Execution timeout in seconds |
| |
| Returns: |
| Dictionary with execution results |
| """ |
| |
| code_dir = os.path.join(output_dir, 'code') |
| os.makedirs(code_dir, exist_ok=True) |
|
|
| sanitized_code, debug_meta = sanitize_code(code, problem_id) |
| code_enhanced = enhance_code_with_objective_print(sanitized_code) |
| |
| |
| code_file = os.path.join(code_dir, f'problem_{problem_id}.py') |
| with open(code_file, 'w', encoding='utf-8') as f: |
| f.write(code_enhanced) |
|
|
| |
| save_debug_metadata(debug_meta, output_dir) |
| |
| |
| try: |
| result = subprocess.run( |
| [sys.executable, f'problem_{problem_id}.py'], |
| capture_output=True, |
| text=True, |
| timeout=timeout, |
| cwd=code_dir |
| ) |
| |
| stdout = result.stdout |
| stderr = result.stderr |
| returncode = result.returncode |
| |
| if returncode == 0: |
| obj_value = extract_objective_value(stdout) |
| if obj_value is not None: |
| return { |
| 'status': 'success', |
| 'objective_value': obj_value, |
| 'stdout': stdout, |
| 'stderr': stderr |
| } |
| else: |
| return { |
| 'status': 'success_no_objective', |
| 'objective_value': None, |
| 'stdout': stdout, |
| 'stderr': stderr |
| } |
| else: |
| return { |
| 'status': 'execution_error', |
| 'objective_value': None, |
| 'stdout': stdout, |
| 'stderr': stderr, |
| 'returncode': returncode |
| } |
| |
| except subprocess.TimeoutExpired: |
| return { |
| 'status': 'timeout', |
| 'objective_value': None, |
| 'stdout': '', |
| 'stderr': f'Execution timeout after {timeout} seconds' |
| } |
| except Exception as e: |
| return { |
| 'status': 'error', |
| 'objective_value': None, |
| 'stdout': '', |
| 'stderr': str(e) |
| } |
|
|
|
|
| def check_correctness(pred_obj: float, gt_obj: float, tolerance: float = 0.05, |
| use_relative: bool = True) -> bool: |
| """ |
| Check if predicted objective matches ground truth |
| |
| Args: |
| pred_obj: Predicted objective value |
| gt_obj: Ground truth objective value |
| tolerance: Tolerance for comparison |
| use_relative: Use relative tolerance if True, absolute if False |
| |
| Returns: |
| True if values match within tolerance |
| """ |
| if pred_obj is None or gt_obj is None: |
| return False |
| |
| try: |
| pred_obj = float(pred_obj) |
| gt_obj = float(gt_obj) |
| |
| if gt_obj == 0: |
| return abs(pred_obj) <= tolerance |
| |
| if use_relative: |
| return abs((pred_obj - gt_obj) / gt_obj) <= tolerance |
| else: |
| return abs(pred_obj - gt_obj) <= tolerance |
| except (ValueError, TypeError): |
| return False |
|
|
|
|
| def evaluate_results(results: List[Dict], args) -> Dict: |
| """ |
| Evaluate execution results |
| |
| Args: |
| results: List of result dictionaries |
| args: Command line arguments |
| |
| Returns: |
| Evaluation report dictionary |
| """ |
| total = len(results) |
| correct = 0 |
| |
| status_counts = defaultdict(int) |
| correct_ids = [] |
| incorrect_details = [] |
| |
| for result in results: |
| status = result['execution_status'] |
| status_counts[status] += 1 |
| |
| if status == 'success' and result['is_correct']: |
| correct += 1 |
| correct_ids.append(result['id']) |
| elif status == 'success' and not result['is_correct']: |
| incorrect_details.append({ |
| 'id': result['id'], |
| 'predicted': result['predicted_objective'], |
| 'ground_truth': result['ground_truth'] |
| }) |
| |
| accuracy = correct / total if total > 0 else 0.0 |
| |
| report = { |
| 'total_problems': total, |
| 'correct': correct, |
| 'accuracy': accuracy, |
| 'status_counts': dict(status_counts), |
| 'correct_ids': correct_ids, |
| 'incorrect_details': incorrect_details[:10], |
| 'settings': { |
| 'tolerance': args.tolerance, |
| 'use_relative_tolerance': args.use_relative_tolerance, |
| 'timeout': args.timeout |
| } |
| } |
| |
| return report |
|
|
|
|
| def process_single_problem(gen_result, args): |
| """Process a single problem (for parallel execution)""" |
| problem_id = gen_result['id'] |
| code = gen_result['generated_code'] |
| gt_answer = gen_result.get('answer') |
| |
| if not code: |
| result = { |
| 'id': problem_id, |
| 'execution_status': 'no_code', |
| 'predicted_objective': None, |
| 'ground_truth': gt_answer, |
| 'is_correct': False |
| } |
| else: |
| exec_result = execute_code(code, problem_id, args.output_dir, args.timeout) |
| |
| pred_obj = exec_result['objective_value'] |
| is_correct = False |
| |
| if pred_obj is not None and gt_answer is not None: |
| try: |
| gt_obj = float(gt_answer) |
| is_correct = check_correctness( |
| pred_obj, gt_obj, |
| args.tolerance, |
| args.use_relative_tolerance |
| ) |
| except (ValueError, TypeError): |
| is_correct = False |
| |
| result = { |
| 'id': problem_id, |
| 'execution_status': exec_result['status'], |
| 'predicted_objective': pred_obj, |
| 'ground_truth': gt_answer, |
| 'is_correct': is_correct, |
| 'stdout': exec_result['stdout'][:500] if args.save_output else '', |
| 'stderr': exec_result['stderr'][:500] if args.save_output else '' |
| } |
| |
| return result |
|
|
|
|
| def main(args): |
| |
| if not os.path.exists(args.input_file): |
| raise FileNotFoundError(f"Input file not found: {args.input_file}") |
| |
| with open(args.input_file, 'r', encoding='utf-8') as f: |
| generated_results = [json.loads(line) for line in f if line.strip()] |
| |
| print(f"Loaded {len(generated_results)} generated results") |
| |
| |
| os.makedirs(args.output_dir, exist_ok=True) |
| id_to_problem = {record['id']: record for record in generated_results} |
|
|
| debug_store = None |
| memory_helper = None |
| memory_bank = None |
| if not args.disable_debug_memory: |
| try: |
| from .debug_memory import DebugMemoryStore |
| from .memory_bank import MemoryBank |
| from .memory_intelligence import MemoryIntelligence |
| except ModuleNotFoundError as exc: |
| print( |
| f"⚠️ Debug-memory dependencies missing ({exc}). " |
| "Continuing with --disable_debug_memory behavior." |
| ) |
| args.disable_debug_memory = True |
| else: |
| debug_store = DebugMemoryStore(args.debug_memory_path) |
| if args.category_guidelines_path: |
| try: |
| memory_helper = MemoryIntelligence(args.category_guidelines_path) |
| except Exception as exc: |
| print(f"Warning: failed to load category guidelines ({exc})") |
| if args.memory_dir: |
| try: |
| if args.embedding_model: |
| memory_bank = MemoryBank(args.memory_dir, embedding_model=args.embedding_model) |
| else: |
| memory_bank = MemoryBank(args.memory_dir) |
| except Exception as exc: |
| print(f"Warning: failed to load memory bank from {args.memory_dir} ({exc})") |
| |
| |
| evaluation_results = [] |
| |
| if args.num_workers > 1: |
| |
| print(f"Using {args.num_workers} workers for parallel execution") |
| with ProcessPoolExecutor(max_workers=args.num_workers) as executor: |
| |
| future_to_problem = { |
| executor.submit(process_single_problem, gen_result, args): gen_result |
| for gen_result in generated_results |
| } |
| |
| |
| with tqdm(total=len(generated_results), desc="Executing") as pbar: |
| for future in as_completed(future_to_problem): |
| try: |
| result = future.result() |
| evaluation_results.append(result) |
| status_symbol = '✓' if result['is_correct'] else '✗' |
| pbar.set_postfix_str(f"Problem {result['id']}: {status_symbol}") |
| pbar.update(1) |
| except Exception as e: |
| gen_result = future_to_problem[future] |
| print(f"\nError processing problem {gen_result['id']}: {e}") |
| evaluation_results.append({ |
| 'id': gen_result['id'], |
| 'execution_status': 'error', |
| 'predicted_objective': None, |
| 'ground_truth': gen_result.get('answer'), |
| 'is_correct': False, |
| 'stdout': '', |
| 'stderr': str(e) |
| }) |
| pbar.update(1) |
| |
| |
| evaluation_results.sort(key=lambda x: x['id']) |
| else: |
| |
| for gen_result in generated_results: |
| problem_id = gen_result['id'] |
| print(f"Processing problem {problem_id}...", end=' ') |
| |
| result = process_single_problem(gen_result, args) |
| evaluation_results.append(result) |
| |
| status_symbol = '✓' if result['is_correct'] else '✗' |
| print(f"{status_symbol} [{result['execution_status']}]") |
| |
| |
| if not args.disable_debug_memory: |
| for result in evaluation_results: |
| status = result['execution_status'] |
| if status in ('execution_error', 'success_no_objective', 'timeout', 'no_code'): |
| gen_result = id_to_problem.get(result['id'], {}) |
| description = gen_result.get('description', '') |
| error_message = result.get('stderr') or result.get('stdout') or '' |
| if not error_message: |
| if status == 'timeout': |
| error_message = 'Execution timeout' |
| elif status == 'no_code': |
| error_message = 'No code was generated for execution.' |
| elif status == 'success_no_objective': |
| error_message = 'Execution succeeded but no objective value was captured.' |
| write_debug_suggestions( |
| problem_id=result['id'], |
| description=description, |
| error_message=error_message, |
| memory_helper=memory_helper, |
| memory_bank=memory_bank, |
| output_dir=args.output_dir, |
| status=status, |
| debug_store=debug_store, |
| ) |
|
|
| |
| report = evaluate_results(evaluation_results, args) |
| |
| |
| results_file = os.path.join(args.output_dir, 'evaluation_results.jsonl') |
| with open(results_file, 'w', encoding='utf-8') as f: |
| for result in evaluation_results: |
| f.write(json.dumps(result, ensure_ascii=False) + '\n') |
| |
| |
| report_file = os.path.join(args.output_dir, 'evaluation_report.json') |
| with open(report_file, 'w', encoding='utf-8') as f: |
| json.dump(report, f, indent=2, ensure_ascii=False) |
| |
| |
| print(f"\n{'='*60}") |
| print("EVALUATION SUMMARY") |
| print(f"{'='*60}") |
| print(f"Total problems: {report['total_problems']}") |
| print(f"Correct: {report['correct']}") |
| print(f"Accuracy: {report['accuracy']:.2%}") |
| print(f"\nStatus breakdown:") |
| for status, count in sorted(report['status_counts'].items()): |
| print(f" {status:20s}: {count:3d} ({count/report['total_problems']:.1%})") |
| print(f"{'='*60}") |
| print(f"\nResults saved to:") |
| print(f" {results_file}") |
| print(f" {report_file}") |
|
|
|
|
| def parse_args(): |
| parser = argparse.ArgumentParser(description="Execute and evaluate generated Gurobi code") |
| |
| parser.add_argument("--input_file", type=str, required=True, |
| help="Path to generated results JSONL file") |
| parser.add_argument("--output_dir", type=str, required=True, |
| help="Directory to save execution results") |
| parser.add_argument("--timeout", type=int, default=60, |
| help="Timeout for code execution (seconds)") |
| parser.add_argument("--tolerance", type=float, default=0.05, |
| help="Tolerance for answer comparison") |
| parser.add_argument("--use_relative_tolerance", action="store_true", |
| help="Use relative tolerance (default: absolute)") |
| parser.add_argument("--save_output", action="store_true", |
| help="Save stdout/stderr in results") |
| parser.add_argument("--num_workers", type=int, default=100, |
| help="Number of parallel workers for execution") |
| parser.add_argument("--memory_dir", type=str, default=str(DEFAULT_MEMORY_DIR), |
| help="Path to episodic memory directory (used for debug suggestions)") |
| parser.add_argument("--embedding_model", type=str, default=None, |
| help="Optional embedding model name or local path for debug-memory retrieval") |
| parser.add_argument("--category_guidelines_path", type=str, |
| default=str(DEFAULT_GUIDELINES), |
| help="Path to category guideline JSONL file") |
| parser.add_argument("--debug_memory_path", type=str, |
| default=str(DEFAULT_DEBUG_MEMORY), |
| help="Path to persistent debug memory JSONL file") |
| parser.add_argument("--disable_debug_memory", action="store_true", |
| help="Disable memory-based debug suggestions") |
| |
| return parser.parse_args() |
|
|
|
|
| if __name__ == "__main__": |
| args = parse_args() |
| main(args) |
|
|