File size: 3,263 Bytes
ed71b0e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 | """
SQL injection vulnerability detector.
Detects common SQL injection patterns in tool arguments and context.
"""
from __future__ import annotations
import re
from typing import Any, Dict, Optional
from ..base import ScannerPlugin, ScanResult, PluginMetadata
# SQL injection patterns
SQL_DANGEROUS_PATTERNS = [
r"\bdrop\s+table\b",
r"\bdrop\s+database\b",
r"\bdelete\s+from\b",
r"\btruncate\b",
r"\bunion\s+select\b",
r";\s*--",
r";\s*/\*",
r"\bor\s+1\s*=\s*1\b",
r"\bor\s+'1'\s*=\s*'1'\b",
r"\bwhere\s+1\s*=\s*1\b",
r"admin\s*'--",
r"admin\s*'\s*or",
r"\binsert\s+into\b",
r"\bupdate\b.*\bset\b",
r"\bselect\b.*\bfrom\b",
]
class SQLInjectionDetector(ScannerPlugin):
"""Detects SQL injection attacks."""
def __init__(self):
super().__init__(
metadata=PluginMetadata(
name="SQLInjectionDetector",
version="1.0.0",
description="Detects SQL injection patterns in tool arguments and context",
author="SecurityGateway",
)
)
def scan(
self,
user_id: Optional[str],
server_key: str,
tool: str,
arguments: Dict[str, Any],
llm_context: Optional[str] = None,
) -> ScanResult:
"""
Scan for SQL injection attempts.
Checks for:
- DROP/TRUNCATE/DELETE statements
- UNION-based injection
- Comment-based injection (-- or /*)
- Logical operator abuse (OR 1=1)
- Admin bypass patterns
Args:
user_id: User identifier
server_key: Server key
tool: Tool name
arguments: Tool arguments
llm_context: Optional context
Returns:
ScanResult with detection status
"""
context = (llm_context or "") + " " + self._flatten_json(arguments)
detected = self._contains_pattern(context, SQL_DANGEROUS_PATTERNS)
reasons = []
flags = {}
if detected:
reasons.append("Potential SQL injection pattern detected.")
flags["sql_injection"] = True
return ScanResult(
plugin_name=self.get_metadata().name,
detected=detected,
risk_score=0.35 if detected else 0.0,
reasons=reasons if reasons else ["No SQL injection detected."],
flags=flags,
)
def _flatten_json(self, value: Any) -> str:
"""Flatten nested structures to string for pattern matching."""
if isinstance(value, dict):
return " ".join(self._flatten_json(v) for v in value.values())
if isinstance(value, list):
return " ".join(self._flatten_json(v) for v in value)
return str(value)
def _contains_pattern(self, text: str, patterns: list) -> bool:
"""Check if text matches any pattern."""
for pat in patterns:
if re.search(pat, text, flags=re.IGNORECASE | re.DOTALL):
return True
return False
# Export as module-level plugin for auto-loading
plugin = SQLInjectionDetector()
|