""" Utility functions for the Security Scanner MCP server. """ import json import logging from pathlib import Path from typing import Any, Dict # Project root directory PROJECT_ROOT = Path(__file__).parent.parent def load_config() -> Dict[str, Any]: """ Load configuration from mcp_config.json. Returns: Dictionary containing configuration settings """ config_path = PROJECT_ROOT / "mcp_config.json" if not config_path.exists(): raise FileNotFoundError(f"Configuration file not found: {config_path}") with open(config_path, "r", encoding="utf-8") as f: config = json.load(f) return config def setup_logging(debug: bool = False) -> logging.Logger: """ Set up logging based on configuration. Args: debug: If True, set logging level to DEBUG Returns: Configured logger instance """ config = load_config() log_config = config.get("logging", {}) # Determine log level if debug: log_level = logging.DEBUG else: log_level = getattr(logging, log_config.get("level", "INFO")) # Create logs directory if it doesn't exist log_file = log_config.get("file", "logs/mcp_server.log") log_path = PROJECT_ROOT / log_file log_path.parent.mkdir(parents=True, exist_ok=True) # Configure logging handlers = [] # File handler file_handler = logging.FileHandler(log_path, encoding="utf-8") file_handler.setLevel(log_level) handlers.append(file_handler) # Console handler if log_config.get("console", True): console_handler = logging.StreamHandler() console_handler.setLevel(log_level) handlers.append(console_handler) # Set up formatter if log_config.get("json_format", False): formatter = logging.Formatter( '{"time":"%(asctime)s","level":"%(levelname)s","message":"%(message)s"}' ) else: formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) for handler in handlers: handler.setFormatter(formatter) # Create and configure logger logger = logging.getLogger("security-scanner-mcp") logger.setLevel(log_level) logger.handlers = [] # Clear any existing handlers for handler in handlers: logger.addHandler(handler) return logger def validate_severity_threshold(threshold: str) -> bool: """ Validate severity threshold value. Args: threshold: Severity threshold string Returns: True if valid, False otherwise """ valid_thresholds = ["CRITICAL", "HIGH", "MEDIUM", "LOW"] return threshold.upper() in valid_thresholds def get_severity_order() -> Dict[str, int]: """ Get severity level ordering from configuration. Returns: Dictionary mapping severity levels to numeric order """ config = load_config() return config.get("severity", {}).get("thresholds", { "CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3 }) def filter_by_severity( vulnerabilities: list, threshold: str ) -> list: """ Filter vulnerabilities by severity threshold. Args: vulnerabilities: List of vulnerability dictionaries threshold: Minimum severity threshold Returns: Filtered list of vulnerabilities """ severity_order = get_severity_order() threshold_value = severity_order.get(threshold.upper(), 2) filtered = [] for vuln in vulnerabilities: vuln_severity = vuln.get("severity", "LOW").upper() vuln_value = severity_order.get(vuln_severity, 3) if vuln_value <= threshold_value: filtered.append(vuln) return filtered