Spaces:
Sleeping
Sleeping
File size: 9,548 Bytes
f228014 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# redteam_simulator.py
# Simulador Red Team (alto nivel, defensivo) - Gradio app
# Requisitos: gradio, requests
# Instrucciones: sube este archivo a tu Space y configura OPENAI_API_KEY en Settings -> Variables and secrets
import os
import json
import re
import requests
import gradio as gr
from typing import Tuple
# ------------------ Config ------------------
OPENAI_API_URL = "https://api.openai.com/v1/responses"
OPENAI_MODEL_FALLBACK = ["gpt-4o-mini", "gpt-4o", "gpt-5-mini"]
# Prompt template (defensivo, no instrucciones de explotación)
PROMPT_TEMPLATE = """Eres un asistente de ciberseguridad (red-team simulator) que debe simular a alto nivel cómo un atacante podría aprovechar el contenido que se le entrega (email, URL o fragmento). NORMAS (obligatorias):
1. Proporciona sólo un resumen del objetivo del atacante y del vector. NO entregues instrucciones técnicas, comandos, código de exploit, payloads, o pasos paso-a-paso para cometer un ataque.
2. Devuelve 3 secciones en JSON:
- \"simulation\": breve párrafo (1-3 frases) explicando la estrategia del atacante (alto nivel).
- \"iocs\": lista de indicadores accionables para detección (dominios, patrones de URL, encabezados sospechosos, extensiones).
- \"mitigations\": lista de contramedidas operativas (bloqueos, políticas, educación, verificación técnica).
3. Si el material es insuficiente, indica qué faltaría.
4. Limita la respuesta a lenguaje defensivo y educacional. NO ofrezcas código ni tácticas para explotar vulnerabilidades.
5. Devuelve SOLO JSON válido (objetivo: {\"simulation\":..., \"iocs\":[...], \"mitigations\":[...]})
Contenido a analizar:
{input}
"""
# Palabras/prototipos prohibidos en la salida (si aparecen, se bloqueará la respuesta por seguridad)
FORBIDDEN_PATTERNS = [
r"\bexploit\b", r"\bpayload\b", r"\bmeterpreter\b", r"\bmsfconsole\b",
r"curl\b", r"wget\b", r"sudo\b", r"rm\s+-rf\b", r"reverse shell\b",
r"exec\b", r"bash -i\b", r"nc\b", r"ncat\b", r"chmod\b", r"chown\b",
r"\bsqlmap\b", r"\\x", r"0x[0-9a-fA-F]{2,}", r"base64 -d", r"\\b\\$\\(", r"\\$\\{"
]
FORBIDDEN_REGEX = re.compile("|".join(FORBIDDEN_PATTERNS), re.I)
# ------------------ Helpers ------------------
def call_openai_responses(prompt: str, api_key: str, models=None, timeout: int = 20) -> Tuple[bool, str]:
if models is None:
models = OPENAI_MODEL_FALLBACK
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
for model in models:
payload = {"model": model, "input": prompt}
try:
r = requests.post(OPENAI_API_URL, headers=headers, json=payload, timeout=timeout)
except Exception as e:
return False, f"Error de conexión al llamar a la API: {e}"
if r.status_code == 200:
try:
j = r.json()
# Extraer texto de la Responses API
out = ""
if "output" in j:
if isinstance(j["output"], list):
parts = []
for item in j["output"]:
if isinstance(item, dict):
c = item.get("content") or item.get("text") or item.get("output_text")
if isinstance(c, str):
parts.append(c)
elif isinstance(c, list):
for el in c:
if isinstance(el, dict):
txt = el.get("text") or el.get("output_text") or el.get("content")
if txt:
parts.append(str(txt))
else:
parts.append(str(el))
out = "\\n".join(parts).strip()
elif isinstance(j["output"], str):
out = j["output"].strip()
# Fallback a choices
if not out and "choices" in j and isinstance(j.get("choices"), list) and j["choices"]:
ch = j["choices"][0]
out = ch.get("text") or ch.get("message", {}).get("content", {}).get("text") or ""
if not out:
out = json.dumps(j, ensure_ascii=False)[:4000]
return True, out
except Exception as e:
return False, f"Error parseando respuesta de la API: {e}"
else:
# error http: intentar siguiente modelo o devolver mensaje claro
try:
ej = r.json()
msg = ej.get("error", {}).get("message") or ej.get("message") or r.text
except Exception:
msg = r.text
if r.status_code == 401:
return False, "AuthenticationError (401): OPENAI_API_KEY inválida o revocada."
if r.status_code == 429:
return False, "RateLimitError (429): límite superado en OpenAI."
# si el mensaje menciona el modelo, intentar siguiente
if isinstance(msg, str) and "model" in msg.lower():
continue
return False, f"HTTP {r.status_code}: {msg}"
return False, "Ningún modelo disponible o permitido en la cuenta de OpenAI."
def contains_forbidden(text: str) -> bool:
if not text:
return False
return bool(FORBIDDEN_REGEX.search(text))
def safe_parse_json_from_model(text: str):
# Intenta parsear JSON; si falla, extrae el primer bloque JSON encontrado
try:
return json.loads(text)
except Exception:
s = text.find('{')
e = text.rfind('}')
if s != -1 and e != -1 and e > s:
try:
return json.loads(text[s:e+1])
except Exception:
return {"raw": text}
return {"raw": text}
# ------------------ Generador ------------------
def generate_simulation(user_input: str, include_iocs: bool, include_mitigation: bool):
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
return "<p style='color:crimson'><b>Error:</b> OPENAI_API_KEY no configurada en Settings → Variables and secrets.</p>"
# construir prompt
prompt = PROMPT_TEMPLATE.format(input=user_input)
ok, out = call_openai_responses(prompt, api_key)
if not ok:
return f"<p style='color:crimson'><b>Error IA:</b> {out}</p>"
# filtro de seguridad: si la respuesta contiene patrones peligrosos, no se muestra
if contains_forbidden(out):
# devolver aviso y una versión segura: pedir al modelo reescribir de forma defensiva
safe_msg = ("La respuesta original fue bloqueada por contener contenido sensible que podría ser instructivo para ataques. "
"He realizado un bloqueo por seguridad. Intenta proporcionar más contexto defensivo o limpia el contenido y vuelve a intentarlo.")
return f"<p style='color:crimson'><b>Contenido bloqueado por seguridad:</b></p><p>{safe_msg}</p>"
parsed = safe_parse_json_from_model(out)
# Construir HTML de salida
html = []
html.append("<h3>Simulación Red Team (alto nivel)</h3>")
if isinstance(parsed, dict) and parsed.get("simulation"):
html.append(f"<p><b>Simulación:</b> {parsed['simulation']}</p>")
else:
sim = parsed.get("simulation") if isinstance(parsed, dict) else None
html.append(f"<p><b>Simulación:</b> {json.dumps(sim, ensure_ascii=False)}</p>")
if include_iocs:
html.append("<h4>Indicadores (IoCs) sugeridos</h4>")
iocs = parsed.get("iocs") if isinstance(parsed, dict) else None
if isinstance(iocs, list) and iocs:
html.append("<ul>")
for i in iocs:
html.append(f"<li>{i}</li>")
html.append("</ul>")
else:
html.append(f"<p>{json.dumps(iocs, ensure_ascii=False)}</p>")
if include_mitigation:
html.append("<h4>Contramedidas y mitigación</h4>")
mit = parsed.get("mitigations") if isinstance(parsed, dict) else None
if isinstance(mit, list) and mit:
html.append("<ul>")
for m in mit:
html.append(f"<li>{m}</li>")
html.append("</ul>")
else:
html.append(f"<p>{json.dumps(mit, ensure_ascii=False)}</p>")
html.append("<p style='font-size:0.9em;color:#bbb'>Nota: esta simulación es de alto nivel y educativa. No proporciona instrucciones de ataque. Use para mejorar defensas y detección.</p>")
return "\\n".join(html)
# ------------------ UI ------------------
with gr.Blocks(analytics_enabled=False) as demo:
gr.Markdown(\"## 🧯 Simulador Red Team (alto nivel) — Defender con IA\")
with gr.Row():
with gr.Column(scale=7):
inp = gr.Textbox(label=\"Pega aquí el correo RAW, URL o fragmento a analizar\", lines=20, placeholder=\"Pega cabeceras, cuerpo o URL completa\")
cb_iocs = gr.Checkbox(label=\"Incluir IoCs (indicadores) en la salida\", value=True)
cb_mit = gr.Checkbox(label=\"Incluir mitigaciones\", value=True)
btn = gr.Button(\"Simular ataque (alto nivel)\")
with gr.Column(scale=5):
out_html = gr.HTML(\"<i>Resultado aparecerá aquí</i>\")
btn.click(generate_simulation, inputs=[inp, cb_iocs, cb_mit], outputs=[out_html])
if __name__ == '__main__':
demo.launch(server_name='0.0.0.0', server_port=7860)
|