| | """
|
| | SQL injection vulnerability detector.
|
| |
|
| | Detects common SQL injection patterns in tool arguments and context.
|
| | """
|
| |
|
| | from __future__ import annotations
|
| |
|
| | import re
|
| | from typing import Any, Dict, Optional
|
| |
|
| | from ..base import ScannerPlugin, ScanResult, PluginMetadata
|
| |
|
| |
|
| |
|
| | SQL_DANGEROUS_PATTERNS = [
|
| | r"\bdrop\s+table\b",
|
| | r"\bdrop\s+database\b",
|
| | r"\bdelete\s+from\b",
|
| | r"\btruncate\b",
|
| | r"\bunion\s+select\b",
|
| | r";\s*--",
|
| | r";\s*/\*",
|
| | r"\bor\s+1\s*=\s*1\b",
|
| | r"\bor\s+'1'\s*=\s*'1'\b",
|
| | r"\bwhere\s+1\s*=\s*1\b",
|
| | r"admin\s*'--",
|
| | r"admin\s*'\s*or",
|
| | r"\binsert\s+into\b",
|
| | r"\bupdate\b.*\bset\b",
|
| | r"\bselect\b.*\bfrom\b",
|
| | ]
|
| |
|
| |
|
| | class SQLInjectionDetector(ScannerPlugin):
|
| | """Detects SQL injection attacks."""
|
| |
|
| | def __init__(self):
|
| | super().__init__(
|
| | metadata=PluginMetadata(
|
| | name="SQLInjectionDetector",
|
| | version="1.0.0",
|
| | description="Detects SQL injection patterns in tool arguments and context",
|
| | author="SecurityGateway",
|
| | )
|
| | )
|
| |
|
| | def scan(
|
| | self,
|
| | user_id: Optional[str],
|
| | server_key: str,
|
| | tool: str,
|
| | arguments: Dict[str, Any],
|
| | llm_context: Optional[str] = None,
|
| | ) -> ScanResult:
|
| | """
|
| | Scan for SQL injection attempts.
|
| |
|
| | Checks for:
|
| | - DROP/TRUNCATE/DELETE statements
|
| | - UNION-based injection
|
| | - Comment-based injection (-- or /*)
|
| | - Logical operator abuse (OR 1=1)
|
| | - Admin bypass patterns
|
| |
|
| | Args:
|
| | user_id: User identifier
|
| | server_key: Server key
|
| | tool: Tool name
|
| | arguments: Tool arguments
|
| | llm_context: Optional context
|
| |
|
| | Returns:
|
| | ScanResult with detection status
|
| | """
|
| | context = (llm_context or "") + " " + self._flatten_json(arguments)
|
| |
|
| | detected = self._contains_pattern(context, SQL_DANGEROUS_PATTERNS)
|
| |
|
| | reasons = []
|
| | flags = {}
|
| |
|
| | if detected:
|
| | reasons.append("Potential SQL injection pattern detected.")
|
| | flags["sql_injection"] = True
|
| |
|
| | return ScanResult(
|
| | plugin_name=self.get_metadata().name,
|
| | detected=detected,
|
| | risk_score=0.35 if detected else 0.0,
|
| | reasons=reasons if reasons else ["No SQL injection detected."],
|
| | flags=flags,
|
| | )
|
| |
|
| | def _flatten_json(self, value: Any) -> str:
|
| | """Flatten nested structures to string for pattern matching."""
|
| | if isinstance(value, dict):
|
| | return " ".join(self._flatten_json(v) for v in value.values())
|
| | if isinstance(value, list):
|
| | return " ".join(self._flatten_json(v) for v in value)
|
| | return str(value)
|
| |
|
| | def _contains_pattern(self, text: str, patterns: list) -> bool:
|
| | """Check if text matches any pattern."""
|
| | for pat in patterns:
|
| | if re.search(pat, text, flags=re.IGNORECASE | re.DOTALL):
|
| | return True
|
| | return False
|
| |
|
| |
|
| |
|
| | plugin = SQLInjectionDetector()
|
| |
|