voiceforge / backend /tests /security /security_tests.py
lordofgaming
Initial VoiceForge deployment (clean)
673435a
"""
VoiceForge Security Test Suite
Automated penetration testing scripts for OWASP Top 10 vulnerabilities.
Usage:
python security_tests.py --base-url http://localhost:8000
IMPORTANT: Only run against test/dev environments you own!
"""
import argparse
import requests
import json
import re
from typing import Dict, List, Any
class SecurityTester:
"""Automated security testing for VoiceForge API."""
def __init__(self, base_url: str):
self.base_url = base_url.rstrip('/')
self.results: List[Dict[str, Any]] = []
self.session = requests.Session()
def log_result(self, test_name: str, passed: bool, details: str):
"""Log test result."""
status = "✅ PASS" if passed else "❌ FAIL"
print(f"{status}: {test_name}")
if not passed:
print(f" Details: {details}")
self.results.append({
"test": test_name,
"passed": passed,
"details": details
})
# =========================================================================
# INJECTION TESTS (OWASP A03:2021)
# =========================================================================
def test_sql_injection(self):
"""Test for SQL injection vulnerabilities."""
print("\n[1] SQL Injection Tests")
print("-" * 40)
payloads = [
"' OR '1'='1",
"'; DROP TABLE users;--",
"1' UNION SELECT * FROM users--",
"admin'--",
"1; SELECT * FROM users WHERE '1'='1",
]
# Test login endpoint
for payload in payloads:
try:
response = self.session.post(
f"{self.base_url}/api/v1/auth/login",
json={"email": payload, "password": payload},
timeout=5
)
# Check for SQL error messages (bad sign if exposed)
suspicious_patterns = [
"sql", "syntax", "query", "sqlite", "mysql", "postgres",
"ORA-", "ODBC", "exception"
]
response_text = response.text.lower()
leaked = any(p in response_text for p in suspicious_patterns)
if leaked:
self.log_result(
f"SQL Injection ({payload[:20]}...)",
False,
"Database error message leaked in response"
)
return
except requests.exceptions.RequestException:
pass
self.log_result("SQL Injection", True, "No SQL errors leaked")
def test_xss_injection(self):
"""Test for Cross-Site Scripting vulnerabilities."""
print("\n[2] XSS Injection Tests")
print("-" * 40)
payloads = [
"<script>alert('XSS')</script>",
"<img src=x onerror=alert('XSS')>",
"javascript:alert('XSS')",
"<svg onload=alert('XSS')>",
"{{7*7}}", # Template injection
]
for payload in payloads:
try:
# Test text input (TTS endpoint)
response = self.session.post(
f"{self.base_url}/api/v1/tts/synthesize",
json={"text": payload, "voice": "en-US-JennyNeural"},
timeout=10
)
# Check if payload is reflected without encoding
if payload in response.text and "<script>" in response.text:
self.log_result(
f"XSS ({payload[:20]}...)",
False,
"Script tag reflected in response"
)
return
except requests.exceptions.RequestException:
pass
self.log_result("XSS Injection", True, "No XSS vulnerabilities found")
def test_command_injection(self):
"""Test for OS command injection."""
print("\n[3] Command Injection Tests")
print("-" * 40)
payloads = [
"; ls -la",
"| cat /etc/passwd",
"`id`",
"$(whoami)",
"&& dir",
]
# Test file upload endpoint with malicious filename
for payload in payloads:
try:
files = {'file': (f'test{payload}.wav', b'fake audio', 'audio/wav')}
response = self.session.post(
f"{self.base_url}/api/v1/stt/upload",
files=files,
timeout=10
)
# Check for command output in response
suspicious = ["root:", "uid=", "gid=", "Directory of"]
if any(s in response.text for s in suspicious):
self.log_result(
f"Command Injection ({payload})",
False,
"Command output detected in response"
)
return
except requests.exceptions.RequestException:
pass
self.log_result("Command Injection", True, "No command injection found")
# =========================================================================
# AUTHENTICATION TESTS (OWASP A07:2021)
# =========================================================================
def test_broken_authentication(self):
"""Test for authentication bypass and weak implementations."""
print("\n[4] Authentication Tests")
print("-" * 40)
# Test 1: Access protected endpoint without token
try:
response = self.session.get(
f"{self.base_url}/api/v1/transcripts",
timeout=5
)
if response.status_code == 200:
self.log_result(
"Auth Bypass (No Token)",
False,
"Protected endpoint accessible without authentication"
)
else:
self.log_result("Auth Bypass (No Token)", True, "Endpoint protected")
except requests.exceptions.RequestException:
self.log_result("Auth Bypass (No Token)", True, "Request failed (expected)")
# Test 2: JWT manipulation
fake_tokens = [
"eyJhbGciOiJub25lIiwidHlwIjoiSldUIn0.eyJzdWIiOiJhZG1pbiJ9.", # alg:none
"invalid.token.here",
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiJ9.fake",
]
for token in fake_tokens:
try:
response = self.session.get(
f"{self.base_url}/api/v1/transcripts",
headers={"Authorization": f"Bearer {token}"},
timeout=5
)
if response.status_code == 200:
self.log_result(
"JWT Manipulation",
False,
f"Fake token accepted: {token[:30]}..."
)
return
except requests.exceptions.RequestException:
pass
self.log_result("JWT Manipulation", True, "Fake tokens rejected")
def test_rate_limiting(self):
"""Test rate limiting implementation."""
print("\n[5] Rate Limiting Tests")
print("-" * 40)
# Hit login endpoint rapidly
blocked = False
for i in range(20):
try:
response = self.session.post(
f"{self.base_url}/api/v1/auth/login",
json={"email": f"test{i}@test.com", "password": "wrong"},
timeout=2
)
if response.status_code == 429:
blocked = True
break
except requests.exceptions.RequestException:
break
self.log_result(
"Rate Limiting",
blocked,
"429 returned" if blocked else "No rate limiting detected after 20 requests"
)
# =========================================================================
# SECURITY MISCONFIGURATION (OWASP A05:2021)
# =========================================================================
def test_security_headers(self):
"""Test for security headers in responses."""
print("\n[6] Security Headers Tests")
print("-" * 40)
try:
response = self.session.get(f"{self.base_url}/health", timeout=5)
headers = response.headers
required_headers = {
'X-Content-Type-Options': 'nosniff',
'X-Frame-Options': ['DENY', 'SAMEORIGIN'],
'Strict-Transport-Security': None, # Just check presence
'Content-Security-Policy': None,
}
all_present = True
for header, expected in required_headers.items():
value = headers.get(header)
if not value:
self.log_result(f"Header: {header}", False, "Missing")
all_present = False
elif expected and value not in (expected if isinstance(expected, list) else [expected]):
self.log_result(f"Header: {header}", False, f"Unexpected value: {value}")
all_present = False
else:
self.log_result(f"Header: {header}", True, value)
except requests.exceptions.RequestException as e:
self.log_result("Security Headers", False, str(e))
def test_information_disclosure(self):
"""Test for sensitive information disclosure."""
print("\n[7] Information Disclosure Tests")
print("-" * 40)
# Check for verbose error messages
try:
response = self.session.get(
f"{self.base_url}/api/v1/nonexistent_endpoint_12345",
timeout=5
)
sensitive_patterns = [
"traceback", "stack trace", "exception", "error in",
"line \\d+", "file \"", "python", "uvicorn"
]
response_lower = response.text.lower()
disclosed = any(re.search(p, response_lower) for p in sensitive_patterns)
self.log_result(
"Error Message Disclosure",
not disclosed,
"Stack trace exposed" if disclosed else "Generic error returned"
)
except requests.exceptions.RequestException:
pass
# Check for sensitive files
sensitive_paths = [
"/.env", "/.git/config", "/config.py", "/docker-compose.yml",
"/requirements.txt", "/.aws/credentials"
]
for path in sensitive_paths:
try:
response = self.session.get(f"{self.base_url}{path}", timeout=3)
if response.status_code == 200 and len(response.text) > 10:
self.log_result(
f"Sensitive File: {path}",
False,
"File accessible"
)
else:
pass # Not accessible (good)
except requests.exceptions.RequestException:
pass
self.log_result("Sensitive Files", True, "No sensitive files exposed")
# =========================================================================
# MAIN RUNNER
# =========================================================================
def run_all_tests(self):
"""Run all security tests."""
print("=" * 60)
print("VoiceForge Security Test Suite")
print(f"Target: {self.base_url}")
print("=" * 60)
self.test_sql_injection()
self.test_xss_injection()
self.test_command_injection()
self.test_broken_authentication()
self.test_rate_limiting()
self.test_security_headers()
self.test_information_disclosure()
# Summary
print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)
passed = sum(1 for r in self.results if r['passed'])
total = len(self.results)
print(f"Passed: {passed}/{total}")
print(f"Failed: {total - passed}/{total}")
if total - passed > 0:
print("\n❌ FAILED TESTS:")
for r in self.results:
if not r['passed']:
print(f" - {r['test']}: {r['details']}")
return passed == total
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="VoiceForge Security Test Suite")
parser.add_argument("--base-url", default="http://localhost:8000", help="API base URL")
args = parser.parse_args()
tester = SecurityTester(args.base_url)
success = tester.run_all_tests()
exit(0 if success else 1)