File size: 3,263 Bytes
ed71b0e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
"""

SQL injection vulnerability detector.



Detects common SQL injection patterns in tool arguments and context.

"""

from __future__ import annotations

import re
from typing import Any, Dict, Optional

from ..base import ScannerPlugin, ScanResult, PluginMetadata


# SQL injection patterns
SQL_DANGEROUS_PATTERNS = [
    r"\bdrop\s+table\b",
    r"\bdrop\s+database\b",
    r"\bdelete\s+from\b",
    r"\btruncate\b",
    r"\bunion\s+select\b",
    r";\s*--",
    r";\s*/\*",
    r"\bor\s+1\s*=\s*1\b",
    r"\bor\s+'1'\s*=\s*'1'\b",
    r"\bwhere\s+1\s*=\s*1\b",
    r"admin\s*'--",
    r"admin\s*'\s*or",
    r"\binsert\s+into\b",
    r"\bupdate\b.*\bset\b",
    r"\bselect\b.*\bfrom\b",
]


class SQLInjectionDetector(ScannerPlugin):
    """Detects SQL injection attacks."""

    def __init__(self):
        super().__init__(
            metadata=PluginMetadata(
                name="SQLInjectionDetector",
                version="1.0.0",
                description="Detects SQL injection patterns in tool arguments and context",
                author="SecurityGateway",
            )
        )

    def scan(

        self,

        user_id: Optional[str],

        server_key: str,

        tool: str,

        arguments: Dict[str, Any],

        llm_context: Optional[str] = None,

    ) -> ScanResult:
        """

        Scan for SQL injection attempts.



        Checks for:

        - DROP/TRUNCATE/DELETE statements

        - UNION-based injection

        - Comment-based injection (-- or /*)

        - Logical operator abuse (OR 1=1)

        - Admin bypass patterns



        Args:

            user_id: User identifier

            server_key: Server key

            tool: Tool name

            arguments: Tool arguments

            llm_context: Optional context



        Returns:

            ScanResult with detection status

        """
        context = (llm_context or "") + " " + self._flatten_json(arguments)

        detected = self._contains_pattern(context, SQL_DANGEROUS_PATTERNS)

        reasons = []
        flags = {}

        if detected:
            reasons.append("Potential SQL injection pattern detected.")
            flags["sql_injection"] = True

        return ScanResult(
            plugin_name=self.get_metadata().name,
            detected=detected,
            risk_score=0.35 if detected else 0.0,
            reasons=reasons if reasons else ["No SQL injection detected."],
            flags=flags,
        )

    def _flatten_json(self, value: Any) -> str:
        """Flatten nested structures to string for pattern matching."""
        if isinstance(value, dict):
            return " ".join(self._flatten_json(v) for v in value.values())
        if isinstance(value, list):
            return " ".join(self._flatten_json(v) for v in value)
        return str(value)

    def _contains_pattern(self, text: str, patterns: list) -> bool:
        """Check if text matches any pattern."""
        for pat in patterns:
            if re.search(pat, text, flags=re.IGNORECASE | re.DOTALL):
                return True
        return False


# Export as module-level plugin for auto-loading
plugin = SQLInjectionDetector()