""" SQL injection vulnerability detector. Detects common SQL injection patterns in tool arguments and context. """ from __future__ import annotations import re from typing import Any, Dict, Optional from ..base import ScannerPlugin, ScanResult, PluginMetadata # SQL injection patterns SQL_DANGEROUS_PATTERNS = [ r"\bdrop\s+table\b", r"\bdrop\s+database\b", r"\bdelete\s+from\b", r"\btruncate\b", r"\bunion\s+select\b", r";\s*--", r";\s*/\*", r"\bor\s+1\s*=\s*1\b", r"\bor\s+'1'\s*=\s*'1'\b", r"\bwhere\s+1\s*=\s*1\b", r"admin\s*'--", r"admin\s*'\s*or", r"\binsert\s+into\b", r"\bupdate\b.*\bset\b", r"\bselect\b.*\bfrom\b", ] class SQLInjectionDetector(ScannerPlugin): """Detects SQL injection attacks.""" def __init__(self): super().__init__( metadata=PluginMetadata( name="SQLInjectionDetector", version="1.0.0", description="Detects SQL injection patterns in tool arguments and context", author="SecurityGateway", ) ) def scan( self, user_id: Optional[str], server_key: str, tool: str, arguments: Dict[str, Any], llm_context: Optional[str] = None, ) -> ScanResult: """ Scan for SQL injection attempts. Checks for: - DROP/TRUNCATE/DELETE statements - UNION-based injection - Comment-based injection (-- or /*) - Logical operator abuse (OR 1=1) - Admin bypass patterns Args: user_id: User identifier server_key: Server key tool: Tool name arguments: Tool arguments llm_context: Optional context Returns: ScanResult with detection status """ context = (llm_context or "") + " " + self._flatten_json(arguments) detected = self._contains_pattern(context, SQL_DANGEROUS_PATTERNS) reasons = [] flags = {} if detected: reasons.append("Potential SQL injection pattern detected.") flags["sql_injection"] = True return ScanResult( plugin_name=self.get_metadata().name, detected=detected, risk_score=0.35 if detected else 0.0, reasons=reasons if reasons else ["No SQL injection detected."], flags=flags, ) def _flatten_json(self, value: Any) -> str: """Flatten nested structures to string for pattern matching.""" if isinstance(value, dict): return " ".join(self._flatten_json(v) for v in value.values()) if isinstance(value, list): return " ".join(self._flatten_json(v) for v in value) return str(value) def _contains_pattern(self, text: str, patterns: list) -> bool: """Check if text matches any pattern.""" for pat in patterns: if re.search(pat, text, flags=re.IGNORECASE | re.DOTALL): return True return False # Export as module-level plugin for auto-loading plugin = SQLInjectionDetector()