mcpmark / src /mcp_services /postgres /postgres_login_helper.py
haochengsama's picture
Add files using upload-large-folder tool
a2ec7b6 verified
Raw
History Blame Contribute Delete
4.47 kB
"""
PostgreSQL Login Helper for MCPMark
====================================
Handles PostgreSQL authentication and connection validation.
"""
import json
import psycopg2
from pathlib import Path
from typing import Optional, Dict, Any
from src.base.login_helper import BaseLoginHelper
from src.logger import get_logger
logger = get_logger(__name__)
class PostgresLoginHelper(BaseLoginHelper):
"""Handles PostgreSQL authentication and connection validation."""
def __init__(
self,
host: str = "localhost",
port: int = 5432,
database: str = "postgres",
username: str = "postgres",
password: str = None,
state_path: Optional[Path] = None,
):
"""Initialize PostgreSQL login helper.
Args:
host: Database host
port: Database port
database: Database name
username: Database username
password: Database password
state_path: Path to save connection state
"""
super().__init__()
self.host = host
self.port = port
self.database = database
self.username = username
self.password = password
self.state_path = state_path or Path.home() / ".mcpbench" / "postgres_auth.json"
# Ensure state directory exists
self.state_path.parent.mkdir(parents=True, exist_ok=True)
def login(self, **kwargs) -> bool:
"""Test PostgreSQL connection and save state.
Returns:
bool: True if connection successful
"""
try:
# Test connection
conn = psycopg2.connect(
host=self.host,
port=self.port,
database=self.database,
user=self.username,
password=self.password,
connect_timeout=10,
)
# Execute test query
with conn.cursor() as cur:
cur.execute("SELECT version()")
version = cur.fetchone()[0]
logger.info(f"PostgreSQL connection successful: {version}")
# Check permissions
cur.execute(
"""
SELECT has_database_privilege(%s, 'CREATE')
""",
(self.database,),
)
can_create = cur.fetchone()[0]
if not can_create:
logger.warning("User does not have CREATE privilege on database")
conn.close()
# Save connection state
self._save_connection_state(
{
"host": self.host,
"port": self.port,
"database": self.database,
"username": self.username,
"version": version,
"can_create": can_create,
"authenticated_at": self._get_current_timestamp(),
}
)
return True
except psycopg2.Error as e:
logger.error(f"PostgreSQL connection failed: {e}")
return False
except Exception as e:
logger.error(f"Unexpected error during PostgreSQL login: {e}")
return False
def _save_connection_state(self, state: Dict[str, Any]):
"""Save connection state to file."""
try:
# Don't save password
safe_state = {k: v for k, v in state.items() if k != "password"}
with open(self.state_path, "w") as f:
json.dump(safe_state, f, indent=2)
# Set restrictive permissions
self.state_path.chmod(0o600)
logger.info(f"Connection state saved to: {self.state_path}")
except Exception as e:
logger.error(f"Failed to save connection state: {e}")
def _get_current_timestamp(self) -> str:
"""Get current timestamp in ISO format."""
from datetime import datetime, timezone
return datetime.now(timezone.utc).isoformat()
def is_connected(self) -> bool:
"""Check if we can connect to PostgreSQL."""
return self.login()
def get_connection_params(self) -> Dict[str, Any]:
"""Get connection parameters (without password)."""
return {
"host": self.host,
"port": self.port,
"database": self.database,
"user": self.username,
}