File size: 13,382 Bytes
673435a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
"""
VoiceForge Security Test Suite
Automated penetration testing scripts for OWASP Top 10 vulnerabilities.

Usage:
    python security_tests.py --base-url http://localhost:8000

IMPORTANT: Only run against test/dev environments you own!
"""

import argparse
import requests
import json
import re
from typing import Dict, List, Any


class SecurityTester:
    """Automated security testing for VoiceForge API."""
    
    def __init__(self, base_url: str):
        self.base_url = base_url.rstrip('/')
        self.results: List[Dict[str, Any]] = []
        self.session = requests.Session()
    
    def log_result(self, test_name: str, passed: bool, details: str):
        """Log test result."""
        status = "✅ PASS" if passed else "❌ FAIL"
        print(f"{status}: {test_name}")
        if not passed:
            print(f"   Details: {details}")
        self.results.append({
            "test": test_name,
            "passed": passed,
            "details": details
        })
    
    # =========================================================================
    # INJECTION TESTS (OWASP A03:2021)
    # =========================================================================
    
    def test_sql_injection(self):
        """Test for SQL injection vulnerabilities."""
        print("\n[1] SQL Injection Tests")
        print("-" * 40)
        
        payloads = [
            "' OR '1'='1",
            "'; DROP TABLE users;--",
            "1' UNION SELECT * FROM users--",
            "admin'--",
            "1; SELECT * FROM users WHERE '1'='1",
        ]
        
        # Test login endpoint
        for payload in payloads:
            try:
                response = self.session.post(
                    f"{self.base_url}/api/v1/auth/login",
                    json={"email": payload, "password": payload},
                    timeout=5
                )
                
                # Check for SQL error messages (bad sign if exposed)
                suspicious_patterns = [
                    "sql", "syntax", "query", "sqlite", "mysql", "postgres",
                    "ORA-", "ODBC", "exception"
                ]
                
                response_text = response.text.lower()
                leaked = any(p in response_text for p in suspicious_patterns)
                
                if leaked:
                    self.log_result(
                        f"SQL Injection ({payload[:20]}...)",
                        False,
                        "Database error message leaked in response"
                    )
                    return
            except requests.exceptions.RequestException:
                pass
        
        self.log_result("SQL Injection", True, "No SQL errors leaked")
    
    def test_xss_injection(self):
        """Test for Cross-Site Scripting vulnerabilities."""
        print("\n[2] XSS Injection Tests")
        print("-" * 40)
        
        payloads = [
            "<script>alert('XSS')</script>",
            "<img src=x onerror=alert('XSS')>",
            "javascript:alert('XSS')",
            "<svg onload=alert('XSS')>",
            "{{7*7}}",  # Template injection
        ]
        
        for payload in payloads:
            try:
                # Test text input (TTS endpoint)
                response = self.session.post(
                    f"{self.base_url}/api/v1/tts/synthesize",
                    json={"text": payload, "voice": "en-US-JennyNeural"},
                    timeout=10
                )
                
                # Check if payload is reflected without encoding
                if payload in response.text and "<script>" in response.text:
                    self.log_result(
                        f"XSS ({payload[:20]}...)",
                        False,
                        "Script tag reflected in response"
                    )
                    return
            except requests.exceptions.RequestException:
                pass
        
        self.log_result("XSS Injection", True, "No XSS vulnerabilities found")
    
    def test_command_injection(self):
        """Test for OS command injection."""
        print("\n[3] Command Injection Tests")
        print("-" * 40)
        
        payloads = [
            "; ls -la",
            "| cat /etc/passwd",
            "`id`",
            "$(whoami)",
            "&& dir",
        ]
        
        # Test file upload endpoint with malicious filename
        for payload in payloads:
            try:
                files = {'file': (f'test{payload}.wav', b'fake audio', 'audio/wav')}
                response = self.session.post(
                    f"{self.base_url}/api/v1/stt/upload",
                    files=files,
                    timeout=10
                )
                
                # Check for command output in response
                suspicious = ["root:", "uid=", "gid=", "Directory of"]
                if any(s in response.text for s in suspicious):
                    self.log_result(
                        f"Command Injection ({payload})",
                        False,
                        "Command output detected in response"
                    )
                    return
            except requests.exceptions.RequestException:
                pass
        
        self.log_result("Command Injection", True, "No command injection found")
    
    # =========================================================================
    # AUTHENTICATION TESTS (OWASP A07:2021)
    # =========================================================================
    
    def test_broken_authentication(self):
        """Test for authentication bypass and weak implementations."""
        print("\n[4] Authentication Tests")
        print("-" * 40)
        
        # Test 1: Access protected endpoint without token
        try:
            response = self.session.get(
                f"{self.base_url}/api/v1/transcripts",
                timeout=5
            )
            if response.status_code == 200:
                self.log_result(
                    "Auth Bypass (No Token)",
                    False,
                    "Protected endpoint accessible without authentication"
                )
            else:
                self.log_result("Auth Bypass (No Token)", True, "Endpoint protected")
        except requests.exceptions.RequestException:
            self.log_result("Auth Bypass (No Token)", True, "Request failed (expected)")
        
        # Test 2: JWT manipulation
        fake_tokens = [
            "eyJhbGciOiJub25lIiwidHlwIjoiSldUIn0.eyJzdWIiOiJhZG1pbiJ9.",  # alg:none
            "invalid.token.here",
            "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiJ9.fake",
        ]
        
        for token in fake_tokens:
            try:
                response = self.session.get(
                    f"{self.base_url}/api/v1/transcripts",
                    headers={"Authorization": f"Bearer {token}"},
                    timeout=5
                )
                if response.status_code == 200:
                    self.log_result(
                        "JWT Manipulation",
                        False,
                        f"Fake token accepted: {token[:30]}..."
                    )
                    return
            except requests.exceptions.RequestException:
                pass
        
        self.log_result("JWT Manipulation", True, "Fake tokens rejected")
    
    def test_rate_limiting(self):
        """Test rate limiting implementation."""
        print("\n[5] Rate Limiting Tests")
        print("-" * 40)
        
        # Hit login endpoint rapidly
        blocked = False
        for i in range(20):
            try:
                response = self.session.post(
                    f"{self.base_url}/api/v1/auth/login",
                    json={"email": f"test{i}@test.com", "password": "wrong"},
                    timeout=2
                )
                if response.status_code == 429:
                    blocked = True
                    break
            except requests.exceptions.RequestException:
                break
        
        self.log_result(
            "Rate Limiting",
            blocked,
            "429 returned" if blocked else "No rate limiting detected after 20 requests"
        )
    
    # =========================================================================
    # SECURITY MISCONFIGURATION (OWASP A05:2021)
    # =========================================================================
    
    def test_security_headers(self):
        """Test for security headers in responses."""
        print("\n[6] Security Headers Tests")
        print("-" * 40)
        
        try:
            response = self.session.get(f"{self.base_url}/health", timeout=5)
            headers = response.headers
            
            required_headers = {
                'X-Content-Type-Options': 'nosniff',
                'X-Frame-Options': ['DENY', 'SAMEORIGIN'],
                'Strict-Transport-Security': None,  # Just check presence
                'Content-Security-Policy': None,
            }
            
            all_present = True
            for header, expected in required_headers.items():
                value = headers.get(header)
                if not value:
                    self.log_result(f"Header: {header}", False, "Missing")
                    all_present = False
                elif expected and value not in (expected if isinstance(expected, list) else [expected]):
                    self.log_result(f"Header: {header}", False, f"Unexpected value: {value}")
                    all_present = False
                else:
                    self.log_result(f"Header: {header}", True, value)
            
        except requests.exceptions.RequestException as e:
            self.log_result("Security Headers", False, str(e))
    
    def test_information_disclosure(self):
        """Test for sensitive information disclosure."""
        print("\n[7] Information Disclosure Tests")
        print("-" * 40)
        
        # Check for verbose error messages
        try:
            response = self.session.get(
                f"{self.base_url}/api/v1/nonexistent_endpoint_12345",
                timeout=5
            )
            
            sensitive_patterns = [
                "traceback", "stack trace", "exception", "error in",
                "line \\d+", "file \"", "python", "uvicorn"
            ]
            
            response_lower = response.text.lower()
            disclosed = any(re.search(p, response_lower) for p in sensitive_patterns)
            
            self.log_result(
                "Error Message Disclosure",
                not disclosed,
                "Stack trace exposed" if disclosed else "Generic error returned"
            )
        except requests.exceptions.RequestException:
            pass
        
        # Check for sensitive files
        sensitive_paths = [
            "/.env", "/.git/config", "/config.py", "/docker-compose.yml",
            "/requirements.txt", "/.aws/credentials"
        ]
        
        for path in sensitive_paths:
            try:
                response = self.session.get(f"{self.base_url}{path}", timeout=3)
                if response.status_code == 200 and len(response.text) > 10:
                    self.log_result(
                        f"Sensitive File: {path}",
                        False,
                        "File accessible"
                    )
                else:
                    pass  # Not accessible (good)
            except requests.exceptions.RequestException:
                pass
        
        self.log_result("Sensitive Files", True, "No sensitive files exposed")
    
    # =========================================================================
    # MAIN RUNNER
    # =========================================================================
    
    def run_all_tests(self):
        """Run all security tests."""
        print("=" * 60)
        print("VoiceForge Security Test Suite")
        print(f"Target: {self.base_url}")
        print("=" * 60)
        
        self.test_sql_injection()
        self.test_xss_injection()
        self.test_command_injection()
        self.test_broken_authentication()
        self.test_rate_limiting()
        self.test_security_headers()
        self.test_information_disclosure()
        
        # Summary
        print("\n" + "=" * 60)
        print("SUMMARY")
        print("=" * 60)
        
        passed = sum(1 for r in self.results if r['passed'])
        total = len(self.results)
        
        print(f"Passed: {passed}/{total}")
        print(f"Failed: {total - passed}/{total}")
        
        if total - passed > 0:
            print("\n❌ FAILED TESTS:")
            for r in self.results:
                if not r['passed']:
                    print(f"  - {r['test']}: {r['details']}")
        
        return passed == total


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="VoiceForge Security Test Suite")
    parser.add_argument("--base-url", default="http://localhost:8000", help="API base URL")
    args = parser.parse_args()
    
    tester = SecurityTester(args.base_url)
    success = tester.run_all_tests()
    exit(0 if success else 1)