File size: 11,139 Bytes
f8e78b2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73f6601
 
f8e78b2
 
 
73f6601
f8e78b2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73f6601
 
f8e78b2
 
 
73f6601
f8e78b2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73f6601
 
f8e78b2
 
 
73f6601
f8e78b2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73f6601
 
f8e78b2
 
 
73f6601
f8e78b2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73f6601
 
f8e78b2
 
 
73f6601
f8e78b2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73f6601
 
f8e78b2
 
 
73f6601
f8e78b2
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
"""
SQL Injection vulnerability detector using AST analysis.

Detects unsafe SQL query construction patterns in Python code.
"""

import ast
import re
from typing import Dict, List, Any, Optional

# SQL keywords that indicate a query
SQL_KEYWORDS = [
    "SELECT", "INSERT", "UPDATE", "DELETE", "DROP", "CREATE", "ALTER",
    "FROM", "WHERE", "JOIN", "UNION", "ORDER BY", "GROUP BY",
    "EXEC", "EXECUTE", "TRUNCATE", "GRANT", "REVOKE"
]

# Database methods that execute SQL
EXECUTE_METHODS = ["execute", "executemany", "raw", "extra"]


class SQLInjectionVisitor(ast.NodeVisitor):
    """AST visitor to detect SQL injection vulnerabilities."""

    def __init__(self, code_lines: List[str]):
        """
        Initialize the visitor.

        Args:
            code_lines: Source code split by lines
        """
        self.vulnerabilities = []
        self.code_lines = code_lines

    def get_line_content(self, line_number: int) -> str:
        """Get the content of a specific line."""
        if 0 < line_number <= len(self.code_lines):
            return self.code_lines[line_number - 1].strip()
        return ""

    def contains_sql_keywords(self, text: str) -> bool:
        """Check if text contains SQL keywords."""
        text_upper = text.upper()
        return any(keyword in text_upper for keyword in SQL_KEYWORDS)

    def extract_string_content(self, node: ast.AST) -> Optional[str]:
        """Extract string content from various node types."""
        if isinstance(node, ast.Constant) and isinstance(node.value, str):
            return node.value
        elif isinstance(node, ast.Str):  # Python < 3.8
            return node.s
        elif isinstance(node, ast.JoinedStr):
            # f-string - combine literal parts
            parts = []
            for value in node.values:
                if isinstance(value, ast.Constant):
                    parts.append(str(value.value))
                elif isinstance(value, ast.FormattedValue):
                    parts.append("{}")
            return "".join(parts)
        return None

    def visit_JoinedStr(self, node: ast.JoinedStr):
        """Detect f-strings with SQL keywords (potential SQL injection)."""
        # Extract the f-string content
        string_content = self.extract_string_content(node)

        if string_content and self.contains_sql_keywords(string_content):
            # Check if it has any FormattedValue (variable interpolation)
            has_variables = any(isinstance(v, ast.FormattedValue) for v in node.values)

            if has_variables:
                line_number = node.lineno
                code_snippet = self.get_line_content(line_number)

                self.vulnerabilities.append({
                    "id": "sql-injection-fstring",
                    "severity": "CRITICAL",
                    "title": "SQL Injection: SQL query built with f-string",
                    "description": "Variables are directly interpolated into SQL query using f-string.",
                    "line_number": line_number,
                    "code_snippet": code_snippet,
                    "vulnerable_pattern": "f-string interpolation",
                    "recommendation": "Use parameterized queries: cursor.execute('SELECT * FROM users WHERE id=%s', (user_id,))",
                    "scanner": "sql_injection",
                })

        self.generic_visit(node)

    def visit_BinOp(self, node: ast.BinOp):
        """Detect string concatenation with SQL keywords."""
        # Check for string concatenation with + operator
        if isinstance(node.op, ast.Add):
            left_str = self.extract_string_content(node.left)
            right_str = self.extract_string_content(node.right)

            # Check if either side contains SQL keywords
            combined = (left_str or "") + (right_str or "")
            if self.contains_sql_keywords(combined):
                line_number = node.lineno
                code_snippet = self.get_line_content(line_number)

                self.vulnerabilities.append({
                    "id": "sql-injection-concat",
                    "severity": "CRITICAL",
                    "title": "SQL Injection: SQL query built with string concatenation",
                    "description": "SQL query is dynamically constructed using the + operator.",
                    "line_number": line_number,
                    "code_snippet": code_snippet,
                    "vulnerable_pattern": "string concatenation",
                    "recommendation": "Use parameterized queries",
                    "scanner": "sql_injection",
                })

        # Check for % formatting
        elif isinstance(node.op, ast.Mod):
            left_str = self.extract_string_content(node.left)
            if left_str and self.contains_sql_keywords(left_str):
                line_number = node.lineno
                code_snippet = self.get_line_content(line_number)

                # Check if it looks like old-style parameterized query
                # Safe: "SELECT * FROM users WHERE id=%s" % (user_id,)
                # Unsafe: "SELECT * FROM users WHERE name='%s'" % username
                if "'%s'" in left_str or '"%s"' in left_str:
                    self.vulnerabilities.append({
                        "id": "sql-injection-percent",
                        "severity": "CRITICAL",
                        "title": "SQL Injection: SQL query built with % formatting",
                        "description": "Variables are directly inserted into SQL query using the % operator.",
                        "line_number": line_number,
                        "code_snippet": code_snippet,
                        "vulnerable_pattern": "percent formatting",
                        "recommendation": "Use parameterized queries",
                        "scanner": "sql_injection",
                    })

        self.generic_visit(node)

    def visit_Call(self, node: ast.Call):
        """Detect execute() calls and .format() on SQL strings."""
        # Check if this is a .format() call on a string with SQL
        if isinstance(node.func, ast.Attribute) and node.func.attr == "format":
            if isinstance(node.func.value, (ast.Constant, ast.Str)):
                string_content = self.extract_string_content(node.func.value)
                if string_content and self.contains_sql_keywords(string_content):
                    line_number = node.lineno
                    code_snippet = self.get_line_content(line_number)

                    self.vulnerabilities.append({
                        "id": "sql-injection-format",
                        "severity": "CRITICAL",
                        "title": "SQL Injection: SQL query built with .format()",
                        "description": "SQL query is dynamically constructed using the .format() method.",
                        "line_number": line_number,
                        "code_snippet": code_snippet,
                        "vulnerable_pattern": "string.format()",
                        "recommendation": "Use parameterized queries",
                        "scanner": "sql_injection",
                    })

        # Check for execute() calls with dynamic strings
        func_name = None
        if isinstance(node.func, ast.Name):
            func_name = node.func.id
        elif isinstance(node.func, ast.Attribute):
            func_name = node.func.attr

        if func_name in EXECUTE_METHODS and node.args:
            first_arg = node.args[0]

            # Check if first argument is a dynamic string (not a simple constant)
            is_dynamic = False
            if isinstance(first_arg, (ast.JoinedStr, ast.BinOp, ast.Call)):
                is_dynamic = True

            # Also check if it's a formatted string
            if isinstance(first_arg, (ast.Constant, ast.Str)):
                content = self.extract_string_content(first_arg)
                # Check for string formatting placeholders
                if content and ("{}" in content or "{0" in content):
                    is_dynamic = True

            if is_dynamic:
                line_number = node.lineno
                code_snippet = self.get_line_content(line_number)

                self.vulnerabilities.append({
                    "id": f"sql-injection-{func_name}",
                    "severity": "CRITICAL",
                    "title": f"SQL Injection: Dynamic SQL query in {func_name}()",
                    "description": f"A dynamically constructed SQL query is passed to the {func_name}() method.",
                    "line_number": line_number,
                    "code_snippet": code_snippet,
                    "vulnerable_pattern": f"dynamic SQL in {func_name}()",
                    "recommendation": "Use parameterized queries",
                    "scanner": "sql_injection",
                })

        self.generic_visit(node)


