File size: 11,139 Bytes
f8e78b2 73f6601 f8e78b2 73f6601 f8e78b2 73f6601 f8e78b2 73f6601 f8e78b2 73f6601 f8e78b2 73f6601 f8e78b2 73f6601 f8e78b2 73f6601 f8e78b2 73f6601 f8e78b2 73f6601 f8e78b2 73f6601 f8e78b2 73f6601 f8e78b2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
"""
SQL Injection vulnerability detector using AST analysis.
Detects unsafe SQL query construction patterns in Python code.
"""
import ast
import re
from typing import Dict, List, Any, Optional
# SQL keywords that indicate a query
SQL_KEYWORDS = [
"SELECT", "INSERT", "UPDATE", "DELETE", "DROP", "CREATE", "ALTER",
"FROM", "WHERE", "JOIN", "UNION", "ORDER BY", "GROUP BY",
"EXEC", "EXECUTE", "TRUNCATE", "GRANT", "REVOKE"
]
# Database methods that execute SQL
EXECUTE_METHODS = ["execute", "executemany", "raw", "extra"]
class SQLInjectionVisitor(ast.NodeVisitor):
"""AST visitor to detect SQL injection vulnerabilities."""
def __init__(self, code_lines: List[str]):
"""
Initialize the visitor.
Args:
code_lines: Source code split by lines
"""
self.vulnerabilities = []
self.code_lines = code_lines
def get_line_content(self, line_number: int) -> str:
"""Get the content of a specific line."""
if 0 < line_number <= len(self.code_lines):
return self.code_lines[line_number - 1].strip()
return ""
def contains_sql_keywords(self, text: str) -> bool:
"""Check if text contains SQL keywords."""
text_upper = text.upper()
return any(keyword in text_upper for keyword in SQL_KEYWORDS)
def extract_string_content(self, node: ast.AST) -> Optional[str]:
"""Extract string content from various node types."""
if isinstance(node, ast.Constant) and isinstance(node.value, str):
return node.value
elif isinstance(node, ast.Str): # Python < 3.8
return node.s
elif isinstance(node, ast.JoinedStr):
# f-string - combine literal parts
parts = []
for value in node.values:
if isinstance(value, ast.Constant):
parts.append(str(value.value))
elif isinstance(value, ast.FormattedValue):
parts.append("{}")
return "".join(parts)
return None
def visit_JoinedStr(self, node: ast.JoinedStr):
"""Detect f-strings with SQL keywords (potential SQL injection)."""
# Extract the f-string content
string_content = self.extract_string_content(node)
if string_content and self.contains_sql_keywords(string_content):
# Check if it has any FormattedValue (variable interpolation)
has_variables = any(isinstance(v, ast.FormattedValue) for v in node.values)
if has_variables:
line_number = node.lineno
code_snippet = self.get_line_content(line_number)
self.vulnerabilities.append({
"id": "sql-injection-fstring",
"severity": "CRITICAL",
"title": "SQL Injection: SQL query built with f-string",
"description": "Variables are directly interpolated into SQL query using f-string.",
"line_number": line_number,
"code_snippet": code_snippet,
"vulnerable_pattern": "f-string interpolation",
"recommendation": "Use parameterized queries: cursor.execute('SELECT * FROM users WHERE id=%s', (user_id,))",
"scanner": "sql_injection",
})
self.generic_visit(node)
def visit_BinOp(self, node: ast.BinOp):
"""Detect string concatenation with SQL keywords."""
# Check for string concatenation with + operator
if isinstance(node.op, ast.Add):
left_str = self.extract_string_content(node.left)
right_str = self.extract_string_content(node.right)
# Check if either side contains SQL keywords
combined = (left_str or "") + (right_str or "")
if self.contains_sql_keywords(combined):
line_number = node.lineno
code_snippet = self.get_line_content(line_number)
self.vulnerabilities.append({
"id": "sql-injection-concat",
"severity": "CRITICAL",
"title": "SQL Injection: SQL query built with string concatenation",
"description": "SQL query is dynamically constructed using the + operator.",
"line_number": line_number,
"code_snippet": code_snippet,
"vulnerable_pattern": "string concatenation",
"recommendation": "Use parameterized queries",
"scanner": "sql_injection",
})
# Check for % formatting
elif isinstance(node.op, ast.Mod):
left_str = self.extract_string_content(node.left)
if left_str and self.contains_sql_keywords(left_str):
line_number = node.lineno
code_snippet = self.get_line_content(line_number)
# Check if it looks like old-style parameterized query
# Safe: "SELECT * FROM users WHERE id=%s" % (user_id,)
# Unsafe: "SELECT * FROM users WHERE name='%s'" % username
if "'%s'" in left_str or '"%s"' in left_str:
self.vulnerabilities.append({
"id": "sql-injection-percent",
"severity": "CRITICAL",
"title": "SQL Injection: SQL query built with % formatting",
"description": "Variables are directly inserted into SQL query using the % operator.",
"line_number": line_number,
"code_snippet": code_snippet,
"vulnerable_pattern": "percent formatting",
"recommendation": "Use parameterized queries",
"scanner": "sql_injection",
})
self.generic_visit(node)
def visit_Call(self, node: ast.Call):
"""Detect execute() calls and .format() on SQL strings."""
# Check if this is a .format() call on a string with SQL
if isinstance(node.func, ast.Attribute) and node.func.attr == "format":
if isinstance(node.func.value, (ast.Constant, ast.Str)):
string_content = self.extract_string_content(node.func.value)
if string_content and self.contains_sql_keywords(string_content):
line_number = node.lineno
code_snippet = self.get_line_content(line_number)
self.vulnerabilities.append({
"id": "sql-injection-format",
"severity": "CRITICAL",
"title": "SQL Injection: SQL query built with .format()",
"description": "SQL query is dynamically constructed using the .format() method.",
"line_number": line_number,
"code_snippet": code_snippet,
"vulnerable_pattern": "string.format()",
"recommendation": "Use parameterized queries",
"scanner": "sql_injection",
})
# Check for execute() calls with dynamic strings
func_name = None
if isinstance(node.func, ast.Name):
func_name = node.func.id
elif isinstance(node.func, ast.Attribute):
func_name = node.func.attr
if func_name in EXECUTE_METHODS and node.args:
first_arg = node.args[0]
# Check if first argument is a dynamic string (not a simple constant)
is_dynamic = False
if isinstance(first_arg, (ast.JoinedStr, ast.BinOp, ast.Call)):
is_dynamic = True
# Also check if it's a formatted string
if isinstance(first_arg, (ast.Constant, ast.Str)):
content = self.extract_string_content(first_arg)
# Check for string formatting placeholders
if content and ("{}" in content or "{0" in content):
is_dynamic = True
if is_dynamic:
line_number = node.lineno
code_snippet = self.get_line_content(line_number)
self.vulnerabilities.append({
"id": f"sql-injection-{func_name}",
"severity": "CRITICAL",
"title": f"SQL Injection: Dynamic SQL query in {func_name}()",
"description": f"A dynamically constructed SQL query is passed to the {func_name}() method.",
"line_number": line_number,
"code_snippet": code_snippet,
"vulnerable_pattern": f"dynamic SQL in {func_name}()",
"recommendation": "Use parameterized queries",
"scanner": "sql_injection",
})
self.generic_visit(node)
def scan_sql_injection(file_path: str, code: str) -> List[Dict[str, Any]]:
"""
Scan Python code for SQL injection vulnerabilities.
Args:
file_path: Path to the file being scanned
code: Source code to analyze
Returns:
List of vulnerability dictionaries
"""
vulnerabilities = []
try:
# Parse code into AST
tree = ast.parse(code)
# Create visitor and walk the AST
code_lines = code.split("\n")
visitor = SQLInjectionVisitor(code_lines)
visitor.visit(tree)
# Add file path to all vulnerabilities
for vuln in visitor.vulnerabilities:
vuln["file_path"] = file_path
vulnerabilities = visitor.vulnerabilities
except SyntaxError:
# If code has syntax errors, we can't analyze it
# Return empty list rather than failing
pass
except Exception as e:
# Log unexpected errors but don't fail
import sys
print(f"Warning: SQL injection scan error: {e}", file=sys.stderr)
return vulnerabilities
def check_sql_pattern_regex(code: str) -> List[Dict[str, Any]]:
"""
Fallback regex-based SQL injection detection.
Args:
code: Source code to scan
Returns:
List of vulnerability dictionaries
"""
vulnerabilities = []
lines = code.split("\n")
# Pattern for f-strings with SQL keywords
fstring_pattern = r'f["\'].*(?:SELECT|INSERT|UPDATE|DELETE|FROM|WHERE).*\{.*\}.*["\']'
for line_num, line in enumerate(lines, start=1):
if re.search(fstring_pattern, line, re.IGNORECASE):
vulnerabilities.append({
"id": "sql-injection-regex",
"severity": "HIGH",
"title": "Potential SQL Injection: f-string usage detected",
"description": "Suspected f-string variable interpolation in SQL query.",
"line_number": line_num,
"code_snippet": line.strip(),
"vulnerable_pattern": "f-string with SQL keywords",
"recommendation": "Use parameterized queries instead",
"scanner": "sql_injection",
})
return vulnerabilities
|