""" PostgreSQL Task Manager for MCPMark ==================================== Manages PostgreSQL task discovery, execution, and verification. """ import os import subprocess import sys from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional from src.base.task_manager import BaseTask, BaseTaskManager from src.logger import get_logger logger = get_logger(__name__) @dataclass class PostgresTask(BaseTask): """PostgreSQL-specific task with database information.""" task_name: str = "" database_name: Optional[str] = None database_url: Optional[str] = None expected_queries: Optional[List[str]] = None expected_tables: Optional[List[str]] = None class PostgresTaskManager(BaseTaskManager): """Manages PostgreSQL tasks for MCPMark evaluation.""" def __init__(self, tasks_root: Path = None, task_suite: str = "standard"): """Initialize PostgreSQL task manager. Args: tasks_root: Path to tasks directory task_suite: Logical task suite (e.g., 'standard', 'easy') """ if tasks_root is None: tasks_root = Path(__file__).resolve().parents[3] / "tasks" super().__init__( tasks_root, mcp_service="postgres", task_class=PostgresTask, task_organization="file", # PostgreSQL uses file-based tasks task_suite=task_suite, ) def _create_task_from_files( self, category_id: str, task_files_info: Dict[str, Any] ) -> Optional[PostgresTask]: """Instantiate a `PostgresTask` from the dictionary returned by `_find_task_files`.""" import json # Check for meta.json meta_path = task_files_info["instruction_path"].parent / "meta.json" final_category_id = category_id task_id = task_files_info["task_id"] if meta_path.exists(): try: with open(meta_path, 'r') as f: meta_data = json.load(f) # Use values from meta.json if available final_category_id = meta_data.get("category_id", category_id) task_id = meta_data.get("task_id", task_id) except Exception as e: logger.warning(f"Failed to load meta.json from {meta_path}: {e}") return PostgresTask( task_instruction_path=task_files_info["instruction_path"], task_verification_path=task_files_info["verification_path"], service="postgres", category_id=final_category_id, task_id=task_id, task_name=task_files_info["task_id"], ) def _get_verification_command(self, task: PostgresTask) -> List[str]: """Get verification command with database info.""" cmd = [sys.executable, str(task.task_verification_path)] # Pass database name as argument if available if task.database_name: cmd.append(task.database_name) return cmd def run_verification(self, task: BaseTask) -> subprocess.CompletedProcess: """Run verification with PostgreSQL environment.""" env = os.environ.copy() # Pass database connection info to verification script if hasattr(task, "database_name") and task.database_name: env["POSTGRES_DATABASE"] = task.database_name if hasattr(task, "database_url") and task.database_url: env["DATABASE_URL"] = task.database_url return subprocess.run( self._get_verification_command(task), capture_output=True, text=True, timeout=300, env=env, ) def _format_task_instruction(self, base_instruction: str) -> str: """Add PostgreSQL-specific instructions.""" return ( base_instruction + "\n\nNote: Use PostgreSQL MCP tools to complete this task. The database connection is already configured." )