garibong commited on
Commit
f8e78b2
·
1 Parent(s): 12de350

Add Gradio app with MCP server support

Browse files
.gitignore ADDED
@@ -0,0 +1,4 @@
 
 
 
 
 
1
+ .idea/
2
+ __pycache__/
3
+ *.pyc
4
+ .DS_Store
README.md CHANGED
@@ -1,14 +1,62 @@
1
  ---
2
  title: Simple Security Scanner
3
- emoji: 🌖
4
- colorFrom: purple
5
- colorTo: blue
6
  sdk: gradio
7
- sdk_version: 6.0.1
8
  app_file: app.py
 
 
 
 
9
  pinned: false
10
- license: mit
11
- short_description: MCP server that scans Python code for security vulnerabiliti
12
  ---
13
 
14
- Check out the configuration reference at https://huggingface.co/docs/hub/spaces-config-reference
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  ---
2
  title: Simple Security Scanner
3
+ emoji: 🔒
4
+ colorFrom: red
5
+ colorTo: yellow
6
  sdk: gradio
7
+ sdk_version: "5.6.0"
8
  app_file: app.py
9
+ tags:
10
+ - mcp
11
+ - security
12
+ - building-mcp-track-developer-tools
13
  pinned: false
 
 
14
  ---
15
 
