garibong's picture
Add Gradio app with MCP server support
f8e78b2
#!/usr/bin/env python3
"""
Security Scanner MCP Server
An MCP server that scans Python code for security vulnerabilities
and provides beginner-friendly explanations.
"""
import argparse
import asyncio
import json
import sys
import tempfile
from pathlib import Path
from typing import Any, Dict, List
# MCP imports
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
# Local imports
from utils import (
load_config,
setup_logging,
validate_severity_threshold,
filter_by_severity,
)
# Initialize logger (will be configured in main)
logger = None
class SecurityScannerServer:
"""MCP server for security scanning."""
def __init__(self, debug: bool = False):
"""
Initialize the Security Scanner MCP server.
Args:
debug: Enable debug logging
"""
global logger
logger = setup_logging(debug=debug)
self.config = load_config()
self.server = Server(self.config["server"]["name"])
self.debug = debug
logger.info(
f"Initializing {self.config['server']['name']} "
f"v{self.config['server']['version']}"
)
# Register handlers
self._register_handlers()
def _register_handlers(self):
"""Register MCP tool handlers."""
@self.server.list_tools()
async def list_tools() -> List[Tool]:
"""List available tools."""
return [
Tool(
name="scan_security",
description=self.config["tools"]["scan_security"]["description"],
inputSchema={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "๋ถ„์„ํ•  Python ์†Œ์Šค ์ฝ”๋“œ",
},
"severity_threshold": {
"type": "string",
"enum": ["CRITICAL", "HIGH", "MEDIUM", "LOW"],
"description": "๋ณด๊ณ ํ•  ์ตœ์†Œ ์‹ฌ๊ฐ๋„ ์ˆ˜์ค€",
"default": self.config["severity"]["default_threshold"],
},
},
"required": ["code"],
},
)
]
@self.server.call_tool()
async def call_tool(name: str, arguments: Any) -> List[TextContent]:
"""
Handle tool calls.
Args:
name: Tool name
arguments: Tool arguments
Returns:
List of text content with results
"""
if name != "scan_security":
raise ValueError(f"Unknown tool: {name}")
logger.info(f"Tool called: {name}")
logger.debug(f"Arguments: {arguments}")
try:
result = await self._scan_security(arguments)
return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))]
except Exception as e:
logger.error(f"Error during security scan: {e}", exc_info=True)
error_result = {
"error": str(e),
"summary": {
"total_issues": 0,
"critical": 0,
"high": 0,
"medium": 0,
"low": 0,
},
"vulnerabilities": [],
}
return [TextContent(type="text", text=json.dumps(error_result, indent=2, ensure_ascii=False))]
async def _scan_security(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""
Perform security scan on provided code.
Args:
arguments: Dictionary containing 'code' and optional 'severity_threshold'
Returns:
Dictionary with scan results
"""
# Extract and validate arguments
code = arguments.get("code", "").strip()
if not code:
raise ValueError("Code parameter is required and cannot be empty")
severity_threshold = arguments.get(
"severity_threshold",
self.config["severity"]["default_threshold"]
).upper()
if not validate_severity_threshold(severity_threshold):
raise ValueError(
f"Invalid severity threshold: {severity_threshold}. "
f"Must be one of: CRITICAL, HIGH, MEDIUM, LOW"
)
logger.info(f"Starting security scan (threshold: {severity_threshold})")
logger.debug(f"Code length: {len(code)} characters")
# Check file size limit
max_size_mb = self.config["performance"]["max_file_size_mb"]
code_size_mb = len(code.encode("utf-8")) / (1024 * 1024)
if code_size_mb > max_size_mb:
raise ValueError(
f"Code size ({code_size_mb:.2f}MB) exceeds maximum "
f"allowed size ({max_size_mb}MB)"
)
# Create temporary file for scanning
with tempfile.NamedTemporaryFile(
mode="w",
suffix=".py",
delete=False,
encoding="utf-8"
) as tmp_file:
tmp_file.write(code)
tmp_file_path = tmp_file.name
try:
# Collect results from all enabled scanners
all_vulnerabilities = []
# Run pattern detector
if self.config["scanners"]["pattern_detector"]["enabled"]:
logger.info("Running pattern detector...")
try:
from scanner.pattern_detector import scan_patterns
pattern_results = scan_patterns(tmp_file_path, code)
all_vulnerabilities.extend(pattern_results)
logger.info(f"Pattern detector found {len(pattern_results)} issues")
except ImportError:
logger.warning("Pattern detector module not available (not yet implemented)")
except Exception as e:
logger.error(f"Pattern detector error: {e}")
# Run SQL injection detector
if self.config["scanners"]["sql_injection"]["enabled"]:
logger.info("Running SQL injection detector...")
try:
from scanner.sql_injection import scan_sql_injection
sql_results = scan_sql_injection(tmp_file_path, code)
all_vulnerabilities.extend(sql_results)
logger.info(f"SQL injection detector found {len(sql_results)} issues")
except ImportError:
logger.warning("SQL injection detector module not available (not yet implemented)")
except Exception as e:
logger.error(f"SQL injection detector error: {e}")
# Run bandit
if self.config["scanners"]["bandit"]["enabled"]:
logger.info("Running bandit scanner...")
try:
from scanner.bandit_wrapper import scan_with_bandit
bandit_results = scan_with_bandit(tmp_file_path)
all_vulnerabilities.extend(bandit_results)
logger.info(f"Bandit found {len(bandit_results)} issues")
except ImportError:
logger.warning("Bandit wrapper module not available (not yet implemented)")
except Exception as e:
logger.error(f"Bandit scanner error: {e}")
# Run semgrep
if self.config["scanners"]["semgrep"]["enabled"]:
logger.info("Running semgrep scanner...")
try:
from scanner.semgrep_wrapper import scan_with_semgrep
semgrep_results = scan_with_semgrep(tmp_file_path)
all_vulnerabilities.extend(semgrep_results)
logger.info(f"Semgrep found {len(semgrep_results)} issues")
except ImportError:
logger.warning("Semgrep wrapper module not available (not yet implemented)")
except Exception as e:
logger.error(f"Semgrep scanner error: {e}")
# Filter by severity threshold
filtered_vulnerabilities = filter_by_severity(
all_vulnerabilities,
severity_threshold
)
logger.info(
f"Total issues found: {len(all_vulnerabilities)}, "
f"after filtering: {len(filtered_vulnerabilities)}"
)
# Format results
try:
from formatter import format_results
formatted_results = format_results(
filtered_vulnerabilities,
severity_threshold
)
except ImportError:
logger.warning("Formatter module not available, using basic format")
formatted_results = self._basic_format_results(
filtered_vulnerabilities,
severity_threshold
)
return formatted_results
finally:
# Clean up temporary file
try:
Path(tmp_file_path).unlink()
except Exception as e:
logger.warning(f"Failed to delete temporary file: {e}")
def _basic_format_results(
self,
vulnerabilities: List[Dict[str, Any]],
threshold: str
) -> Dict[str, Any]:
"""
Basic result formatting when formatter module is not available.
Args:
vulnerabilities: List of vulnerabilities
threshold: Severity threshold used
Returns:
Formatted results dictionary
"""
# Count by severity
severity_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0}
for vuln in vulnerabilities:
severity = vuln.get("severity", "LOW").lower()
if severity in severity_counts:
severity_counts[severity] += 1
return {
"summary": {
"total_issues": len(vulnerabilities),
"critical": severity_counts["critical"],
"high": severity_counts["high"],
"medium": severity_counts["medium"],
"low": severity_counts["low"],
"severity_threshold": threshold,
},
"vulnerabilities": vulnerabilities,
}
async def run(self):
"""Run the MCP server."""
logger.info("Starting MCP server...")
async with stdio_server() as (read_stream, write_stream):
logger.info("Server is ready and listening on stdio")
await self.server.run(
read_stream,
write_stream,
self.server.create_initialization_options()
)
def main():
"""Main entry point for the MCP server."""
parser = argparse.ArgumentParser(
description="Security Scanner MCP Server"
)
parser.add_argument(
"--debug",
action="store_true",
help="Enable debug logging"
)
args = parser.parse_args()
# Create and run server
server = SecurityScannerServer(debug=args.debug)
try:
asyncio.run(server.run())
except KeyboardInterrupt:
if logger:
logger.info("Server stopped by user")
sys.exit(0)
except Exception as e:
if logger:
logger.error(f"Server error: {e}", exc_info=True)
sys.exit(1)
if __name__ == "__main__":
main()