Spaces:
Sleeping
Sleeping
File size: 2,775 Bytes
790e0e9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 | """
SQL Safety Validator - Prevents dangerous database operations
"""
import re
from typing import Tuple
from config import ALLOWED_SQL_OPERATIONS, DANGEROUS_SQL_KEYWORDS
class SafetyValidator:
"""Validates SQL queries to prevent dangerous operations"""
@staticmethod
def validate_query(query: str) -> Tuple[bool, str]:
"""
Validate if a SQL query is safe to execute
Args:
query: SQL query string to validate
Returns:
Tuple of (is_valid, error_message)
- is_valid: True if query is safe, False otherwise
- error_message: Empty string if valid, error description if invalid
"""
if not query or not query.strip():
return False, "Empty query provided"
# Normalize query for checking
normalized_query = query.strip().upper()
# Check for dangerous keywords
for keyword in DANGEROUS_SQL_KEYWORDS:
# Use word boundaries to avoid false positives
pattern = r'\b' + keyword + r'\b'
if re.search(pattern, normalized_query):
return False, (
f"π« BLOCKED: Query contains dangerous operation '{keyword}'. "
f"Only SELECT queries are allowed for safety reasons."
)
# Ensure query starts with SELECT
if not normalized_query.startswith('SELECT'):
return False, (
"π« BLOCKED: Only SELECT queries are allowed. "
"This application is read-only to prevent accidental data modification."
)
# Additional checks for SQL injection patterns
suspicious_patterns = [
r';.*?(DELETE|DROP|UPDATE|INSERT)', # Multiple statements
r'--', # SQL comments (potential injection)
r'/\*.*?\*/', # Block comments
]
for pattern in suspicious_patterns:
if re.search(pattern, normalized_query, re.IGNORECASE | re.DOTALL):
return False, (
"π« BLOCKED: Query contains suspicious patterns that may indicate "
"SQL injection or multiple statements. Please use simple SELECT queries."
)
return True, ""
@staticmethod
def get_safety_message() -> str:
"""Get a message explaining safety restrictions"""
return (
"π‘οΈ **Safety Features Active**\n\n"
f"β
Allowed operations: {', '.join(ALLOWED_SQL_OPERATIONS)}\n"
f"β Blocked operations: {', '.join(DANGEROUS_SQL_KEYWORDS)}\n\n"
"This ensures your data remains safe from accidental modifications."
)
|