File size: 12,014 Bytes
ac107b3
f228014
 
 
 
 
 
 
69cf4db
f228014
69cf4db
 
 
f228014
ac107b3
f228014
 
ac107b3
 
 
f228014
 
ac107b3
f228014
 
 
ac107b3
f228014
69cf4db
db5b15b
 
 
f228014
db5b15b
69cf4db
 
db5b15b
 
 
 
 
 
 
 
 
69cf4db
db5b15b
 
 
 
 
69cf4db
54676f7
 
69cf4db
54676f7
 
 
 
 
f228014
69cf4db
f228014
 
ac107b3
69cf4db
 
f228014
69cf4db
f228014
69cf4db
 
f228014
ac107b3
 
 
db5b15b
ac107b3
f228014
ac107b3
f228014
 
 
 
 
ac107b3
f228014
 
 
 
ac107b3
 
 
69cf4db
 
ac107b3
 
f228014
 
 
69cf4db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f228014
 
 
 
 
 
 
 
 
 
 
ac107b3
 
f228014
69cf4db
f228014
69cf4db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f228014
69cf4db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f228014
ac107b3
f228014
69cf4db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f228014
69cf4db
f228014
db5b15b
f228014
 
ac107b3
 
 
 
 
f228014
69cf4db
f228014
ac107b3
f228014
 
ac107b3
f228014
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
# app.py
# Simulador Red Team (alto nivel, defensivo) - Gradio app + generación de reporte TXT
import os
import json
import re
import time
import requests
import gradio as gr
from typing import Tuple, List, Optional

OPENAI_API_URL_RESPONSES = "https://api.openai.com/v1/responses"
OPENAI_API_URL_CHAT = "https://api.openai.com/v1/chat/completions"
OPENAI_MODEL_FALLBACK = ["gpt-4o-mini", "gpt-4o"]

PROMPT_TEMPLATE = """Eres un asistente de ciberseguridad (red-team simulator) que debe simular a alto nivel cómo un atacante podría aprovechar el contenido que se le entrega (email, URL o fragmento). NORMAS (obligatorias):
1. Proporciona sólo un resumen del objetivo del atacante y del vector. NO entregues instrucciones técnicas, comandos, código de exploit, payloads, o pasos paso-a-paso para cometer un ataque.
2. Devuelve 3 secciones en JSON:
   - "simulation": breve párrafo (1-3 frases) explicando la estrategia del atacante (alto nivel).
   - "iocs": lista de indicadores accionables para detección (dominios, patrones de URL, encabezados sospechosos, extensiones).
   - "mitigations": lista de contramedidas operativas (bloqueos, políticas, educación, verificación técnica).
3. Si el material es insuficiente, indica qué faltaría.
4. Limita la respuesta a lenguaje defensivo y educacional. NO ofrezcas código ni tácticas para explotar vulnerabilidades.
5. Devuelve SOLO JSON válido (objetivo: {"simulation":..., "iocs":[...], "mitigations":[...]})

Contenido a analizar:
{input}
"""

# --------- Filtro seguro (ligero y robusto) ----------
FORBIDDEN_KEYWORDS = [
    "exploit", "payload", "meterpreter", "msfconsole",
    "reverse shell", "sqlmap", "chmod", "chown", "exec "
]
FORBIDDEN_CMDLIKE = ["curl ", "wget ", "sudo ", "bash -i", "nc ", "ncat ", "rm -rf"]
FORBIDDEN_SUBSTR = ["$(", "${", "\\x", "base64 -d"]  # doble-escape para dejar \x literal
FORBIDDEN_HEX_RE = re.compile(r"0x[0-9a-fA-F]{2,}")

def contains_forbidden(text: str) -> bool:
    if not text:
        return False
    t = text.lower()
    if any(k in t for k in FORBIDDEN_KEYWORDS):
        return True
    if any(k in t for k in FORBIDDEN_CMDLIKE):
        return True
    if any(s in text for s in FORBIDDEN_SUBSTR):
        return True
    if FORBIDDEN_HEX_RE.search(text):
        return True
    return False

# --------- Utilidades API ----------
ENV_CANDIDATES = ["OPENAI_API_KEY", "OPENAI_API_KEY_ATAQUE", "OPENAI_APIKEY", "OPENAI_KEY", "HF_OPENAI_API_KEY"]

