simulator / app.py
doctorlinux's picture
Upload app.py
69cf4db verified
# app.py
# Simulador Red Team (alto nivel, defensivo) - Gradio app + generación de reporte TXT
import os
import json
import re
import time
import requests
import gradio as gr
from typing import Tuple, List, Optional
OPENAI_API_URL_RESPONSES = "https://api.openai.com/v1/responses"
OPENAI_API_URL_CHAT = "https://api.openai.com/v1/chat/completions"
OPENAI_MODEL_FALLBACK = ["gpt-4o-mini", "gpt-4o"]
PROMPT_TEMPLATE = """Eres un asistente de ciberseguridad (red-team simulator) que debe simular a alto nivel cómo un atacante podría aprovechar el contenido que se le entrega (email, URL o fragmento). NORMAS (obligatorias):
1. Proporciona sólo un resumen del objetivo del atacante y del vector. NO entregues instrucciones técnicas, comandos, código de exploit, payloads, o pasos paso-a-paso para cometer un ataque.
2. Devuelve 3 secciones en JSON:
- "simulation": breve párrafo (1-3 frases) explicando la estrategia del atacante (alto nivel).
- "iocs": lista de indicadores accionables para detección (dominios, patrones de URL, encabezados sospechosos, extensiones).
- "mitigations": lista de contramedidas operativas (bloqueos, políticas, educación, verificación técnica).
3. Si el material es insuficiente, indica qué faltaría.
4. Limita la respuesta a lenguaje defensivo y educacional. NO ofrezcas código ni tácticas para explotar vulnerabilidades.
5. Devuelve SOLO JSON válido (objetivo: {"simulation":..., "iocs":[...], "mitigations":[...]})
Contenido a analizar:
{input}
"""
# --------- Filtro seguro (ligero y robusto) ----------
FORBIDDEN_KEYWORDS = [
"exploit", "payload", "meterpreter", "msfconsole",
"reverse shell", "sqlmap", "chmod", "chown", "exec "
]
FORBIDDEN_CMDLIKE = ["curl ", "wget ", "sudo ", "bash -i", "nc ", "ncat ", "rm -rf"]
FORBIDDEN_SUBSTR = ["$(", "${", "\\x", "base64 -d"] # doble-escape para dejar \x literal
FORBIDDEN_HEX_RE = re.compile(r"0x[0-9a-fA-F]{2,}")
def contains_forbidden(text: str) -> bool:
if not text:
return False
t = text.lower()
if any(k in t for k in FORBIDDEN_KEYWORDS):
return True
if any(k in t for k in FORBIDDEN_CMDLIKE):
return True
if any(s in text for s in FORBIDDEN_SUBSTR):
return True
if FORBIDDEN_HEX_RE.search(text):
return True
return False
# --------- Utilidades API ----------
ENV_CANDIDATES = ["OPENAI_API_KEY", "OPENAI_API_KEY_ATAQUE", "OPENAI_APIKEY", "OPENAI_KEY", "HF_OPENAI_API_KEY"]
def get_api_key() -> Optional[str]:
for name in ENV_CANDIDATES:
v = os.environ.get(name)
if v:
return v
return None
def call_openai_any(prompt: str, api_key: str, models=None, timeout: int = 25) -> Tuple[bool, str]:
if models is None:
models = OPENAI_MODEL_FALLBACK
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
last_error = None
for model in models:
# 1) Intento con /v1/responses
try:
r = requests.post(OPENAI_API_URL_RESPONSES, headers=headers, json={"model": model, "input": prompt}, timeout=timeout)
if r.status_code == 200:
j = r.json()
out = ""
if "output" in j:
if isinstance(j["output"], list):
parts: List[str] = []
for item in j["output"]:
if isinstance(item, dict):
c = item.get("content") or item.get("text") or item.get("output_text")
if isinstance(c, str):
parts.append(c)
elif isinstance(c, list):
for el in c:
if isinstance(el, dict):
txt = el.get("text") or el.get("output_text") or el.get("content")
if txt:
parts.append(str(txt))
else:
parts.append(str(el))
out = "\n".join(parts).strip()
elif isinstance(j["output"], str):
out = j["output"].strip()
if not out and "choices" in j:
# algunos despliegues devuelven estilo choices
ch = j["choices"][0]
out = ch.get("text") or ch.get("message", {}).get("content", {}).get("text") or ""
if not out:
out = json.dumps(j, ensure_ascii=False)[:4000]
return True, out
else:
try:
jr = r.json()
msg = jr.get("error", {}).get("message") or jr.get("message") or r.text
except Exception:
msg = r.text
last_error = f"Responses API (HTTP {r.status_code}): {msg}"
# si es 404/400/422 probamos chat inmediatamente
except Exception as e:
last_error = f"Responses API error: {e}"
# 2) Fallback a /v1/chat/completions
try:
payload = {"model": model, "messages": [{"role": "user", "content": prompt}], "temperature": 0.2}
rc = requests.post(OPENAI_API_URL_CHAT, headers=headers, json=payload, timeout=timeout)
if rc.status_code == 200:
jc = rc.json()
out = ""
if "choices" in jc and jc["choices"]:
msg = jc["choices"][0].get("message", {}).get("content")
if isinstance(msg, str):
out = msg.strip()
if not out:
out = json.dumps(jc, ensure_ascii=False)[:4000]
return True, out
else:
try:
jc = rc.json()
msg = jc.get("error", {}).get("message") or jc.get("message") or rc.text
except Exception:
msg = rc.text
last_error = f"Chat API (HTTP {rc.status_code}): {msg}"
except Exception as e:
last_error = f"Chat API error: {e}"
return False, (last_error or "No fue posible obtener respuesta del modelo.")
def safe_parse_json_from_model(text: str):
try:
return json.loads(text)
except Exception:
s = text.find('{')
e = text.rfind('}')
if s != -1 and e != -1 and e > s:
try:
return json.loads(text[s:e+1])
except Exception:
return {"raw": text}
return {"raw": text}
# --------- Handlers ----------
def generate_simulation(user_input: str, include_iocs: bool, include_mitigation: bool):
try:
api_key = get_api_key()
if not api_key:
return ("<p style='color:crimson'><b>Error:</b> Falta la API key. "
f"Añade una de estas variables en Settings → Variables and secrets: {', '.join(ENV_CANDIDATES)}.</p>", "")
prompt = PROMPT_TEMPLATE.format(input=user_input)
ok, out = call_openai_any(prompt, api_key)
if not ok:
return f"<p style='color:crimson'><b>Error IA:</b> {out}</p>", ""
if contains_forbidden(out):
safe_msg = ("La respuesta original fue bloqueada por contener contenido sensible que podría ser instructivo para ataques. "
"He realizado un bloqueo por seguridad. Intenta proporcionar más contexto defensivo o limpia el contenido y vuelve a intentarlo.")
return f"<p style='color:crimson'><b>Contenido bloqueado por seguridad:</b></p><p>{safe_msg}</p>", ""
parsed = safe_parse_json_from_model(out)
html = []
html.append("<h3>Simulación Red Team (alto nivel)</h3>")
if isinstance(parsed, dict) and parsed.get("simulation"):
html.append(f"<p><b>Simulación:</b> {parsed['simulation']}</p>")
else:
sim = parsed.get("simulation") if isinstance(parsed, dict) else None
html.append(f"<p><b>Simulación:</b> {json.dumps(sim, ensure_ascii=False)}</p>")
if include_iocs:
html.append("<h4>Indicadores (IoCs) sugeridos</h4>")
iocs = parsed.get("iocs") if isinstance(parsed, dict) else None
if isinstance(iocs, list) and iocs:
html.append("<ul>")
for i in iocs:
html.append(f"<li>{i}</li>")
html.append("</ul>")
else:
html.append(f"<p>{json.dumps(iocs, ensure_ascii=False)}</p>")
if include_mitigation:
html.append("<h4>Contramedidas y mitigación</h4>")
mit = parsed.get("mitigations") if isinstance(parsed, dict) else None
if isinstance(mit, list) and mit:
html.append("<ul>")
for m in mit:
html.append(f"<li>{m}</li>")
html.append("</ul>")
else:
html.append(f"<p>{json.dumps(mit, ensure_ascii=False)}</p>")
html.append("<p style='font-size:0.9em;color:#bbb'>Nota: esta simulación es de alto nivel y educativa. No proporciona instrucciones de ataque. Use para mejorar defensas y detección.</p>")
return "\n".join(html), json.dumps(parsed, ensure_ascii=False, indent=2)
except Exception as e:
return f"<p style='color:crimson'><b>Error inesperado:</b> {e}</p>", ""
def generate_report(json_str: str, title: str = "Reporte Red Team") -> Tuple[str, str]:
try:
if not json_str:
return "", ""
try:
parsed = json.loads(json_str) if isinstance(json_str, str) else json_str
except Exception:
parsed = {"raw": str(json_str)}
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"/mnt/data/redteam_report_{timestamp}.txt"
with open(filename, 'w', encoding='utf-8') as f:
f.write(f"{title}\nGenerated: {time.ctime()}\n\n")
f.write("SIMULATION:\n")
sim = parsed.get("simulation") if isinstance(parsed, dict) else None
f.write((sim or "(no simulation)") + "\n\n")
f.write("IOCS:\n")
for i in (parsed.get("iocs") if isinstance(parsed, dict) and parsed.get("iocs") else []):
f.write(f"- {i}\n")
f.write("\nMITIGATIONS:\n")
for m in (parsed.get("mitigations") if isinstance(parsed, dict) and parsed.get("mitigations") else []):
f.write(f"- {m}\n")
f.write("\nRAW:\n")
f.write(json.dumps(parsed, ensure_ascii=False, indent=2))
return filename, filename
except Exception as e:
return "", ""
# --------- UI ----------
with gr.Blocks(analytics_enabled=False) as demo:
gr.Markdown("## 🧯 Simulación Red Team (alto nivel) — Defender con IA")
with gr.Row():
with gr.Column(scale=7):
inp = gr.Textbox(label="Pega aquí el correo RAW, URL o fragmento a analizar", lines=20, placeholder="Pega cabeceras, cuerpo o URL completa")
cb_iocs = gr.Checkbox(label="Incluir IoCs (indicadores) en la salida", value=True)
cb_mit = gr.Checkbox(label="Incluir mitigaciones", value=True)
btn = gr.Button("Simular ataque (alto nivel)")
download_btn = gr.Button("Generar reporte (.txt)")
with gr.Column(scale=5):
out_html = gr.HTML("Resultado aparecerá aquí")
last_json = gr.Textbox(visible=False)
file_out = gr.File(label="Descargar reporte (.txt)", visible=False)
btn.click(generate_simulation, inputs=[inp, cb_iocs, cb_mit], outputs=[out_html, last_json])
download_btn.click(generate_report, inputs=[last_json, gr.Textbox(value="Reporte Red Team", visible=False)], outputs=[file_out, file_out])
if __name__ == '__main__':
demo.launch(server_name='0.0.0.0', server_port=7860)