""" SQL Injection vulnerability detector using AST analysis. Detects unsafe SQL query construction patterns in Python code. """ import ast import re from typing import Dict, List, Any, Optional # SQL keywords that indicate a query SQL_KEYWORDS = [ "SELECT", "INSERT", "UPDATE", "DELETE", "DROP", "CREATE", "ALTER", "FROM", "WHERE", "JOIN", "UNION", "ORDER BY", "GROUP BY", "EXEC", "EXECUTE", "TRUNCATE", "GRANT", "REVOKE" ] # Database methods that execute SQL EXECUTE_METHODS = ["execute", "executemany", "raw", "extra"] class SQLInjectionVisitor(ast.NodeVisitor): """AST visitor to detect SQL injection vulnerabilities.""" def __init__(self, code_lines: List[str]): """ Initialize the visitor. Args: code_lines: Source code split by lines """ self.vulnerabilities = [] self.code_lines = code_lines def get_line_content(self, line_number: int) -> str: """Get the content of a specific line.""" if 0 < line_number <= len(self.code_lines): return self.code_lines[line_number - 1].strip() return "" def contains_sql_keywords(self, text: str) -> bool: """Check if text contains SQL keywords.""" text_upper = text.upper() return any(keyword in text_upper for keyword in SQL_KEYWORDS) def extract_string_content(self, node: ast.AST) -> Optional[str]: """Extract string content from various node types.""" if isinstance(node, ast.Constant) and isinstance(node.value, str): return node.value elif isinstance(node, ast.Str): # Python < 3.8 return node.s elif isinstance(node, ast.JoinedStr): # f-string - combine literal parts parts = [] for value in node.values: if isinstance(value, ast.Constant): parts.append(str(value.value)) elif isinstance(value, ast.FormattedValue): parts.append("{}") return "".join(parts) return None def visit_JoinedStr(self, node: ast.JoinedStr): """Detect f-strings with SQL keywords (potential SQL injection).""" # Extract the f-string content string_content = self.extract_string_content(node) if string_content and self.contains_sql_keywords(string_content): # Check if it has any FormattedValue (variable interpolation) has_variables = any(isinstance(v, ast.FormattedValue) for v in node.values) if has_variables: line_number = node.lineno code_snippet = self.get_line_content(line_number) self.vulnerabilities.append({ "id": "sql-injection-fstring", "severity": "CRITICAL", "title": "SQL Injection: SQL query built with f-string", "description": "Variables are directly interpolated into SQL query using f-string.", "line_number": line_number, "code_snippet": code_snippet, "vulnerable_pattern": "f-string interpolation", "recommendation": "Use parameterized queries: cursor.execute('SELECT * FROM users WHERE id=%s', (user_id,))", "scanner": "sql_injection", }) self.generic_visit(node) def visit_BinOp(self, node: ast.BinOp): """Detect string concatenation with SQL keywords.""" # Check for string concatenation with + operator if isinstance(node.op, ast.Add): left_str = self.extract_string_content(node.left) right_str = self.extract_string_content(node.right) # Check if either side contains SQL keywords combined = (left_str or "") + (right_str or "") if self.contains_sql_keywords(combined): line_number = node.lineno code_snippet = self.get_line_content(line_number) self.vulnerabilities.append({ "id": "sql-injection-concat", "severity": "CRITICAL", "title": "SQL Injection: SQL query built with string concatenation", "description": "SQL query is dynamically constructed using the + operator.", "line_number": line_number, "code_snippet": code_snippet, "vulnerable_pattern": "string concatenation", "recommendation": "Use parameterized queries", "scanner": "sql_injection", }) # Check for % formatting elif isinstance(node.op, ast.Mod): left_str = self.extract_string_content(node.left) if left_str and self.contains_sql_keywords(left_str): line_number = node.lineno code_snippet = self.get_line_content(line_number) # Check if it looks like old-style parameterized query # Safe: "SELECT * FROM users WHERE id=%s" % (user_id,) # Unsafe: "SELECT * FROM users WHERE name='%s'" % username if "'%s'" in left_str or '"%s"' in left_str: self.vulnerabilities.append({ "id": "sql-injection-percent", "severity": "CRITICAL", "title": "SQL Injection: SQL query built with % formatting", "description": "Variables are directly inserted into SQL query using the % operator.", "line_number": line_number, "code_snippet": code_snippet, "vulnerable_pattern": "percent formatting", "recommendation": "Use parameterized queries", "scanner": "sql_injection", }) self.generic_visit(node) def visit_Call(self, node: ast.Call): """Detect execute() calls and .format() on SQL strings.""" # Check if this is a .format() call on a string with SQL if isinstance(node.func, ast.Attribute) and node.func.attr == "format": if isinstance(node.func.value, (ast.Constant, ast.Str)): string_content = self.extract_string_content(node.func.value) if string_content and self.contains_sql_keywords(string_content): line_number = node.lineno code_snippet = self.get_line_content(line_number) self.vulnerabilities.append({ "id": "sql-injection-format", "severity": "CRITICAL", "title": "SQL Injection: SQL query built with .format()", "description": "SQL query is dynamically constructed using the .format() method.", "line_number": line_number, "code_snippet": code_snippet, "vulnerable_pattern": "string.format()", "recommendation": "Use parameterized queries", "scanner": "sql_injection", }) # Check for execute() calls with dynamic strings func_name = None if isinstance(node.func, ast.Name): func_name = node.func.id elif isinstance(node.func, ast.Attribute): func_name = node.func.attr if func_name in EXECUTE_METHODS and node.args: first_arg = node.args[0] # Check if first argument is a dynamic string (not a simple constant) is_dynamic = False if isinstance(first_arg, (ast.JoinedStr, ast.BinOp, ast.Call)): is_dynamic = True # Also check if it's a formatted string if isinstance(first_arg, (ast.Constant, ast.Str)): content = self.extract_string_content(first_arg) # Check for string formatting placeholders if content and ("{}" in content or "{0" in content): is_dynamic = True if is_dynamic: line_number = node.lineno code_snippet = self.get_line_content(line_number) self.vulnerabilities.append({ "id": f"sql-injection-{func_name}", "severity": "CRITICAL", "title": f"SQL Injection: Dynamic SQL query in {func_name}()", "description": f"A dynamically constructed SQL query is passed to the {func_name}() method.", "line_number": line_number, "code_snippet": code_snippet, "vulnerable_pattern": f"dynamic SQL in {func_name}()", "recommendation": "Use parameterized queries", "scanner": "sql_injection", }) self.generic_visit(node) def scan_sql_injection(file_path: str, code: str) -> List[Dict[str, Any]]: """ Scan Python code for SQL injection vulnerabilities. Args: file_path: Path to the file being scanned code: Source code to analyze Returns: List of vulnerability dictionaries """ vulnerabilities = [] try: # Parse code into AST tree = ast.parse(code) # Create visitor and walk the AST code_lines = code.split("\n") visitor = SQLInjectionVisitor(code_lines) visitor.visit(tree) # Add file path to all vulnerabilities for vuln in visitor.vulnerabilities: vuln["file_path"] = file_path vulnerabilities = visitor.vulnerabilities except SyntaxError: # If code has syntax errors, we can't analyze it # Return empty list rather than failing pass except Exception as e: # Log unexpected errors but don't fail import sys print(f"Warning: SQL injection scan error: {e}", file=sys.stderr) return vulnerabilities def check_sql_pattern_regex(code: str) -> List[Dict[str, Any]]: """ Fallback regex-based SQL injection detection. Args: code: Source code to scan Returns: List of vulnerability dictionaries """ vulnerabilities = [] lines = code.split("\n") # Pattern for f-strings with SQL keywords fstring_pattern = r'f["\'].*(?:SELECT|INSERT|UPDATE|DELETE|FROM|WHERE).*\{.*\}.*["\']' for line_num, line in enumerate(lines, start=1): if re.search(fstring_pattern, line, re.IGNORECASE): vulnerabilities.append({ "id": "sql-injection-regex", "severity": "HIGH", "title": "Potential SQL Injection: f-string usage detected", "description": "Suspected f-string variable interpolation in SQL query.", "line_number": line_num, "code_snippet": line.strip(), "vulnerable_pattern": "f-string with SQL keywords", "recommendation": "Use parameterized queries instead", "scanner": "sql_injection", }) return vulnerabilities