yuki-sui's picture
Upload 169 files
ed71b0e verified
"""
SQL injection vulnerability detector.
Detects common SQL injection patterns in tool arguments and context.
"""
from __future__ import annotations
import re
from typing import Any, Dict, Optional
from ..base import ScannerPlugin, ScanResult, PluginMetadata
# SQL injection patterns
SQL_DANGEROUS_PATTERNS = [
r"\bdrop\s+table\b",
r"\bdrop\s+database\b",
r"\bdelete\s+from\b",
r"\btruncate\b",
r"\bunion\s+select\b",
r";\s*--",
r";\s*/\*",
r"\bor\s+1\s*=\s*1\b",
r"\bor\s+'1'\s*=\s*'1'\b",
r"\bwhere\s+1\s*=\s*1\b",
r"admin\s*'--",
r"admin\s*'\s*or",
r"\binsert\s+into\b",
r"\bupdate\b.*\bset\b",
r"\bselect\b.*\bfrom\b",
]
class SQLInjectionDetector(ScannerPlugin):
"""Detects SQL injection attacks."""
def __init__(self):
super().__init__(
metadata=PluginMetadata(
name="SQLInjectionDetector",
version="1.0.0",
description="Detects SQL injection patterns in tool arguments and context",
author="SecurityGateway",
)
)
def scan(
self,
user_id: Optional[str],
server_key: str,
tool: str,
arguments: Dict[str, Any],
llm_context: Optional[str] = None,
) -> ScanResult:
"""
Scan for SQL injection attempts.
Checks for:
- DROP/TRUNCATE/DELETE statements
- UNION-based injection
- Comment-based injection (-- or /*)
- Logical operator abuse (OR 1=1)
- Admin bypass patterns
Args:
user_id: User identifier
server_key: Server key
tool: Tool name
arguments: Tool arguments
llm_context: Optional context
Returns:
ScanResult with detection status
"""
context = (llm_context or "") + " " + self._flatten_json(arguments)
detected = self._contains_pattern(context, SQL_DANGEROUS_PATTERNS)
reasons = []
flags = {}
if detected:
reasons.append("Potential SQL injection pattern detected.")
flags["sql_injection"] = True
return ScanResult(
plugin_name=self.get_metadata().name,
detected=detected,
risk_score=0.35 if detected else 0.0,
reasons=reasons if reasons else ["No SQL injection detected."],
flags=flags,
)
def _flatten_json(self, value: Any) -> str:
"""Flatten nested structures to string for pattern matching."""
if isinstance(value, dict):
return " ".join(self._flatten_json(v) for v in value.values())
if isinstance(value, list):
return " ".join(self._flatten_json(v) for v in value)
return str(value)
def _contains_pattern(self, text: str, patterns: list) -> bool:
"""Check if text matches any pattern."""
for pat in patterns:
if re.search(pat, text, flags=re.IGNORECASE | re.DOTALL):
return True
return False
# Export as module-level plugin for auto-loading
plugin = SQLInjectionDetector()