|
|
|
|
|
""" |
|
|
Security Scanner MCP Server |
|
|
|
|
|
An MCP server that scans Python code for security vulnerabilities |
|
|
and provides beginner-friendly explanations. |
|
|
""" |
|
|
|
|
|
import argparse |
|
|
import asyncio |
|
|
import json |
|
|
import sys |
|
|
import tempfile |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, List |
|
|
|
|
|
|
|
|
from mcp.server import Server |
|
|
from mcp.server.stdio import stdio_server |
|
|
from mcp.types import Tool, TextContent |
|
|
|
|
|
|
|
|
from utils import ( |
|
|
load_config, |
|
|
setup_logging, |
|
|
validate_severity_threshold, |
|
|
filter_by_severity, |
|
|
) |
|
|
|
|
|
|
|
|
logger = None |
|
|
|
|
|
|
|
|
class SecurityScannerServer: |
|
|
"""MCP server for security scanning.""" |
|
|
|
|
|
def __init__(self, debug: bool = False): |
|
|
""" |
|
|
Initialize the Security Scanner MCP server. |
|
|
|
|
|
Args: |
|
|
debug: Enable debug logging |
|
|
""" |
|
|
global logger |
|
|
logger = setup_logging(debug=debug) |
|
|
|
|
|
self.config = load_config() |
|
|
self.server = Server(self.config["server"]["name"]) |
|
|
self.debug = debug |
|
|
|
|
|
logger.info( |
|
|
f"Initializing {self.config['server']['name']} " |
|
|
f"v{self.config['server']['version']}" |
|
|
) |
|
|
|
|
|
|
|
|
self._register_handlers() |
|
|
|
|
|
def _register_handlers(self): |
|
|
"""Register MCP tool handlers.""" |
|
|
|
|
|
@self.server.list_tools() |
|
|
async def list_tools() -> List[Tool]: |
|
|
"""List available tools.""" |
|
|
return [ |
|
|
Tool( |
|
|
name="scan_security", |
|
|
description=self.config["tools"]["scan_security"]["description"], |
|
|
inputSchema={ |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"code": { |
|
|
"type": "string", |
|
|
"description": "๋ถ์ํ Python ์์ค ์ฝ๋", |
|
|
}, |
|
|
"severity_threshold": { |
|
|
"type": "string", |
|
|
"enum": ["CRITICAL", "HIGH", "MEDIUM", "LOW"], |
|
|
"description": "๋ณด๊ณ ํ ์ต์ ์ฌ๊ฐ๋ ์์ค", |
|
|
"default": self.config["severity"]["default_threshold"], |
|
|
}, |
|
|
}, |
|
|
"required": ["code"], |
|
|
}, |
|
|
) |
|
|
] |
|
|
|
|
|
@self.server.call_tool() |
|
|
async def call_tool(name: str, arguments: Any) -> List[TextContent]: |
|
|
""" |
|
|
Handle tool calls. |
|
|
|
|
|
Args: |
|
|
name: Tool name |
|
|
arguments: Tool arguments |
|
|
|
|
|
Returns: |
|
|
List of text content with results |
|
|
""" |
|
|
if name != "scan_security": |
|
|
raise ValueError(f"Unknown tool: {name}") |
|
|
|
|
|
logger.info(f"Tool called: {name}") |
|
|
logger.debug(f"Arguments: {arguments}") |
|
|
|
|
|
try: |
|
|
result = await self._scan_security(arguments) |
|
|
return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))] |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error during security scan: {e}", exc_info=True) |
|
|
error_result = { |
|
|
"error": str(e), |
|
|
"summary": { |
|
|
"total_issues": 0, |
|
|
"critical": 0, |
|
|
"high": 0, |
|
|
"medium": 0, |
|
|
"low": 0, |
|
|
}, |
|
|
"vulnerabilities": [], |
|
|
} |
|
|
return [TextContent(type="text", text=json.dumps(error_result, indent=2, ensure_ascii=False))] |
|
|
|
|
|
async def _scan_security(self, arguments: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Perform security scan on provided code. |
|
|
|
|
|
Args: |
|
|
arguments: Dictionary containing 'code' and optional 'severity_threshold' |
|
|
|
|
|
Returns: |
|
|
Dictionary with scan results |
|
|
""" |
|
|
|
|
|
code = arguments.get("code", "").strip() |
|
|
if not code: |
|
|
raise ValueError("Code parameter is required and cannot be empty") |
|
|
|
|
|
severity_threshold = arguments.get( |
|
|
"severity_threshold", |
|
|
self.config["severity"]["default_threshold"] |
|
|
).upper() |
|
|
|
|
|
if not validate_severity_threshold(severity_threshold): |
|
|
raise ValueError( |
|
|
f"Invalid severity threshold: {severity_threshold}. " |
|
|
f"Must be one of: CRITICAL, HIGH, MEDIUM, LOW" |
|
|
) |
|
|
|
|
|
logger.info(f"Starting security scan (threshold: {severity_threshold})") |
|
|
logger.debug(f"Code length: {len(code)} characters") |
|
|
|
|
|
|
|
|
max_size_mb = self.config["performance"]["max_file_size_mb"] |
|
|
code_size_mb = len(code.encode("utf-8")) / (1024 * 1024) |
|
|
if code_size_mb > max_size_mb: |
|
|
raise ValueError( |
|
|
f"Code size ({code_size_mb:.2f}MB) exceeds maximum " |
|
|
f"allowed size ({max_size_mb}MB)" |
|
|
) |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile( |
|
|
mode="w", |
|
|
suffix=".py", |
|
|
delete=False, |
|
|
encoding="utf-8" |
|
|
) as tmp_file: |
|
|
tmp_file.write(code) |
|
|
tmp_file_path = tmp_file.name |
|
|
|
|
|
try: |
|
|
|
|
|
all_vulnerabilities = [] |
|
|
|
|
|
|
|
|
if self.config["scanners"]["pattern_detector"]["enabled"]: |
|
|
logger.info("Running pattern detector...") |
|
|
try: |
|
|
from scanner.pattern_detector import scan_patterns |
|
|
pattern_results = scan_patterns(tmp_file_path, code) |
|
|
all_vulnerabilities.extend(pattern_results) |
|
|
logger.info(f"Pattern detector found {len(pattern_results)} issues") |
|
|
except ImportError: |
|
|
logger.warning("Pattern detector module not available (not yet implemented)") |
|
|
except Exception as e: |
|
|
logger.error(f"Pattern detector error: {e}") |
|
|
|
|
|
|
|
|
if self.config["scanners"]["sql_injection"]["enabled"]: |
|
|
logger.info("Running SQL injection detector...") |
|
|
try: |
|
|
from scanner.sql_injection import scan_sql_injection |
|
|
sql_results = scan_sql_injection(tmp_file_path, code) |
|
|
all_vulnerabilities.extend(sql_results) |
|
|
logger.info(f"SQL injection detector found {len(sql_results)} issues") |
|
|
except ImportError: |
|
|
logger.warning("SQL injection detector module not available (not yet implemented)") |
|
|
except Exception as e: |
|
|
logger.error(f"SQL injection detector error: {e}") |
|
|
|
|
|
|
|
|
if self.config["scanners"]["bandit"]["enabled"]: |
|
|
logger.info("Running bandit scanner...") |
|
|
try: |
|
|
from scanner.bandit_wrapper import scan_with_bandit |
|
|
bandit_results = scan_with_bandit(tmp_file_path) |
|
|
all_vulnerabilities.extend(bandit_results) |
|
|
logger.info(f"Bandit found {len(bandit_results)} issues") |
|
|
except ImportError: |
|
|
logger.warning("Bandit wrapper module not available (not yet implemented)") |
|
|
except Exception as e: |
|
|
logger.error(f"Bandit scanner error: {e}") |
|
|
|
|
|
|
|
|
if self.config["scanners"]["semgrep"]["enabled"]: |
|
|
logger.info("Running semgrep scanner...") |
|
|
try: |
|
|
from scanner.semgrep_wrapper import scan_with_semgrep |
|
|
semgrep_results = scan_with_semgrep(tmp_file_path) |
|
|
all_vulnerabilities.extend(semgrep_results) |
|
|
logger.info(f"Semgrep found {len(semgrep_results)} issues") |
|
|
except ImportError: |
|
|
logger.warning("Semgrep wrapper module not available (not yet implemented)") |
|
|
except Exception as e: |
|
|
logger.error(f"Semgrep scanner error: {e}") |
|
|
|
|
|
|
|
|
filtered_vulnerabilities = filter_by_severity( |
|
|
all_vulnerabilities, |
|
|
severity_threshold |
|
|
) |
|
|
|
|
|
logger.info( |
|
|
f"Total issues found: {len(all_vulnerabilities)}, " |
|
|
f"after filtering: {len(filtered_vulnerabilities)}" |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
from formatter import format_results |
|
|
formatted_results = format_results( |
|
|
filtered_vulnerabilities, |
|
|
severity_threshold |
|
|
) |
|
|
except ImportError: |
|
|
logger.warning("Formatter module not available, using basic format") |
|
|
formatted_results = self._basic_format_results( |
|
|
filtered_vulnerabilities, |
|
|
severity_threshold |
|
|
) |
|
|
|
|
|
return formatted_results |
|
|
|
|
|
finally: |
|
|
|
|
|
try: |
|
|
Path(tmp_file_path).unlink() |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to delete temporary file: {e}") |
|
|
|
|
|
def _basic_format_results( |
|
|
self, |
|
|
vulnerabilities: List[Dict[str, Any]], |
|
|
threshold: str |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Basic result formatting when formatter module is not available. |
|
|
|
|
|
Args: |
|
|
vulnerabilities: List of vulnerabilities |
|
|
threshold: Severity threshold used |
|
|
|
|
|
Returns: |
|
|
Formatted results dictionary |
|
|
""" |
|
|
|
|
|
severity_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0} |
|
|
|
|
|
for vuln in vulnerabilities: |
|
|
severity = vuln.get("severity", "LOW").lower() |
|
|
if severity in severity_counts: |
|
|
severity_counts[severity] += 1 |
|
|
|
|
|
return { |
|
|
"summary": { |
|
|
"total_issues": len(vulnerabilities), |
|
|
"critical": severity_counts["critical"], |
|
|
"high": severity_counts["high"], |
|
|
"medium": severity_counts["medium"], |
|
|
"low": severity_counts["low"], |
|
|
"severity_threshold": threshold, |
|
|
}, |
|
|
"vulnerabilities": vulnerabilities, |
|
|
} |
|
|
|
|
|
async def run(self): |
|
|
"""Run the MCP server.""" |
|
|
logger.info("Starting MCP server...") |
|
|
|
|
|
async with stdio_server() as (read_stream, write_stream): |
|
|
logger.info("Server is ready and listening on stdio") |
|
|
await self.server.run( |
|
|
read_stream, |
|
|
write_stream, |
|
|
self.server.create_initialization_options() |
|
|
) |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main entry point for the MCP server.""" |
|
|
parser = argparse.ArgumentParser( |
|
|
description="Security Scanner MCP Server" |
|
|
) |
|
|
parser.add_argument( |
|
|
"--debug", |
|
|
action="store_true", |
|
|
help="Enable debug logging" |
|
|
) |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
server = SecurityScannerServer(debug=args.debug) |
|
|
|
|
|
try: |
|
|
asyncio.run(server.run()) |
|
|
except KeyboardInterrupt: |
|
|
if logger: |
|
|
logger.info("Server stopped by user") |
|
|
sys.exit(0) |
|
|
except Exception as e: |
|
|
if logger: |
|
|
logger.error(f"Server error: {e}", exc_info=True) |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |