deploymate / app /services /script_validator.py
shakauthossain's picture
V2.0.0
2df0cf9 verified
"""
Script and Nginx configuration validator
Tests generated bash scripts and nginx configurations
"""
import re
import subprocess
import tempfile
import os
from pathlib import Path
from typing import Dict, List, Optional
import shutil
class ScriptValidator:
"""Validates bash scripts and nginx configurations"""
def __init__(self, logger):
self.logger = logger
def validate_bash_script(self, script_content: str) -> Dict:
"""
Validate bash script for syntax errors and best practices
Returns:
{
"valid": bool,
"errors": List[str],
"warnings": List[str],
"suggestions": List[str],
"score": int
}
"""
errors = []
warnings = []
suggestions = []
lines = script_content.split('\n')
# Check for shebang
if not lines[0].startswith('#!'):
errors.append("Missing shebang - script should start with '#!/bin/bash'")
elif '#!/bin/bash' not in lines[0] and '#!/usr/bin/env bash' not in lines[0]:
warnings.append("Shebang should use bash: #!/bin/bash or #!/usr/bin/env bash")
# Check for set -e (exit on error)
has_set_e = any('set -e' in line or 'set -euo pipefail' in line for line in lines)
if not has_set_e:
suggestions.append("Consider adding 'set -e' to exit on errors")
# Check for dangerous commands without safeguards
dangerous_patterns = [
(r'\brm\s+-rf\s+/', "Using 'rm -rf /' is extremely dangerous"),
(r'\bchmod\s+777', "chmod 777 is a security risk - use specific permissions"),
(r'\bsudo\s+su\s+-', "Using 'sudo su -' unnecessarily escalates privileges"),
]
for line_num, line in enumerate(lines, 1):
for pattern, message in dangerous_patterns:
if re.search(pattern, line):
warnings.append(f"Line {line_num}: {message}")
# Check for unquoted variables
unquoted_vars = re.findall(r'[^"\']\$\{?[A-Za-z_][A-Za-z0-9_]*\}?[^"\']', script_content)
if unquoted_vars:
suggestions.append("Consider quoting variables to prevent word splitting")
# Check for package manager best practices
if 'apt-get update' in script_content and 'apt-get upgrade' not in script_content:
suggestions.append("Consider running 'apt-get upgrade' after update for security patches")
if 'apt-get install' in script_content and '-y' not in script_content:
warnings.append("apt-get install should use -y flag for non-interactive installation")
# Check for error handling
if 'systemctl' in script_content and not any('||' in line or 'if' in line for line in lines if 'systemctl' in line):
suggestions.append("Consider adding error handling for systemctl commands")
# Check for firewall configuration
if any(db in script_content.lower() for db in ['mysql', 'postgresql', 'mongodb']) and 'ufw' not in script_content.lower() and 'firewall' not in script_content.lower():
warnings.append("Database installed without firewall configuration")
# Run shellcheck if available
shellcheck_results = self._run_shellcheck(script_content)
if shellcheck_results:
errors.extend(shellcheck_results.get('errors', []))
warnings.extend(shellcheck_results.get('warnings', []))
is_valid = len(errors) == 0
return {
"valid": is_valid,
"errors": errors,
"warnings": warnings,
"suggestions": suggestions,
"score": self._calculate_score(errors, warnings, suggestions)
}
def _run_shellcheck(self, script_content: str) -> Optional[Dict]:
"""Run shellcheck if available"""
if not shutil.which('shellcheck'):
return None
try:
with tempfile.NamedTemporaryFile(mode='w', suffix='.sh', delete=False) as f:
f.write(script_content)
temp_path = f.name
result = subprocess.run(
['shellcheck', '-f', 'json', temp_path],
capture_output=True,
text=True,
timeout=5
)
os.unlink(temp_path)
if result.returncode == 0:
return {"errors": [], "warnings": []}
# Parse shellcheck JSON output
import json
try:
issues = json.loads(result.stdout)
errors = []
warnings = []
for issue in issues:
level = issue.get('level', 'info')
message = f"Line {issue.get('line', '?')}: {issue.get('message', 'Unknown issue')}"
if level == 'error':
errors.append(message)
elif level == 'warning':
warnings.append(message)
return {"errors": errors, "warnings": warnings}
except:
return None
except Exception as e:
self.logger.warning(f"Shellcheck validation failed: {str(e)}")
return None
def validate_nginx_config(self, nginx_content: str) -> Dict:
"""Validate nginx configuration"""
errors = []
warnings = []
suggestions = []
lines = nginx_content.split('\n')
# Check for server block
if 'server {' not in nginx_content:
errors.append("Missing 'server' block - nginx config must define a server")
# Check for listen directive
if not any('listen' in line for line in lines):
errors.append("Missing 'listen' directive - must specify port")
# Check for server_name
if not any('server_name' in line for line in lines):
warnings.append("Missing 'server_name' directive - consider specifying domain")
# Check for root or proxy_pass
has_root = any('root' in line for line in lines if not line.strip().startswith('#'))
has_proxy = any('proxy_pass' in line for line in lines if not line.strip().startswith('#'))
if not has_root and not has_proxy:
warnings.append("No 'root' or 'proxy_pass' directive - where should nginx serve from?")
# Check for SSL/TLS if port 443
if '443' in nginx_content:
if 'ssl_certificate' not in nginx_content:
warnings.append("Port 443 configured but no SSL certificate specified")
if 'ssl_certificate_key' not in nginx_content:
warnings.append("Port 443 configured but no SSL key specified")
# Check for security headers
security_headers = [
'X-Frame-Options',
'X-Content-Type-Options',
'X-XSS-Protection',
]
missing_headers = [h for h in security_headers if h not in nginx_content]
if missing_headers:
suggestions.append(f"Consider adding security headers: {', '.join(missing_headers)}")
# Check for gzip compression
if 'gzip' not in nginx_content:
suggestions.append("Consider enabling gzip compression for better performance")
# Check for client_max_body_size
if 'client_max_body_size' not in nginx_content:
suggestions.append("Consider setting client_max_body_size to prevent upload issues")
# Check for access_log and error_log
if 'access_log' not in nginx_content:
suggestions.append("Consider specifying access_log location")
if 'error_log' not in nginx_content:
suggestions.append("Consider specifying error_log location")
# Try nginx -t if available (syntax check)
nginx_test = self._test_nginx_syntax(nginx_content)
if nginx_test and not nginx_test['valid']:
errors.extend(nginx_test.get('errors', []))
is_valid = len(errors) == 0
return {
"valid": is_valid,
"errors": errors,
"warnings": warnings,
"suggestions": suggestions,
"score": self._calculate_score(errors, warnings, suggestions)
}
def _test_nginx_syntax(self, nginx_content: str) -> Optional[Dict]:
"""Test nginx syntax if nginx is available"""
if not shutil.which('nginx'):
return None
try:
with tempfile.TemporaryDirectory() as tmpdir:
conf_path = Path(tmpdir) / 'nginx.conf'
conf_path.write_text(nginx_content)
result = subprocess.run(
['nginx', '-t', '-c', str(conf_path)],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
return {"valid": True, "errors": []}
else:
error_msg = result.stderr.strip()
return {"valid": False, "errors": [f"Nginx syntax error: {error_msg}"]}
except Exception as e:
self.logger.warning(f"Nginx syntax test failed: {str(e)}")
return None
def _calculate_score(self, errors: List, warnings: List, suggestions: List) -> int:
"""Calculate quality score 0-100"""
score = 100
score -= len(errors) * 20 # Critical issues
score -= len(warnings) * 5 # Minor issues
score -= len(suggestions) * 2 # Improvements
return max(0, min(100, score))