| """ |
| PostgreSQL Task Manager for MCPMark |
| ==================================== |
| |
| Manages PostgreSQL task discovery, execution, and verification. |
| """ |
|
|
| import os |
| import subprocess |
| import sys |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional |
|
|
| from src.base.task_manager import BaseTask, BaseTaskManager |
| from src.logger import get_logger |
|
|
| logger = get_logger(__name__) |
|
|
|
|
| @dataclass |
| class PostgresTask(BaseTask): |
| """PostgreSQL-specific task with database information.""" |
|
|
| task_name: str = "" |
| database_name: Optional[str] = None |
| database_url: Optional[str] = None |
| expected_queries: Optional[List[str]] = None |
| expected_tables: Optional[List[str]] = None |
|
|
|
|
| class PostgresTaskManager(BaseTaskManager): |
| """Manages PostgreSQL tasks for MCPMark evaluation.""" |
|
|
| def __init__(self, tasks_root: Path = None, task_suite: str = "standard"): |
| """Initialize PostgreSQL task manager. |
| |
| Args: |
| tasks_root: Path to tasks directory |
| task_suite: Logical task suite (e.g., 'standard', 'easy') |
| """ |
| if tasks_root is None: |
| tasks_root = Path(__file__).resolve().parents[3] / "tasks" |
|
|
| super().__init__( |
| tasks_root, |
| mcp_service="postgres", |
| task_class=PostgresTask, |
| task_organization="file", |
| task_suite=task_suite, |
| ) |
|
|
| def _create_task_from_files( |
| self, category_id: str, task_files_info: Dict[str, Any] |
| ) -> Optional[PostgresTask]: |
| """Instantiate a `PostgresTask` from the dictionary returned by `_find_task_files`.""" |
| import json |
| |
| |
| meta_path = task_files_info["instruction_path"].parent / "meta.json" |
| final_category_id = category_id |
| task_id = task_files_info["task_id"] |
| |
| if meta_path.exists(): |
| try: |
| with open(meta_path, 'r') as f: |
| meta_data = json.load(f) |
| |
| final_category_id = meta_data.get("category_id", category_id) |
| task_id = meta_data.get("task_id", task_id) |
| except Exception as e: |
| logger.warning(f"Failed to load meta.json from {meta_path}: {e}") |
|
|
| return PostgresTask( |
| task_instruction_path=task_files_info["instruction_path"], |
| task_verification_path=task_files_info["verification_path"], |
| service="postgres", |
| category_id=final_category_id, |
| task_id=task_id, |
| task_name=task_files_info["task_id"], |
| ) |
|
|
| def _get_verification_command(self, task: PostgresTask) -> List[str]: |
| """Get verification command with database info.""" |
| cmd = [sys.executable, str(task.task_verification_path)] |
|
|
| |
| if task.database_name: |
| cmd.append(task.database_name) |
|
|
| return cmd |
|
|
| def run_verification(self, task: BaseTask) -> subprocess.CompletedProcess: |
| """Run verification with PostgreSQL environment.""" |
| env = os.environ.copy() |
|
|
| |
| if hasattr(task, "database_name") and task.database_name: |
| env["POSTGRES_DATABASE"] = task.database_name |
|
|
| if hasattr(task, "database_url") and task.database_url: |
| env["DATABASE_URL"] = task.database_url |
|
|
| return subprocess.run( |
| self._get_verification_command(task), |
| capture_output=True, |
| text=True, |
| timeout=300, |
| env=env, |
| ) |
|
|
| def _format_task_instruction(self, base_instruction: str) -> str: |
| """Add PostgreSQL-specific instructions.""" |
| return ( |
| base_instruction |
| + "\n\nNote: Use PostgreSQL MCP tools to complete this task. The database connection is already configured." |
| ) |
|
|