garibong's picture
Translate SQL injection messages to English
73f6601
"""
SQL Injection vulnerability detector using AST analysis.
Detects unsafe SQL query construction patterns in Python code.
"""
import ast
import re
from typing import Dict, List, Any, Optional
# SQL keywords that indicate a query
SQL_KEYWORDS = [
"SELECT", "INSERT", "UPDATE", "DELETE", "DROP", "CREATE", "ALTER",
"FROM", "WHERE", "JOIN", "UNION", "ORDER BY", "GROUP BY",
"EXEC", "EXECUTE", "TRUNCATE", "GRANT", "REVOKE"
]
# Database methods that execute SQL
EXECUTE_METHODS = ["execute", "executemany", "raw", "extra"]
class SQLInjectionVisitor(ast.NodeVisitor):
"""AST visitor to detect SQL injection vulnerabilities."""
def __init__(self, code_lines: List[str]):
"""
Initialize the visitor.
Args:
code_lines: Source code split by lines
"""
self.vulnerabilities = []
self.code_lines = code_lines
def get_line_content(self, line_number: int) -> str:
"""Get the content of a specific line."""
if 0 < line_number <= len(self.code_lines):
return self.code_lines[line_number - 1].strip()
return ""
def contains_sql_keywords(self, text: str) -> bool:
"""Check if text contains SQL keywords."""
text_upper = text.upper()
return any(keyword in text_upper for keyword in SQL_KEYWORDS)
def extract_string_content(self, node: ast.AST) -> Optional[str]:
"""Extract string content from various node types."""
if isinstance(node, ast.Constant) and isinstance(node.value, str):
return node.value
elif isinstance(node, ast.Str): # Python < 3.8
return node.s
elif isinstance(node, ast.JoinedStr):
# f-string - combine literal parts
parts = []
for value in node.values:
if isinstance(value, ast.Constant):
parts.append(str(value.value))
elif isinstance(value, ast.FormattedValue):
parts.append("{}")
return "".join(parts)
return None
def visit_JoinedStr(self, node: ast.JoinedStr):
"""Detect f-strings with SQL keywords (potential SQL injection)."""
# Extract the f-string content
string_content = self.extract_string_content(node)
if string_content and self.contains_sql_keywords(string_content):
# Check if it has any FormattedValue (variable interpolation)
has_variables = any(isinstance(v, ast.FormattedValue) for v in node.values)
if has_variables:
line_number = node.lineno
code_snippet = self.get_line_content(line_number)
self.vulnerabilities.append({
"id": "sql-injection-fstring",
"severity": "CRITICAL",
"title": "SQL Injection: SQL query built with f-string",
"description": "Variables are directly interpolated into SQL query using f-string.",
"line_number": line_number,
"code_snippet": code_snippet,
"vulnerable_pattern": "f-string interpolation",
"recommendation": "Use parameterized queries: cursor.execute('SELECT * FROM users WHERE id=%s', (user_id,))",
"scanner": "sql_injection",
})
self.generic_visit(node)
def visit_BinOp(self, node: ast.BinOp):
"""Detect string concatenation with SQL keywords."""
# Check for string concatenation with + operator
if isinstance(node.op, ast.Add):
left_str = self.extract_string_content(node.left)
right_str = self.extract_string_content(node.right)
# Check if either side contains SQL keywords
combined = (left_str or "") + (right_str or "")
if self.contains_sql_keywords(combined):
line_number = node.lineno
code_snippet = self.get_line_content(line_number)
self.vulnerabilities.append({
"id": "sql-injection-concat",
"severity": "CRITICAL",
"title": "SQL Injection: SQL query built with string concatenation",
"description": "SQL query is dynamically constructed using the + operator.",
"line_number": line_number,
"code_snippet": code_snippet,
"vulnerable_pattern": "string concatenation",
"recommendation": "Use parameterized queries",
"scanner": "sql_injection",
})
# Check for % formatting
elif isinstance(node.op, ast.Mod):
left_str = self.extract_string_content(node.left)
if left_str and self.contains_sql_keywords(left_str):
line_number = node.lineno
code_snippet = self.get_line_content(line_number)
# Check if it looks like old-style parameterized query
# Safe: "SELECT * FROM users WHERE id=%s" % (user_id,)
# Unsafe: "SELECT * FROM users WHERE name='%s'" % username
if "'%s'" in left_str or '"%s"' in left_str:
self.vulnerabilities.append({
"id": "sql-injection-percent",
"severity": "CRITICAL",
"title": "SQL Injection: SQL query built with % formatting",
"description": "Variables are directly inserted into SQL query using the % operator.",
"line_number": line_number,
"code_snippet": code_snippet,
"vulnerable_pattern": "percent formatting",
"recommendation": "Use parameterized queries",
"scanner": "sql_injection",
})
self.generic_visit(node)
def visit_Call(self, node: ast.Call):
"""Detect execute() calls and .format() on SQL strings."""
# Check if this is a .format() call on a string with SQL
if isinstance(node.func, ast.Attribute) and node.func.attr == "format":
if isinstance(node.func.value, (ast.Constant, ast.Str)):
string_content = self.extract_string_content(node.func.value)
if string_content and self.contains_sql_keywords(string_content):
line_number = node.lineno
code_snippet = self.get_line_content(line_number)
self.vulnerabilities.append({
"id": "sql-injection-format",
"severity": "CRITICAL",
"title": "SQL Injection: SQL query built with .format()",
"description": "SQL query is dynamically constructed using the .format() method.",
"line_number": line_number,
"code_snippet": code_snippet,
"vulnerable_pattern": "string.format()",
"recommendation": "Use parameterized queries",
"scanner": "sql_injection",
})
# Check for execute() calls with dynamic strings
func_name = None
if isinstance(node.func, ast.Name):
func_name = node.func.id
elif isinstance(node.func, ast.Attribute):
func_name = node.func.attr
if func_name in EXECUTE_METHODS and node.args:
first_arg = node.args[0]
# Check if first argument is a dynamic string (not a simple constant)
is_dynamic = False
if isinstance(first_arg, (ast.JoinedStr, ast.BinOp, ast.Call)):
is_dynamic = True
# Also check if it's a formatted string
if isinstance(first_arg, (ast.Constant, ast.Str)):
content = self.extract_string_content(first_arg)
# Check for string formatting placeholders
if content and ("{}" in content or "{0" in content):
is_dynamic = True
if is_dynamic:
line_number = node.lineno
code_snippet = self.get_line_content(line_number)
self.vulnerabilities.append({
"id": f"sql-injection-{func_name}",
"severity": "CRITICAL",
"title": f"SQL Injection: Dynamic SQL query in {func_name}()",
"description": f"A dynamically constructed SQL query is passed to the {func_name}() method.",
"line_number": line_number,
"code_snippet": code_snippet,
"vulnerable_pattern": f"dynamic SQL in {func_name}()",
"recommendation": "Use parameterized queries",
"scanner": "sql_injection",
})
self.generic_visit(node)
def scan_sql_injection(file_path: str, code: str) -> List[Dict[str, Any]]:
"""
Scan Python code for SQL injection vulnerabilities.
Args:
file_path: Path to the file being scanned
code: Source code to analyze
Returns:
List of vulnerability dictionaries
"""
vulnerabilities = []
try:
# Parse code into AST
tree = ast.parse(code)
# Create visitor and walk the AST
code_lines = code.split("\n")
visitor = SQLInjectionVisitor(code_lines)
visitor.visit(tree)
# Add file path to all vulnerabilities
for vuln in visitor.vulnerabilities:
vuln["file_path"] = file_path
vulnerabilities = visitor.vulnerabilities
except SyntaxError:
# If code has syntax errors, we can't analyze it
# Return empty list rather than failing
pass
except Exception as e:
# Log unexpected errors but don't fail
import sys
print(f"Warning: SQL injection scan error: {e}", file=sys.stderr)
return vulnerabilities
def check_sql_pattern_regex(code: str) -> List[Dict[str, Any]]:
"""
Fallback regex-based SQL injection detection.
Args:
code: Source code to scan
Returns:
List of vulnerability dictionaries
"""
vulnerabilities = []
lines = code.split("\n")
# Pattern for f-strings with SQL keywords
fstring_pattern = r'f["\'].*(?:SELECT|INSERT|UPDATE|DELETE|FROM|WHERE).*\{.*\}.*["\']'
for line_num, line in enumerate(lines, start=1):
if re.search(fstring_pattern, line, re.IGNORECASE):
vulnerabilities.append({
"id": "sql-injection-regex",
"severity": "HIGH",
"title": "Potential SQL Injection: f-string usage detected",
"description": "Suspected f-string variable interpolation in SQL query.",
"line_number": line_num,
"code_snippet": line.strip(),
"vulnerable_pattern": "f-string with SQL keywords",
"recommendation": "Use parameterized queries instead",
"scanner": "sql_injection",
})
return vulnerabilities