def get_api_key() -> Optional[str]:
    for name in ENV_CANDIDATES:
        v = os.environ.get(name)
        if v:
            return v
    return None

def call_openai_any(prompt: str, api_key: str, models=None, timeout: int = 25) -> Tuple[bool, str]:
    if models is None:
        models = OPENAI_MODEL_FALLBACK
    headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
    last_error = None

    for model in models:
        # 1) Intento con /v1/responses
        try:
            r = requests.post(OPENAI_API_URL_RESPONSES, headers=headers, json={"model": model, "input": prompt}, timeout=timeout)
            if r.status_code == 200:
                j = r.json()
                out = ""
                if "output" in j:
                    if isinstance(j["output"], list):
                        parts: List[str] = []
                        for item in j["output"]:
                            if isinstance(item, dict):
                                c = item.get("content") or item.get("text") or item.get("output_text")
                                if isinstance(c, str):
                                    parts.append(c)
                                elif isinstance(c, list):
                                    for el in c:
                                        if isinstance(el, dict):
                                            txt = el.get("text") or el.get("output_text") or el.get("content")
                                            if txt:
                                                parts.append(str(txt))
                                        else:
                                            parts.append(str(el))
                        out = "\n".join(parts).strip()
                    elif isinstance(j["output"], str):
                        out = j["output"].strip()
                if not out and "choices" in j:
                    # algunos despliegues devuelven estilo choices
                    ch = j["choices"][0]
                    out = ch.get("text") or ch.get("message", {}).get("content", {}).get("text") or ""
                if not out:
                    out = json.dumps(j, ensure_ascii=False)[:4000]
                return True, out
            else:
                try:
                    jr = r.json()
                    msg = jr.get("error", {}).get("message") or jr.get("message") or r.text
                except Exception:
                    msg = r.text
                last_error = f"Responses API (HTTP {r.status_code}): {msg}"
                # si es 404/400/422 probamos chat inmediatamente
        except Exception as e:
            last_error = f"Responses API error: {e}"

        # 2) Fallback a /v1/chat/completions
        try:
            payload = {"model": model, "messages": [{"role": "user", "content": prompt}], "temperature": 0.2}
            rc = requests.post(OPENAI_API_URL_CHAT, headers=headers, json=payload, timeout=timeout)
            if rc.status_code == 200:
                jc = rc.json()
                out = ""
                if "choices" in jc and jc["choices"]:
                    msg = jc["choices"][0].get("message", {}).get("content")
                    if isinstance(msg, str):
                        out = msg.strip()
                if not out:
                    out = json.dumps(jc, ensure_ascii=False)[:4000]
                return True, out
            else:
                try:
                    jc = rc.json()
                    msg = jc.get("error", {}).get("message") or jc.get("message") or rc.text
                except Exception:
                    msg = rc.text
                last_error = f"Chat API (HTTP {rc.status_code}): {msg}"
        except Exception as e:
            last_error = f"Chat API error: {e}"

    return False, (last_error or "No fue posible obtener respuesta del modelo.")

def safe_parse_json_from_model(text: str):
    try:
        return json.loads(text)
    except Exception:
        s = text.find('{')
        e = text.rfind('}')
        if s != -1 and e != -1 and e > s:
            try:
                return json.loads(text[s:e+1])
            except Exception:
                return {"raw": text}
        return {"raw": text}