16
+ # 🔒 Simple Security Scanner MCP
17
+
18
+ An MCP server that scans Python code for security vulnerabilities and provides **beginner-friendly explanations**.
19
+
20
+ ## Features
21
+
22
+ - **Pattern-based Detection**: Hardcoded secrets, path traversal, insecure deserialization, and more
23
+ - **SQL Injection Detection**: Precise detection using AST analysis
24
+ - **Beginner-friendly Explanations**: Easy-to-understand descriptions of what's wrong, why it's dangerous, and how to fix it
25
+
26
+ ## MCP Tool
27
+
28
+ ### `scan_security`
29
+
30
+ Analyzes Python code for security vulnerabilities.
31
+
32
+ **Inputs:**
33
+ - `code` (string): Python source code to analyze
34
+ - `severity_threshold` (string): Minimum severity level to report (CRITICAL, HIGH, MEDIUM, LOW)
35
+
36
+ **Output:**
37
+ - Beginner-friendly explanation of found vulnerabilities
38
+
39
+ ## Usage
40
+
41
+ ### Web UI
42
+ Visit this Space and enter your code to scan.
43
+
44
+ ### MCP Client
45
+ Connect from MCP clients like Claude Desktop with the following configuration:
46
+ ```json
47
+ {
48
+ "mcpServers": {
49
+ "security-scanner": {
50
+ "url": "https://huggingface.co/spaces/MCP-1st-Birthday/simple-security-scanner/gradio_api/mcp/sse"
51
+ }
52
+ }
53
+ }
54
+ ```
55
+
56
+ ## Track
57
+
58
+ `building-mcp-track-developer-tools`
59
+
60
+ ## License
61
+
62
+ MIT
app.py ADDED
@@ -0,0 +1,183 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Simple Security Scanner - Gradio App with MCP Server
3
+ A security vulnerability scanner that provides beginner-friendly explanations.
4
+ """
5
+
6
+ import gradio as gr
7
+ from src.scanner.pattern_detector import PatternDetector
8
+ from src.scanner.sql_injection import SQLInjectionDetector
9
+ from src.formatter import format_results_for_beginners
10
+
11
+ # 샘플 코드들
12
+ SAMPLE_CODES = {
13
+ "SQL Injection": '''import sqlite3
14
+
15
+ def get_user(username):
16
+ conn = sqlite3.connect('users.db')
17
+ cursor = conn.cursor()
18
+ query = f"SELECT * FROM users WHERE username = '{username}'"
19
+ cursor.execute(query)
20
+ return cursor.fetchone()
21
+ ''',
22
+ "Hardcoded Secret": '''import requests
23
+
24
+ API_KEY = "sk-1234567890abcdef"
25
+ DATABASE_PASSWORD = "admin123"
26
+
27
+ def connect():
28
+ return requests.get(f"https://api.example.com?key={API_KEY}")
29
+ ''',
30
+ "Path Traversal": '''import os
31
+
32
+ def read_file(filename):
33
+ base_path = "/var/www/uploads/"
34
+ file_path = base_path + filename
35
+ with open(file_path, 'r') as f:
36
+ return f.read()
37
+ ''',
38
+ "Insecure Deserialization": '''import pickle
39
+ import base64
40
+
41
+ def load_user_data(data):
42
+ decoded = base64.b64decode(data)
43
+ return pickle.loads(decoded)
44
+ '''
45
+ }
46
+
47
+
48
+ def scan_code(code: str, severity_threshold: str = "MEDIUM") -> str:
49
+ """
50
+ Scan Python code for security vulnerabilities.
51
+
52
+ Args:
53
+ code: Python source code to analyze
54
+ severity_threshold: Minimum severity level (CRITICAL, HIGH, MEDIUM, LOW)
55
+
56
+ Returns:
57
+ Beginner-friendly explanation of found vulnerabilities
58
+ """
59
+ if not code or not code.strip():
60
+ return "⚠️ 코드를 입력해주세요."
61
+
62
+ all_findings = []
63
+
64
+ # Pattern-based detection
65
+ try:
66
+ pattern_detector = PatternDetector()
67
+ pattern_findings = pattern_detector.scan(code)
68
+ all_findings.extend(pattern_findings)
69
+ except Exception as e:
70
+ pass
71
+
72
+ # SQL Injection detection
73
+ try:
74
+ sql_detector = SQLInjectionDetector()
75
+ sql_findings = sql_detector.scan(code)
76
+ all_findings.extend(sql_findings)
77
+ except Exception as e:
78
+ pass
79
+
80
+ # Filter by severity
81
+ severity_order = {"CRITICAL": 4, "HIGH": 3, "MEDIUM": 2, "LOW": 1}
82
+ threshold_value = severity_order.get(severity_threshold, 2)
83
+
84
+ filtered_findings = [
85
+ f for f in all_findings
86
+ if severity_order.get(f.get("severity", "LOW"), 1) >= threshold_value
87
+ ]
88
+
89
+ # Remove duplicates
90
+ seen = set()
91
+ unique_findings = []
92
+ for f in filtered_findings:
93
+ key = (f.get("line", 0), f.get("rule_id", ""), f.get("message", ""))
94
+ if key not in seen:
95
+ seen.add(key)
96
+ unique_findings.append(f)
97
+
98
+ if not unique_findings:
99
+ return "✅ 선택한 심각도 수준에서 발견된 보안 취약점이 없습니다!"
100
+
101
+ # Format results
102
+ return format_results_for_beginners(unique_findings)
103
+
104
+
105
+ def load_sample(sample_name: str) -> str:
106
+ """Load sample vulnerable code."""
107
+ return SAMPLE_CODES.get(sample_name, "")
108
+
109
+
110
+ # Gradio UI
111
+ with gr.Blocks(
112
+ title="🔒 Simple Security Scanner",
113
+ theme=gr.themes.Soft()
114
+ ) as demo:
115
+ gr.Markdown("""
116
+ # 🔒 Simple Security Scanner
117
+
118
+ **Python 코드의 보안 취약점을 분석하고 초보자도 이해할 수 있는 설명을 제공합니다.**
119
+
120
+ MCP (Model Context Protocol) 서버로도 사용 가능합니다.
121
+ """)
122
+
123
+ with gr.Row():
124
+ with gr.Column(scale=2):
125
+ code_input = gr.Code(
126
+ label="Python 코드 입력",
127
+ language="python",
128
+ lines=15,
129
+ placeholder="분석할 Python 코드를 입력하세요..."
130
+ )
131
+
132
+ with gr.Row():
133
+ severity_dropdown = gr.Dropdown(
134
+ choices=["LOW", "MEDIUM", "HIGH", "CRITICAL"],
135
+ value="MEDIUM",
136
+ label="최소 심각도",
137
+ scale=1
138
+ )
139
+ scan_btn = gr.Button("🔍 스캔 시작", variant="primary", scale=2)
140
+
141
+ with gr.Column(scale=1):
142
+ gr.Markdown("### 📝 샘플 코드")
143
+ for name in SAMPLE_CODES.keys():
144
+ sample_btn = gr.Button(name, size="sm")
145
+ sample_btn.click(
146
+ fn=lambda n=name: load_sample(n),
147
+ outputs=code_input
148
+ )
149
+
150
+ output = gr.Markdown(label="분석 결과")
151
+
152
+ scan_btn.click(
153
+ fn=scan_code,
154
+ inputs=[code_input, severity_dropdown],
155
+ outputs=output
156
+ )
157
+
158
+ gr.Markdown("""
159
+ ---
160
+ ### 🛠️ MCP 서버로 사용하기
161
+
162
+ 이 앱은 MCP 클라이언트(Claude Desktop 등)에서 도구로 사용할 수 있습니다.
163
+ ```json
164
+ {
165
+ "mcpServers": {
166
+ "security-scanner": {
167
+ "url": "https://huggingface.co/spaces/MCP-1st-Birthday/simple-security-scanner/gradio_api/mcp/sse"
168
+ }
169
+ }
170
+ }
171
+ ```
172
+ """)
173
+
174
+ if __name__ == "__main__":
175
+ demo.launch(mcp_server=True)
176
+ ```
177
+
178
+ ---
179
+
180
+ ## 2. `requirements.txt` 생성
181
+ ```
182
+ gradio[mcp] >= 5.6
183
+ .0
demo/vulnerable_samples/.gitkeep ADDED
File without changes
demo/vulnerable_samples/hardcoded_secrets.py ADDED
@@ -0,0 +1,74 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Vulnerable Code Sample: Hardcoded Secrets
3
+
4
+ ⚠️ WARNING: This file contains intentionally insecure code for demonstration purposes.
5
+ NEVER use these patterns in production code!
6
+
7
+ This sample demonstrates various types of hardcoded credentials that should
8
+ never be stored directly in source code.
9
+ """
10
+
11
+
12
+ # 1. AWS Credentials - VULNERABLE
13
+ AWS_ACCESS_KEY_ID = "AKIAIOSFODNN7EXAMPLE"
14
+ AWS_SECRET_ACCESS_KEY = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
15
+
16
+
17
+ # 2. GitHub Personal Access Token - VULNERABLE
18
+ GITHUB_TOKEN = "ghp_1234567890abcdefghijklmnopqrstuvwxyz"
19
+
20
+
21
+ # 3. Stripe API Key - VULNERABLE
22
+ def process_payment(amount):
23
+ """Process payment using hardcoded Stripe key."""
24
+ stripe_api_key = "sk_live_1234567890abcdefghijklmnop" # VULNERABLE!
25
+ # Payment processing logic...
26
+ return f"Processing ${amount} with key: {stripe_api_key[:10]}..."
27
+
28
+
29
+ # 4. OpenAI API Key - VULNERABLE
30
+ class AIService:
31
+ def __init__(self):
32
+ self.api_key = "sk-proj-abcdefghijklmnopqrstuvwxyz1234567890" # VULNERABLE!
33
+
34
+ def generate_text(self, prompt):
35
+ """Generate text using hardcoded API key."""
36
+ return f"Calling API with key: {self.api_key[:10]}..."
37
+
38
+
39
+ # 5. Database Connection String with Password - VULNERABLE
40
+ DATABASE_URL = "postgresql://admin:SuperSecret123@localhost:5432/mydb"
41
+
42
+
43
+ # 6. JWT Secret Key - VULNERABLE
44
+ JWT_SECRET = "my-super-secret-jwt-key-that-should-be-in-env"
45
+
46
+
47
+ # 7. Hardcoded Password - VULNERABLE
48
+ def authenticate_user(username):
49
+ """Check user credentials with hardcoded password."""
50
+ admin_password = "Admin123!@#" # VULNERABLE!
51
+ if username == "admin":
52
+ return admin_password
53
+ return None
54
+
55
+
56
+ # 8. Private Key - VULNERABLE
57
+ PRIVATE_KEY = """-----BEGIN PRIVATE KEY-----
58
+ MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC7VJTUt9Us8cKj
59
+ MzEfYyjiWA4R4/M2bS1+fWIcPm15A4LH5V8NGlIRIDlT2H5M3V2dZdG4vZbJTvP2
60
+ -----END PRIVATE KEY-----"""
61
+
62
+
63
+ # 9. Korean Bank Account Info - VULNERABLE (한국 특화)
64
+ BANK_ACCOUNT = {
65
+ "bank": "국민은행",
66
+ "account_number": "123-456-789012",
67
+ "account_holder": "홍길동"
68
+ }
69
+
70
+
71
+ # Safe alternative (commented for comparison):
72
+ # import os
73
+ # AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
74
+ # AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
demo/vulnerable_samples/insecure_deserialization.py ADDED
@@ -0,0 +1,142 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Vulnerable Code Sample: Insecure Deserialization
3
+
4
+ ⚠️ WARNING: This file contains intentionally insecure code for demonstration purposes.
5
+ NEVER use these patterns in production code!
6
+
7
+ This sample demonstrates insecure deserialization vulnerabilities that allow
8
+ attackers to execute arbitrary code through crafted input data.
9
+ """
10
+
11
+ import pickle
12
+ import yaml
13
+
14
+
15
+ # 1. Unsafe pickle.loads() - VULNERABLE
16
+ def load_user_session(session_data):
17
+ """Vulnerable to code execution via pickle deserialization."""
18
+ # VULNERABLE: pickle can execute arbitrary code during deserialization
19
+ # An attacker can craft a pickle that runs os.system("rm -rf /")
20
+ try:
21
+ user_data = pickle.loads(session_data)
22
+ return user_data
23
+ except Exception as e:
24
+ return f"Error: {e}"
25
+
26
+ # Safe alternative: Use JSON for untrusted data
27
+ # import json
28
+ # user_data = json.loads(session_data.decode('utf-8'))
29
+
30
+
31
+ # 2. Unsafe eval() for JSON parsing - VULNERABLE
32
+ def parse_config(config_string):
33
+ """Vulnerable to code execution via eval()."""
34
+ # VULNERABLE: eval() executes arbitrary Python code
35
+ # Input like "__import__('os').system('rm -rf /')" will execute
36
+ try:
37
+ config = eval(config_string)
38
+ return config
39
+ except Exception as e:
40
+ return f"Error: {e}"
41
+
42
+ # Safe alternative:
43
+ # import json
44
+ # config = json.loads(config_string)
45
+
46
+
47
+ # 3. Unsafe exec() for dynamic code - VULNERABLE
48
+ def run_user_script(script_code):
49
+ """Vulnerable to code execution via exec()."""
50
+ # VULNERABLE: exec() runs arbitrary Python code
51
+ # User can run anything: "import os; os.system('cat /etc/passwd')"
52
+ result = {}
53
+ try:
54
+ exec(script_code, {}, result)
55
+ return result
56
+ except Exception as e:
57
+ return f"Error: {e}"
58
+
59
+ # Safe alternative: Use ast.literal_eval() for data only
60
+ # import ast
61
+ # data = ast.literal_eval(user_input) # Only evaluates literals
62
+
63
+
64
+ # 4. Unsafe YAML loading - VULNERABLE
65
+ def load_config_file(yaml_content):
66
+ """Vulnerable to code execution via YAML deserialization."""
67
+ # VULNERABLE: yaml.load() can execute Python code
68
+ # YAML can contain !!python/object/apply tags to execute code
69
+ try:
70
+ config = yaml.load(yaml_content, Loader=yaml.Loader)
71
+ return config
72
+ except Exception as e:
73
+ return f"Error: {e}"
74
+
75
+ # Safe alternative:
76
+ # config = yaml.safe_load(yaml_content) # Only parses basic YAML
77
+
78
+
79
+ # 5. Unsafe pickle in file operations - VULNERABLE
80
+ def save_and_load_data(data, filename="/tmp/data.pkl"):
81
+ """Vulnerable pickle usage in file operations."""
82
+ # VULNERABLE: Loading pickles from untrusted sources
83
+ # Save
84
+ with open(filename, 'wb') as f:
85
+ pickle.dump(data, f)
86
+
87
+ # Load - VULNERABLE if file is tampered with
88
+ with open(filename, 'rb') as f:
89
+ loaded_data = pickle.load(f)
90
+
91
+ return loaded_data
92
+
93
+
94
+ # 6. Dynamic code compilation - VULNERABLE
95
+ def compile_and_run(code_string):
96
+ """Vulnerable to code execution via compile()."""
97
+ # VULNERABLE: compile() + exec() allows arbitrary code execution
98
+ try:
99
+ compiled_code = compile(code_string, '<string>', 'exec')
100
+ exec(compiled_code)
101
+ return "Code executed"
102
+ except Exception as e:
103
+ return f"Error: {e}"
104
+
105
+
106
+ # Example malicious payloads:
107
+ """
108
+ # Malicious pickle payload (simplified concept):
109
+ malicious_pickle = b"cos\nsystem\n(S'cat /etc/passwd'\ntR."
110
+
111
+ # Malicious YAML payload:
112
+ malicious_yaml = '''
113
+ !!python/object/apply:os.system
114
+ args: ['cat /etc/passwd']
115
+ '''
116
+
117
+ # Malicious eval payload:
118
+ malicious_eval = "__import__('os').system('whoami')"
119
+ """
120
+
121
+
122
+ # Safe deserialization example:
123
+ def safe_deserialize(json_string):
124
+ """Safe deserialization using JSON."""
125
+ import json
126
+
127
+ try:
128
+ # JSON is safe - it only deserializes data, not code
129
+ data = json.loads(json_string)
130
+
131
+ # Validate the structure
132
+ if not isinstance(data, dict):
133
+ raise ValueError("Expected dictionary")
134
+
135
+ # Whitelist expected keys
136
+ allowed_keys = {'username', 'email', 'age', 'preferences'}
137
+ if not set(data.keys()).issubset(allowed_keys):
138
+ raise ValueError("Unexpected keys in data")
139
+
140
+ return data
141
+ except json.JSONDecodeError as e:
142
+ raise ValueError(f"Invalid JSON: {e}")
demo/vulnerable_samples/path_traversal.py ADDED
@@ -0,0 +1,114 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Vulnerable Code Sample: Path Traversal
3
+
4
+ ⚠️ WARNING: This file contains intentionally insecure code for demonstration purposes.
5
+ NEVER use these patterns in production code!
6
+
7
+ This sample demonstrates path traversal vulnerabilities that allow attackers
8
+ to access files outside of the intended directory.
9
+ """
10
+
11
+ import os
12
+
13
+
14
+ # 1. Direct user input in file path - VULNERABLE
15
+ def read_user_file(filename):
16
+ """Vulnerable to path traversal - direct file path usage."""
17
+ # VULNERABLE: User can use ../../../etc/passwd
18
+ file_path = f"/var/www/uploads/{filename}"
19
+ try:
20
+ with open(file_path, 'r') as f:
21
+ return f.read()
22
+ except Exception as e:
23
+ return f"Error: {e}"
24
+
25
+ # Safe alternative:
26
+ # import os
27
+ # base_dir = "/var/www/uploads/"
28
+ # file_path = os.path.join(base_dir, filename)
29
+ # real_path = os.path.realpath(file_path)
30
+ # if not real_path.startswith(os.path.realpath(base_dir)):
31
+ # raise ValueError("Invalid file path")
32
+
33
+
34
+ # 2. Unsafe os.path.join usage - VULNERABLE
35
+ def download_file(user_dir, filename):
36
+ """Vulnerable to path traversal via os.path.join."""
37
+ # VULNERABLE: Absolute paths in filename can bypass base directory
38
+ base_path = "/home/users/"
39
+ file_path = os.path.join(base_path, user_dir, filename)
40
+ # If filename = "/etc/passwd", it returns "/etc/passwd"
41
+ try:
42
+ with open(file_path, 'r') as f:
43
+ return f.read()
44
+ except Exception as e:
45
+ return f"Error: {e}"
46
+
47
+
48
+ # 3. Directory listing vulnerability - VULNERABLE
49
+ def list_directory(subdir):
50
+ """Vulnerable to directory traversal in listing."""
51
+ # VULNERABLE: User can list any directory with ../
52
+ base_dir = "/var/www/public/"
53
+ target_dir = base_dir + subdir
54
+ try:
55
+ return os.listdir(target_dir)
56
+ except Exception as e:
57
+ return f"Error: {e}"
58
+
59
+ # Example exploit: list_directory("../../../etc/")
60
+
61
+
62
+ # 4. File write vulnerability - VULNERABLE
63
+ def save_uploaded_file(username, filename, content):
64
+ """Vulnerable to path traversal in file upload."""
65
+ # VULNERABLE: Can overwrite system files
66
+ upload_dir = f"/uploads/{username}/"
67
+ file_path = upload_dir + filename # User controls filename
68
+ try:
69
+ os.makedirs(os.path.dirname(file_path), exist_ok=True)
70
+ with open(file_path, 'w') as f:
71
+ f.write(content)
72
+ return f"Saved to {file_path}"
73
+ except Exception as e:
74
+ return f"Error: {e}"
75
+
76
+ # Example exploit: save_uploaded_file("user", "../../../etc/cron.d/backdoor", "* * * * * root /tmp/malware")
77
+
78
+
79
+ # 5. Template file inclusion - VULNERABLE
80
+ def render_template(template_name):
81
+ """Vulnerable to path traversal in template rendering."""
82
+ # VULNERABLE: Can read arbitrary files
83
+ templates_dir = "./templates/"
84
+ template_path = templates_dir + template_name + ".html"
85
+ try:
86
+ with open(template_path, 'r') as f:
87
+ return f.read()
88
+ except Exception as e:
89
+ return f"Error: {e}"
90
+
91
+ # Example exploit: render_template("../../etc/passwd")
92
+
93
+
94
+ # Safe implementation example:
95
+ def safe_read_file(filename):
96
+ """Safe file reading with path validation."""
97
+ from pathlib import Path
98
+
99
+ # Define allowed base directory
100
+ base_dir = Path("/var/www/uploads/").resolve()
101
+
102
+ # Build the full path
103
+ requested_path = (base_dir / filename).resolve()
104
+
105
+ # Ensure the resolved path is within base_dir
106
+ if not str(requested_path).startswith(str(base_dir)):
107
+ raise ValueError("Access denied: Path traversal detected")
108
+
109
+ # Also check for common path traversal patterns
110
+ if ".." in filename or filename.startswith("/"):
111
+ raise ValueError("Invalid filename")
112
+
113
+ with open(requested_path, 'r') as f:
114
+ return f.read()
demo/vulnerable_samples/sql_injection.py ADDED
@@ -0,0 +1,96 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Vulnerable Code Sample: SQL Injection
3
+
4
+ ⚠️ WARNING: This file contains intentionally insecure code for demonstration purposes.
5
+ NEVER use these patterns in production code!
6
+
7
+ This sample demonstrates various SQL injection vulnerabilities that can occur
8
+ when user input is directly incorporated into SQL queries.
9
+ """
10
+
11
+
12
+ # 1. SQL Injection via f-string - VULNERABLE
13
+ def get_user_by_id_fstring(user_id):
14
+ """Vulnerable to SQL injection via f-string."""
15
+ # VULNERABLE: User input directly in f-string
16
+ query = f"SELECT * FROM users WHERE id = {user_id}"
17
+ # execute_query(query)
18
+ return query
19
+
20
+ # Safe alternative:
21
+ # query = "SELECT * FROM users WHERE id = %s"
22
+ # execute_query(query, (user_id,))
23
+
24
+
25
+ # 2. SQL Injection via string concatenation - VULNERABLE
26
+ def search_products(keyword):
27
+ """Vulnerable to SQL injection via + operator."""
28
+ # VULNERABLE: String concatenation with user input
29
+ query = "SELECT * FROM products WHERE name LIKE '%" + keyword + "%'"
30
+ # execute_query(query)
31
+ return query
32
+
33
+ # Safe alternative:
34
+ # query = "SELECT * FROM products WHERE name LIKE %s"
35
+ # execute_query(query, (f"%{keyword}%",))
36
+
37
+
38
+ # 3. SQL Injection via % formatting - VULNERABLE
39
+ def delete_user(username):
40
+ """Vulnerable to SQL injection via % formatting."""
41
+ # VULNERABLE: % formatting with quotes
42
+ query = "DELETE FROM users WHERE username='%s'" % username
43
+ # execute_query(query)
44
+ return query
45
+
46
+ # Safe alternative:
47
+ # query = "DELETE FROM users WHERE username=%s"
48
+ # execute_query(query, (username,))
49
+
50
+
51
+ # 4. SQL Injection via .format() - VULNERABLE
52
+ def update_email(user_id, new_email):
53
+ """Vulnerable to SQL injection via .format() method."""
54
+ # VULNERABLE: .format() with user input
55
+ query = "UPDATE users SET email='{}' WHERE id={}".format(new_email, user_id)
56
+ # execute_query(query)
57
+ return query
58
+
59
+ # Safe alternative:
60
+ # query = "UPDATE users SET email=%s WHERE id=%s"
61
+ # execute_query(query, (new_email, user_id))
62
+
63
+
64
+ # 5. SQL Injection via ORM raw query - VULNERABLE
65
+ def get_orders_by_status(status):
66
+ """Vulnerable to SQL injection in ORM raw query."""
67
+ # VULNERABLE: Django ORM raw() with string formatting
68
+ query = f"SELECT * FROM orders WHERE status = '{status}' ORDER BY created_at"
69
+ # Order.objects.raw(query)
70
+ return query
71
+
72
+ # Safe alternative:
73
+ # query = "SELECT * FROM orders WHERE status = %s ORDER BY created_at"
74
+ # Order.objects.raw(query, [status])
75
+
76
+
77
+ # 6. Complex SQL injection - VULNERABLE
78
+ def advanced_search(table_name, column, value):
79
+ """Vulnerable to SQL injection with dynamic table/column names."""
80
+ # VULNERABLE: Dynamic table and column names
81
+ query = f"SELECT * FROM {table_name} WHERE {column} = '{value}'"
82
+ # execute_query(query)
83
+ return query
84
+
85
+ # Safe alternative: Use whitelist for table/column names
86
+ # ALLOWED_TABLES = {'users', 'products', 'orders'}
87
+ # ALLOWED_COLUMNS = {'id', 'name', 'email', 'status'}
88
+ # if table_name in ALLOWED_TABLES and column in ALLOWED_COLUMNS:
89
+ # query = f"SELECT * FROM {table_name} WHERE {column} = %s"
90
+ # execute_query(query, (value,))
91
+
92
+
93
+ # Example of exploitation:
94
+ # get_user_by_id_fstring("1 OR 1=1") # Returns all users
95
+ # delete_user("admin'; DROP TABLE users--") # Deletes the users table!
96
+ # search_products("'; DELETE FROM products--") # Deletes all products!
mcp_config.json ADDED
@@ -0,0 +1,99 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ {
2
+ "server": {
3
+ "name": "security-scanner-mcp",
4
+ "version": "0.1.0",
5
+ "description": "Python 코드 보안 취약점을 스캔하고 초보자 친화적인 설명을 제공하는 MCP 서버"
6
+ },
7
+ "tools": {
8
+ "scan_security": {
9
+ "enabled": true,
10
+ "description": "코드의 보안 취약점을 스캔하고 초보자 친화적인 설명 제공",
11
+ "timeout_seconds": 30
12
+ }
13
+ },
14
+ "scanners": {
15
+ "pattern_detector": {
16
+ "enabled": true,
17
+ "patterns": {
18
+ "aws_keys": true,
19
+ "api_keys": true,
20
+ "github_tokens": true,
21
+ "jwt_tokens": true,
22
+ "passwords": true,
23
+ "korean_pii": true,
24
+ "database_credentials": true,
25
+ "private_keys": true
26
+ }
27
+ },
28
+ "bandit": {
29
+ "enabled": true,
30
+ "confidence_level": "MEDIUM",
31
+ "skip_tests": []
32
+ },
33
+ "semgrep": {
34
+ "enabled": true,
35
+ "config": "auto",
36
+ "timeout": 30,
37
+ "max_memory_mb": 2000
38
+ },
39
+ "sql_injection": {
40
+ "enabled": true,
41
+ "check_fstring": true,
42
+ "check_concat": true,
43
+ "check_format": true,
44
+ "check_percent": true
45
+ }
46
+ },
47
+ "custom_rules": {
48
+ "enabled": true,
49
+ "directories": [
50
+ "rules"
51
+ ],
52
+ "files": [
53
+ "rules/skt_guidelines.yaml"
54
+ ]
55
+ },
56
+ "severity": {
57
+ "thresholds": {
58
+ "CRITICAL": 0,
59
+ "HIGH": 1,
60
+ "MEDIUM": 2,
61
+ "LOW": 3
62
+ },
63
+ "default_threshold": "MEDIUM"
64
+ },
65
+ "formatter": {
66
+ "explanation_templates": {
67
+ "hardcoded_api_key": {
68
+ "what": "API 키가 소스 코드에 직접 하드코딩되어 있습니다",
69
+ "why": "코드에 접근할 수 있는 누구나 이 자격증명을 훔쳐서 악용할 수 있습니다. Git 히스토리에 영구히 남아 삭제하기 어렵습니다.",
70
+ "how_to_fix": "환경 변수나 별도의 설정 파일을 사용하세요",
71
+ "references": [
72
+ "https://owasp.org/www-community/vulnerabilities/Use_of_hard-coded_password"
73
+ ]
74
+ },
75
+ "sql_injection": {
76
+ "what": "SQL 쿼리가 사용자 입력과 문자열 연결로 만들어지고 있습니다",
77
+ "why": "공격자가 악의적인 SQL 코드를 삽입하여 데이터베이스의 모든 데이터를 탈취하거나 삭제할 수 있습니다.",
78
+ "how_to_fix": "파라미터화된 쿼리 또는 ORM을 사용하세요",
79
+ "references": [
80
+ "https://owasp.org/www-community/attacks/SQL_Injection"
81
+ ]
82
+ }
83
+ },
84
+ "include_code_examples": true,
85
+ "include_references": true,
86
+ "json_indent": 2
87
+ },
88
+ "performance": {
89
+ "max_file_size_mb": 10,
90
+ "enable_caching": true,
91
+ "cache_ttl_seconds": 3600
92
+ },
93
+ "logging": {
94
+ "level": "INFO",
95
+ "file": "logs/mcp_server.log",
96
+ "console": true,
97
+ "json_format": false
98
+ }
99
+ }
requirements.txt ADDED
@@ -0,0 +1 @@
 
 
1
+ gradio[mcp]>=5.6.0
src/.gitkeep ADDED
File without changes
src/formatter.py ADDED
@@ -0,0 +1,322 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Result formatter for security scan results.
3
+
4
+ Transforms raw vulnerability data into beginner-friendly explanations.
5
+ """
6
+
7
+ from datetime import datetime
8
+ from typing import Dict, List, Any
9
+ import sys
10
+ from pathlib import Path
11
+
12
+ # Import utilities
13
+ sys.path.insert(0, str(Path(__file__).parent))
14
+ from utils import get_severity_order
15
+
16
+ # Explanation templates for common vulnerabilities
17
+ EXPLANATION_TEMPLATES = {
18
+ "hardcoded_api_key": {
19
+ "what": "API 키가 소스 코드에 직접 하드코딩되어 있습니다",
20
+ "why": "소스 코드는 버전 관리 시스템(Git)에 저장되고, 여러 개발자가 접근할 수 있습니다. "
21
+ "코드에 접근할 수 있는 누구나 이 API 키를 복사하여 악용할 수 있으며, "
22
+ "심지어 공개 저장소에 실수로 올릴 경우 전 세계에 노출됩니다. "
23
+ "공격자는 이 키로 무단 API 호출을 하여 비용을 발생시키거나 데이터를 탈취할 수 있습니다.",
24
+ "how_to_fix": "API 키는 환경 변수나 별도의 설정 파일(예: .env)에 저장하고, "
25
+ "이 파일은 .gitignore에 추가하여 버전 관리에서 제외하세요.",
26
+ "example": "# 나쁜 예\napi_key = 'sk-1234567890abcdef'\n\n"
27
+ "# 좋은 예\nimport os\napi_key = os.getenv('API_KEY')\n\n"
28
+ "# .env 파일에\n# API_KEY=sk-1234567890abcdef",
29
+ "references": [
30
+ "https://owasp.org/www-community/vulnerabilities/Use_of_hard-coded_password",
31
+ "https://12factor.net/config"
32
+ ]
33
+ },
34
+ "sql_injection": {
35
+ "what": "사용자 입력을 SQL 쿼리에 직접 삽입하여 SQL 인젝션 취약점이 발생합니다",
36
+ "why": "공격자가 악의적인 SQL 코드를 입력하면, 데이터베이스의 모든 데이터를 조회하거나 삭제할 수 있습니다. "
37
+ "예를 들어 'admin' OR '1'='1' 같은 입력으로 인증을 우회하거나, "
38
+ "; DROP TABLE users-- 같은 입력으로 전체 테이블을 삭제할 수 있습니다.",
39
+ "how_to_fix": "파라미터화된 쿼리(Prepared Statement)를 사용하여 사용자 입력을 SQL 코드와 분리하세요. "
40
+ "ORM(SQLAlchemy, Django ORM 등)을 사용하면 자동으로 안전하게 처리됩니다.",
41
+ "example": "# 나쁜 예\nquery = f\"SELECT * FROM users WHERE id={user_id}\"\n\n"
42
+ "# 좋은 예\nquery = \"SELECT * FROM users WHERE id=%s\"\ncursor.execute(query, (user_id,))\n\n"
43
+ "# ORM 사용\nuser = User.objects.filter(id=user_id).first()",
44
+ "references": [
45
+ "https://owasp.org/www-community/attacks/SQL_Injection",
46
+ "https://cheatsheetseries.owasp.org/cheatsheets/SQL_Injection_Prevention_Cheat_Sheet.html"
47
+ ]
48
+ },
49
+ "password": {
50
+ "what": "비밀번호가 소스 코드에 평문으로 저장되어 있습니다",
51
+ "why": "코드에 접근할 수 있는 누구나 이 비밀번호를 볼 수 있으며, "
52
+ "Git 히스토리에 영구히 남아 나중에 삭제해도 복구할 수 있습니다. "
53
+ "같은 비밀번호를 다른 서비스에서도 재사용했다면 피해가 더 커질 수 있습니다.",
54
+ "how_to_fix": "비밀번호는 환경 변수에 저장하고, 가능하면 비밀번호 관리 서비스(AWS Secrets Manager, HashiCorp Vault 등)를 사용하세요.",
55
+ "example": "# 나쁜 예\npassword = 'MyPassword123'\n\n"
56
+ "# 좋은 예\nimport os\npassword = os.getenv('DB_PASSWORD')",
57
+ "references": [
58
+ "https://owasp.org/www-community/vulnerabilities/Use_of_hard-coded_password"
59
+ ]
60
+ },
61
+ "pickle_usage": {
62
+ "what": "pickle.loads()를 사용하여 신뢰할 수 없는 데이터를 역직렬화하고 있습니다",
63
+ "why": "pickle은 Python 객체를 복원할 때 임의의 코드를 실행할 수 있습니다. "
64
+ "공격자가 악의적으로 조작한 pickle 데이터를 제공하면, "
65
+ "서버에서 임의의 명령을 실행하거나 시스템을 완전히 장악할 수 있습니다.",
66
+ "how_to_fix": "신뢰할 수 없는 데이터는 pickle 대신 JSON, YAML(safe_load), 또는 Protocol Buffers 같은 "
67
+ "안전한 직렬화 형식을 사용하세요.",
68
+ "example": "# 나쁜 예\nimport pickle\ndata = pickle.loads(untrusted_input)\n\n"
69
+ "# 좋은 예\nimport json\ndata = json.loads(untrusted_input)",
70
+ "references": [
71
+ "https://docs.python.org/3/library/pickle.html#module-pickle",
72
+ "https://owasp.org/www-community/vulnerabilities/Deserialization_of_untrusted_data"
73
+ ]
74
+ },
75
+ "exec_usage": {
76
+ "what": "exec() 또는 eval()을 사용하여 동적으로 코드를 실행하고 있습니다",
77
+ "why": "사용자 입력이나 외부 데이터를 exec()/eval()로 실행하면, "
78
+ "공격자가 임의의 Python 코드를 실행할 수 있습니다. "
79
+ "이는 서버의 모든 파일에 접근하거나, 다른 시스템을 공격하거나, "
80
+ "악성코드를 설치하는 등 치명적인 결과를 초래할 수 있습니다.",
81
+ "how_to_fix": "exec()와 eval()은 가능한 한 사용하지 마세요. "
82
+ "필요하다면 ast.literal_eval()로 안전하게 평가하거나, "
83
+ "화이트리스트 기반의 명령어 매핑을 사용하세요.",
84
+ "example": "# 나쁜 예\nexec(user_input)\n\n"
85
+ "# 좋은 예 (리터럴만)\nimport ast\nvalue = ast.literal_eval(user_input)\n\n"
86
+ "# 또는 화이트리스트\nallowed_commands = {'start': start_func, 'stop': stop_func}\ncommand = allowed_commands.get(user_input)",
87
+ "references": [
88
+ "https://docs.python.org/3/library/functions.html#eval",
89
+ "https://nedbatchelder.com/blog/201206/eval_really_is_dangerous.html"
90
+ ]
91
+ },
92
+ "shell_injection": {
93
+ "what": "subprocess에서 shell=True를 사용하여 명령어를 실행하고 있습니다",
94
+ "why": "shell=True는 명령어를 셸을 통해 실행하므로, 사용자 입력에 세미콜론(;)이나 파이프(|) 같은 "
95
+ "셸 메타문자가 포함되면 추가 명령어를 실행할 수 있습니다. "
96
+ "예를 들어 '; rm -rf /' 같은 입력으로 시스템 전체를 삭제할 수 있습니다.",
97
+ "how_to_fix": "shell=True를 제거하고 명령어를 리스트로 전달하세요. "
98
+ "또는 shlex.quote()로 입력을 이스케이프하세요.",
99
+ "example": "# 나쁜 예\nimport subprocess\nsubprocess.call(f'ls {user_dir}', shell=True)\n\n"
100
+ "# 좋은 예\nsubprocess.call(['ls', user_dir])\n\n"
101
+ "# 또는 shlex 사용\nimport shlex\nsafe_dir = shlex.quote(user_dir)\nsubprocess.call(f'ls {safe_dir}', shell=True)",
102
+ "references": [
103
+ "https://docs.python.org/3/library/subprocess.html#security-considerations",
104
+ "https://owasp.org/www-community/attacks/Command_Injection"
105
+ ]
106
+ }
107
+ }
108
+
109
+
110
+ def get_explanation(vulnerability: Dict[str, Any]) -> Dict[str, Any]:
111
+ """
112
+ Generate beginner-friendly explanation for a vulnerability.
113
+
114
+ Args:
115
+ vulnerability: Vulnerability dictionary from scanner
116
+
117
+ Returns:
118
+ Explanation dictionary with what, why, how_to_fix, example, references
119
+ """
120
+ vuln_id = vulnerability.get("id", "")
121
+ vuln_type = vulnerability.get("pattern_type", "")
122
+
123
+ # Try to find a matching template
124
+ template = None
125
+
126
+ # Check by pattern type first
127
+ if vuln_type in EXPLANATION_TEMPLATES:
128
+ template = EXPLANATION_TEMPLATES[vuln_type]
129
+ # Check by ID pattern
130
+ elif "sql-injection" in vuln_id:
131
+ template = EXPLANATION_TEMPLATES["sql_injection"]
132
+ elif "api" in vuln_id.lower() or "api_key" in vuln_type:
133
+ template = EXPLANATION_TEMPLATES["hardcoded_api_key"]
134
+ elif "password" in vuln_id.lower() or "password" in vuln_type:
135
+ template = EXPLANATION_TEMPLATES["password"]
136
+ elif "pickle" in vuln_id.lower() or "B301" in vuln_id:
137
+ template = EXPLANATION_TEMPLATES["pickle_usage"]
138
+ elif "exec" in vuln_id.lower() or "eval" in vuln_id.lower() or "B102" in vuln_id:
139
+ template = EXPLANATION_TEMPLATES["exec_usage"]
140
+ elif "shell" in vuln_id.lower() or "B602" in vuln_id:
141
+ template = EXPLANATION_TEMPLATES["shell_injection"]
142
+
143
+ # Use template or create generic explanation
144
+ if template:
145
+ explanation = template.copy()
146
+ else:
147
+ # Generic explanation
148
+ description = vulnerability.get("description", "보안 취약점이 발견되었습니다")
149
+ explanation = {
150
+ "what": description,
151
+ "why": "이 패턴은 보안 취약점을 일으킬 수 있으며, 공격자가 악용할 경우 시스템에 피해를 줄 수 있습니다.",
152
+ "how_to_fix": vulnerability.get("recommendation", "보안 모범 사례를 따르고, 신뢰할 수 없는 입력을 검증하세요."),
153
+ "example": "# 안전한 코드 작성을 위해 보안 가이드를 참고하세요",
154
+ "references": [
155
+ "https://owasp.org/www-project-top-ten/",
156
+ "https://cheatsheetseries.owasp.org/"
157
+ ]
158
+ }
159
+
160
+ return explanation
161
+
162
+
163
+ def remove_duplicates(vulnerabilities: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
164
+ """
165
+ Remove duplicate vulnerabilities based on line number and issue type.
166
+
167
+ Args:
168
+ vulnerabilities: List of vulnerability dictionaries
169
+
170
+ Returns:
171
+ Deduplicated list
172
+ """
173
+ seen = set()
174
+ unique = []
175
+
176
+ for vuln in vulnerabilities:
177
+ # Create a key based on line number and vulnerability type
178
+ key = (
179
+ vuln.get("line_number"),
180
+ vuln.get("id", "").split("-")[0], # Base ID without suffix
181
+ vuln.get("file_path", "")
182
+ )
183
+
184
+ if key not in seen:
185
+ seen.add(key)
186
+ unique.append(vuln)
187
+
188
+ return unique
189
+
190
+
191
+ def sort_vulnerabilities(vulnerabilities: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
192
+ """
193
+ Sort vulnerabilities by severity and line number.
194
+
195
+ Args:
196
+ vulnerabilities: List of vulnerability dictionaries
197
+
198
+ Returns:
199
+ Sorted list
200
+ """
201
+ severity_order = get_severity_order()
202
+
203
+ def sort_key(vuln):
204
+ severity = vuln.get("severity", "LOW").upper()
205
+ severity_value = severity_order.get(severity, 99)
206
+ line_number = vuln.get("line_number", 0)
207
+ return (severity_value, line_number)
208
+
209
+ return sorted(vulnerabilities, key=sort_key)
210
+
211
+
212
+ def calculate_summary(vulnerabilities: List[Dict[str, Any]]) -> Dict[str, Any]:
213
+ """
214
+ Calculate summary statistics for vulnerabilities.
215
+
216
+ Args:
217
+ vulnerabilities: List of vulnerability dictionaries
218
+
219
+ Returns:
220
+ Summary dictionary with counts
221
+ """
222
+ summary = {
223
+ "total_issues": len(vulnerabilities),
224
+ "critical": 0,
225
+ "high": 0,
226
+ "medium": 0,
227
+ "low": 0,
228
+ "scan_timestamp": datetime.utcnow().isoformat() + "Z"
229
+ }
230
+
231
+ for vuln in vulnerabilities:
232
+ severity = vuln.get("severity", "LOW").lower()
233
+ if severity in summary:
234
+ summary[severity] += 1
235
+
236
+ return summary
237
+
238
+
239
+ def format_results(
240
+ vulnerabilities: List[Dict[str, Any]],
241
+ severity_threshold: str = "MEDIUM"
242
+ ) -> Dict[str, Any]:
243
+ """
244
+ Format scan results into beginner-friendly output.
245
+
246
+ Args:
247
+ vulnerabilities: List of vulnerability dictionaries from scanners
248
+ severity_threshold: Minimum severity threshold used
249
+
250
+ Returns:
251
+ Formatted results dictionary
252
+ """
253
+ # Remove duplicates
254
+ unique_vulns = remove_duplicates(vulnerabilities)
255
+
256
+ # Sort by severity and line number
257
+ sorted_vulns = sort_vulnerabilities(unique_vulns)
258
+
259
+ # Add explanations to each vulnerability
260
+ formatted_vulns = []
261
+ for vuln in sorted_vulns:
262
+ formatted_vuln = vuln.copy()
263
+
264
+ # Add explanation if not already present
265
+ if "explanation" not in formatted_vuln:
266
+ formatted_vuln["explanation"] = get_explanation(vuln)
267
+
268
+ formatted_vulns.append(formatted_vuln)
269
+
270
+ # Calculate summary
271
+ summary = calculate_summary(formatted_vulns)
272
+ summary["severity_threshold"] = severity_threshold
273
+
274
+ # Create final result
275
+ result = {
276
+ "summary": summary,
277
+ "vulnerabilities": formatted_vulns
278
+ }
279
+
280
+ return result
281
+
282
+
283
+ def format_for_display(results: Dict[str, Any]) -> str:
284
+ """
285
+ Format results for console display.
286
+
287
+ Args:
288
+ results: Formatted results dictionary
289
+
290
+ Returns:
291
+ Human-readable string
292
+ """
293
+ summary = results.get("summary", {})
294
+ vulns = results.get("vulnerabilities", [])
295
+
296
+ output = []
297
+ output.append("\n" + "=" * 70)
298
+ output.append("보안 스캔 결과")
299
+ output.append("=" * 70)
300
+ output.append(f"\n총 발견된 이슈: {summary.get('total_issues', 0)}")
301
+ output.append(f" - CRITICAL: {summary.get('critical', 0)}")
302
+ output.append(f" - HIGH: {summary.get('high', 0)}")
303
+ output.append(f" - MEDIUM: {summary.get('medium', 0)}")
304
+ output.append(f" - LOW: {summary.get('low', 0)}")
305
+ output.append(f"\n스캔 시각: {summary.get('scan_timestamp', 'N/A')}")
306
+ output.append("\n" + "-" * 70)
307
+
308
+ for i, vuln in enumerate(vulns, 1):
309
+ output.append(f"\n[{i}] {vuln.get('title', 'Unknown Issue')}")
310
+ output.append(f"심각도: {vuln.get('severity', 'UNKNOWN')}")
311
+ output.append(f"위치: 라인 {vuln.get('line_number', 'N/A')}")
312
+ output.append(f"코드: {vuln.get('code_snippet', '')[:60]}...")
313
+
314
+ explanation = vuln.get("explanation", {})
315
+ if explanation:
316
+ output.append(f"\n문제: {explanation.get('what', '')}")
317
+ output.append(f"위험성: {explanation.get('why', '')[:100]}...")
318
+ output.append(f"해결방법: {explanation.get('how_to_fix', '')[:100]}...")
319
+
320
+ output.append("\n" + "-" * 70)
321
+
322
+ return "\n".join(output)
src/scanner/.gitkeep ADDED
File without changes
src/scanner/__init__.py ADDED
@@ -0,0 +1,11 @@
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Security scanner modules package.
3
+
4
+ This package contains various security scanning modules:
5
+ - pattern_detector: Regex-based pattern detection for hardcoded secrets
6
+ - sql_injection: SQL injection vulnerability detection
7
+ - bandit_wrapper: Integration with Bandit security scanner
8
+ - semgrep_wrapper: Integration with Semgrep static analysis tool
9
+ """
10
+
11
+ __version__ = "0.1.0"
src/scanner/bandit_wrapper.py ADDED
@@ -0,0 +1,244 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Bandit security scanner wrapper.
3
+
4
+ Integrates the Bandit tool for Python-specific security analysis.
5
+ """
6
+
7
+ import json
8
+ import subprocess
9
+ import sys
10
+ from pathlib import Path
11
+ from typing import Dict, List, Any, Optional
12
+
13
+ # Import config loader
14
+ sys.path.insert(0, str(Path(__file__).parent.parent))
15
+ from utils import load_config
16
+
17
+ # Severity mapping from Bandit to our standard
18
+ SEVERITY_MAPPING = {
19
+ "HIGH": "CRITICAL",
20
+ "MEDIUM": "HIGH",
21
+ "LOW": "MEDIUM",
22
+ }
23
+
24
+
25
+ def is_bandit_available() -> bool:
26
+ """
27
+ Check if bandit is installed and available.
28
+
29
+ Returns:
30
+ True if bandit is available, False otherwise
31
+ """
32
+ try:
33
+ result = subprocess.run(
34
+ ["bandit", "--version"],
35
+ capture_output=True,
36
+ text=True,
37
+ timeout=5
38
+ )
39
+ return result.returncode == 0
40
+ except (FileNotFoundError, subprocess.TimeoutExpired):
41
+ return False
42
+
43
+
44
+ def map_severity(bandit_severity: str) -> str:
45
+ """
46
+ Map Bandit severity to our standard severity levels.
47
+
48
+ Args:
49
+ bandit_severity: Bandit's severity (HIGH, MEDIUM, LOW)
50
+
51
+ Returns:
52
+ Standard severity level (CRITICAL, HIGH, MEDIUM, LOW)
53
+ """
54
+ return SEVERITY_MAPPING.get(bandit_severity.upper(), "MEDIUM")
55
+
56
+
57
+ def run_bandit(file_path: str, config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
58
+ """
59
+ Run bandit on a file and return JSON results.
60
+
61
+ Args:
62
+ file_path: Path to Python file to scan
63
+ config: Optional configuration dictionary
64
+
65
+ Returns:
66
+ Bandit results as dictionary
67
+
68
+ Raises:
69
+ RuntimeError: If bandit is not available
70
+ subprocess.TimeoutExpired: If bandit execution times out
71
+ subprocess.CalledProcessError: If bandit execution fails
72
+ """
73
+ if not is_bandit_available():
74
+ raise RuntimeError(
75
+ "Bandit is not installed. Please install it with: pip install bandit"
76
+ )
77
+
78
+ if config is None:
79
+ config = load_config().get("scanners", {}).get("bandit", {})
80
+
81
+ # Build bandit command
82
+ cmd = [
83
+ "bandit",
84
+ "-f", "json", # JSON output format
85
+ "-r", # Recursive (even for single file, bandit expects this)
86
+ file_path
87
+ ]
88
+
89
+ # Add confidence level if specified
90
+ confidence_level = config.get("confidence_level", "").upper()
91
+ if confidence_level in ["HIGH", "MEDIUM", "LOW"]:
92
+ cmd.extend(["-ll"]) # Set minimum confidence level
93
+
94
+ # Add skip tests if specified
95
+ skip_tests = config.get("skip_tests", [])
96
+ if skip_tests:
97
+ cmd.extend(["-s", ",".join(skip_tests)])
98
+
99
+ try:
100
+ # Run bandit
101
+ result = subprocess.run(
102
+ cmd,
103
+ capture_output=True,
104
+ text=True,
105
+ timeout=30, # 30 second timeout
106
+ check=False # Don't raise exception on non-zero exit
107
+ )
108
+
109
+ # Bandit returns exit code 1 if issues found, which is expected
110
+ # Only fail on actual errors (exit code > 1)
111
+ if result.returncode > 1:
112
+ raise subprocess.CalledProcessError(
113
+ result.returncode,
114
+ cmd,
115
+ result.stdout,
116
+ result.stderr
117
+ )
118
+
119
+ # Parse JSON output
120
+ if result.stdout:
121
+ return json.loads(result.stdout)
122
+ else:
123
+ return {"results": []}
124
+
125
+ except json.JSONDecodeError as e:
126
+ raise RuntimeError(f"Failed to parse bandit output: {e}")
127
+ except subprocess.TimeoutExpired:
128
+ raise RuntimeError("Bandit execution timed out (30s limit)")
129
+
130
+
131
+ def parse_bandit_results(bandit_output: Dict[str, Any], file_path: str) -> List[Dict[str, Any]]:
132
+ """
133
+ Parse bandit JSON output into standard vulnerability format.
134
+
135
+ Args:
136
+ bandit_output: Raw bandit JSON output
137
+ file_path: Path to the scanned file
138
+
139
+ Returns:
140
+ List of vulnerability dictionaries in standard format
141
+ """
142
+ vulnerabilities = []
143
+
144
+ results = bandit_output.get("results", [])
145
+
146
+ for issue in results:
147
+ # Extract bandit data
148
+ test_id = issue.get("test_id", "UNKNOWN")
149
+ test_name = issue.get("test_name", "unknown")
150
+ bandit_severity = issue.get("issue_severity", "MEDIUM")
151
+ confidence = issue.get("issue_confidence", "MEDIUM")
152
+ line_number = issue.get("line_number", 0)
153
+ code_snippet = issue.get("code", "").strip()
154
+ issue_text = issue.get("issue_text", "Security issue detected")
155
+
156
+ # Map to standard severity
157
+ standard_severity = map_severity(bandit_severity)
158
+
159
+ # Create vulnerability entry
160
+ vulnerability = {
161
+ "id": f"bandit-{test_id}",
162
+ "severity": standard_severity,
163
+ "title": f"Bandit: {issue_text}",
164
+ "description": issue_text,
165
+ "line_number": line_number,
166
+ "code_snippet": code_snippet,
167
+ "file_path": file_path,
168
+ "scanner": "bandit",
169
+ "bandit_test_id": test_id,
170
+ "bandit_test_name": test_name,
171
+ "bandit_severity": bandit_severity,
172
+ "bandit_confidence": confidence,
173
+ }
174
+
175
+ vulnerabilities.append(vulnerability)
176
+
177
+ return vulnerabilities
178
+
179
+
180
+ def scan_with_bandit(file_path: str, config: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
181
+ """
182
+ Main function to scan a file with bandit.
183
+
184
+ Args:
185
+ file_path: Path to Python file to scan
186
+ config: Optional configuration dictionary
187
+
188
+ Returns:
189
+ List of vulnerability dictionaries
190
+
191
+ Raises:
192
+ RuntimeError: If bandit is not available or execution fails
193
+ """
194
+ try:
195
+ # Run bandit
196
+ bandit_output = run_bandit(file_path, config)
197
+
198
+ # Parse results
199
+ vulnerabilities = parse_bandit_results(bandit_output, file_path)
200
+
201
+ # Filter by confidence if needed
202
+ if config is None:
203
+ config = load_config().get("scanners", {}).get("bandit", {})
204
+
205
+ confidence_level = config.get("confidence_level", "").upper()
206
+ if confidence_level:
207
+ confidence_order = {"HIGH": 2, "MEDIUM": 1, "LOW": 0}
208
+ min_confidence = confidence_order.get(confidence_level, 0)
209
+
210
+ vulnerabilities = [
211
+ v for v in vulnerabilities
212
+ if confidence_order.get(v["bandit_confidence"], 0) >= min_confidence
213
+ ]
214
+
215
+ return vulnerabilities
216
+
217
+ except Exception as e:
218
+ # Log the error but don't fail the entire scan
219
+ print(f"Warning: Bandit scan failed: {e}", file=sys.stderr)
220
+ return []
221
+
222
+
223
+ def get_bandit_version() -> str:
224
+ """
225
+ Get the installed bandit version.
226
+
227
+ Returns:
228
+ Version string or "not installed"
229
+ """
230
+ try:
231
+ result = subprocess.run(
232
+ ["bandit", "--version"],
233
+ capture_output=True,
234
+ text=True,
235
+ timeout=5
236
+ )
237
+ if result.returncode == 0:
238
+ # Parse version from output
239
+ for line in result.stdout.split("\n"):
240
+ if "bandit" in line.lower():
241
+ return line.strip()
242
+ return "unknown version"
243
+ except (FileNotFoundError, subprocess.TimeoutExpired):
244
+ return "not installed"
src/scanner/pattern_detector.py ADDED
@@ -0,0 +1,231 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Pattern-based security vulnerability detector using regular expressions.
3
+
4
+ Detects hardcoded secrets, credentials, and sensitive information patterns.
5
+ """
6
+
7
+ import re
8
+ from typing import Dict, List, Any
9
+
10
+ # Security patterns with regex, severity, and descriptions
11
+ SECURITY_PATTERNS = {
12
+ "aws_access_key": {
13
+ "regex": r"(?:AWS_ACCESS_KEY_ID|aws_access_key_id)\s*[:=]\s*['\"]?(AKIA[0-9A-Z]{16})['\"]?",
14
+ "severity": "CRITICAL",
15
+ "title": "하드코딩된 AWS Access Key 탐지",
16
+ "description": "AWS Access Key가 코드에 하드코딩되어 있습니다.",
17
+ },
18
+ "aws_secret_key": {
19
+ "regex": r"(?:AWS_SECRET_ACCESS_KEY|aws_secret_access_key)\s*[:=]\s*['\"]?([A-Za-z0-9/+=]{40})['\"]?",
20
+ "severity": "CRITICAL",
21
+ "title": "하드코딩된 AWS Secret Key 탐지",
22
+ "description": "AWS Secret Access Key가 코드에 하드코딩되어 있습니다.",
23
+ },
24
+ "api_key": {
25
+ "regex": r"(?:api[_-]?key|apikey|api[_-]?secret)\s*[:=]\s*['\"]([a-zA-Z0-9_\-]{20,})['\"]",
26
+ "severity": "HIGH",
27
+ "title": "하드코딩된 API 키 탐지",
28
+ "description": "API 키가 코드에 직접 하드코딩되어 있습니다.",
29
+ },
30
+ "github_token": {
31
+ "regex": r"\b(gh[ps]_[a-zA-Z0-9]{36,})\b",
32
+ "severity": "HIGH",
33
+ "title": "GitHub Personal Access Token 탐지",
34
+ "description": "GitHub 개인 액세스 토큰이 코드에 노출되어 있습니다.",
35
+ },
36
+ "jwt_token": {
37
+ "regex": r"\b(eyJ[a-zA-Z0-9_-]*\.eyJ[a-zA-Z0-9_-]*\.[a-zA-Z0-9_-]+)\b",
38
+ "severity": "HIGH",
39
+ "title": "JWT 토큰 하드코딩 탐지",
40
+ "description": "JWT 토큰이 코드에 하드코딩되어 있습니다.",
41
+ },
42
+ "password": {
43
+ "regex": r"(?:password|passwd|pwd)\s*[:=]\s*['\"]([^'\"]{4,})['\"]",
44
+ "severity": "MEDIUM",
45
+ "title": "하드코딩된 비밀번호 탐지",
46
+ "description": "비밀번호가 코드에 직접 작성되어 있습니다.",
47
+ },
48
+ "korean_ssn": {
49
+ "regex": r"\b(\d{6}[-]\d{7})\b",
50
+ "severity": "MEDIUM",
51
+ "title": "주민등록번호 패턴 탐지",
52
+ "description": "주민등록번호 형식의 데이터가 코드에 포함되어 있습니다.",
53
+ },
54
+ "credit_card": {
55
+ "regex": r"\b(\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4})\b",
56
+ "severity": "MEDIUM",
57
+ "title": "신용카드 번호 패턴 탐지",
58
+ "description": "신용카드 번호 형식의 데이터가 발견되었습니다.",
59
+ },
60
+ "phone_number": {
61
+ "regex": r"\b(0\d{1,2}[-\s]?\d{3,4}[-\s]?\d{4})\b",
62
+ "severity": "LOW",
63
+ "title": "전화번호 패턴 탐지",
64
+ "description": "전화번호가 코드에 포함되어 있습니다.",
65
+ },
66
+ "database_url": {
67
+ "regex": r"(?:postgresql|mysql|mongodb|redis)://([^:]+):([^@]+)@",
68
+ "severity": "CRITICAL",
69
+ "title": "데이터베이스 연결 문자열에 자격증명 포함",
70
+ "description": "데이터베이스 연결 문자열에 사용자명과 비밀번호가 포함되어 있습니다.",
71
+ },
72
+ "private_key": {
73
+ "regex": r"-----BEGIN (?:RSA |EC |DSA )?PRIVATE KEY-----",
74
+ "severity": "CRITICAL",
75
+ "title": "개인 키 하드코딩 탐지",
76
+ "description": "암호화 개인 키가 코드에 직접 포함되어 있습니다.",
77
+ },
78
+ }
79
+
80
+
81
+ def is_false_positive(line: str, pattern_type: str) -> bool:
82
+ """
83
+ Check if a detected pattern is likely a false positive.
84
+
85
+ Args:
86
+ line: The line of code containing the match
87
+ pattern_type: Type of pattern detected
88
+
89
+ Returns:
90
+ True if likely a false positive, False otherwise
91
+ """
92
+ # Skip commented lines (but with lower confidence)
93
+ if line.strip().startswith("#"):
94
+ return True
95
+
96
+ # Skip obvious example/test values
97
+ test_indicators = [
98
+ "example",
99
+ "test",
100
+ "dummy",
101
+ "fake",
102
+ "sample",
103
+ "mock",
104
+ "placeholder",
105
+ "TODO",
106
+ "FIXME",
107
+ "xxx",
108
+ "000",
109
+ ]
110
+
111
+ line_lower = line.lower()
112
+ for indicator in test_indicators:
113
+ if indicator in line_lower:
114
+ return True
115
+
116
+ # Pattern-specific false positive checks
117
+ if pattern_type == "credit_card":
118
+ # Common false positive: date ranges, version numbers
119
+ if re.search(r"(19|20)\d{2}", line): # Year pattern
120
+ return True
121
+
122
+ if pattern_type == "phone_number":
123
+ # Skip if looks like a date or other numeric pattern
124
+ if "date" in line_lower or "time" in line_lower:
125
+ return True
126
+
127
+ if pattern_type == "password":
128
+ # Skip if it's just a variable name assignment (no actual password)
129
+ if re.search(r'password\s*[:=]\s*["\']?\s*["\']?$', line):
130
+ return True
131
+
132
+ return False
133
+
134
+
135
+ def scan_patterns(file_path: str, code: str) -> List[Dict[str, Any]]:
136
+ """
137
+ Scan code for security vulnerability patterns.
138
+
139
+ Args:
140
+ file_path: Path to the file being scanned (for reference)
141
+ code: Source code to scan
142
+
143
+ Returns:
144
+ List of vulnerability dictionaries
145
+ """
146
+ vulnerabilities = []
147
+ lines = code.split("\n")
148
+
149
+ for line_num, line in enumerate(lines, start=1):
150
+ for pattern_name, pattern_info in SECURITY_PATTERNS.items():
151
+ regex = pattern_info["regex"]
152
+ matches = re.finditer(regex, line, re.IGNORECASE)
153
+
154
+ for match in matches:
155
+ # Check for false positives
156
+ if is_false_positive(line, pattern_name):
157
+ continue
158
+
159
+ # Extract matched text (mask sensitive parts)
160
+ matched_text = match.group(0)
161
+ if len(matched_text) > 50:
162
+ # Truncate long matches for display
163
+ matched_text = matched_text[:47] + "..."
164
+
165
+ # Mask the actual secret value for security
166
+ code_snippet = line.strip()
167
+ if len(code_snippet) > 100:
168
+ code_snippet = code_snippet[:97] + "..."
169
+
170
+ vulnerability = {
171
+ "id": f"pattern-{pattern_name}",
172
+ "severity": pattern_info["severity"],
173
+ "title": pattern_info["title"],
174
+ "description": pattern_info["description"],
175
+ "line_number": line_num,
176
+ "code_snippet": code_snippet,
177
+ "pattern_type": pattern_name,
178
+ "file_path": file_path,
179
+ "scanner": "pattern_detector",
180
+ }
181
+
182
+ vulnerabilities.append(vulnerability)
183
+
184
+ return vulnerabilities
185
+
186
+
187
+ def get_pattern_info(pattern_type: str) -> Dict[str, str]:
188
+ """
189
+ Get information about a specific pattern type.
190
+
191
+ Args:
192
+ pattern_type: Type of security pattern
193
+
194
+ Returns:
195
+ Dictionary with pattern information
196
+ """
197
+ return SECURITY_PATTERNS.get(
198
+ pattern_type,
199
+ {
200
+ "severity": "MEDIUM",
201
+ "title": "보안 패턴 탐지",
202
+ "description": "알 수 없는 보안 패턴이 발견되었습니다.",
203
+ },
204
+ )
205
+
206
+
207
+ def list_available_patterns() -> List[str]:
208
+ """
209
+ List all available security patterns.
210
+
211
+ Returns:
212
+ List of pattern names
213
+ """
214
+ return list(SECURITY_PATTERNS.keys())
215
+
216
+
217
+ def get_patterns_by_severity(severity: str) -> List[str]:
218
+ """
219
+ Get patterns filtered by severity level.
220
+
221
+ Args:
222
+ severity: Severity level (CRITICAL, HIGH, MEDIUM, LOW)
223
+
224
+ Returns:
225
+ List of pattern names with matching severity
226
+ """
227
+ return [
228
+ name
229
+ for name, info in SECURITY_PATTERNS.items()
230
+ if info["severity"] == severity.upper()
231
+ ]
src/scanner/semgrep_wrapper.py ADDED
@@ -0,0 +1,363 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Semgrep integration wrapper for multi-language security scanning.
3
+
4
+ Runs semgrep as a subprocess and parses the JSON output.
5
+ """
6
+
7
+ import subprocess
8
+ import json
9
+ import tempfile
10
+ import os
11
+ import sys
12
+ from pathlib import Path
13
+ from typing import Dict, List, Any, Optional
14
+
15
+ # Add parent directory to path to import utils
16
+ sys.path.insert(0, str(Path(__file__).parent.parent))
17
+
18
+ # Severity mapping from semgrep to standard format
19
+ SEVERITY_MAPPING = {
20
+ "ERROR": "CRITICAL",
21
+ "WARNING": "HIGH",
22
+ "INFO": "MEDIUM"
23
+ }
24
+
25
+ # Default Python security rule categories
26
+ DEFAULT_PYTHON_RULES = [
27
+ "python.lang.security",
28
+ "python.flask.security",
29
+ "python.django.security",
30
+ "python.requests.security"
31
+ ]
32
+
33
+
34
+ def check_semgrep_installed() -> bool:
35
+ """
36
+ Check if semgrep is installed and accessible.
37
+
38
+ Returns:
39
+ True if semgrep is available, False otherwise
40
+ """
41
+ try:
42
+ result = subprocess.run(
43
+ ["semgrep", "--version"],
44
+ capture_output=True,
45
+ text=True,
46
+ timeout=5
47
+ )
48
+ return result.returncode == 0
49
+ except (subprocess.TimeoutExpired, FileNotFoundError):
50
+ return False
51
+
52
+
53
+ def find_custom_rules() -> List[str]:
54
+ """
55
+ Find custom rule files in the rules/ directory.
56
+
57
+ Returns:
58
+ List of paths to custom rule files
59
+ """
60
+ rules_dir = Path(__file__).parent.parent.parent / "rules"
61
+ custom_rules = []
62
+
63
+ if rules_dir.exists() and rules_dir.is_dir():
64
+ # Look for YAML rule files
65
+ for pattern in ["*.yaml", "*.yml"]:
66
+ custom_rules.extend([str(f) for f in rules_dir.glob(pattern)])
67
+
68
+ return custom_rules
69
+
70
+
71
+ def load_config_rules(config: Optional[Dict[str, Any]] = None) -> List[str]:
72
+ """
73
+ Load custom rules from configuration.
74
+
75
+ Args:
76
+ config: Configuration dictionary with optional 'custom_rules' key
77
+
78
+ Returns:
79
+ List of rule file paths from configuration
80
+ """
81
+ if not config:
82
+ return []
83
+
84
+ custom_rules = config.get("custom_rules", [])
85
+ if isinstance(custom_rules, list):
86
+ return custom_rules
87
+
88
+ return []
89
+
90
+
91
+ def run_semgrep(
92
+ file_path: str,
93
+ config: Optional[Dict[str, Any]] = None,
94
+ timeout: int = 30
95
+ ) -> Dict[str, Any]:
96
+ """
97
+ Run semgrep on a file and return JSON results.
98
+
99
+ Args:
100
+ file_path: Path to the file to scan
101
+ config: Optional configuration dictionary
102
+ timeout: Timeout in seconds (default 30)
103
+
104
+ Returns:
105
+ Dictionary with semgrep results or error information
106
+ """
107
+ # Check if semgrep is installed
108
+ if not check_semgrep_installed():
109
+ return {
110
+ "error": "semgrep_not_installed",
111
+ "message": "Semgrep is not installed. Install it with: pip install semgrep"
112
+ }
113
+
114
+ # Build semgrep command
115
+ cmd = ["semgrep", "--json", "--quiet"]
116
+
117
+ # Add custom rules
118
+ custom_rules = find_custom_rules()
119
+ config_rules = load_config_rules(config)
120
+ all_rules = custom_rules + config_rules
121
+
122
+ if all_rules:
123
+ # Use custom rules
124
+ for rule_file in all_rules:
125
+ if os.path.exists(rule_file):
126
+ cmd.extend(["--config", rule_file])
127
+ else:
128
+ # Use auto configuration (community rules)
129
+ cmd.extend(["--config", "auto"])
130
+
131
+ # Add target file
132
+ cmd.append(file_path)
133
+
134
+ try:
135
+ result = subprocess.run(
136
+ cmd,
137
+ capture_output=True,
138
+ text=True,
139
+ timeout=timeout
140
+ )
141
+
142
+ # Parse JSON output
143
+ if result.stdout:
144
+ try:
145
+ output = json.loads(result.stdout)
146
+ return output
147
+ except json.JSONDecodeError as e:
148
+ return {
149
+ "error": "json_parse_error",
150
+ "message": f"Failed to parse semgrep output: {str(e)}",
151
+ "raw_output": result.stdout
152
+ }
153
+ else:
154
+ # No output means no findings
155
+ return {"results": []}
156
+
157
+ except subprocess.TimeoutExpired:
158
+ return {
159
+ "error": "timeout",
160
+ "message": f"Semgrep scan timed out after {timeout} seconds"
161
+ }
162
+ except FileNotFoundError:
163
+ return {
164
+ "error": "semgrep_not_found",
165
+ "message": "Semgrep executable not found in PATH"
166
+ }
167
+ except Exception as e:
168
+ return {
169
+ "error": "unexpected_error",
170
+ "message": f"Unexpected error running semgrep: {str(e)}"
171
+ }
172
+
173
+
174
+ def parse_semgrep_results(
175
+ semgrep_output: Dict[str, Any],
176
+ file_path: str
177
+ ) -> List[Dict[str, Any]]:
178
+ """
179
+ Parse semgrep JSON output into standard vulnerability format.
180
+
181
+ Args:
182
+ semgrep_output: Raw semgrep JSON output
183
+ file_path: Path to the scanned file
184
+
185
+ Returns:
186
+ List of standardized vulnerability dictionaries
187
+ """
188
+ vulnerabilities = []
189
+
190
+ # Check for errors
191
+ if "error" in semgrep_output:
192
+ # Return empty list for errors - they've been logged
193
+ return vulnerabilities
194
+
195
+ # Get results from semgrep output
196
+ results = semgrep_output.get("results", [])
197
+
198
+ for finding in results:
199
+ # Extract basic information
200
+ check_id = finding.get("check_id", "unknown")
201
+ message = finding.get("extra", {}).get("message", finding.get("message", ""))
202
+ severity = finding.get("extra", {}).get("severity", "INFO").upper()
203
+
204
+ # Map severity to standard format
205
+ standard_severity = SEVERITY_MAPPING.get(severity, "MEDIUM")
206
+
207
+ # Get location information
208
+ start = finding.get("start", {})
209
+ line_number = start.get("line", 0)
210
+
211
+ # Get code snippet
212
+ lines = finding.get("extra", {}).get("lines", "")
213
+ if not lines:
214
+ # Try to extract from the finding
215
+ lines = finding.get("lines", "")
216
+ code_snippet = lines.strip() if lines else ""
217
+
218
+ # Get metadata
219
+ metadata = finding.get("extra", {}).get("metadata", {})
220
+
221
+ # Create vulnerability entry
222
+ vulnerability = {
223
+ "id": f"semgrep-{check_id}",
224
+ "severity": standard_severity,
225
+ "title": f"Semgrep: {message[:80]}",
226
+ "description": message,
227
+ "line_number": line_number,
228
+ "code_snippet": code_snippet,
229
+ "file_path": file_path,
230
+ "scanner": "semgrep",
231
+ "semgrep_rule_id": check_id,
232
+ "semgrep_message": message,
233
+ "semgrep_metadata": metadata
234
+ }
235
+
236
+ vulnerabilities.append(vulnerability)
237
+
238
+ return vulnerabilities
239
+
240
+
241
+ def scan_with_semgrep(
242
+ file_path: str,
243
+ code: str,
244
+ config: Optional[Dict[str, Any]] = None
245
+ ) -> List[Dict[str, Any]]:
246
+ """
247
+ Scan code using semgrep.
248
+
249
+ Args:
250
+ file_path: Original file path (for reference)
251
+ code: Source code to scan
252
+ config: Optional configuration dictionary
253
+
254
+ Returns:
255
+ List of vulnerability dictionaries
256
+ """
257
+ vulnerabilities = []
258
+
259
+ # Create temporary file with the code
260
+ with tempfile.NamedTemporaryFile(
261
+ mode='w',
262
+ suffix=Path(file_path).suffix or '.py',
263
+ delete=False
264
+ ) as temp_file:
265
+ temp_file.write(code)
266
+ temp_path = temp_file.name
267
+
268
+ try:
269
+ # Run semgrep on temporary file
270
+ timeout = config.get("semgrep_timeout", 30) if config else 30
271
+ semgrep_output = run_semgrep(temp_path, config, timeout)
272
+
273
+ # Parse results
274
+ vulnerabilities = parse_semgrep_results(semgrep_output, file_path)
275
+
276
+ finally:
277
+ # Clean up temporary file
278
+ try:
279
+ os.unlink(temp_path)
280
+ except Exception:
281
+ pass # Ignore cleanup errors
282
+
283
+ return vulnerabilities
284
+
285
+
286
+ def scan_with_custom_rules(
287
+ file_path: str,
288
+ code: str,
289
+ rule_files: List[str]
290
+ ) -> List[Dict[str, Any]]:
291
+ """
292
+ Scan code using specific custom rule files.
293
+
294
+ Args:
295
+ file_path: Original file path (for reference)
296
+ code: Source code to scan
297
+ rule_files: List of paths to rule files
298
+
299
+ Returns:
300
+ List of vulnerability dictionaries
301
+ """
302
+ config = {"custom_rules": rule_files}
303
+ return scan_with_semgrep(file_path, code, config)
304
+
305
+
306
+ # Test function
307
+ def test_semgrep_wrapper():
308
+ """Test the semgrep wrapper with sample vulnerable code."""
309
+ print("Testing Semgrep Wrapper...")
310
+ print("-" * 50)
311
+
312
+ # Check if semgrep is installed
313
+ if not check_semgrep_installed():
314
+ print("❌ Semgrep is not installed")
315
+ print("Install it with: pip install semgrep")
316
+ return
317
+
318
+ print("✓ Semgrep is installed")
319
+
320
+ # Test code with security issues
321
+ test_code = '''
322
+ import pickle
323
+ import subprocess
324
+
325
+ # Insecure deserialization
326
+ def load_data(data):
327
+ return pickle.loads(data) # Vulnerable to code execution
328
+
329
+ # Command injection
330
+ def run_command(user_input):
331
+ subprocess.call("ls " + user_input, shell=True) # Shell injection
332
+
333
+ # Hardcoded secret
334
+ api_key = "sk-1234567890abcdef"
335
+ '''
336
+
337
+ print("\nScanning test code...")
338
+ vulnerabilities = scan_with_semgrep("test.py", test_code)
339
+
340
+ print(f"\n✓ Found {len(vulnerabilities)} issue(s)")
341
+
342
+ if vulnerabilities:
343
+ print("\nDetected vulnerabilities:")
344
+ for i, vuln in enumerate(vulnerabilities, 1):
345
+ print(f"\n[{i}] {vuln['title']}")
346
+ print(f" Severity: {vuln['severity']}")
347
+ print(f" Line: {vuln['line_number']}")
348
+ print(f" Rule: {vuln['semgrep_rule_id']}")
349
+
350
+ # Test custom rules
351
+ custom_rules = find_custom_rules()
352
+ if custom_rules:
353
+ print(f"\n✓ Found {len(custom_rules)} custom rule file(s):")
354
+ for rule in custom_rules:
355
+ print(f" - {rule}")
356
+ else:
357
+ print("\n✓ No custom rule files found in rules/")
358
+
359
+ print("\n✅ Semgrep wrapper test: SUCCESS")
360
+
361
+
362
+ if __name__ == "__main__":
363
+ test_semgrep_wrapper()
src/scanner/sql_injection.py ADDED
@@ -0,0 +1,273 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ SQL Injection vulnerability detector using AST analysis.
3
+
4
+ Detects unsafe SQL query construction patterns in Python code.
5
+ """
6
+
7
+ import ast
8
+ import re
9
+ from typing import Dict, List, Any, Optional
10
+
11
+ # SQL keywords that indicate a query
12
+ SQL_KEYWORDS = [
13
+ "SELECT", "INSERT", "UPDATE", "DELETE", "DROP", "CREATE", "ALTER",
14
+ "FROM", "WHERE", "JOIN", "UNION", "ORDER BY", "GROUP BY",
15
+ "EXEC", "EXECUTE", "TRUNCATE", "GRANT", "REVOKE"
16
+ ]
17
+
18
+ # Database methods that execute SQL
19
+ EXECUTE_METHODS = ["execute", "executemany", "raw", "extra"]
20
+
21
+
22
+ class SQLInjectionVisitor(ast.NodeVisitor):
23
+ """AST visitor to detect SQL injection vulnerabilities."""
24
+
25
+ def __init__(self, code_lines: List[str]):
26
+ """
27
+ Initialize the visitor.
28
+
29
+ Args:
30
+ code_lines: Source code split by lines
31
+ """
32
+ self.vulnerabilities = []
33
+ self.code_lines = code_lines
34
+
35
+ def get_line_content(self, line_number: int) -> str:
36
+ """Get the content of a specific line."""
37
+ if 0 < line_number <= len(self.code_lines):
38
+ return self.code_lines[line_number - 1].strip()
39
+ return ""
40
+
41
+ def contains_sql_keywords(self, text: str) -> bool:
42
+ """Check if text contains SQL keywords."""
43
+ text_upper = text.upper()
44
+ return any(keyword in text_upper for keyword in SQL_KEYWORDS)
45
+
46
+ def extract_string_content(self, node: ast.AST) -> Optional[str]:
47
+ """Extract string content from various node types."""
48
+ if isinstance(node, ast.Constant) and isinstance(node.value, str):
49
+ return node.value
50
+ elif isinstance(node, ast.Str): # Python < 3.8
51
+ return node.s
52
+ elif isinstance(node, ast.JoinedStr):
53
+ # f-string - combine literal parts
54
+ parts = []
55
+ for value in node.values:
56
+ if isinstance(value, ast.Constant):
57
+ parts.append(str(value.value))
58
+ elif isinstance(value, ast.FormattedValue):
59
+ parts.append("{}")
60
+ return "".join(parts)
61
+ return None
62
+
63
+ def visit_JoinedStr(self, node: ast.JoinedStr):
64
+ """Detect f-strings with SQL keywords (potential SQL injection)."""
65
+ # Extract the f-string content
66
+ string_content = self.extract_string_content(node)
67
+
68
+ if string_content and self.contains_sql_keywords(string_content):
69
+ # Check if it has any FormattedValue (variable interpolation)
70
+ has_variables = any(isinstance(v, ast.FormattedValue) for v in node.values)
71
+
72
+ if has_variables:
73
+ line_number = node.lineno
74
+ code_snippet = self.get_line_content(line_number)
75
+
76
+ self.vulnerabilities.append({
77
+ "id": "sql-injection-fstring",
78
+ "severity": "CRITICAL",
79
+ "title": "SQL Injection: f-string으로 SQL 쿼리 생성",
80
+ "description": "f-string을 사용하여 SQL 쿼리에 변수를 직접 삽입하고 있습니다.",
81
+ "line_number": line_number,
82
+ "code_snippet": code_snippet,
83
+ "vulnerable_pattern": "f-string interpolation",
84
+ "recommendation": "파라미터화된 쿼리를 사용하세요: cursor.execute('SELECT * FROM users WHERE id=%s', (user_id,))",
85
+ "scanner": "sql_injection",
86
+ })
87
+
88
+ self.generic_visit(node)
89
+
90
+ def visit_BinOp(self, node: ast.BinOp):
91
+ """Detect string concatenation with SQL keywords."""
92
+ # Check for string concatenation with + operator
93
+ if isinstance(node.op, ast.Add):
94
+ left_str = self.extract_string_content(node.left)
95
+ right_str = self.extract_string_content(node.right)
96
+
97
+ # Check if either side contains SQL keywords
98
+ combined = (left_str or "") + (right_str or "")
99
+ if self.contains_sql_keywords(combined):
100
+ line_number = node.lineno
101
+ code_snippet = self.get_line_content(line_number)
102
+
103
+ self.vulnerabilities.append({
104
+ "id": "sql-injection-concat",
105
+ "severity": "CRITICAL",
106
+ "title": "SQL Injection: 문자열 연결로 SQL 쿼리 생성",
107
+ "description": "+ 연산자로 SQL 쿼리를 동적으로 생성하고 있습니다.",
108
+ "line_number": line_number,
109
+ "code_snippet": code_snippet,
110
+ "vulnerable_pattern": "string concatenation",
111
+ "recommendation": "파라미터화된 쿼리를 사용하세요",
112
+ "scanner": "sql_injection",
113
+ })
114
+
115
+ # Check for % formatting
116
+ elif isinstance(node.op, ast.Mod):
117
+ left_str = self.extract_string_content(node.left)
118
+ if left_str and self.contains_sql_keywords(left_str):
119
+ line_number = node.lineno
120
+ code_snippet = self.get_line_content(line_number)
121
+
122
+ # Check if it looks like old-style parameterized query
123
+ # Safe: "SELECT * FROM users WHERE id=%s" % (user_id,)
124
+ # Unsafe: "SELECT * FROM users WHERE name='%s'" % username
125
+ if "'%s'" in left_str or '"%s"' in left_str:
126
+ self.vulnerabilities.append({
127
+ "id": "sql-injection-percent",
128
+ "severity": "CRITICAL",
129
+ "title": "SQL Injection: % 포맷팅으로 SQL 쿼리 생성",
130
+ "description": "% 연산자로 SQL 쿼리에 변수를 직접 삽입하고 있습니다.",
131
+ "line_number": line_number,
132
+ "code_snippet": code_snippet,
133
+ "vulnerable_pattern": "percent formatting",
134
+ "recommendation": "파라미터화된 쿼리를 사용하세요",
135
+ "scanner": "sql_injection",
136
+ })
137
+
138
+ self.generic_visit(node)
139
+
140
+ def visit_Call(self, node: ast.Call):
141
+ """Detect execute() calls and .format() on SQL strings."""
142
+ # Check if this is a .format() call on a string with SQL
143
+ if isinstance(node.func, ast.Attribute) and node.func.attr == "format":
144
+ if isinstance(node.func.value, (ast.Constant, ast.Str)):
145
+ string_content = self.extract_string_content(node.func.value)
146
+ if string_content and self.contains_sql_keywords(string_content):
147
+ line_number = node.lineno
148
+ code_snippet = self.get_line_content(line_number)
149
+
150
+ self.vulnerabilities.append({
151
+ "id": "sql-injection-format",
152
+ "severity": "CRITICAL",
153
+ "title": "SQL Injection: .format()으로 SQL 쿼리 생성",
154
+ "description": ".format() 메서드로 SQL 쿼리를 동적으로 생성하고 있습니다.",
155
+ "line_number": line_number,
156
+ "code_snippet": code_snippet,
157
+ "vulnerable_pattern": "string.format()",
158
+ "recommendation": "파라미터화된 쿼리를 사용하세요",
159
+ "scanner": "sql_injection",
160
+ })
161
+
162
+ # Check for execute() calls with dynamic strings
163
+ func_name = None
164
+ if isinstance(node.func, ast.Name):
165
+ func_name = node.func.id
166
+ elif isinstance(node.func, ast.Attribute):
167
+ func_name = node.func.attr
168
+
169
+ if func_name in EXECUTE_METHODS and node.args:
170
+ first_arg = node.args[0]
171
+
172
+ # Check if first argument is a dynamic string (not a simple constant)
173
+ is_dynamic = False
174
+ if isinstance(first_arg, (ast.JoinedStr, ast.BinOp, ast.Call)):
175
+ is_dynamic = True
176
+
177
+ # Also check if it's a formatted string
178
+ if isinstance(first_arg, (ast.Constant, ast.Str)):
179
+ content = self.extract_string_content(first_arg)
180
+ # Check for string formatting placeholders
181
+ if content and ("{}" in content or "{0" in content):
182
+ is_dynamic = True
183
+
184
+ if is_dynamic:
185
+ line_number = node.lineno
186
+ code_snippet = self.get_line_content(line_number)
187
+
188
+ self.vulnerabilities.append({
189
+ "id": f"sql-injection-{func_name}",
190
+ "severity": "CRITICAL",
191
+ "title": f"SQL Injection: {func_name}()에 동적 SQL 쿼리 사용",
192
+ "description": f"{func_name}() 메서드에 동적으로 생성된 SQL 쿼리를 전달하고 있습니다.",
193
+ "line_number": line_number,
194
+ "code_snippet": code_snippet,
195
+ "vulnerable_pattern": f"dynamic SQL in {func_name}()",
196
+ "recommendation": "파라미터화된 쿼리를 사용하세요",
197
+ "scanner": "sql_injection",
198
+ })
199
+
200
+ self.generic_visit(node)
201
+
202
+
203
+ def scan_sql_injection(file_path: str, code: str) -> List[Dict[str, Any]]:
204
+ """
205
+ Scan Python code for SQL injection vulnerabilities.
206
+
207
+ Args:
208
+ file_path: Path to the file being scanned
209
+ code: Source code to analyze
210
+
211
+ Returns:
212
+ List of vulnerability dictionaries
213
+ """
214
+ vulnerabilities = []
215
+
216
+ try:
217
+ # Parse code into AST
218
+ tree = ast.parse(code)
219
+
220
+ # Create visitor and walk the AST
221
+ code_lines = code.split("\n")
222
+ visitor = SQLInjectionVisitor(code_lines)
223
+ visitor.visit(tree)
224
+
225
+ # Add file path to all vulnerabilities
226
+ for vuln in visitor.vulnerabilities:
227
+ vuln["file_path"] = file_path
228
+
229
+ vulnerabilities = visitor.vulnerabilities
230
+
231
+ except SyntaxError:
232
+ # If code has syntax errors, we can't analyze it
233
+ # Return empty list rather than failing
234
+ pass
235
+ except Exception as e:
236
+ # Log unexpected errors but don't fail
237
+ import sys
238
+ print(f"Warning: SQL injection scan error: {e}", file=sys.stderr)
239
+
240
+ return vulnerabilities
241
+
242
+
243
+ def check_sql_pattern_regex(code: str) -> List[Dict[str, Any]]:
244
+ """
245
+ Fallback regex-based SQL injection detection.
246
+
247
+ Args:
248
+ code: Source code to scan
249
+
250
+ Returns:
251
+ List of vulnerability dictionaries
252
+ """
253
+ vulnerabilities = []
254
+ lines = code.split("\n")
255
+
256
+ # Pattern for f-strings with SQL keywords
257
+ fstring_pattern = r'f["\'].*(?:SELECT|INSERT|UPDATE|DELETE|FROM|WHERE).*\{.*\}.*["\']'
258
+
259
+ for line_num, line in enumerate(lines, start=1):
260
+ if re.search(fstring_pattern, line, re.IGNORECASE):
261
+ vulnerabilities.append({
262
+ "id": "sql-injection-regex",
263
+ "severity": "HIGH",
264
+ "title": "SQL Injection 가능성: f-string 사용 탐지",
265
+ "description": "SQL 쿼리에 f-string 변수 삽입이 의심됩니다.",
266
+ "line_number": line_num,
267
+ "code_snippet": line.strip(),
268
+ "vulnerable_pattern": "f-string with SQL keywords",
269
+ "recommendation": "파라미터화된 쿼리 사용 권장",
270
+ "scanner": "sql_injection",
271
+ })
272
+
273
+ return vulnerabilities
src/server.py ADDED
@@ -0,0 +1,340 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ Security Scanner MCP Server
4
+
5
+ An MCP server that scans Python code for security vulnerabilities
6
+ and provides beginner-friendly explanations.
7
+ """
8
+
9
+ import argparse
10
+ import asyncio
11
+ import json
12
+ import sys
13
+ import tempfile
14
+ from pathlib import Path
15
+ from typing import Any, Dict, List
16
+
17
+ # MCP imports
18
+ from mcp.server import Server
19
+ from mcp.server.stdio import stdio_server
20
+ from mcp.types import Tool, TextContent
21
+
22
+ # Local imports
23
+ from utils import (
24
+ load_config,
25
+ setup_logging,
26
+ validate_severity_threshold,
27
+ filter_by_severity,
28
+ )
29
+
30
+ # Initialize logger (will be configured in main)
31
+ logger = None
32
+
33
+
34
+ class SecurityScannerServer:
35
+ """MCP server for security scanning."""
36
+
37
+ def __init__(self, debug: bool = False):
38
+ """
39
+ Initialize the Security Scanner MCP server.
40
+
41
+ Args:
42
+ debug: Enable debug logging
43
+ """
44
+ global logger
45
+ logger = setup_logging(debug=debug)
46
+
47
+ self.config = load_config()
48
+ self.server = Server(self.config["server"]["name"])
49
+ self.debug = debug
50
+
51
+ logger.info(
52
+ f"Initializing {self.config['server']['name']} "
53
+ f"v{self.config['server']['version']}"
54
+ )
55
+
56
+ # Register handlers
57
+ self._register_handlers()
58
+
59
+ def _register_handlers(self):
60
+ """Register MCP tool handlers."""
61
+
62
+ @self.server.list_tools()
63
+ async def list_tools() -> List[Tool]:
64
+ """List available tools."""
65
+ return [
66
+ Tool(
67
+ name="scan_security",
68
+ description=self.config["tools"]["scan_security"]["description"],
69
+ inputSchema={
70
+ "type": "object",
71
+ "properties": {
72
+ "code": {
73
+ "type": "string",
74
+ "description": "분석할 Python 소스 코드",
75
+ },
76
+ "severity_threshold": {
77
+ "type": "string",
78
+ "enum": ["CRITICAL", "HIGH", "MEDIUM", "LOW"],
79
+ "description": "보고할 최소 심각도 수준",
80
+ "default": self.config["severity"]["default_threshold"],
81
+ },
82
+ },
83
+ "required": ["code"],
84
+ },
85
+ )
86
+ ]
87
+
88
+ @self.server.call_tool()
89
+ async def call_tool(name: str, arguments: Any) -> List[TextContent]:
90
+ """
91
+ Handle tool calls.
92
+
93
+ Args:
94
+ name: Tool name
95
+ arguments: Tool arguments
96
+
97
+ Returns:
98
+ List of text content with results
99
+ """
100
+ if name != "scan_security":
101
+ raise ValueError(f"Unknown tool: {name}")
102
+
103
+ logger.info(f"Tool called: {name}")
104
+ logger.debug(f"Arguments: {arguments}")
105
+
106
+ try:
107
+ result = await self._scan_security(arguments)
108
+ return [TextContent(type="text", text=json.dumps(result, indent=2, ensure_ascii=False))]
109
+
110
+ except Exception as e:
111
+ logger.error(f"Error during security scan: {e}", exc_info=True)
112
+ error_result = {
113
+ "error": str(e),
114
+ "summary": {
115
+ "total_issues": 0,
116
+ "critical": 0,
117
+ "high": 0,
118
+ "medium": 0,
119
+ "low": 0,
120
+ },
121
+ "vulnerabilities": [],
122
+ }
123
+ return [TextContent(type="text", text=json.dumps(error_result, indent=2, ensure_ascii=False))]
124
+
125
+ async def _scan_security(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
126
+ """
127
+ Perform security scan on provided code.
128
+
129
+ Args:
130
+ arguments: Dictionary containing 'code' and optional 'severity_threshold'
131
+
132
+ Returns:
133
+ Dictionary with scan results
134
+ """
135
+ # Extract and validate arguments
136
+ code = arguments.get("code", "").strip()
137
+ if not code:
138
+ raise ValueError("Code parameter is required and cannot be empty")
139
+
140
+ severity_threshold = arguments.get(
141
+ "severity_threshold",
142
+ self.config["severity"]["default_threshold"]
143
+ ).upper()
144
+
145
+ if not validate_severity_threshold(severity_threshold):
146
+ raise ValueError(
147
+ f"Invalid severity threshold: {severity_threshold}. "
148
+ f"Must be one of: CRITICAL, HIGH, MEDIUM, LOW"
149
+ )
150
+
151
+ logger.info(f"Starting security scan (threshold: {severity_threshold})")
152
+ logger.debug(f"Code length: {len(code)} characters")
153
+
154
+ # Check file size limit
155
+ max_size_mb = self.config["performance"]["max_file_size_mb"]
156
+ code_size_mb = len(code.encode("utf-8")) / (1024 * 1024)
157
+ if code_size_mb > max_size_mb:
158
+ raise ValueError(
159
+ f"Code size ({code_size_mb:.2f}MB) exceeds maximum "
160
+ f"allowed size ({max_size_mb}MB)"
161
+ )
162
+
163
+ # Create temporary file for scanning
164
+ with tempfile.NamedTemporaryFile(
165
+ mode="w",
166
+ suffix=".py",
167
+ delete=False,
168
+ encoding="utf-8"
169
+ ) as tmp_file:
170
+ tmp_file.write(code)
171
+ tmp_file_path = tmp_file.name
172
+
173
+ try:
174
+ # Collect results from all enabled scanners
175
+ all_vulnerabilities = []
176
+
177
+ # Run pattern detector
178
+ if self.config["scanners"]["pattern_detector"]["enabled"]:
179
+ logger.info("Running pattern detector...")
180
+ try:
181
+ from scanner.pattern_detector import scan_patterns
182
+ pattern_results = scan_patterns(tmp_file_path, code)
183
+ all_vulnerabilities.extend(pattern_results)
184
+ logger.info(f"Pattern detector found {len(pattern_results)} issues")
185
+ except ImportError:
186
+ logger.warning("Pattern detector module not available (not yet implemented)")
187
+ except Exception as e:
188
+ logger.error(f"Pattern detector error: {e}")
189
+
190
+ # Run SQL injection detector
191
+ if self.config["scanners"]["sql_injection"]["enabled"]:
192
+ logger.info("Running SQL injection detector...")
193
+ try:
194
+ from scanner.sql_injection import scan_sql_injection
195
+ sql_results = scan_sql_injection(tmp_file_path, code)
196
+ all_vulnerabilities.extend(sql_results)
197
+ logger.info(f"SQL injection detector found {len(sql_results)} issues")
198
+ except ImportError:
199
+ logger.warning("SQL injection detector module not available (not yet implemented)")
200
+ except Exception as e:
201
+ logger.error(f"SQL injection detector error: {e}")
202
+
203
+ # Run bandit
204
+ if self.config["scanners"]["bandit"]["enabled"]:
205
+ logger.info("Running bandit scanner...")
206
+ try:
207
+ from scanner.bandit_wrapper import scan_with_bandit
208
+ bandit_results = scan_with_bandit(tmp_file_path)
209
+ all_vulnerabilities.extend(bandit_results)
210
+ logger.info(f"Bandit found {len(bandit_results)} issues")
211
+ except ImportError:
212
+ logger.warning("Bandit wrapper module not available (not yet implemented)")
213
+ except Exception as e:
214
+ logger.error(f"Bandit scanner error: {e}")
215
+
216
+ # Run semgrep
217
+ if self.config["scanners"]["semgrep"]["enabled"]:
218
+ logger.info("Running semgrep scanner...")
219
+ try:
220
+ from scanner.semgrep_wrapper import scan_with_semgrep
221
+ semgrep_results = scan_with_semgrep(tmp_file_path)
222
+ all_vulnerabilities.extend(semgrep_results)
223
+ logger.info(f"Semgrep found {len(semgrep_results)} issues")
224
+ except ImportError:
225
+ logger.warning("Semgrep wrapper module not available (not yet implemented)")
226
+ except Exception as e:
227
+ logger.error(f"Semgrep scanner error: {e}")
228
+
229
+ # Filter by severity threshold
230
+ filtered_vulnerabilities = filter_by_severity(
231
+ all_vulnerabilities,
232
+ severity_threshold
233
+ )
234
+
235
+ logger.info(
236
+ f"Total issues found: {len(all_vulnerabilities)}, "
237
+ f"after filtering: {len(filtered_vulnerabilities)}"
238
+ )
239
+
240
+ # Format results
241
+ try:
242
+ from formatter import format_results
243
+ formatted_results = format_results(
244
+ filtered_vulnerabilities,
245
+ severity_threshold
246
+ )
247
+ except ImportError:
248
+ logger.warning("Formatter module not available, using basic format")
249
+ formatted_results = self._basic_format_results(
250
+ filtered_vulnerabilities,
251
+ severity_threshold
252
+ )
253
+
254
+ return formatted_results
255
+
256
+ finally:
257
+ # Clean up temporary file
258
+ try:
259
+ Path(tmp_file_path).unlink()
260
+ except Exception as e:
261
+ logger.warning(f"Failed to delete temporary file: {e}")
262
+
263
+ def _basic_format_results(
264
+ self,
265
+ vulnerabilities: List[Dict[str, Any]],
266
+ threshold: str
267
+ ) -> Dict[str, Any]:
268
+ """
269
+ Basic result formatting when formatter module is not available.
270
+
271
+ Args:
272
+ vulnerabilities: List of vulnerabilities
273
+ threshold: Severity threshold used
274
+
275
+ Returns:
276
+ Formatted results dictionary
277
+ """
278
+ # Count by severity
279
+ severity_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0}
280
+
281
+ for vuln in vulnerabilities:
282
+ severity = vuln.get("severity", "LOW").lower()
283
+ if severity in severity_counts:
284
+ severity_counts[severity] += 1
285
+
286
+ return {
287
+ "summary": {
288
+ "total_issues": len(vulnerabilities),
289
+ "critical": severity_counts["critical"],
290
+ "high": severity_counts["high"],
291
+ "medium": severity_counts["medium"],
292
+ "low": severity_counts["low"],
293
+ "severity_threshold": threshold,
294
+ },
295
+ "vulnerabilities": vulnerabilities,
296
+ }
297
+
298
+ async def run(self):
299
+ """Run the MCP server."""
300
+ logger.info("Starting MCP server...")
301
+
302
+ async with stdio_server() as (read_stream, write_stream):
303
+ logger.info("Server is ready and listening on stdio")
304
+ await self.server.run(
305
+ read_stream,
306
+ write_stream,
307
+ self.server.create_initialization_options()
308
+ )
309
+
310
+
311
+ def main():
312
+ """Main entry point for the MCP server."""
313
+ parser = argparse.ArgumentParser(
314
+ description="Security Scanner MCP Server"
315
+ )
316
+ parser.add_argument(
317
+ "--debug",
318
+ action="store_true",
319
+ help="Enable debug logging"
320
+ )
321
+
322
+ args = parser.parse_args()
323
+
324
+ # Create and run server
325
+ server = SecurityScannerServer(debug=args.debug)
326
+
327
+ try:
328
+ asyncio.run(server.run())
329
+ except KeyboardInterrupt:
330
+ if logger:
331
+ logger.info("Server stopped by user")
332
+ sys.exit(0)
333
+ except Exception as e:
334
+ if logger:
335
+ logger.error(f"Server error: {e}", exc_info=True)
336
+ sys.exit(1)
337
+
338
+
339
+ if __name__ == "__main__":
340
+ main()
src/utils.py ADDED
@@ -0,0 +1,149 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Utility functions for the Security Scanner MCP server.
3
+ """
4
+
5
+ import json
6
+ import logging
7
+ from pathlib import Path
8
+ from typing import Any, Dict
9
+
10
+ # Project root directory
11
+ PROJECT_ROOT = Path(__file__).parent.parent
12
+
13
+
14
+ def load_config() -> Dict[str, Any]:
15
+ """
16
+ Load configuration from mcp_config.json.
17
+
18
+ Returns:
19
+ Dictionary containing configuration settings
20
+ """
21
+ config_path = PROJECT_ROOT / "mcp_config.json"
22
+
23
+ if not config_path.exists():
24
+ raise FileNotFoundError(f"Configuration file not found: {config_path}")
25
+
26
+ with open(config_path, "r", encoding="utf-8") as f:
27
+ config = json.load(f)
28
+
29
+ return config
30
+
31
+
32
+ def setup_logging(debug: bool = False) -> logging.Logger:
33
+ """
34
+ Set up logging based on configuration.
35
+
36
+ Args:
37
+ debug: If True, set logging level to DEBUG
38
+
39
+ Returns:
40
+ Configured logger instance
41
+ """
42
+ config = load_config()
43
+ log_config = config.get("logging", {})
44
+
45
+ # Determine log level
46
+ if debug:
47
+ log_level = logging.DEBUG
48
+ else:
49
+ log_level = getattr(logging, log_config.get("level", "INFO"))
50
+
51
+ # Create logs directory if it doesn't exist
52
+ log_file = log_config.get("file", "logs/mcp_server.log")
53
+ log_path = PROJECT_ROOT / log_file
54
+ log_path.parent.mkdir(parents=True, exist_ok=True)
55
+
56
+ # Configure logging
57
+ handlers = []
58
+
59
+ # File handler
60
+ file_handler = logging.FileHandler(log_path, encoding="utf-8")
61
+ file_handler.setLevel(log_level)
62
+ handlers.append(file_handler)
63
+
64
+ # Console handler
65
+ if log_config.get("console", True):
66
+ console_handler = logging.StreamHandler()
67
+ console_handler.setLevel(log_level)
68
+ handlers.append(console_handler)
69
+
70
+ # Set up formatter
71
+ if log_config.get("json_format", False):
72
+ formatter = logging.Formatter(
73
+ '{"time":"%(asctime)s","level":"%(levelname)s","message":"%(message)s"}'
74
+ )
75
+ else:
76
+ formatter = logging.Formatter(
77
+ "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
78
+ )
79
+
80
+ for handler in handlers:
81
+ handler.setFormatter(formatter)
82
+
83
+ # Create and configure logger
84
+ logger = logging.getLogger("security-scanner-mcp")
85
+ logger.setLevel(log_level)
86
+ logger.handlers = [] # Clear any existing handlers
87
+
88
+ for handler in handlers:
89
+ logger.addHandler(handler)
90
+
91
+ return logger
92
+
93
+
94
+ def validate_severity_threshold(threshold: str) -> bool:
95
+ """
96
+ Validate severity threshold value.
97
+
98
+ Args:
99
+ threshold: Severity threshold string
100
+
101
+ Returns:
102
+ True if valid, False otherwise
103
+ """
104
+ valid_thresholds = ["CRITICAL", "HIGH", "MEDIUM", "LOW"]
105
+ return threshold.upper() in valid_thresholds
106
+
107
+
108
+ def get_severity_order() -> Dict[str, int]:
109
+ """
110
+ Get severity level ordering from configuration.
111
+
112
+ Returns:
113
+ Dictionary mapping severity levels to numeric order
114
+ """
115
+ config = load_config()
116
+ return config.get("severity", {}).get("thresholds", {
117
+ "CRITICAL": 0,
118
+ "HIGH": 1,
119
+ "MEDIUM": 2,
120
+ "LOW": 3
121
+ })
122
+
123
+
124
+ def filter_by_severity(
125
+ vulnerabilities: list,
126
+ threshold: str
127
+ ) -> list:
128
+ """
129
+ Filter vulnerabilities by severity threshold.
130
+
131
+ Args:
132
+ vulnerabilities: List of vulnerability dictionaries
133
+ threshold: Minimum severity threshold
134
+
135
+ Returns:
136
+ Filtered list of vulnerabilities
137
+ """
138
+ severity_order = get_severity_order()
139
+ threshold_value = severity_order.get(threshold.upper(), 2)
140
+
141
+ filtered = []
142
+ for vuln in vulnerabilities:
143
+ vuln_severity = vuln.get("severity", "LOW").upper()
144
+ vuln_value = severity_order.get(vuln_severity, 3)
145
+
146
+ if vuln_value <= threshold_value:
147
+ filtered.append(vuln)
148
+
149
+ return filtered