def scan_sql_injection(file_path: str, code: str) -> List[Dict[str, Any]]:
    """
    Scan Python code for SQL injection vulnerabilities.

    Args:
        file_path: Path to the file being scanned
        code: Source code to analyze

    Returns:
        List of vulnerability dictionaries
    """
    vulnerabilities = []

    try:
        # Parse code into AST
        tree = ast.parse(code)

        # Create visitor and walk the AST
        code_lines = code.split("\n")
        visitor = SQLInjectionVisitor(code_lines)
        visitor.visit(tree)

        # Add file path to all vulnerabilities
        for vuln in visitor.vulnerabilities:
            vuln["file_path"] = file_path

        vulnerabilities = visitor.vulnerabilities

    except SyntaxError:
        # If code has syntax errors, we can't analyze it
        # Return empty list rather than failing
        pass
    except Exception as e:
        # Log unexpected errors but don't fail
        import sys
        print(f"Warning: SQL injection scan error: {e}", file=sys.stderr)

    return vulnerabilities


def check_sql_pattern_regex(code: str) -> List[Dict[str, Any]]:
    """
    Fallback regex-based SQL injection detection.

    Args:
        code: Source code to scan

    Returns:
        List of vulnerability dictionaries
    """
    vulnerabilities = []
    lines = code.split("\n")

    # Pattern for f-strings with SQL keywords
    fstring_pattern = r'f["\'].*(?:SELECT|INSERT|UPDATE|DELETE|FROM|WHERE).*\{.*\}.*["\']'

    for line_num, line in enumerate(lines, start=1):
        if re.search(fstring_pattern, line, re.IGNORECASE):
            vulnerabilities.append({
                "id": "sql-injection-regex",
                "severity": "HIGH",
                "title": "Potential SQL Injection: f-string usage detected",
                "description": "Suspected f-string variable interpolation in SQL query.",
                "line_number": line_num,
                "code_snippet": line.strip(),
                "vulnerable_pattern": "f-string with SQL keywords",
                "recommendation": "Use parameterized queries instead",
                "scanner": "sql_injection",
            })

    return vulnerabilities