""" Simulated Security Audit Tools. Each tool reads from the scenario's ground truth data and returns realistic-looking text output. No real network scanning occurs. All outputs are deterministic — same inputs always produce same results. Output detail varies by scenario difficulty: - Easy: labeled findings with CWE, CVSS, remediation (training mode) - Medium: raw evidence with hints — agent must classify and assess - Hard: raw tool output only — agent must infer everything from evidence """ from typing import Any, Dict, List, Optional, Set, Tuple TOOL_DEFINITIONS = [ { "name": "network_scan", "description": "Discover hosts and open ports on a target network or host. Similar to nmap.", "parameters": {"target": "IP address or CIDR range (e.g., '10.0.1.0/24' or '10.0.1.10')"}, }, { "name": "service_fingerprint", "description": "Get detailed service version information for a specific host and port.", "parameters": {"host": "Target IP address", "port": "(optional) Specific port number"}, }, { "name": "web_crawl", "description": "Discover web endpoints and pages on a host's HTTP/HTTPS service.", "parameters": {"host": "Target IP address"}, }, { "name": "vulnerability_scan", "description": "Check a host's services against known CVE databases for vulnerable versions.", "parameters": {"host": "Target IP address"}, }, { "name": "test_injection", "description": "Test a web endpoint for injection vulnerabilities (SQL injection, command injection, SSRF, SSTI).", "parameters": {"host": "Target IP address", "endpoint": "URL path to test (e.g., '/api/login')"}, }, { "name": "test_xss", "description": "Test a web endpoint for Cross-Site Scripting (XSS) vulnerabilities.", "parameters": {"host": "Target IP address", "endpoint": "URL path to test"}, }, { "name": "test_auth", "description": "Test authentication and access controls — default credentials, IDOR, brute force, session management.", "parameters": {"host": "Target IP address", "endpoint": "(optional) Specific endpoint to test"}, }, { "name": "test_config", "description": "Check for security misconfigurations — exposed admin panels, directory listing, debug mode, open services.", "parameters": {"host": "Target IP address"}, }, { "name": "test_crypto", "description": "Analyze TLS/SSL configuration and cryptographic implementations.", "parameters": {"host": "Target IP address"}, }, { "name": "check_secrets", "description": "Scan for exposed secrets, API keys, credentials in accessible files and responses.", "parameters": {"host": "Target IP address", "endpoint": "(optional) Specific endpoint to check"}, }, ] def _filter_by_chain(vulns: List[Dict], discovered_vulns: Optional[Set[str]]) -> List[Dict]: """Filter vulnerabilities by chain prerequisites.""" if discovered_vulns is None: discovered_vulns = set() return [ v for v in vulns if not v.get("requires_found") or all(r in discovered_vulns for r in v["requires_found"]) ] _PORT_WARNINGS = { 3306: "WARNING: MySQL accessible externally — verify firewall rules", 5432: "WARNING: PostgreSQL accessible externally — restrict to app IPs", 21: "WARNING: FTP (plaintext protocol) detected — consider SFTP", 445: "NOTE: SMB file sharing detected — verify access controls", 3389: "NOTE: RDP exposed — ensure strong credentials and NLA", } # --------------------------------------------------------------------------- # Raw output templates for medium/hard (keyed by vuln ID) # Medium: shows evidence + partial context, no CWE/CVSS/remediation # Hard: raw tool output — HTTP responses, timing, errors — agent infers # --------------------------------------------------------------------------- _RAW_OUTPUTS = { # ---- MEDIUM: E-commerce Platform ---- "VULN-M001": { "medium": ( " [!] Anomalous response from server\n" " Parameter: image_url\n" " Payload: image_url=http://10.0.2.30:8080\n" " Response: HTTP 200 — body contains HTML from internal Jenkins server\n" " Payload: image_url=http://169.254.169.254/latest/meta-data/\n" " Response: HTTP 200 — body contains AWS metadata (ami-id, instance-type)\n" " The server fetched and returned content from internal/cloud URLs via the image_url parameter.\n" " Assess the vulnerability type, severity, and impact." ), "hard": ( " Parameter: image_url | Payload: http://10.0.2.30:8080\n" " Response: HTTP 200\n" ' Body (first 200 chars): Dashboard [Jenkins]...Welcome to Jenkins!...2 jobs configured...\n' "\n" " GET /script → HTTP 200\n" ' Body (excerpt): \n' " Note: No redirect to /login, no 401/403 status\n" "\n" " GET /manage → HTTP 200\n" ' Body (excerpt): ...Manage Jenkins...Configure System...Manage Plugins...\n' "\n" " GET /credentials → HTTP 200\n" ' Body (excerpt): ...Stored credentials: 3 items...\n' "\n" " Cookie: no JSESSIONID set (no session management active)" ), }, "VULN-M005": { "medium": ( " [!] Database accepts weak credentials\n" " PostgreSQL (port 5432):\n" " dbuser:dbpass123 → authentication successful\n" " Connected — 23 tables visible, no connection rate limiting detected\n" " Assess the vulnerability type, severity, and impact." ), "hard": ( " PostgreSQL (port 5432):\n" " postgres:postgres → FATAL: password authentication failed\n" " admin:admin → FATAL: password authentication failed\n" " dbuser:dbuser → FATAL: password authentication failed\n" " dbuser:dbpass123 → connection established\n" "\n" ' SELECT current_user; → "dbuser"\n' " \\dt → 23 tables (users, orders, payments, sessions, ...)\n" " SELECT count(*) FROM users; → 12,847\n" "\n" " Brute force timing: 50 attempts in 3.1s, no lockout or delay increase" ), }, "VULN-M006": { "medium": ( " [!] Outdated TLS configuration detected\n" " Server accepts TLSv1.0 and TLSv1.1 (deprecated protocols)\n" " Weak cipher suite accepted: TLS_RSA_WITH_3DES_EDE_CBC_SHA\n" " HSTS header not present in responses.\n" " Assess the vulnerability type, severity, and impact." ), "hard": ( " TLS handshake analysis:\n" " Protocol: TLSv1.0 → ServerHello (accepted)\n" " Protocol: TLSv1.1 → ServerHello (accepted)\n" " Protocol: TLSv1.2 → ServerHello (accepted)\n" " Protocol: TLSv1.3 → handshake failure\n" "\n" " Cipher suites (port 443):\n" " TLS_RSA_WITH_3DES_EDE_CBC_SHA → accepted\n" " TLS_RSA_WITH_AES_128_CBC_SHA → accepted\n" " TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 → accepted\n" "\n" " Certificate: CN=shopfast.com, expires 2025-12-01, issuer=Let's Encrypt\n" " HSTS: not present\n" " OCSP stapling: not enabled" ), }, # ---- HARD: Enterprise SOC2 Pre-Audit ---- "VULN-H001": { "hard": ( " Parameter: message\n" " Payload: \n" " POST /support/ticket → HTTP 200\n" "\n" " Verification: GET /support/tickets?latest=true\n" ' Response body excerpt:\n' '
\n' " Note: Payload persisted in database and rendered without encoding\n" " Note: Response headers — no Content-Security-Policy, X-XSS-Protection: 0" ), }, "VULN-H002": { "hard": ( " Authenticated as user_id=1001 (Bearer token: eyJhbG...)\n" ' GET /api/v2/accounts/1001 → HTTP 200 {"account_id":1001,"name":"Test User","balance":"$12,450.00"}\n' ' GET /api/v2/accounts/1002 → HTTP 200 {"account_id":1002,"name":"Jane Smith","balance":"$87,230.50","ssn_last4":"4829"}\n' ' GET /api/v2/accounts/1003 → HTTP 200 {"account_id":1003,"name":"Bob Jones","balance":"$3,100.00","ssn_last4":"7712"}\n' " GET /api/v2/accounts/9999 → HTTP 404\n" "\n" " Note: Sequential integer IDs. Same Bearer token used for all requests.\n" " Note: PII (SSN, balance) exposed in cross-account responses." ), }, "VULN-H003": { "hard": ( " Concurrency testing on /api/v2/transfer:\n" " Initial balance (account 1001): $1,000.00\n" "\n" " Sent 5 concurrent POST requests:\n" ' {"from_account":"1001","to_account":"1002","amount":800}\n' "\n" ' Request 1: HTTP 200 {"status":"success","new_balance":"$200.00"}\n' ' Request 2: HTTP 200 {"status":"success","new_balance":"$200.00"}\n' ' Request 3: HTTP 200 {"status":"success","new_balance":"$200.00"}\n' ' Request 4: HTTP 402 {"error":"insufficient_funds"}\n' ' Request 5: HTTP 402 {"error":"insufficient_funds"}\n' "\n" " Final balance (account 1001): -$1,400.00\n" " Total transferred: $2,400.00 from initial $1,000.00\n" " Note: Balance check and deduction are not atomic operations" ), }, "VULN-H004": { "hard": ( " Parameter: template\n" "\n" " Payload: {{7*7}}\n" ' Response body (excerpt): ...Report generated: 49...\n' "\n" " Payload: {{config}}\n" " Response body (excerpt): ...Report generated: ...\n" "\n" ' Payload: {{"test".__class__.__mro__}}\n' " Response body (excerpt): ...(, )...\n" "\n" " Server headers: X-Powered-By: Flask/Jinja2" ), }, "VULN-H005": { "hard": ( " File upload testing:\n" ' test.txt (text/plain, 12B) → HTTP 200 {"path":"/uploads/test.txt"}\n' ' test.php (application/x-php, 45B) → HTTP 200 {"path":"/uploads/test.php"}\n' ' test.jsp (text/plain, 38B) → HTTP 200 {"path":"/uploads/test.jsp"}\n' ' test.exe (application/octet-stream, 1KB) → HTTP 200 {"path":"/uploads/test.exe"}\n' "\n" " Verification: GET /uploads/test.php\n" " Response: HTTP 200, Content-Type: text/html\n" " Body: PHP output rendered (file was executed server-side, not served as static)" ), }, "VULN-H006": { "hard": ( " FTP (port 21):\n" " USER anonymous → 331 Please specify the password\n" " PASS anonymous → 230 Login successful\n" " LIST → drwxr-xr-x /reports/\n" " drwxr-xr-x /employee_data/\n" " drwxr-xr-x /financial/\n" " SIZE /financial/Q4_2024_revenue.xlsx → 2,371,584 bytes\n" "\n" " SMB (port 445):\n" " Guest session → \\\\10.0.3.40\\Documents accessible (read/write)\n" " Files: employee_roster.csv, salary_data.xlsx, board_minutes_2024.pdf\n" " Share permissions: Everyone:FullControl" ), }, "VULN-H007": { "hard": ( " FTP (port 21):\n" " AUTH TLS → 502 Command not implemented\n" " FEAT → no TLS/SSL features advertised\n" "\n" " Captured transfer: RETR /financial/Q4_2024_revenue.xlsx\n" " Transfer protocol: plaintext FTP (no encryption layer)\n" " Wireshark excerpt: PASS command visible in cleartext in packet capture\n" "\n" " SFTP (port 22): connection refused\n" " FTPS: not available" ), }, "VULN-H008": { "hard": ( " SMTP (port 25, Postfix 3.4.13):\n" " EHLO test.local → 250 mail.finsecure.com\n" " MAIL FROM: → 250 2.1.0 Ok\n" " RCPT TO: → 250 2.1.5 Ok\n" " DATA → 354 End data with .\n" " (message accepted from unauthenticated external sender)\n" "\n" " DNS records for finsecure.com:\n" " SPF: no record found\n" " DKIM: no record found\n" " DMARC: no record found\n" " MX: mail.finsecure.com → 10.0.3.50" ), }, "VULN-H009": { "hard": ( " Port 8080: Apache Tomcat/9.0.31\n" "\n" " NVD database results:\n" " CVE-2020-1938 — Base Score: 9.8 — AJP connector file read/inclusion\n" " CVE-2020-9484 — Base Score: 7.0 — session persistence deserialization\n" " CVE-2020-11996 — Base Score: 7.5 — HTTP/2 request handling DoS\n" "\n" " Port 8009/tcp: open (AJP13 protocol response received)\n" " Note: AJP connector externally accessible from scan origin" ), }, "VULN-H010": { "hard": ( " POST /login — rate limit testing:\n" " Batch 1: 100 requests with invalid creds in 2.3s\n" " Result: 100x HTTP 401 (all processed, none blocked)\n" " Avg response time: 23ms (consistent, no throttling)\n" "\n" " Batch 2: 1000 requests in 18.7s\n" " Result: 1000x HTTP 401\n" " No CAPTCHA challenge\n" " No account lockout triggered\n" " No IP-based blocking detected\n" "\n" " Response headers: no X-RateLimit-* headers present" ), }, } def _labeled_output(v: Dict) -> str: """Easy mode: full labeled finding with CWE, CVSS, remediation.""" tag = { "Critical": "CRITICAL", "High": "ALERT", "Medium": "WARNING", "Low": "INFO", }.get(v["severity"], "FINDING") return ( f" [{tag}] {v['type']} DETECTED\n" f" CWE: {v['cwe']}\n" f" OWASP: {v['owasp']}\n" f" Evidence: {v['evidence']}\n" f" Suggested CVSS: {v['cvss']} ({v['severity']})\n" f" Remediation: {v['remediation']}" ) def _format_detection(v: Dict, scenario: Dict, header: str) -> str: """Pick output format based on scenario difficulty.""" difficulty = scenario.get("id", "easy") if difficulty == "easy": return _labeled_output(v) raw = _RAW_OUTPUTS.get(v["id"], {}).get(difficulty) if raw: return raw # Fallback for medium vulns without explicit raw output if difficulty == "medium": return ( f" [!] Potential issue detected\n" f" Observation: {v['evidence']}\n" f" Assess the vulnerability type, severity, and impact." ) # Fallback for hard vulns without explicit raw output return ( f" Observation: {v['evidence']}\n" f" No further classification available — manual analysis required." ) def execute_tool( tool_name: str, arguments: Dict[str, Any], scenario: Dict[str, Any], discovered_hosts: List[str], discovered_ports: Dict[str, List[int]], discovered_vulns: Optional[Set[str]] = None, ) -> Tuple[str, List[str], Dict[str, List[int]], float]: """Execute a simulated tool and return (output, new_hosts, new_ports, reward).""" handler = TOOL_HANDLERS.get(tool_name) if not handler: return (f"Error: Unknown tool '{tool_name}'. Use list_tools to see available tools.", [], {}, -0.05) return handler(arguments, scenario, discovered_hosts, discovered_ports, discovered_vulns) def _network_scan(args, scenario, discovered_hosts, discovered_ports, discovered_vulns): target = args.get("target", scenario.get("target_network", "")) all_hosts = scenario.get("hosts", {}) ports = scenario.get("ports", {}) dv = discovered_vulns or set() # Progressive discovery: filter out hidden hosts whose prereqs aren't met hosts = {} hidden_count = 0 for ip, info in all_hosts.items(): required = info.get("hidden_until", []) if not required or all(r in dv for r in required): hosts[ip] = info else: hidden_count += 1 total_ports = 0 lines = [ f"Starting Nmap 7.94 scan on {target}", f"Nmap scan report — {len(hosts)} host(s) responding", "", ] new_hosts = [] new_ports: Dict[str, List[int]] = {} reward = 0.0 for ip, host_info in hosts.items(): host_ports = ports.get(ip, []) lines.append(f"Nmap scan report for {ip} ({host_info['hostname']})") lines.append(f" OS Detection: {host_info['os']}") lines.append(f" Device Role: {host_info['role']}") lines.append(f" PORT STATE SERVICE VERSION") port_nums = [] for p in host_ports: lines.append(f" {p['port']}/tcp {p['state']:6s} {p['service']:15s} {p['version']}") port_nums.append(p["port"]) total_ports += 1 warning = _PORT_WARNINGS.get(p["port"]) if warning: lines.append(f" |_ {warning}") if "Jenkins" in p["version"]: lines.append(f" |_ NOTE: Jenkins CI/CD server detected — check authentication") lines.append("") if ip not in discovered_hosts: new_hosts.append(ip) reward += 0.05 current_known = set(discovered_ports.get(ip, [])) new_port_nums = [p for p in port_nums if p not in current_known] if new_port_nums: new_ports[ip] = new_port_nums reward += len(new_port_nums) * 0.02 lines.append(f"Nmap done: {len(hosts)} IP addresses ({len(hosts)} hosts up) scanned") lines.append(f" {total_ports} open ports found across {len(hosts)} hosts") if hidden_count > 0: lines.append(f" Note: {hidden_count} host(s) may exist on internal segments not directly reachable. Pivot through compromised hosts to discover them.") return "\n".join(lines), new_hosts, new_ports, reward def _service_fingerprint(args, scenario, discovered_hosts, discovered_ports, discovered_vulns): host = args.get("host", "") target_port = args.get("port") dv = discovered_vulns or set() host_info_raw = scenario.get("hosts", {}).get(host) if not host_info_raw: return (f"Error: Host {host} not reachable. Run network_scan first.", [], {}, -0.02) required = host_info_raw.get("hidden_until", []) if required and not all(r in dv for r in required): return (f"Error: Host {host} not reachable. It may be on an internal network segment.", [], {}, -0.02) ports = scenario.get("ports", {}).get(host, []) host_info = scenario["hosts"][host] lines = [f"Service fingerprint for {host} ({host_info['hostname']})", ""] for p in ports: if target_port and p["port"] != int(target_port): continue lines.append(f"Port {p['port']}/tcp:") lines.append(f" Service: {p['service']}") lines.append(f" Version: {p['version']}") lines.append(f" State: {p['state']}") lines.append(f" Protocol: TCP") lines.append("") return "\n".join(lines), [], {}, 0.01 def _web_crawl(args, scenario, discovered_hosts, discovered_ports, discovered_vulns): host = args.get("host", "") dv = discovered_vulns or set() host_info_raw = scenario.get("hosts", {}).get(host) if host_info_raw: required = host_info_raw.get("hidden_until", []) if required and not all(r in dv for r in required): return (f"Error: Host {host} not reachable. It may be on an internal network segment.", [], {}, -0.02) endpoints = scenario.get("web_endpoints", {}).get(host, []) if not endpoints: return (f"No web endpoints found on {host}. Host may not run a web server.", [], {}, 0.0) lines = [f"Web crawl results for {host}:", f" Discovered {len(endpoints)} endpoint(s):", ""] for ep in endpoints: params = "" if ep.get("params"): params = f" (params: {', '.join(ep['params'])})" lines.append(f" {ep['method']:6s} {ep['path']:30s} — {ep['description']}{params}") lines.append(f"\n{len(endpoints)} endpoint(s) discovered. Test each for vulnerabilities.") return "\n".join(lines), [], {}, 0.03 def _vulnerability_scan(args, scenario, discovered_hosts, discovered_ports, discovered_vulns): host = args.get("host", "") ports = scenario.get("ports", {}).get(host, []) if not ports: return (f"Error: No services known for {host}. Run network_scan first.", [], {}, -0.02) vulns = [v for v in scenario.get("vulnerabilities", []) if v["host"] == host and "vulnerability_scan" in v.get("discoverable_by", [])] vulns = _filter_by_chain(vulns, discovered_vulns) difficulty = scenario.get("id", "easy") lines = [f"Vulnerability scan for {host}:", ""] reported = set() for p in ports: lines.append(f" {p['service']} {p['version']} on port {p['port']}:") found_any = False for v in vulns: if v["id"] not in reported: if difficulty == "easy": lines.append(f" [!] VULNERABLE: {v['type']} (CVSS {v['cvss']}) — {v['cwe']}") lines.append(f" Confidence: High | Source: NVD/OWASP") lines.append(f" Remediation: {v['remediation']}") else: raw = _RAW_OUTPUTS.get(v["id"], {}).get(difficulty) if raw: lines.append(raw) else: # Generic CVE-style output without explicit classification lines.append(f" NVD match for {p['version']}") lines.append(f" See output details for assessment.") reported.add(v["id"]) found_any = True if not found_any: lines.append(f" No known CVEs for {p['version']} (database current)") lines.append("") reward = 0.05 if vulns else 0.01 return "\n".join(lines), [], {}, reward def _test_injection(args, scenario, discovered_hosts, discovered_ports, discovered_vulns): host = args.get("host", "") endpoint = args.get("endpoint", "") vulns = [ v for v in scenario.get("vulnerabilities", []) if v["host"] == host and v.get("endpoint") == endpoint and "test_injection" in v.get("discoverable_by", []) ] vulns = _filter_by_chain(vulns, discovered_vulns) if vulns: lines = [f"Injection testing on {host}{endpoint}:", ""] for v in vulns: lines.append(_format_detection(v, scenario, f"Injection testing on {host}{endpoint}")) lines.append("") return "\n".join(lines), [], {}, 0.08 else: return (f"Injection testing on {host}{endpoint}:\n" f" Tested: SQLi (error-based, blind, time-based), command injection, SSTI, SSRF\n" f" Payloads: 47 injection patterns tested\n" f" Result: No injection vulnerabilities detected on this endpoint."), [], {}, 0.01 def _test_xss(args, scenario, discovered_hosts, discovered_ports, discovered_vulns): host = args.get("host", "") endpoint = args.get("endpoint", "") vulns = [ v for v in scenario.get("vulnerabilities", []) if v["host"] == host and v.get("endpoint") == endpoint and "test_xss" in v.get("discoverable_by", []) ] vulns = _filter_by_chain(vulns, discovered_vulns) if vulns: lines = [f"XSS testing on {host}{endpoint}:", ""] for v in vulns: lines.append(_format_detection(v, scenario, f"XSS testing on {host}{endpoint}")) lines.append("") return "\n".join(lines), [], {}, 0.08 else: return (f"XSS testing on {host}{endpoint}:\n" f" Tested: reflected XSS, stored XSS, DOM-based XSS\n" f" Payloads: 23 XSS vectors tested across all parameters\n" f" Result: No XSS vulnerabilities detected."), [], {}, 0.01 def _test_auth(args, scenario, discovered_hosts, discovered_ports, discovered_vulns): host = args.get("host", "") endpoint = args.get("endpoint") vulns = [ v for v in scenario.get("vulnerabilities", []) if v["host"] == host and "test_auth" in v.get("discoverable_by", []) and (endpoint is None or v.get("endpoint") is None or v.get("endpoint") == endpoint) ] vulns = _filter_by_chain(vulns, discovered_vulns) if vulns: lines = [f"Auth & access control testing on {host}" + (endpoint or "") + ":", ""] for v in vulns: lines.append(_format_detection(v, scenario, f"Auth testing on {host}")) lines.append("") return "\n".join(lines), [], {}, 0.08 else: target = f"{host}{endpoint}" if endpoint else host return (f"Auth testing on {target}:\n" f" Default credentials: 15 common sets tested — none accepted\n" f" Session management: tokens properly rotated\n" f" Access controls: authorization checks present\n" f" Brute force: rate limiting detected after 5 attempts\n" f" Result: PASS — no authentication weaknesses found."), [], {}, 0.01 def _test_config(args, scenario, discovered_hosts, discovered_ports, discovered_vulns): host = args.get("host", "") vulns = [ v for v in scenario.get("vulnerabilities", []) if v["host"] == host and "test_config" in v.get("discoverable_by", []) ] vulns = _filter_by_chain(vulns, discovered_vulns) if vulns: lines = [f"Configuration audit for {host}:", ""] for v in vulns: lines.append(_format_detection(v, scenario, f"Configuration audit for {host}")) lines.append("") return "\n".join(lines), [], {}, 0.08 else: return (f"Configuration audit for {host}:\n" f" Directory listing: disabled\n" f" Debug mode: off\n" f" Server headers: version info suppressed\n" f" Admin panels: not exposed publicly\n" f" Result: PASS — no significant misconfigurations."), [], {}, 0.01 def _test_crypto(args, scenario, discovered_hosts, discovered_ports, discovered_vulns): host = args.get("host", "") vulns = [ v for v in scenario.get("vulnerabilities", []) if v["host"] == host and "test_crypto" in v.get("discoverable_by", []) ] vulns = _filter_by_chain(vulns, discovered_vulns) if vulns: lines = [f"Cryptographic analysis for {host}:", ""] for v in vulns: lines.append(_format_detection(v, scenario, f"Cryptographic analysis for {host}")) lines.append("") return "\n".join(lines), [], {}, 0.06 else: return (f"Cryptographic analysis for {host}:\n" f" TLS version: 1.2+ only (1.0/1.1 disabled)\n" f" Cipher suites: strong (AES-256-GCM preferred)\n" f" Certificate: valid, not expired\n" f" HSTS: enabled\n" f" Result: PASS — TLS/SSL configuration is secure."), [], {}, 0.01 def _check_secrets(args, scenario, discovered_hosts, discovered_ports, discovered_vulns): host = args.get("host", "") endpoint = args.get("endpoint") vulns = [ v for v in scenario.get("vulnerabilities", []) if v["host"] == host and "check_secrets" in v.get("discoverable_by", []) and (endpoint is None or v.get("endpoint") is None or v.get("endpoint") == endpoint) ] vulns = _filter_by_chain(vulns, discovered_vulns) if vulns: lines = [f"Secret scanning on {host}" + (endpoint or "") + ":", ""] for v in vulns: lines.append(_format_detection(v, scenario, f"Secret scanning on {host}")) lines.append("") return "\n".join(lines), [], {}, 0.08 else: target = f"{host}{endpoint}" if endpoint else host return (f"Secret scanning on {target}:\n" f" Scanned: source files, config files, environment variables, HTTP responses\n" f" Patterns: 34 secret patterns checked (AWS, Stripe, JWT, private keys, etc.)\n" f" Entropy analysis: no high-entropy strings detected\n" f" Result: PASS — no exposed secrets found."), [], {}, 0.01 TOOL_HANDLERS = { "network_scan": _network_scan, "service_fingerprint": _service_fingerprint, "web_crawl": _web_crawl, "vulnerability_scan": _vulnerability_scan, "test_injection": _test_injection, "test_xss": _test_xss, "test_auth": _test_auth, "test_config": _test_config, "test_crypto": _test_crypto, "check_secrets": _check_secrets, }