|
|
""" |
|
|
SQL Injection vulnerability detector using AST analysis. |
|
|
|
|
|
Detects unsafe SQL query construction patterns in Python code. |
|
|
""" |
|
|
|
|
|
import ast |
|
|
import re |
|
|
from typing import Dict, List, Any, Optional |
|
|
|
|
|
|
|
|
SQL_KEYWORDS = [ |
|
|
"SELECT", "INSERT", "UPDATE", "DELETE", "DROP", "CREATE", "ALTER", |
|
|
"FROM", "WHERE", "JOIN", "UNION", "ORDER BY", "GROUP BY", |
|
|
"EXEC", "EXECUTE", "TRUNCATE", "GRANT", "REVOKE" |
|
|
] |
|
|
|
|
|
|
|
|
EXECUTE_METHODS = ["execute", "executemany", "raw", "extra"] |
|
|
|
|
|
|
|
|
class SQLInjectionVisitor(ast.NodeVisitor): |
|
|
"""AST visitor to detect SQL injection vulnerabilities.""" |
|
|
|
|
|
def __init__(self, code_lines: List[str]): |
|
|
""" |
|
|
Initialize the visitor. |
|
|
|
|
|
Args: |
|
|
code_lines: Source code split by lines |
|
|
""" |
|
|
self.vulnerabilities = [] |
|
|
self.code_lines = code_lines |
|
|
|
|
|
def get_line_content(self, line_number: int) -> str: |
|
|
"""Get the content of a specific line.""" |
|
|
if 0 < line_number <= len(self.code_lines): |
|
|
return self.code_lines[line_number - 1].strip() |
|
|
return "" |
|
|
|
|
|
def contains_sql_keywords(self, text: str) -> bool: |
|
|
"""Check if text contains SQL keywords.""" |
|
|
text_upper = text.upper() |
|
|
return any(keyword in text_upper for keyword in SQL_KEYWORDS) |
|
|
|
|
|
def extract_string_content(self, node: ast.AST) -> Optional[str]: |
|
|
"""Extract string content from various node types.""" |
|
|
if isinstance(node, ast.Constant) and isinstance(node.value, str): |
|
|
return node.value |
|
|
elif isinstance(node, ast.Str): |
|
|
return node.s |
|
|
elif isinstance(node, ast.JoinedStr): |
|
|
|
|
|
parts = [] |
|
|
for value in node.values: |
|
|
if isinstance(value, ast.Constant): |
|
|
parts.append(str(value.value)) |
|
|
elif isinstance(value, ast.FormattedValue): |
|
|
parts.append("{}") |
|
|
return "".join(parts) |
|
|
return None |
|
|
|
|
|
def visit_JoinedStr(self, node: ast.JoinedStr): |
|
|
"""Detect f-strings with SQL keywords (potential SQL injection).""" |
|
|
|
|
|
string_content = self.extract_string_content(node) |
|
|
|
|
|
if string_content and self.contains_sql_keywords(string_content): |
|
|
|
|
|
has_variables = any(isinstance(v, ast.FormattedValue) for v in node.values) |
|
|
|
|
|
if has_variables: |
|
|
line_number = node.lineno |
|
|
code_snippet = self.get_line_content(line_number) |
|
|
|
|
|
self.vulnerabilities.append({ |
|
|
"id": "sql-injection-fstring", |
|
|
"severity": "CRITICAL", |
|
|
"title": "SQL Injection: SQL query built with f-string", |
|
|
"description": "Variables are directly interpolated into SQL query using f-string.", |
|
|
"line_number": line_number, |
|
|
"code_snippet": code_snippet, |
|
|
"vulnerable_pattern": "f-string interpolation", |
|
|
"recommendation": "Use parameterized queries: cursor.execute('SELECT * FROM users WHERE id=%s', (user_id,))", |
|
|
"scanner": "sql_injection", |
|
|
}) |
|
|
|
|
|
self.generic_visit(node) |
|
|
|
|
|
def visit_BinOp(self, node: ast.BinOp): |
|
|
"""Detect string concatenation with SQL keywords.""" |
|
|
|
|
|
if isinstance(node.op, ast.Add): |
|
|
left_str = self.extract_string_content(node.left) |
|
|
right_str = self.extract_string_content(node.right) |
|
|
|
|
|
|
|
|
combined = (left_str or "") + (right_str or "") |
|
|
if self.contains_sql_keywords(combined): |
|
|
line_number = node.lineno |
|
|
code_snippet = self.get_line_content(line_number) |
|
|
|
|
|
self.vulnerabilities.append({ |
|
|
"id": "sql-injection-concat", |
|
|
"severity": "CRITICAL", |
|
|
"title": "SQL Injection: SQL query built with string concatenation", |
|
|
"description": "SQL query is dynamically constructed using the + operator.", |
|
|
"line_number": line_number, |
|
|
"code_snippet": code_snippet, |
|
|
"vulnerable_pattern": "string concatenation", |
|
|
"recommendation": "Use parameterized queries", |
|
|
"scanner": "sql_injection", |
|
|
}) |
|
|
|
|
|
|
|
|
elif isinstance(node.op, ast.Mod): |
|
|
left_str = self.extract_string_content(node.left) |
|
|
if left_str and self.contains_sql_keywords(left_str): |
|
|
line_number = node.lineno |
|
|
code_snippet = self.get_line_content(line_number) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if "'%s'" in left_str or '"%s"' in left_str: |
|
|
self.vulnerabilities.append({ |
|
|
"id": "sql-injection-percent", |
|
|
"severity": "CRITICAL", |
|
|
"title": "SQL Injection: SQL query built with % formatting", |
|
|
"description": "Variables are directly inserted into SQL query using the % operator.", |
|
|
"line_number": line_number, |
|
|
"code_snippet": code_snippet, |
|
|
"vulnerable_pattern": "percent formatting", |
|
|
"recommendation": "Use parameterized queries", |
|
|
"scanner": "sql_injection", |
|
|
}) |
|
|
|
|
|
self.generic_visit(node) |
|
|
|
|
|
def visit_Call(self, node: ast.Call): |
|
|
"""Detect execute() calls and .format() on SQL strings.""" |
|
|
|
|
|
if isinstance(node.func, ast.Attribute) and node.func.attr == "format": |
|
|
if isinstance(node.func.value, (ast.Constant, ast.Str)): |
|
|
string_content = self.extract_string_content(node.func.value) |
|
|
if string_content and self.contains_sql_keywords(string_content): |
|
|
line_number = node.lineno |
|
|
code_snippet = self.get_line_content(line_number) |
|
|
|
|
|
self.vulnerabilities.append({ |
|
|
"id": "sql-injection-format", |
|
|
"severity": "CRITICAL", |
|
|
"title": "SQL Injection: SQL query built with .format()", |
|
|
"description": "SQL query is dynamically constructed using the .format() method.", |
|
|
"line_number": line_number, |
|
|
"code_snippet": code_snippet, |
|
|
"vulnerable_pattern": "string.format()", |
|
|
"recommendation": "Use parameterized queries", |
|
|
"scanner": "sql_injection", |
|
|
}) |
|
|
|
|
|
|
|
|
func_name = None |
|
|
if isinstance(node.func, ast.Name): |
|
|
func_name = node.func.id |
|
|
elif isinstance(node.func, ast.Attribute): |
|
|
func_name = node.func.attr |
|
|
|
|
|
if func_name in EXECUTE_METHODS and node.args: |
|
|
first_arg = node.args[0] |
|
|
|
|
|
|
|
|
is_dynamic = False |
|
|
if isinstance(first_arg, (ast.JoinedStr, ast.BinOp, ast.Call)): |
|
|
is_dynamic = True |
|
|
|
|
|
|
|
|
if isinstance(first_arg, (ast.Constant, ast.Str)): |
|
|
content = self.extract_string_content(first_arg) |
|
|
|
|
|
if content and ("{}" in content or "{0" in content): |
|
|
is_dynamic = True |
|
|
|
|
|
if is_dynamic: |
|
|
line_number = node.lineno |
|
|
code_snippet = self.get_line_content(line_number) |
|
|
|
|
|
self.vulnerabilities.append({ |
|
|
"id": f"sql-injection-{func_name}", |
|
|
"severity": "CRITICAL", |
|
|
"title": f"SQL Injection: Dynamic SQL query in {func_name}()", |
|
|
"description": f"A dynamically constructed SQL query is passed to the {func_name}() method.", |
|
|
"line_number": line_number, |
|
|
"code_snippet": code_snippet, |
|
|
"vulnerable_pattern": f"dynamic SQL in {func_name}()", |
|
|
"recommendation": "Use parameterized queries", |
|
|
"scanner": "sql_injection", |
|
|
}) |
|
|
|
|
|
self.generic_visit(node) |
|
|
|
|
|
|
|
|
def scan_sql_injection(file_path: str, code: str) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Scan Python code for SQL injection vulnerabilities. |
|
|
|
|
|
Args: |
|
|
file_path: Path to the file being scanned |
|
|
code: Source code to analyze |
|
|
|
|
|
Returns: |
|
|
List of vulnerability dictionaries |
|
|
""" |
|
|
vulnerabilities = [] |
|
|
|
|
|
try: |
|
|
|
|
|
tree = ast.parse(code) |
|
|
|
|
|
|
|
|
code_lines = code.split("\n") |
|
|
visitor = SQLInjectionVisitor(code_lines) |
|
|
visitor.visit(tree) |
|
|
|
|
|
|
|
|
for vuln in visitor.vulnerabilities: |
|
|
vuln["file_path"] = file_path |
|
|
|
|
|
vulnerabilities = visitor.vulnerabilities |
|
|
|
|
|
except SyntaxError: |
|
|
|
|
|
|
|
|
pass |
|
|
except Exception as e: |
|
|
|
|
|
import sys |
|
|
print(f"Warning: SQL injection scan error: {e}", file=sys.stderr) |
|
|
|
|
|
return vulnerabilities |
|
|
|
|
|
|
|
|
def check_sql_pattern_regex(code: str) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Fallback regex-based SQL injection detection. |
|
|
|
|
|
Args: |
|
|
code: Source code to scan |
|
|
|
|
|
Returns: |
|
|
List of vulnerability dictionaries |
|
|
""" |
|
|
vulnerabilities = [] |
|
|
lines = code.split("\n") |
|
|
|
|
|
|
|
|
fstring_pattern = r'f["\'].*(?:SELECT|INSERT|UPDATE|DELETE|FROM|WHERE).*\{.*\}.*["\']' |
|
|
|
|
|
for line_num, line in enumerate(lines, start=1): |
|
|
if re.search(fstring_pattern, line, re.IGNORECASE): |
|
|
vulnerabilities.append({ |
|
|
"id": "sql-injection-regex", |
|
|
"severity": "HIGH", |
|
|
"title": "Potential SQL Injection: f-string usage detected", |
|
|
"description": "Suspected f-string variable interpolation in SQL query.", |
|
|
"line_number": line_num, |
|
|
"code_snippet": line.strip(), |
|
|
"vulnerable_pattern": "f-string with SQL keywords", |
|
|
"recommendation": "Use parameterized queries instead", |
|
|
"scanner": "sql_injection", |
|
|
}) |
|
|
|
|
|
return vulnerabilities |
|
|
|