File size: 10,394 Bytes
c44dbf3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from openenv.core.env_server import Environment
from models import RedTeamAction, RedTeamObservation, RedTeamState

TASKS = [
    {
        "name": "Easy Web Vuln",
        "ip": "192.168.1.10",
        "difficulty": "easy",
        "required_steps": ["scan", "enumerate", "exploit"],
        "scenario": "A misconfigured Apache web server with a public-facing login panel.",
        "hidden_flag": "FLAG{w3b_sh3ll_0wn3d_192.168.1.10}",
        "flag_hint": "Admin panel is exposed. Directory listing is enabled.",
    },
    {
        "name": "Medium SQLi + RCE",
        "ip": "10.0.0.5",
        "difficulty": "medium",
        "required_steps": ["scan", "enumerate", "exploit", "escalate"],
        "scenario": "A production web app with unsanitized SQL inputs and a weak sudo policy.",
        "hidden_flag": "FLAG{r00t_v14_sql1_10.0.0.5}",
        "flag_hint": "The login form leaks DB errors. Sudo -l reveals interesting binaries.",
    },
    {
        "name": "Hard Multi-Stage APT",
        "ip": "172.16.0.0/24",
        "difficulty": "hard",
        "required_steps": ["scan", "enumerate", "exploit", "escalate", "c2", "cleanup"],
        "scenario": "A corporate network segment. Full APT simulation: initial access, lateral movement, C2, stealth cleanup.",
        "hidden_flag": "FLAG{apt_s1mul4t10n_c0mpl3t3_172.16.0.0}",
        "flag_hint": "Log4Shell on the DMZ host. Pivot via pass-the-hash to reach internal DC.",
    },
]

ACTION_OUTPUTS = {
    "scan": [
        "Nmap 7.94 scan complete.\nPORT   STATE SERVICE VERSION\n22/tcp open  ssh     OpenSSH 7.9\n80/tcp open  http    Apache httpd 2.4.29\n8080/tcp open http-alt Tomcat 9.0.30\nOS: Ubuntu 18.04 LTS\nWarning: 3 outdated services detected.",
        "Masscan rapid scan complete. 14 live hosts on 10.0.0.0/24.\nNotable: 3306/mysql exposed on 10.0.0.5 - external access ENABLED.\nSMB signing disabled on 10.0.0.12. EternalBlue likely viable.",
        "Nmap stealth SYN scan complete.\nDiscovered: 443/https (expired cert), 9200/elasticsearch (unauthenticated!).\nCritical: Elasticsearch 6.8 with no auth - full data exposure.",
    ],
    "enumerate": [
        "Gobuster dir scan:\n/admin [403] /login [200] /backup.zip [200] /config.php.bak [200]\nNikto: Apache 2.4.29 vulnerable to CVE-2021-41773 (path traversal).",
        "SQLmap v1.7:\n[*] Parameter 'username' injectable (UNION-based)\n[*] Backend: MySQL 5.7.38\n[*] 847 user records extractable\nPassword hashes: MD5 unsalted - crackable.",
        "enum4linux + LDAP sweep:\n[+] 12 domain accounts found\n[+] Kerberoastable SPN: svc_backup/dc01.corp.local\n[+] Password policy: min 6 chars, no lockout - BRUTEFORCEABLE.",
    ],
    "exploit": [
        "CVE-2021-41773 path traversal RCE:\n[+] Shell opened as www-data on 192.168.1.10\nmeterpreter > getuid => www-data\n[+] Foothold established.",
        "SQLi authentication bypass:\nPayload: admin OR 1=1\n[+] Login as Administrator\n[+] Webshell uploaded: /uploads/cmd.php\nuid=33(www-data) - RCE confirmed.",
        "Log4Shell (CVE-2021-44228):\nPayload delivered via JNDI injection\n[+] Reverse shell - bash-4.4$ id => uid=1001(tomcat)\n[+] Initial access on 172.16.0.15 confirmed.",
    ],
    "escalate": [
        "LinPEAS:\n[!] Sudo rule: www-data ALL=(root) NOPASSWD: /usr/bin/python3.8\n$ sudo python3.8 -c import os; os.setuid(0); os.system('/bin/bash')\nroot@target:~# id => uid=0(root)\n[+] FULL ROOT OBTAINED.",
        "Juicy Potato - SeImpersonatePrivilege ENABLED:\n[+] SYSTEM shell obtained on 10.0.0.5\nC: whoami => nt authority\\system",
        "Dirty Pipe CVE-2022-0847:\n[*] Kernel 5.8.0-43 - VULNERABLE\n[+] Root shell active. uid=0(root).",
    ],
    "c2": [
        "Cobalt Strike beacon deployed:\n[+] C2 channel: HTTPS/443 (jquery malleable profile)\n[+] Persistence: HKCU Run key\n[+] Lateral movement to 172.16.0.20, .21 via pass-the-hash\n[+] 3 beacons active.",
        "PowerShell Empire:\n[+] Pivoted to DC01 via SMB\n[+] Mimikatz: 8 plaintext creds from LSASS\n[+] Domain Admin hash obtained.",
        "DNS-tunneled C2:\n[+] Implant in explorer.exe (process hollowing)\n[+] Exfil: 2.3MB via DNS TXT queries\n[+] Fully covert. EDR blind.",
    ],
    "cleanup": [
        "Cleanup complete:\n[*] Webshell removed, logs truncated\n[*] history -c\n[+] Footprint: ZERO",
        "Windows cleanup:\n[*] Registry Run key deleted\n[*] Event logs cleared (Security/System/Application)\n[+] No forensic artifacts remain.",
        "APT cleanup:\n[*] Implants removed from 4 hosts\n[*] Timestomping applied to modified files\n[*] DNS tunnel decommissioned\n[+] Attribution: IMPOSSIBLE.",
    ],
}

