Spaces:
Running
Running
| """ | |
| Script and Nginx configuration validator | |
| Tests generated bash scripts and nginx configurations | |
| """ | |
| import re | |
| import subprocess | |
| import tempfile | |
| import os | |
| from pathlib import Path | |
| from typing import Dict, List, Optional | |
| import shutil | |
| class ScriptValidator: | |
| """Validates bash scripts and nginx configurations""" | |
| def __init__(self, logger): | |
| self.logger = logger | |
| def validate_bash_script(self, script_content: str) -> Dict: | |
| """ | |
| Validate bash script for syntax errors and best practices | |
| Returns: | |
| { | |
| "valid": bool, | |
| "errors": List[str], | |
| "warnings": List[str], | |
| "suggestions": List[str], | |
| "score": int | |
| } | |
| """ | |
| errors = [] | |
| warnings = [] | |
| suggestions = [] | |
| lines = script_content.split('\n') | |
| # Check for shebang | |
| if not lines[0].startswith('#!'): | |
| errors.append("Missing shebang - script should start with '#!/bin/bash'") | |
| elif '#!/bin/bash' not in lines[0] and '#!/usr/bin/env bash' not in lines[0]: | |
| warnings.append("Shebang should use bash: #!/bin/bash or #!/usr/bin/env bash") | |
| # Check for set -e (exit on error) | |
| has_set_e = any('set -e' in line or 'set -euo pipefail' in line for line in lines) | |
| if not has_set_e: | |
| suggestions.append("Consider adding 'set -e' to exit on errors") | |
| # Check for dangerous commands without safeguards | |
| dangerous_patterns = [ | |
| (r'\brm\s+-rf\s+/', "Using 'rm -rf /' is extremely dangerous"), | |
| (r'\bchmod\s+777', "chmod 777 is a security risk - use specific permissions"), | |
| (r'\bsudo\s+su\s+-', "Using 'sudo su -' unnecessarily escalates privileges"), | |
| ] | |
| for line_num, line in enumerate(lines, 1): | |
| for pattern, message in dangerous_patterns: | |
| if re.search(pattern, line): | |
| warnings.append(f"Line {line_num}: {message}") | |
| # Check for unquoted variables | |
| unquoted_vars = re.findall(r'[^"\']\$\{?[A-Za-z_][A-Za-z0-9_]*\}?[^"\']', script_content) | |
| if unquoted_vars: | |
| suggestions.append("Consider quoting variables to prevent word splitting") | |
| # Check for package manager best practices | |
| if 'apt-get update' in script_content and 'apt-get upgrade' not in script_content: | |
| suggestions.append("Consider running 'apt-get upgrade' after update for security patches") | |
| if 'apt-get install' in script_content and '-y' not in script_content: | |
| warnings.append("apt-get install should use -y flag for non-interactive installation") | |
| # Check for error handling | |
| if 'systemctl' in script_content and not any('||' in line or 'if' in line for line in lines if 'systemctl' in line): | |
| suggestions.append("Consider adding error handling for systemctl commands") | |
| # Check for firewall configuration | |
| if any(db in script_content.lower() for db in ['mysql', 'postgresql', 'mongodb']) and 'ufw' not in script_content.lower() and 'firewall' not in script_content.lower(): | |
| warnings.append("Database installed without firewall configuration") | |
| # Run shellcheck if available | |
| shellcheck_results = self._run_shellcheck(script_content) | |
| if shellcheck_results: | |
| errors.extend(shellcheck_results.get('errors', [])) | |
| warnings.extend(shellcheck_results.get('warnings', [])) | |
| is_valid = len(errors) == 0 | |
| return { | |
| "valid": is_valid, | |
| "errors": errors, | |
| "warnings": warnings, | |
| "suggestions": suggestions, | |
| "score": self._calculate_score(errors, warnings, suggestions) | |
| } | |
| def _run_shellcheck(self, script_content: str) -> Optional[Dict]: | |
| """Run shellcheck if available""" | |
| if not shutil.which('shellcheck'): | |
| return None | |
| try: | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.sh', delete=False) as f: | |
| f.write(script_content) | |
| temp_path = f.name | |
| result = subprocess.run( | |
| ['shellcheck', '-f', 'json', temp_path], | |
| capture_output=True, | |
| text=True, | |
| timeout=5 | |
| ) | |
| os.unlink(temp_path) | |
| if result.returncode == 0: | |
| return {"errors": [], "warnings": []} | |
| # Parse shellcheck JSON output | |
| import json | |
| try: | |
| issues = json.loads(result.stdout) | |
| errors = [] | |
| warnings = [] | |
| for issue in issues: | |
| level = issue.get('level', 'info') | |
| message = f"Line {issue.get('line', '?')}: {issue.get('message', 'Unknown issue')}" | |
| if level == 'error': | |
| errors.append(message) | |
| elif level == 'warning': | |
| warnings.append(message) | |
| return {"errors": errors, "warnings": warnings} | |
| except: | |
| return None | |
| except Exception as e: | |
| self.logger.warning(f"Shellcheck validation failed: {str(e)}") | |
| return None | |
| def validate_nginx_config(self, nginx_content: str) -> Dict: | |
| """Validate nginx configuration""" | |
| errors = [] | |
| warnings = [] | |
| suggestions = [] | |
| lines = nginx_content.split('\n') | |
| # Check for server block | |
| if 'server {' not in nginx_content: | |
| errors.append("Missing 'server' block - nginx config must define a server") | |
| # Check for listen directive | |
| if not any('listen' in line for line in lines): | |
| errors.append("Missing 'listen' directive - must specify port") | |
| # Check for server_name | |
| if not any('server_name' in line for line in lines): | |
| warnings.append("Missing 'server_name' directive - consider specifying domain") | |
| # Check for root or proxy_pass | |
| has_root = any('root' in line for line in lines if not line.strip().startswith('#')) | |
| has_proxy = any('proxy_pass' in line for line in lines if not line.strip().startswith('#')) | |
| if not has_root and not has_proxy: | |
| warnings.append("No 'root' or 'proxy_pass' directive - where should nginx serve from?") | |
| # Check for SSL/TLS if port 443 | |
| if '443' in nginx_content: | |
| if 'ssl_certificate' not in nginx_content: | |
| warnings.append("Port 443 configured but no SSL certificate specified") | |
| if 'ssl_certificate_key' not in nginx_content: | |
| warnings.append("Port 443 configured but no SSL key specified") | |
| # Check for security headers | |
| security_headers = [ | |
| 'X-Frame-Options', | |
| 'X-Content-Type-Options', | |
| 'X-XSS-Protection', | |
| ] | |
| missing_headers = [h for h in security_headers if h not in nginx_content] | |
| if missing_headers: | |
| suggestions.append(f"Consider adding security headers: {', '.join(missing_headers)}") | |
| # Check for gzip compression | |
| if 'gzip' not in nginx_content: | |
| suggestions.append("Consider enabling gzip compression for better performance") | |
| # Check for client_max_body_size | |
| if 'client_max_body_size' not in nginx_content: | |
| suggestions.append("Consider setting client_max_body_size to prevent upload issues") | |
| # Check for access_log and error_log | |
| if 'access_log' not in nginx_content: | |
| suggestions.append("Consider specifying access_log location") | |
| if 'error_log' not in nginx_content: | |
| suggestions.append("Consider specifying error_log location") | |
| # Try nginx -t if available (syntax check) | |
| nginx_test = self._test_nginx_syntax(nginx_content) | |
| if nginx_test and not nginx_test['valid']: | |
| errors.extend(nginx_test.get('errors', [])) | |
| is_valid = len(errors) == 0 | |
| return { | |
| "valid": is_valid, | |
| "errors": errors, | |
| "warnings": warnings, | |
| "suggestions": suggestions, | |
| "score": self._calculate_score(errors, warnings, suggestions) | |
| } | |
| def _test_nginx_syntax(self, nginx_content: str) -> Optional[Dict]: | |
| """Test nginx syntax if nginx is available""" | |
| if not shutil.which('nginx'): | |
| return None | |
| try: | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| conf_path = Path(tmpdir) / 'nginx.conf' | |
| conf_path.write_text(nginx_content) | |
| result = subprocess.run( | |
| ['nginx', '-t', '-c', str(conf_path)], | |
| capture_output=True, | |
| text=True, | |
| timeout=5 | |
| ) | |
| if result.returncode == 0: | |
| return {"valid": True, "errors": []} | |
| else: | |
| error_msg = result.stderr.strip() | |
| return {"valid": False, "errors": [f"Nginx syntax error: {error_msg}"]} | |
| except Exception as e: | |
| self.logger.warning(f"Nginx syntax test failed: {str(e)}") | |
| return None | |
| def _calculate_score(self, errors: List, warnings: List, suggestions: List) -> int: | |
| """Calculate quality score 0-100""" | |
| score = 100 | |
| score -= len(errors) * 20 # Critical issues | |
| score -= len(warnings) * 5 # Minor issues | |
| score -= len(suggestions) * 2 # Improvements | |
| return max(0, min(100, score)) | |