simulator / redteam_simulator.py
doctorlinux's picture
Upload 8 files
f228014 verified
# redteam_simulator.py
# Simulador Red Team (alto nivel, defensivo) - Gradio app
# Requisitos: gradio, requests
# Instrucciones: sube este archivo a tu Space y configura OPENAI_API_KEY en Settings -> Variables and secrets
import os
import json
import re
import requests
import gradio as gr
from typing import Tuple
# ------------------ Config ------------------
OPENAI_API_URL = "https://api.openai.com/v1/responses"
OPENAI_MODEL_FALLBACK = ["gpt-4o-mini", "gpt-4o", "gpt-5-mini"]
# Prompt template (defensivo, no instrucciones de explotación)
PROMPT_TEMPLATE = """Eres un asistente de ciberseguridad (red-team simulator) que debe simular a alto nivel cómo un atacante podría aprovechar el contenido que se le entrega (email, URL o fragmento). NORMAS (obligatorias):
1. Proporciona sólo un resumen del objetivo del atacante y del vector. NO entregues instrucciones técnicas, comandos, código de exploit, payloads, o pasos paso-a-paso para cometer un ataque.
2. Devuelve 3 secciones en JSON:
- \"simulation\": breve párrafo (1-3 frases) explicando la estrategia del atacante (alto nivel).
- \"iocs\": lista de indicadores accionables para detección (dominios, patrones de URL, encabezados sospechosos, extensiones).
- \"mitigations\": lista de contramedidas operativas (bloqueos, políticas, educación, verificación técnica).
3. Si el material es insuficiente, indica qué faltaría.
4. Limita la respuesta a lenguaje defensivo y educacional. NO ofrezcas código ni tácticas para explotar vulnerabilidades.
5. Devuelve SOLO JSON válido (objetivo: {\"simulation\":..., \"iocs\":[...], \"mitigations\":[...]})
Contenido a analizar:
{input}
"""
# Palabras/prototipos prohibidos en la salida (si aparecen, se bloqueará la respuesta por seguridad)
FORBIDDEN_PATTERNS = [
r"\bexploit\b", r"\bpayload\b", r"\bmeterpreter\b", r"\bmsfconsole\b",
r"curl\b", r"wget\b", r"sudo\b", r"rm\s+-rf\b", r"reverse shell\b",
r"exec\b", r"bash -i\b", r"nc\b", r"ncat\b", r"chmod\b", r"chown\b",
r"\bsqlmap\b", r"\\x", r"0x[0-9a-fA-F]{2,}", r"base64 -d", r"\\b\\$\\(", r"\\$\\{"
]
FORBIDDEN_REGEX = re.compile("|".join(FORBIDDEN_PATTERNS), re.I)
# ------------------ Helpers ------------------
def call_openai_responses(prompt: str, api_key: str, models=None, timeout: int = 20) -> Tuple[bool, str]:
if models is None:
models = OPENAI_MODEL_FALLBACK
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
for model in models:
payload = {"model": model, "input": prompt}
try:
r = requests.post(OPENAI_API_URL, headers=headers, json=payload, timeout=timeout)
except Exception as e:
return False, f"Error de conexión al llamar a la API: {e}"
if r.status_code == 200:
try:
j = r.json()
# Extraer texto de la Responses API
out = ""
if "output" in j:
if isinstance(j["output"], list):
parts = []
for item in j["output"]:
if isinstance(item, dict):
c = item.get("content") or item.get("text") or item.get("output_text")
if isinstance(c, str):
parts.append(c)
elif isinstance(c, list):
for el in c:
if isinstance(el, dict):
txt = el.get("text") or el.get("output_text") or el.get("content")
if txt:
parts.append(str(txt))
else:
parts.append(str(el))
out = "\\n".join(parts).strip()
elif isinstance(j["output"], str):
out = j["output"].strip()
# Fallback a choices
if not out and "choices" in j and isinstance(j.get("choices"), list) and j["choices"]:
ch = j["choices"][0]
out = ch.get("text") or ch.get("message", {}).get("content", {}).get("text") or ""
if not out:
out = json.dumps(j, ensure_ascii=False)[:4000]
return True, out
except Exception as e:
return False, f"Error parseando respuesta de la API: {e}"
else:
# error http: intentar siguiente modelo o devolver mensaje claro
try:
ej = r.json()
msg = ej.get("error", {}).get("message") or ej.get("message") or r.text
except Exception:
msg = r.text
if r.status_code == 401:
return False, "AuthenticationError (401): OPENAI_API_KEY inválida o revocada."
if r.status_code == 429:
return False, "RateLimitError (429): límite superado en OpenAI."
# si el mensaje menciona el modelo, intentar siguiente
if isinstance(msg, str) and "model" in msg.lower():
continue
return False, f"HTTP {r.status_code}: {msg}"
return False, "Ningún modelo disponible o permitido en la cuenta de OpenAI."
def contains_forbidden(text: str) -> bool:
if not text:
return False
return bool(FORBIDDEN_REGEX.search(text))
def safe_parse_json_from_model(text: str):
# Intenta parsear JSON; si falla, extrae el primer bloque JSON encontrado
try:
return json.loads(text)
except Exception:
s = text.find('{')
e = text.rfind('}')
if s != -1 and e != -1 and e > s:
try:
return json.loads(text[s:e+1])
except Exception:
return {"raw": text}
return {"raw": text}
# ------------------ Generador ------------------
def generate_simulation(user_input: str, include_iocs: bool, include_mitigation: bool):
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
return "<p style='color:crimson'><b>Error:</b> OPENAI_API_KEY no configurada en Settings → Variables and secrets.</p>"
# construir prompt
prompt = PROMPT_TEMPLATE.format(input=user_input)
ok, out = call_openai_responses(prompt, api_key)
if not ok:
return f"<p style='color:crimson'><b>Error IA:</b> {out}</p>"
# filtro de seguridad: si la respuesta contiene patrones peligrosos, no se muestra
if contains_forbidden(out):
# devolver aviso y una versión segura: pedir al modelo reescribir de forma defensiva
safe_msg = ("La respuesta original fue bloqueada por contener contenido sensible que podría ser instructivo para ataques. "
"He realizado un bloqueo por seguridad. Intenta proporcionar más contexto defensivo o limpia el contenido y vuelve a intentarlo.")
return f"<p style='color:crimson'><b>Contenido bloqueado por seguridad:</b></p><p>{safe_msg}</p>"
parsed = safe_parse_json_from_model(out)
# Construir HTML de salida
html = []
html.append("<h3>Simulación Red Team (alto nivel)</h3>")
if isinstance(parsed, dict) and parsed.get("simulation"):
html.append(f"<p><b>Simulación:</b> {parsed['simulation']}</p>")
else:
sim = parsed.get("simulation") if isinstance(parsed, dict) else None
html.append(f"<p><b>Simulación:</b> {json.dumps(sim, ensure_ascii=False)}</p>")
if include_iocs:
html.append("<h4>Indicadores (IoCs) sugeridos</h4>")
iocs = parsed.get("iocs") if isinstance(parsed, dict) else None
if isinstance(iocs, list) and iocs:
html.append("<ul>")
for i in iocs:
html.append(f"<li>{i}</li>")
html.append("</ul>")
else:
html.append(f"<p>{json.dumps(iocs, ensure_ascii=False)}</p>")
if include_mitigation:
html.append("<h4>Contramedidas y mitigación</h4>")
mit = parsed.get("mitigations") if isinstance(parsed, dict) else None
if isinstance(mit, list) and mit:
html.append("<ul>")
for m in mit:
html.append(f"<li>{m}</li>")
html.append("</ul>")
else:
html.append(f"<p>{json.dumps(mit, ensure_ascii=False)}</p>")
html.append("<p style='font-size:0.9em;color:#bbb'>Nota: esta simulación es de alto nivel y educativa. No proporciona instrucciones de ataque. Use para mejorar defensas y detección.</p>")
return "\\n".join(html)
# ------------------ UI ------------------
with gr.Blocks(analytics_enabled=False) as demo:
gr.Markdown(\"## 🧯 Simulador Red Team (alto nivel) — Defender con IA\")
with gr.Row():
with gr.Column(scale=7):
inp = gr.Textbox(label=\"Pega aquí el correo RAW, URL o fragmento a analizar\", lines=20, placeholder=\"Pega cabeceras, cuerpo o URL completa\")
cb_iocs = gr.Checkbox(label=\"Incluir IoCs (indicadores) en la salida\", value=True)
cb_mit = gr.Checkbox(label=\"Incluir mitigaciones\", value=True)
btn = gr.Button(\"Simular ataque (alto nivel)\")
with gr.Column(scale=5):
out_html = gr.HTML(\"<i>Resultado aparecerá aquí</i>\")
btn.click(generate_simulation, inputs=[inp, cb_iocs, cb_mit], outputs=[out_html])
if __name__ == '__main__':
demo.launch(server_name='0.0.0.0', server_port=7860)