STEP_REWARDS = {
    # Keep each completed task's cumulative reward strictly below 1.0.
    "easy":   {"base": 0.16, "completion_bonus": 0.08},
    "medium": {"base": 0.12, "completion_bonus": 0.07},
    "hard":   {"base": 0.09, "completion_bonus": 0.06},
}
CHAIN_BONUS = 0.02
PENALTY_WRONG_ORDER = -0.08


def safe_reward(r: float) -> float:
    """Ensure reward is STRICTLY between 0 and 1 (never 0.0, never 1.0).

    This is critical for Phase 2 evaluation which validates every /step response.
    Clamp to the open interval (0, 1) using minimal safe margins.
    """
    clamped = max(1e-6, min(1 - 1e-6, r))
    return round(clamped, 6)


class RedTeamPentestEnvironment(Environment[RedTeamAction, RedTeamObservation, RedTeamState]):
    def __init__(self):
        self.task_index = 0
        self.completed_steps = []
        self.total_reward = 0.0
        self.episode = 0
        self.mistakes = 0
        self.current_task = TASKS[0]

    def reset(self, seed=None, episode_id=None, **kwargs) -> RedTeamObservation:
        task = TASKS[self.task_index % len(TASKS)]
        self.current_task = task
        self.completed_steps = []
        self.total_reward = 0.0
        self.episode += 1
        self.mistakes = 0
        return RedTeamObservation(
            target_ip=task["ip"],
            current_state="RECON_START",
            output=(
                f"=== MISSION BRIEFING ===\n"
                f"Target: {task['ip']}\n"
                f"Scenario: {task['scenario']}\n"
                f"Difficulty: {task['difficulty'].upper()}\n"
                f"Hint: {task['flag_hint']}\n"
                f"Required phases: {' -> '.join(task['required_steps'])}"
            ),
            difficulty=task["difficulty"],
            reward=safe_reward(0.01),
            done=False,
        )

    def step(self, action: RedTeamAction, timeout_s=None, **kwargs) -> RedTeamObservation:
        act = action.action.lower()
        task = self.current_task
        required = task["required_steps"]
        reward = 0.0
        done = False

        if act not in required:
            self.mistakes += 1
            obs = RedTeamObservation(
                target_ip=task["ip"],
                current_state="INVALID",
                output=f"Action '{act}' not required for this task. Required: {required}",
                difficulty=task["difficulty"],
                reward=safe_reward(-0.03),
                done=False,
            )
            return obs

        idx = required.index(act)
        if idx > 0 and required[idx - 1] not in self.completed_steps:
            self.mistakes += 1
            obs = RedTeamObservation(
                target_ip=task["ip"],
                current_state="ORDER_VIOLATION",
                output=(
                    f"OPSEC VIOLATION: Cannot '{act}' yet.\n"
                    f"Complete '{required[idx-1]}' first.\n"
                    f"Progress: {self.completed_steps}"
                ),
                difficulty=task["difficulty"],
                reward=safe_reward(PENALTY_WRONG_ORDER),
                done=False,
            )
            self.total_reward += PENALTY_WRONG_ORDER
            return obs

        if act in self.completed_steps:
            obs = RedTeamObservation(
                target_ip=task["ip"],
                current_state="REPEAT",
                output=f"Phase '{act}' already done. Advance to next phase.",
                difficulty=task["difficulty"],
                reward=safe_reward(0.01),
                done=False,
            )
            return obs

        self.completed_steps.append(act)
        reward = STEP_REWARDS[task["difficulty"]]["base"]
        if self.mistakes == 0:
            reward += CHAIN_BONUS
        self.total_reward += reward

        output_variants = ACTION_OUTPUTS.get(act, ["Action executed."])
        output_index = self.task_index % len(output_variants)
        output = output_variants[output_index]
        remaining = [s for s in required if s not in self.completed_steps]
        progress = len(self.completed_steps) / len(required)

        if not remaining:
            bonus = STEP_REWARDS[task["difficulty"]]["completion_bonus"]
            reward += bonus
            self.total_reward += bonus
            done = True
            output += (
                f"\n\n========================================\n"
                f"[+] ALL PHASES COMPLETE!\n"
                f"[+] CTF FLAG CAPTURED: {task['hidden_flag']}\n"
                f"[+] Total reward: {self.total_reward:.2f}\n"
                f"[+] Clean chain bonus: {'YES' if self.mistakes == 0 else 'NO'}\n"
                f"========================================"
            )
            state = "MISSION_COMPLETE"
        else:
            state = act.upper() + "_DONE"
            output += f"\n\n[*] Progress: {len(self.completed_steps)}/{len(required)} ({progress*100:.0f}%)\n[*] Next: {remaining[0]}"

        obs = RedTeamObservation(
            target_ip=task["ip"],
            current_state=state,
            output=output,
            difficulty=task["difficulty"],
            reward=safe_reward(reward),
            done=done,
        )
        return obs

    @property
    def state(self) -> RedTeamState:
        task = self.current_task
        required = task["required_steps"]
        progress = len(self.completed_steps) / len(required) if required else 0.0
        return RedTeamState(
            episode=self.episode,
            task=task["name"],
            progress=round(progress, 2),
        )

    def close(self) -> None:
        # No external resources to release for this environment.
        return None