# --------- Handlers ----------
def generate_simulation(user_input: str, include_iocs: bool, include_mitigation: bool):
    try:
        api_key = get_api_key()
        if not api_key:
            return ("<p style='color:crimson'><b>Error:</b> Falta la API key. "
                    f"Añade una de estas variables en Settings → Variables and secrets: {', '.join(ENV_CANDIDATES)}.</p>", "")

        prompt = PROMPT_TEMPLATE.format(input=user_input)
        ok, out = call_openai_any(prompt, api_key)
        if not ok:
            return f"<p style='color:crimson'><b>Error IA:</b> {out}</p>", ""

        if contains_forbidden(out):
            safe_msg = ("La respuesta original fue bloqueada por contener contenido sensible que podría ser instructivo para ataques. "
                        "He realizado un bloqueo por seguridad. Intenta proporcionar más contexto defensivo o limpia el contenido y vuelve a intentarlo.")
            return f"<p style='color:crimson'><b>Contenido bloqueado por seguridad:</b></p><p>{safe_msg}</p>", ""

        parsed = safe_parse_json_from_model(out)

        html = []
        html.append("<h3>Simulación Red Team (alto nivel)</h3>")
        if isinstance(parsed, dict) and parsed.get("simulation"):
            html.append(f"<p><b>Simulación:</b> {parsed['simulation']}</p>")
        else:
            sim = parsed.get("simulation") if isinstance(parsed, dict) else None
            html.append(f"<p><b>Simulación:</b> {json.dumps(sim, ensure_ascii=False)}</p>")

        if include_iocs:
            html.append("<h4>Indicadores (IoCs) sugeridos</h4>")
            iocs = parsed.get("iocs") if isinstance(parsed, dict) else None
            if isinstance(iocs, list) and iocs:
                html.append("<ul>")
                for i in iocs:
                    html.append(f"<li>{i}</li>")
                html.append("</ul>")
            else:
                html.append(f"<p>{json.dumps(iocs, ensure_ascii=False)}</p>")

        if include_mitigation:
            html.append("<h4>Contramedidas y mitigación</h4>")
            mit = parsed.get("mitigations") if isinstance(parsed, dict) else None
            if isinstance(mit, list) and mit:
                html.append("<ul>")
                for m in mit:
                    html.append(f"<li>{m}</li>")
                html.append("</ul>")
            else:
                html.append(f"<p>{json.dumps(mit, ensure_ascii=False)}</p>")

        html.append("<p style='font-size:0.9em;color:#bbb'>Nota: esta simulación es de alto nivel y educativa. No proporciona instrucciones de ataque. Use para mejorar defensas y detección.</p>")
        return "\n".join(html), json.dumps(parsed, ensure_ascii=False, indent=2)
    except Exception as e:
        return f"<p style='color:crimson'><b>Error inesperado:</b> {e}</p>", ""

def generate_report(json_str: str, title: str = "Reporte Red Team") -> Tuple[str, str]:
    try:
        if not json_str:
            return "", ""
        try:
            parsed = json.loads(json_str) if isinstance(json_str, str) else json_str
        except Exception:
            parsed = {"raw": str(json_str)}

        timestamp = time.strftime("%Y%m%d_%H%M%S")
        filename = f"/mnt/data/redteam_report_{timestamp}.txt"
        with open(filename, 'w', encoding='utf-8') as f:
            f.write(f"{title}\nGenerated: {time.ctime()}\n\n")
            f.write("SIMULATION:\n")
            sim = parsed.get("simulation") if isinstance(parsed, dict) else None
            f.write((sim or "(no simulation)") + "\n\n")
            f.write("IOCS:\n")
            for i in (parsed.get("iocs") if isinstance(parsed, dict) and parsed.get("iocs") else []):
                f.write(f"- {i}\n")
            f.write("\nMITIGATIONS:\n")
            for m in (parsed.get("mitigations") if isinstance(parsed, dict) and parsed.get("mitigations") else []):
                f.write(f"- {m}\n")
            f.write("\nRAW:\n")
            f.write(json.dumps(parsed, ensure_ascii=False, indent=2))
        return filename, filename
    except Exception as e:
        return "", ""

# --------- UI ----------
with gr.Blocks(analytics_enabled=False) as demo:
    gr.Markdown("## 🧯 Simulación Red Team (alto nivel) — Defender con IA")
    with gr.Row():
        with gr.Column(scale=7):
            inp = gr.Textbox(label="Pega aquí el correo RAW, URL o fragmento a analizar", lines=20, placeholder="Pega cabeceras, cuerpo o URL completa")
            cb_iocs = gr.Checkbox(label="Incluir IoCs (indicadores) en la salida", value=True)
            cb_mit = gr.Checkbox(label="Incluir mitigaciones", value=True)
            btn = gr.Button("Simular ataque (alto nivel)")
            download_btn = gr.Button("Generar reporte (.txt)")
        with gr.Column(scale=5):
            out_html = gr.HTML("Resultado aparecerá aquí")
            last_json = gr.Textbox(visible=False)
            file_out = gr.File(label="Descargar reporte (.txt)", visible=False)

    btn.click(generate_simulation, inputs=[inp, cb_iocs, cb_mit], outputs=[out_html, last_json])
    download_btn.click(generate_report, inputs=[last_json, gr.Textbox(value="Reporte Red Team", visible=False)], outputs=[file_out, file_out])

if __name__ == '__main__':
    demo.launch(server_name='0.0.0.0', server_port=7860)