Spaces:
Sleeping
Sleeping
| """ | |
| VoiceForge Security Test Suite | |
| Automated penetration testing scripts for OWASP Top 10 vulnerabilities. | |
| Usage: | |
| python security_tests.py --base-url http://localhost:8000 | |
| IMPORTANT: Only run against test/dev environments you own! | |
| """ | |
| import argparse | |
| import requests | |
| import json | |
| import re | |
| from typing import Dict, List, Any | |
| class SecurityTester: | |
| """Automated security testing for VoiceForge API.""" | |
| def __init__(self, base_url: str): | |
| self.base_url = base_url.rstrip('/') | |
| self.results: List[Dict[str, Any]] = [] | |
| self.session = requests.Session() | |
| def log_result(self, test_name: str, passed: bool, details: str): | |
| """Log test result.""" | |
| status = "✅ PASS" if passed else "❌ FAIL" | |
| print(f"{status}: {test_name}") | |
| if not passed: | |
| print(f" Details: {details}") | |
| self.results.append({ | |
| "test": test_name, | |
| "passed": passed, | |
| "details": details | |
| }) | |
| # ========================================================================= | |
| # INJECTION TESTS (OWASP A03:2021) | |
| # ========================================================================= | |
| def test_sql_injection(self): | |
| """Test for SQL injection vulnerabilities.""" | |
| print("\n[1] SQL Injection Tests") | |
| print("-" * 40) | |
| payloads = [ | |
| "' OR '1'='1", | |
| "'; DROP TABLE users;--", | |
| "1' UNION SELECT * FROM users--", | |
| "admin'--", | |
| "1; SELECT * FROM users WHERE '1'='1", | |
| ] | |
| # Test login endpoint | |
| for payload in payloads: | |
| try: | |
| response = self.session.post( | |
| f"{self.base_url}/api/v1/auth/login", | |
| json={"email": payload, "password": payload}, | |
| timeout=5 | |
| ) | |
| # Check for SQL error messages (bad sign if exposed) | |
| suspicious_patterns = [ | |
| "sql", "syntax", "query", "sqlite", "mysql", "postgres", | |
| "ORA-", "ODBC", "exception" | |
| ] | |
| response_text = response.text.lower() | |
| leaked = any(p in response_text for p in suspicious_patterns) | |
| if leaked: | |
| self.log_result( | |
| f"SQL Injection ({payload[:20]}...)", | |
| False, | |
| "Database error message leaked in response" | |
| ) | |
| return | |
| except requests.exceptions.RequestException: | |
| pass | |
| self.log_result("SQL Injection", True, "No SQL errors leaked") | |
| def test_xss_injection(self): | |
| """Test for Cross-Site Scripting vulnerabilities.""" | |
| print("\n[2] XSS Injection Tests") | |
| print("-" * 40) | |
| payloads = [ | |
| "<script>alert('XSS')</script>", | |
| "<img src=x onerror=alert('XSS')>", | |
| "javascript:alert('XSS')", | |
| "<svg onload=alert('XSS')>", | |
| "{{7*7}}", # Template injection | |
| ] | |
| for payload in payloads: | |
| try: | |
| # Test text input (TTS endpoint) | |
| response = self.session.post( | |
| f"{self.base_url}/api/v1/tts/synthesize", | |
| json={"text": payload, "voice": "en-US-JennyNeural"}, | |
| timeout=10 | |
| ) | |
| # Check if payload is reflected without encoding | |
| if payload in response.text and "<script>" in response.text: | |
| self.log_result( | |
| f"XSS ({payload[:20]}...)", | |
| False, | |
| "Script tag reflected in response" | |
| ) | |
| return | |
| except requests.exceptions.RequestException: | |
| pass | |
| self.log_result("XSS Injection", True, "No XSS vulnerabilities found") | |
| def test_command_injection(self): | |
| """Test for OS command injection.""" | |
| print("\n[3] Command Injection Tests") | |
| print("-" * 40) | |
| payloads = [ | |
| "; ls -la", | |
| "| cat /etc/passwd", | |
| "`id`", | |
| "$(whoami)", | |
| "&& dir", | |
| ] | |
| # Test file upload endpoint with malicious filename | |
| for payload in payloads: | |
| try: | |
| files = {'file': (f'test{payload}.wav', b'fake audio', 'audio/wav')} | |
| response = self.session.post( | |
| f"{self.base_url}/api/v1/stt/upload", | |
| files=files, | |
| timeout=10 | |
| ) | |
| # Check for command output in response | |
| suspicious = ["root:", "uid=", "gid=", "Directory of"] | |
| if any(s in response.text for s in suspicious): | |
| self.log_result( | |
| f"Command Injection ({payload})", | |
| False, | |
| "Command output detected in response" | |
| ) | |
| return | |
| except requests.exceptions.RequestException: | |
| pass | |
| self.log_result("Command Injection", True, "No command injection found") | |
| # ========================================================================= | |
| # AUTHENTICATION TESTS (OWASP A07:2021) | |
| # ========================================================================= | |
| def test_broken_authentication(self): | |
| """Test for authentication bypass and weak implementations.""" | |
| print("\n[4] Authentication Tests") | |
| print("-" * 40) | |
| # Test 1: Access protected endpoint without token | |
| try: | |
| response = self.session.get( | |
| f"{self.base_url}/api/v1/transcripts", | |
| timeout=5 | |
| ) | |
| if response.status_code == 200: | |
| self.log_result( | |
| "Auth Bypass (No Token)", | |
| False, | |
| "Protected endpoint accessible without authentication" | |
| ) | |
| else: | |
| self.log_result("Auth Bypass (No Token)", True, "Endpoint protected") | |
| except requests.exceptions.RequestException: | |
| self.log_result("Auth Bypass (No Token)", True, "Request failed (expected)") | |
| # Test 2: JWT manipulation | |
| fake_tokens = [ | |
| "eyJhbGciOiJub25lIiwidHlwIjoiSldUIn0.eyJzdWIiOiJhZG1pbiJ9.", # alg:none | |
| "invalid.token.here", | |
| "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiJ9.fake", | |
| ] | |
| for token in fake_tokens: | |
| try: | |
| response = self.session.get( | |
| f"{self.base_url}/api/v1/transcripts", | |
| headers={"Authorization": f"Bearer {token}"}, | |
| timeout=5 | |
| ) | |
| if response.status_code == 200: | |
| self.log_result( | |
| "JWT Manipulation", | |
| False, | |
| f"Fake token accepted: {token[:30]}..." | |
| ) | |
| return | |
| except requests.exceptions.RequestException: | |
| pass | |
| self.log_result("JWT Manipulation", True, "Fake tokens rejected") | |
| def test_rate_limiting(self): | |
| """Test rate limiting implementation.""" | |
| print("\n[5] Rate Limiting Tests") | |
| print("-" * 40) | |
| # Hit login endpoint rapidly | |
| blocked = False | |
| for i in range(20): | |
| try: | |
| response = self.session.post( | |
| f"{self.base_url}/api/v1/auth/login", | |
| json={"email": f"test{i}@test.com", "password": "wrong"}, | |
| timeout=2 | |
| ) | |
| if response.status_code == 429: | |
| blocked = True | |
| break | |
| except requests.exceptions.RequestException: | |
| break | |
| self.log_result( | |
| "Rate Limiting", | |
| blocked, | |
| "429 returned" if blocked else "No rate limiting detected after 20 requests" | |
| ) | |
| # ========================================================================= | |
| # SECURITY MISCONFIGURATION (OWASP A05:2021) | |
| # ========================================================================= | |
| def test_security_headers(self): | |
| """Test for security headers in responses.""" | |
| print("\n[6] Security Headers Tests") | |
| print("-" * 40) | |
| try: | |
| response = self.session.get(f"{self.base_url}/health", timeout=5) | |
| headers = response.headers | |
| required_headers = { | |
| 'X-Content-Type-Options': 'nosniff', | |
| 'X-Frame-Options': ['DENY', 'SAMEORIGIN'], | |
| 'Strict-Transport-Security': None, # Just check presence | |
| 'Content-Security-Policy': None, | |
| } | |
| all_present = True | |
| for header, expected in required_headers.items(): | |
| value = headers.get(header) | |
| if not value: | |
| self.log_result(f"Header: {header}", False, "Missing") | |
| all_present = False | |
| elif expected and value not in (expected if isinstance(expected, list) else [expected]): | |
| self.log_result(f"Header: {header}", False, f"Unexpected value: {value}") | |
| all_present = False | |
| else: | |
| self.log_result(f"Header: {header}", True, value) | |
| except requests.exceptions.RequestException as e: | |
| self.log_result("Security Headers", False, str(e)) | |
| def test_information_disclosure(self): | |
| """Test for sensitive information disclosure.""" | |
| print("\n[7] Information Disclosure Tests") | |
| print("-" * 40) | |
| # Check for verbose error messages | |
| try: | |
| response = self.session.get( | |
| f"{self.base_url}/api/v1/nonexistent_endpoint_12345", | |
| timeout=5 | |
| ) | |
| sensitive_patterns = [ | |
| "traceback", "stack trace", "exception", "error in", | |
| "line \\d+", "file \"", "python", "uvicorn" | |
| ] | |
| response_lower = response.text.lower() | |
| disclosed = any(re.search(p, response_lower) for p in sensitive_patterns) | |
| self.log_result( | |
| "Error Message Disclosure", | |
| not disclosed, | |
| "Stack trace exposed" if disclosed else "Generic error returned" | |
| ) | |
| except requests.exceptions.RequestException: | |
| pass | |
| # Check for sensitive files | |
| sensitive_paths = [ | |
| "/.env", "/.git/config", "/config.py", "/docker-compose.yml", | |
| "/requirements.txt", "/.aws/credentials" | |
| ] | |
| for path in sensitive_paths: | |
| try: | |
| response = self.session.get(f"{self.base_url}{path}", timeout=3) | |
| if response.status_code == 200 and len(response.text) > 10: | |
| self.log_result( | |
| f"Sensitive File: {path}", | |
| False, | |
| "File accessible" | |
| ) | |
| else: | |
| pass # Not accessible (good) | |
| except requests.exceptions.RequestException: | |
| pass | |
| self.log_result("Sensitive Files", True, "No sensitive files exposed") | |
| # ========================================================================= | |
| # MAIN RUNNER | |
| # ========================================================================= | |
| def run_all_tests(self): | |
| """Run all security tests.""" | |
| print("=" * 60) | |
| print("VoiceForge Security Test Suite") | |
| print(f"Target: {self.base_url}") | |
| print("=" * 60) | |
| self.test_sql_injection() | |
| self.test_xss_injection() | |
| self.test_command_injection() | |
| self.test_broken_authentication() | |
| self.test_rate_limiting() | |
| self.test_security_headers() | |
| self.test_information_disclosure() | |
| # Summary | |
| print("\n" + "=" * 60) | |
| print("SUMMARY") | |
| print("=" * 60) | |
| passed = sum(1 for r in self.results if r['passed']) | |
| total = len(self.results) | |
| print(f"Passed: {passed}/{total}") | |
| print(f"Failed: {total - passed}/{total}") | |
| if total - passed > 0: | |
| print("\n❌ FAILED TESTS:") | |
| for r in self.results: | |
| if not r['passed']: | |
| print(f" - {r['test']}: {r['details']}") | |
| return passed == total | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description="VoiceForge Security Test Suite") | |
| parser.add_argument("--base-url", default="http://localhost:8000", help="API base URL") | |
| args = parser.parse_args() | |
| tester = SecurityTester(args.base_url) | |
| success = tester.run_all_tests() | |
| exit(0 if success else 1) | |