#!/usr/bin/env python3 """ Security Scanner MCP Server An MCP server that scans Python code for security vulnerabilities and provides beginner-friendly explanations. """ import argparse import asyncio import json import sys import tempfile from pathlib import Path from typing import Any, Dict, List # MCP imports from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent # Local imports from utils import ( load_config, setup_logging, validate_severity_threshold, filter_by_severity, ) # Initialize logger (will be configured in main) logger = None class SecurityScannerServer: """MCP server for security scanning.""" def __init__(self, debug: bool = False): """ Initialize the Security Scanner MCP server. Args: debug: Enable debug logging """ global logger logger = setup_logging(debug=debug) self.config = load_config() self.server = Server(self.config["server"]["name"]) self.debug = debug logger.info( f"Initializing {self.config['server']['name']} " f"v{self.config['server']['version']}" ) # Register handlers self._register_handlers() def _register_handlers(self): """Register MCP tool handlers.""" @self.server.list_tools() async def list_tools() -> List[Tool]: """List available tools.""" return [ Tool( name="scan_security", description=self.config["tools"]["scan_security"]["description"], inputSchema={ "type": "object", "properties": { "code": { "type": "string", "description": "분석할 Python 소스 코드", }, "severity_threshold": { "type": "string", "enum": ["CRITICAL", "HIGH", "MEDIUM", "LOW"], "description": "보고할 최소 심각도 수준", "default": self.config["severity"]["default_threshold"], }, }, "required": ["code"], }, ) ] @self.server.call_tool() async def call_tool(name: str, arguments: Any) -> List[TextContent]: """ Handle tool calls. Args: name: Tool name arguments: Tool arguments Returns: List of text content with results """ if name != "scan_security": raise ValueError(f"Unknown tool: {name}") logger.info(f"Tool called: {name}") logger.debug(f"Arguments: {arguments}") try: result = await self._scan_security(arguments) return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))] except Exception as e: logger.error(f"Error during security scan: {e}", exc_info=True) error_result = { "error": str(e), "summary": { "total_issues": 0, "critical": 0, "high": 0, "medium": 0, "low": 0, }, "vulnerabilities": [], } return [TextContent(type="text", text=json.dumps(error_result, indent=2, ensure_ascii=False))] async def _scan_security(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """ Perform security scan on provided code. Args: arguments: Dictionary containing 'code' and optional 'severity_threshold' Returns: Dictionary with scan results """ # Extract and validate arguments code = arguments.get("code", "").strip() if not code: raise ValueError("Code parameter is required and cannot be empty") severity_threshold = arguments.get( "severity_threshold", self.config["severity"]["default_threshold"] ).upper() if not validate_severity_threshold(severity_threshold): raise ValueError( f"Invalid severity threshold: {severity_threshold}. " f"Must be one of: CRITICAL, HIGH, MEDIUM, LOW" ) logger.info(f"Starting security scan (threshold: {severity_threshold})") logger.debug(f"Code length: {len(code)} characters") # Check file size limit max_size_mb = self.config["performance"]["max_file_size_mb"] code_size_mb = len(code.encode("utf-8")) / (1024 * 1024) if code_size_mb > max_size_mb: raise ValueError( f"Code size ({code_size_mb:.2f}MB) exceeds maximum " f"allowed size ({max_size_mb}MB)" ) # Create temporary file for scanning with tempfile.NamedTemporaryFile( mode="w", suffix=".py", delete=False, encoding="utf-8" ) as tmp_file: tmp_file.write(code) tmp_file_path = tmp_file.name try: # Collect results from all enabled scanners all_vulnerabilities = [] # Run pattern detector if self.config["scanners"]["pattern_detector"]["enabled"]: logger.info("Running pattern detector...") try: from scanner.pattern_detector import scan_patterns pattern_results = scan_patterns(tmp_file_path, code) all_vulnerabilities.extend(pattern_results) logger.info(f"Pattern detector found {len(pattern_results)} issues") except ImportError: logger.warning("Pattern detector module not available (not yet implemented)") except Exception as e: logger.error(f"Pattern detector error: {e}") # Run SQL injection detector if self.config["scanners"]["sql_injection"]["enabled"]: logger.info("Running SQL injection detector...") try: from scanner.sql_injection import scan_sql_injection sql_results = scan_sql_injection(tmp_file_path, code) all_vulnerabilities.extend(sql_results) logger.info(f"SQL injection detector found {len(sql_results)} issues") except ImportError: logger.warning("SQL injection detector module not available (not yet implemented)") except Exception as e: logger.error(f"SQL injection detector error: {e}") # Run bandit if self.config["scanners"]["bandit"]["enabled"]: logger.info("Running bandit scanner...") try: from scanner.bandit_wrapper import scan_with_bandit bandit_results = scan_with_bandit(tmp_file_path) all_vulnerabilities.extend(bandit_results) logger.info(f"Bandit found {len(bandit_results)} issues") except ImportError: logger.warning("Bandit wrapper module not available (not yet implemented)") except Exception as e: logger.error(f"Bandit scanner error: {e}") # Run semgrep if self.config["scanners"]["semgrep"]["enabled"]: logger.info("Running semgrep scanner...") try: from scanner.semgrep_wrapper import scan_with_semgrep semgrep_results = scan_with_semgrep(tmp_file_path) all_vulnerabilities.extend(semgrep_results) logger.info(f"Semgrep found {len(semgrep_results)} issues") except ImportError: logger.warning("Semgrep wrapper module not available (not yet implemented)") except Exception as e: logger.error(f"Semgrep scanner error: {e}") # Filter by severity threshold filtered_vulnerabilities = filter_by_severity( all_vulnerabilities, severity_threshold ) logger.info( f"Total issues found: {len(all_vulnerabilities)}, " f"after filtering: {len(filtered_vulnerabilities)}" ) # Format results try: from formatter import format_results formatted_results = format_results( filtered_vulnerabilities, severity_threshold ) except ImportError: logger.warning("Formatter module not available, using basic format") formatted_results = self._basic_format_results( filtered_vulnerabilities, severity_threshold ) return formatted_results finally: # Clean up temporary file try: Path(tmp_file_path).unlink() except Exception as e: logger.warning(f"Failed to delete temporary file: {e}") def _basic_format_results( self, vulnerabilities: List[Dict[str, Any]], threshold: str ) -> Dict[str, Any]: """ Basic result formatting when formatter module is not available. Args: vulnerabilities: List of vulnerabilities threshold: Severity threshold used Returns: Formatted results dictionary """ # Count by severity severity_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0} for vuln in vulnerabilities: severity = vuln.get("severity", "LOW").lower() if severity in severity_counts: severity_counts[severity] += 1 return { "summary": { "total_issues": len(vulnerabilities), "critical": severity_counts["critical"], "high": severity_counts["high"], "medium": severity_counts["medium"], "low": severity_counts["low"], "severity_threshold": threshold, }, "vulnerabilities": vulnerabilities, } async def run(self): """Run the MCP server.""" logger.info("Starting MCP server...") async with stdio_server() as (read_stream, write_stream): logger.info("Server is ready and listening on stdio") await self.server.run( read_stream, write_stream, self.server.create_initialization_options() ) def main(): """Main entry point for the MCP server.""" parser = argparse.ArgumentParser( description="Security Scanner MCP Server" ) parser.add_argument( "--debug", action="store_true", help="Enable debug logging" ) args = parser.parse_args() # Create and run server server = SecurityScannerServer(debug=args.debug) try: asyncio.run(server.run()) except KeyboardInterrupt: if logger: logger.info("Server stopped by user") sys.exit(0) except Exception as e: if logger: logger.error(f"Server error: {e}", exc_info=True) sys.exit(1) if __name__ == "__main__": main()