humanizetech commited on
Commit
e238f5c
·
1 Parent(s): 6440b1f

feat: Introduce `vapt_tools.py` for API security testing, covering injection, authentication, and rate limiting.

Browse files
Files changed (1) hide show
  1. vapt_tools.py +596 -0
vapt_tools.py ADDED
@@ -0,0 +1,596 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ VAPT Tools - Custom security testing tools for API vulnerability assessment.
3
+
4
+ This module contains the custom VAPT tool implementation for performing
5
+ comprehensive security tests on API endpoints.
6
+ """
7
+
8
+ import json
9
+ from typing import Dict, List, Any, Optional
10
+ from datetime import datetime
11
+ from pydantic import BaseModel, Field
12
+
13
+ from claude_agent_sdk import create_sdk_mcp_server, tool
14
+
15
+
16
+ # ============================================================================
17
+ # Data Models
18
+ # ============================================================================
19
+
20
+
21
+ class VAPTTestInput(BaseModel):
22
+ """Input schema for VAPT security testing tool."""
23
+
24
+ endpoint: str = Field(
25
+ description="The API endpoint URL to test (e.g., https://api.example.com/v1/users)"
26
+ )
27
+ method: str = Field(
28
+ default="GET", description="HTTP method (GET, POST, PUT, DELETE, PATCH)"
29
+ )
30
+ test_types: List[str] = Field(
31
+ default=["injection", "auth", "rate_limit", "cors"],
32
+ description="Types of security tests to perform: injection, auth, rate_limit, cors, headers, ssl",
33
+ )
34
+ headers: Optional[Dict[str, str]] = Field(
35
+ default=None, description="Optional headers to include in requests"
36
+ )
37
+ body: Optional[str] = Field(
38
+ default=None, description="Optional request body for POST/PUT/PATCH requests"
39
+ )
40
+
41
+
42
+ class VAPTTestResult(BaseModel):
43
+ """Result schema for VAPT security testing."""
44
+
45
+ endpoint: str
46
+ test_type: str
47
+ severity: str # critical, high, medium, low, info
48
+ status: str # vulnerable, secure, warning
49
+ description: str
50
+ recommendation: str
51
+ evidence: Optional[Dict[str, Any]] = None
52
+
53
+
54
+ # ============================================================================
55
+ # Security Testing Functions
56
+ # ============================================================================
57
+
58
+
59
+ async def test_sql_injection(
60
+ session, endpoint: str, method: str, headers: Dict[str, str], body: str
61
+ ) -> List[VAPTTestResult]:
62
+ """Test for SQL injection vulnerabilities."""
63
+ results = []
64
+
65
+ injection_payloads = [
66
+ "' OR '1'='1",
67
+ "' OR '1'='1' --",
68
+ "' UNION SELECT NULL--",
69
+ "admin'--",
70
+ "1' AND '1'='1",
71
+ ]
72
+
73
+ for payload in injection_payloads:
74
+ test_url = f"{endpoint}?q={payload}" if method == "GET" else endpoint
75
+ test_body = body or json.dumps({"input": payload})
76
+
77
+ try:
78
+ import aiohttp
79
+
80
+ async with session.request(
81
+ method,
82
+ test_url,
83
+ headers=headers,
84
+ data=test_body if method in ["POST", "PUT", "PATCH"] else None,
85
+ timeout=aiohttp.ClientTimeout(total=10),
86
+ ssl=False,
87
+ ) as response:
88
+ response_text = await response.text()
89
+
90
+ # Check for SQL error messages
91
+ error_indicators = [
92
+ "sql",
93
+ "mysql",
94
+ "sqlite",
95
+ "postgresql",
96
+ "oracle",
97
+ "syntax error",
98
+ "unclosed quotation",
99
+ "database error",
100
+ ]
101
+
102
+ if any(
103
+ indicator in response_text.lower() for indicator in error_indicators
104
+ ):
105
+ results.append(
106
+ VAPTTestResult(
107
+ endpoint=endpoint,
108
+ test_type="SQL Injection",
109
+ severity="critical",
110
+ status="vulnerable",
111
+ description=f"Endpoint vulnerable to SQL injection with payload: {payload}",
112
+ recommendation="Use parameterized queries/prepared statements. Implement input validation and sanitization.",
113
+ evidence={
114
+ "payload": payload,
115
+ "status_code": response.status,
116
+ },
117
+ )
118
+ )
119
+ break # Found vulnerability, no need to test more payloads
120
+
121
+ except Exception as e:
122
+ results.append(
123
+ VAPTTestResult(
124
+ endpoint=endpoint,
125
+ test_type="SQL Injection Test",
126
+ severity="info",
127
+ status="warning",
128
+ description=f"Could not complete injection test: {str(e)}",
129
+ recommendation="Ensure endpoint is accessible for testing",
130
+ evidence={"error": str(e)},
131
+ )
132
+ )
133
+ break
134
+
135
+ return results
136
+
137
+
138
+ async def test_xss(
139
+ session, endpoint: str, method: str, headers: Dict[str, str], body: str
140
+ ) -> List[VAPTTestResult]:
141
+ """Test for Cross-Site Scripting (XSS) vulnerabilities."""
142
+ results = []
143
+
144
+ xss_payloads = [
145
+ "<script>alert('XSS')</script>",
146
+ "<img src=x onerror=alert('XSS')>",
147
+ "javascript:alert('XSS')",
148
+ ]
149
+
150
+ for payload in xss_payloads:
151
+ test_url = f"{endpoint}?input={payload}" if method == "GET" else endpoint
152
+ test_body = body or json.dumps({"input": payload})
153
+
154
+ try:
155
+ import aiohttp
156
+
157
+ async with session.request(
158
+ method,
159
+ test_url,
160
+ headers=headers,
161
+ data=test_body if method in ["POST", "PUT", "PATCH"] else None,
162
+ timeout=aiohttp.ClientTimeout(total=10),
163
+ ssl=False,
164
+ ) as response:
165
+ response_text = await response.text()
166
+
167
+ # Check if payload is reflected without sanitization
168
+ if payload in response_text:
169
+ results.append(
170
+ VAPTTestResult(
171
+ endpoint=endpoint,
172
+ test_type="XSS (Cross-Site Scripting)",
173
+ severity="high",
174
+ status="vulnerable",
175
+ description=f"Endpoint reflects unsanitized input: {payload}",
176
+ recommendation="Implement proper output encoding and Content-Security-Policy headers.",
177
+ evidence={
178
+ "payload": payload,
179
+ "status_code": response.status,
180
+ },
181
+ )
182
+ )
183
+ break
184
+
185
+ except Exception as e:
186
+ results.append(
187
+ VAPTTestResult(
188
+ endpoint=endpoint,
189
+ test_type="XSS Test",
190
+ severity="info",
191
+ status="warning",
192
+ description=f"Could not complete XSS test: {str(e)}",
193
+ recommendation="Ensure endpoint is accessible for testing",
194
+ evidence={"error": str(e)},
195
+ )
196
+ )
197
+ break
198
+
199
+ return results
200
+
201
+
202
+ async def test_authentication(
203
+ session, endpoint: str, method: str, headers: Dict[str, str]
204
+ ) -> List[VAPTTestResult]:
205
+ """Test authentication and authorization controls."""
206
+ results = []
207
+
208
+ # Test without authentication headers
209
+ try:
210
+ import aiohttp
211
+
212
+ test_headers = headers.copy() if headers else {}
213
+ test_headers.pop("Authorization", None)
214
+ test_headers.pop("X-API-Key", None)
215
+
216
+ async with session.request(
217
+ method,
218
+ endpoint,
219
+ headers=test_headers,
220
+ timeout=aiohttp.ClientTimeout(total=10),
221
+ ssl=False,
222
+ ) as response:
223
+ if response.status == 200:
224
+ results.append(
225
+ VAPTTestResult(
226
+ endpoint=endpoint,
227
+ test_type="Authentication",
228
+ severity="high",
229
+ status="vulnerable",
230
+ description="Endpoint accessible without authentication",
231
+ recommendation="Implement proper authentication (OAuth2, JWT, API Keys). Return 401/403 for unauthenticated requests.",
232
+ evidence={"status_code": response.status},
233
+ )
234
+ )
235
+ elif response.status in [401, 403]:
236
+ results.append(
237
+ VAPTTestResult(
238
+ endpoint=endpoint,
239
+ test_type="Authentication",
240
+ severity="info",
241
+ status="secure",
242
+ description="Endpoint properly requires authentication",
243
+ recommendation="Continue monitoring authentication implementation",
244
+ evidence={"status_code": response.status},
245
+ )
246
+ )
247
+ except Exception as e:
248
+ results.append(
249
+ VAPTTestResult(
250
+ endpoint=endpoint,
251
+ test_type="Authentication Test",
252
+ severity="info",
253
+ status="warning",
254
+ description=f"Could not complete auth test: {str(e)}",
255
+ recommendation="Verify endpoint accessibility",
256
+ evidence={"error": str(e)},
257
+ )
258
+ )
259
+
260
+ return results
261
+
262
+
263
+ async def test_rate_limiting(
264
+ session, endpoint: str, method: str, headers: Dict[str, str]
265
+ ) -> List[VAPTTestResult]:
266
+ """Test rate limiting implementation."""
267
+ results = []
268
+ rate_limit_requests = 50
269
+ rate_limit_exceeded = False
270
+
271
+ try:
272
+ import aiohttp
273
+
274
+ for i in range(rate_limit_requests):
275
+ async with session.request(
276
+ method,
277
+ endpoint,
278
+ headers=headers,
279
+ timeout=aiohttp.ClientTimeout(total=5),
280
+ ssl=False,
281
+ ) as response:
282
+ if response.status == 429:
283
+ rate_limit_exceeded = True
284
+ results.append(
285
+ VAPTTestResult(
286
+ endpoint=endpoint,
287
+ test_type="Rate Limiting",
288
+ severity="info",
289
+ status="secure",
290
+ description=f"Rate limiting detected after {i+1} requests",
291
+ recommendation="Rate limiting is properly configured",
292
+ evidence={"requests_before_limit": i + 1},
293
+ )
294
+ )
295
+ break
296
+
297
+ if not rate_limit_exceeded:
298
+ results.append(
299
+ VAPTTestResult(
300
+ endpoint=endpoint,
301
+ test_type="Rate Limiting",
302
+ severity="medium",
303
+ status="vulnerable",
304
+ description=f"No rate limiting detected after {rate_limit_requests} requests",
305
+ recommendation="Implement rate limiting to prevent abuse and DoS attacks",
306
+ evidence={"requests_sent": rate_limit_requests},
307
+ )
308
+ )
309
+ except Exception as e:
310
+ results.append(
311
+ VAPTTestResult(
312
+ endpoint=endpoint,
313
+ test_type="Rate Limiting Test",
314
+ severity="info",
315
+ status="warning",
316
+ description=f"Could not complete rate limit test: {str(e)}",
317
+ recommendation="Verify endpoint stability",
318
+ evidence={"error": str(e)},
319
+ )
320
+ )
321
+
322
+ return results
323
+
324
+
325
+ async def test_cors_policy(session, endpoint: str, method: str) -> List[VAPTTestResult]:
326
+ """Test CORS policy configuration."""
327
+ results = []
328
+
329
+ cors_headers = {
330
+ "Origin": "https://evil.com",
331
+ "Access-Control-Request-Method": method,
332
+ "Access-Control-Request-Headers": "X-Custom-Header",
333
+ }
334
+
335
+ try:
336
+ import aiohttp
337
+
338
+ async with session.options(
339
+ endpoint,
340
+ headers=cors_headers,
341
+ timeout=aiohttp.ClientTimeout(total=10),
342
+ ssl=False,
343
+ ) as response:
344
+ cors_allow_origin = response.headers.get("Access-Control-Allow-Origin", "")
345
+
346
+ if cors_allow_origin == "*":
347
+ results.append(
348
+ VAPTTestResult(
349
+ endpoint=endpoint,
350
+ test_type="CORS Policy",
351
+ severity="medium",
352
+ status="vulnerable",
353
+ description="CORS allows requests from any origin (*)",
354
+ recommendation="Restrict CORS to specific trusted domains. Avoid using wildcard (*) in production.",
355
+ evidence={"access_control_allow_origin": cors_allow_origin},
356
+ )
357
+ )
358
+ elif cors_allow_origin:
359
+ results.append(
360
+ VAPTTestResult(
361
+ endpoint=endpoint,
362
+ test_type="CORS Policy",
363
+ severity="info",
364
+ status="secure",
365
+ description=f"CORS properly configured for origin: {cors_allow_origin}",
366
+ recommendation="Review allowed origins periodically",
367
+ evidence={"access_control_allow_origin": cors_allow_origin},
368
+ )
369
+ )
370
+ except Exception as e:
371
+ results.append(
372
+ VAPTTestResult(
373
+ endpoint=endpoint,
374
+ test_type="CORS Policy Test",
375
+ severity="info",
376
+ status="warning",
377
+ description=f"Could not complete CORS test: {str(e)}",
378
+ recommendation="Verify CORS configuration manually",
379
+ evidence={"error": str(e)},
380
+ )
381
+ )
382
+
383
+ return results
384
+
385
+
386
+ async def test_security_headers(
387
+ session, endpoint: str, method: str, headers: Dict[str, str]
388
+ ) -> List[VAPTTestResult]:
389
+ """Test security headers implementation."""
390
+ results = []
391
+
392
+ try:
393
+ import aiohttp
394
+
395
+ async with session.request(
396
+ method,
397
+ endpoint,
398
+ headers=headers,
399
+ timeout=aiohttp.ClientTimeout(total=10),
400
+ ssl=False,
401
+ ) as response:
402
+ security_headers = {
403
+ "Strict-Transport-Security": "HSTS missing",
404
+ "X-Content-Type-Options": "X-Content-Type-Options missing",
405
+ "X-Frame-Options": "X-Frame-Options missing",
406
+ "Content-Security-Policy": "CSP missing",
407
+ "X-XSS-Protection": "X-XSS-Protection missing",
408
+ }
409
+
410
+ missing_headers = []
411
+ for header, message in security_headers.items():
412
+ if header not in response.headers:
413
+ missing_headers.append(header)
414
+
415
+ if missing_headers:
416
+ results.append(
417
+ VAPTTestResult(
418
+ endpoint=endpoint,
419
+ test_type="Security Headers",
420
+ severity="medium",
421
+ status="vulnerable",
422
+ description=f"Missing security headers: {', '.join(missing_headers)}",
423
+ recommendation="Implement all recommended security headers to protect against common attacks",
424
+ evidence={"missing_headers": missing_headers},
425
+ )
426
+ )
427
+ else:
428
+ results.append(
429
+ VAPTTestResult(
430
+ endpoint=endpoint,
431
+ test_type="Security Headers",
432
+ severity="info",
433
+ status="secure",
434
+ description="All recommended security headers present",
435
+ recommendation="Continue monitoring header configuration",
436
+ evidence={"status": "all_headers_present"},
437
+ )
438
+ )
439
+ except Exception as e:
440
+ results.append(
441
+ VAPTTestResult(
442
+ endpoint=endpoint,
443
+ test_type="Security Headers Test",
444
+ severity="info",
445
+ status="warning",
446
+ description=f"Could not complete headers test: {str(e)}",
447
+ recommendation="Verify endpoint response headers",
448
+ evidence={"error": str(e)},
449
+ )
450
+ )
451
+
452
+ return results
453
+
454
+
455
+ # ============================================================================
456
+ # Main VAPT Tool
457
+ # ============================================================================
458
+
459
+
460
+ @tool(
461
+ name="vapt_security_test",
462
+ description="Performs comprehensive API security testing including SQL injection, XSS, authentication, rate limiting, CORS, and security headers validation. Returns a detailed JSON report with vulnerabilities and remediation recommendations.",
463
+ input_schema={
464
+ "type": "object",
465
+ "properties": {
466
+ "endpoint": {
467
+ "type": "string",
468
+ "description": "The API endpoint URL to test (e.g., https://api.example.com/v1/users)",
469
+ },
470
+ "method": {
471
+ "type": "string",
472
+ "description": "HTTP method (GET, POST, PUT, DELETE, PATCH)",
473
+ "default": "GET",
474
+ },
475
+ "test_types": {
476
+ "type": "array",
477
+ "items": {"type": "string"},
478
+ "description": "Types of security tests to perform: injection, auth, rate_limit, cors, headers",
479
+ "default": ["injection", "auth", "rate_limit", "cors", "headers"],
480
+ },
481
+ "headers": {
482
+ "type": "object",
483
+ "description": "Optional HTTP headers as key-value pairs",
484
+ "default": None,
485
+ },
486
+ "body": {
487
+ "type": "string",
488
+ "description": "Optional request body for POST/PUT/PATCH requests",
489
+ "default": None,
490
+ },
491
+ },
492
+ "required": ["endpoint"],
493
+ },
494
+ )
495
+ async def vapt_security_test(args: Dict[str, Any]) -> Dict[str, Any]:
496
+ """
497
+ Custom VAPT tool for API security testing.
498
+
499
+ Performs various security tests on API endpoints including:
500
+ - SQL Injection tests
501
+ - XSS (Cross-Site Scripting) tests
502
+ - Authentication/Authorization bypasses
503
+ - Rate limiting checks
504
+ - CORS policy validation
505
+ - Security headers assessment
506
+
507
+ Args:
508
+ args: Dictionary containing endpoint, method, test_types, headers, and body
509
+
510
+ Returns:
511
+ Dictionary with content containing the test results JSON report
512
+ """
513
+ import aiohttp
514
+
515
+ # Extract parameters from args dict
516
+ endpoint = args["endpoint"]
517
+ method = args.get("method", "GET")
518
+ test_types = args.get("test_types")
519
+ headers = args.get("headers")
520
+ body = args.get("body")
521
+
522
+ if test_types is None:
523
+ test_types = ["injection", "auth", "rate_limit", "cors", "headers"]
524
+
525
+ if headers is None:
526
+ headers = {}
527
+
528
+ all_results: List[VAPTTestResult] = []
529
+
530
+ async with aiohttp.ClientSession() as session:
531
+
532
+ # Run tests based on specified test types
533
+ if "injection" in test_types:
534
+ sql_results = await test_sql_injection(
535
+ session, endpoint, method, headers, body
536
+ )
537
+ all_results.extend(sql_results)
538
+
539
+ xss_results = await test_xss(session, endpoint, method, headers, body)
540
+ all_results.extend(xss_results)
541
+
542
+ if "auth" in test_types:
543
+ auth_results = await test_authentication(session, endpoint, method, headers)
544
+ all_results.extend(auth_results)
545
+
546
+ if "rate_limit" in test_types:
547
+ rate_results = await test_rate_limiting(session, endpoint, method, headers)
548
+ all_results.extend(rate_results)
549
+
550
+ if "cors" in test_types:
551
+ cors_results = await test_cors_policy(session, endpoint, method)
552
+ all_results.extend(cors_results)
553
+
554
+ if "headers" in test_types:
555
+ header_results = await test_security_headers(
556
+ session, endpoint, method, headers
557
+ )
558
+ all_results.extend(header_results)
559
+
560
+ # Format results as JSON report
561
+ report = {
562
+ "endpoint": endpoint,
563
+ "method": method,
564
+ "timestamp": datetime.now().isoformat(),
565
+ "tests_performed": test_types,
566
+ "total_vulnerabilities": len(
567
+ [r for r in all_results if r.status == "vulnerable"]
568
+ ),
569
+ "results": [r.dict() for r in all_results],
570
+ "summary": {
571
+ "critical": len([r for r in all_results if r.severity == "critical"]),
572
+ "high": len([r for r in all_results if r.severity == "high"]),
573
+ "medium": len([r for r in all_results if r.severity == "medium"]),
574
+ "low": len([r for r in all_results if r.severity == "low"]),
575
+ "info": len([r for r in all_results if r.severity == "info"]),
576
+ },
577
+ }
578
+
579
+ report_json = json.dumps(report, indent=2)
580
+
581
+ # Return in MCP tool format
582
+ return {"content": [{"type": "text", "text": report_json}]}
583
+
584
+
585
+ # ============================================================================
586
+ # MCP Server Creation
587
+ # ============================================================================
588
+
589
+
590
+ def create_vapt_mcp_server():
591
+ """Create and return the VAPT MCP server instance."""
592
+ return create_sdk_mcp_server(
593
+ name="VAPTToolServer",
594
+ version="1.0.0",
595
+ tools=[vapt_security_test],
596
+ )