# redteam_simulator_with_download.py # Simulador Red Team (alto nivel, defensivo) - Gradio app + generación de reporte TXT # Requisitos: gradio, requests # Instrucciones: sube este archivo a tu Space y configura OPENAI_API_KEY en Settings -> Variables and secrets import os import json import re import time import requests import gradio as gr from typing import Tuple # ------------------ Config ------------------ OPENAI_API_URL = "https://api.openai.com/v1/responses" OPENAI_MODEL_FALLBACK = ["gpt-4o-mini", "gpt-4o", "gpt-5-mini"] PROMPT_TEMPLATE = \"\"\"Eres un asistente de ciberseguridad (red-team simulator) que debe simular a alto nivel cómo un atacante podría aprovechar el contenido que se le entrega (email, URL o fragmento). NORMAS (obligatorias): 1. Proporciona sólo un resumen del objetivo del atacante y del vector. NO entregues instrucciones técnicas, comandos, código de exploit, payloads, o pasos paso-a-paso para cometer un ataque. 2. Devuelve 3 secciones en JSON: - \"simulation\": breve párrafo (1-3 frases) explicando la estrategia del atacante (alto nivel). - \"iocs\": lista de indicadores accionables para detección (dominios, patrones de URL, encabezados sospechosos, extensiones). - \"mitigations\": lista de contramedidas operativas (bloqueos, políticas, educación, verificación técnica). 3. Si el material es insuficiente, indica qué faltaría. 4. Limita la respuesta a lenguaje defensivo y educacional. NO ofrezcas código ni tácticas para explotar vulnerabilidades. 5. Devuelve SOLO JSON válido (objetivo: {\"simulation\":..., \"iocs\":[...], \"mitigations\":[...]}) Contenido a analizar: {input} \"\"\" FORBIDDEN_PATTERNS = [ r\"\\bexploit\\b\", r\"\\bpayload\\b\", r\"\\bmeterpreter\\b\", r\"\\bmsfconsole\\b\", r\"curl\\b\", r\"wget\\b\", r\"sudo\\b\", r\"rm\\s+-rf\\b\", r\"reverse shell\\b\", r\"exec\\b\", r\"bash -i\\b\", r\"nc\\b\", r\"ncat\\b\", r\"chmod\\b\", r\"chown\\b\", r\"\\bsqlmap\\b\", r\"\\\\x\", r\"0x[0-9a-fA-F]{2,}\", r\"base64 -d\", r\"\\\\b\\\\$\\\\(\", r\"\\\\$\\\\{\" ] FORBIDDEN_REGEX = re.compile(\"|\".join(FORBIDDEN_PATTERNS), re.I) def call_openai_responses(prompt: str, api_key: str, models=None, timeout: int = 20) -> Tuple[bool, str]: if models is None: models = OPENAI_MODEL_FALLBACK headers = {\"Authorization\": f\"Bearer {api_key}\", \"Content-Type\": \"application/json\"} for model in models: payload = {\"model\": model, \"input\": prompt} try: r = requests.post(OPENAI_API_URL, headers=headers, json=payload, timeout=timeout) except Exception as e: return False, f\"Error de conexión al llamar a la API: {e}\" if r.status_code == 200: try: j = r.json() out = \"\" if \"output\" in j: if isinstance(j[\"output\"], list): parts = [] for item in j[\"output\"]: if isinstance(item, dict): c = item.get(\"content\") or item.get(\"text\") or item.get(\"output_text\") if isinstance(c, str): parts.append(c) elif isinstance(c, list): for el in c: if isinstance(el, dict): txt = el.get(\"text\") or el.get(\"output_text\") or el.get(\"content\") if txt: parts.append(str(txt)) else: parts.append(str(el)) out = \"\\n\".join(parts).strip() elif isinstance(j[\"output\"], str): out = j[\"output\"].strip() if not out and \"choices\" in j and isinstance(j.get(\"choices\"), list) and j[\"choices\"]: ch = j[\"choices\"][0] out = ch.get(\"text\") or ch.get(\"message\", {}).get(\"content\", {}).get(\"text\") or \"\" if not out: out = json.dumps(j, ensure_ascii=False)[:4000] return True, out except Exception as e: return False, f\"Error parseando respuesta de la API: {e}\" else: try: ej = r.json() msg = ej.get(\"error\", {}).get(\"message\") or ej.get(\"message\") or r.text except Exception: msg = r.text if r.status_code == 401: return False, \"AuthenticationError (401): OPENAI_API_KEY inválida o revocada.\" if r.status_code == 429: return False, \"RateLimitError (429): límite superado en OpenAI.\" if isinstance(msg, str) and \"model\" in msg.lower(): continue return False, f\"HTTP {r.status_code}: {msg}\" return False, \"Ningún modelo disponible o permitido en la cuenta de OpenAI.\" def contains_forbidden(text: str) -> bool: if not text: return False return bool(FORBIDDEN_REGEX.search(text)) def safe_parse_json_from_model(text: str): try: return json.loads(text) except Exception: s = text.find('{') e = text.rfind('}') if s != -1 and e != -1 and e > s: try: return json.loads(text[s:e+1]) except Exception: return {\"raw\": text} return {\"raw\": text} def generate_simulation(user_input: str, include_iocs: bool, include_mitigation: bool): api_key = os.environ.get(\"OPENAI_API_KEY\") if not api_key: return \"
Error: OPENAI_API_KEY no configurada en Settings → Variables and secrets.
\", \"\" prompt = PROMPT_TEMPLATE.format(input=user_input) ok, out = call_openai_responses(prompt, api_key) if not ok: return f\"Error IA: {out}
\", \"\" if contains_forbidden(out): safe_msg = (\"La respuesta original fue bloqueada por contener contenido sensible que podría ser instructivo para ataques. " "He realizado un bloqueo por seguridad. Intenta proporcionar más contexto defensivo o limpia el contenido y vuelve a intentarlo.\") return f\"Contenido bloqueado por seguridad:
{safe_msg}
\", \"\" parsed = safe_parse_json_from_model(out) html = [] html.append(\"Simulación: {parsed['simulation']}
\") else: sim = parsed.get(\"simulation\") if isinstance(parsed, dict) else None html.append(f\"Simulación: {json.dumps(sim, ensure_ascii=False)}
\") if include_iocs: html.append(\"{json.dumps(iocs, ensure_ascii=False)}
\") if include_mitigation: html.append(\"{json.dumps(mit, ensure_ascii=False)}
\") html.append(\"Nota: esta simulación es de alto nivel y educativa. No proporciona instrucciones de ataque. Use para mejorar defensas y detección.
\") # devolvemos tambien el JSON parseado como string para uso en reporte return \"\\n\".join(html), json.dumps(parsed, ensure_ascii=False, indent=2) def generate_report(json_str: str, title: str = \"Reporte Red Team\") -> Tuple[str, str]: \"\"\"Crea un archivo TXT con la simulación y mitigaciones y devuelve la ruta lista para descargar.\"\"\" if not json_str: return \"\", \"\" try: parsed = json.loads(json_str) if isinstance(json_str, str) else json_str except Exception: parsed = {\"raw\": str(json_str)} timestamp = time.strftime(\"%Y%m%d_%H%M%S\") filename = f\"/mnt/data/redteam_report_{timestamp}.txt\" with open(filename, 'w', encoding='utf-8') as f: f.write(f\"{title}\\nGenerated: {time.ctime()}\\n\\n\") f.write(\"SIMULATION:\\n\") sim = parsed.get(\"simulation\") if isinstance(parsed, dict) else None f.write((sim or \"(no simulation)\") + \"\\n\\n\") f.write(\"IOCS:\\n\") for i in (parsed.get(\"iocs\") if isinstance(parsed, dict) and parsed.get(\"iocs\") else []): f.write(f\"- {i}\\n\") f.write(\"\\nMITIGATIONS:\\n\") for m in (parsed.get(\"mitigations\") if isinstance(parsed, dict) and parsed.get(\"mitigations\") else []): f.write(f\"- {m}\\n\") f.write(\"\\nRAW:\\n\") f.write(json.dumps(parsed, ensure_ascii=False, indent=2)) return filename, filename # return as two values (path, path) for compatibility # ------------------ UI ------------------ with gr.Blocks(analytics_enabled=False) as demo: gr.Markdown(\"## 🧯 Simulador Red Team (alto nivel) — Defender con IA\") with gr.Row(): with gr.Column(scale=7): inp = gr.Textbox(label=\"Pega aquí el correo RAW, URL o fragmento a analizar\", lines=20, placeholder=\"Pega cabeceras, cuerpo o URL completa\") cb_iocs = gr.Checkbox(label=\"Incluir IoCs (indicadores) en la salida\", value=True) cb_mit = gr.Checkbox(label=\"Incluir mitigaciones\", value=True) btn = gr.Button(\"Simular ataque (alto nivel)\") download_btn = gr.Button(\"Generar reporte (.txt)\") with gr.Column(scale=5): out_html = gr.HTML(\"Resultado aparecerá aquí\") # componente invisible para guardar el JSON parseado last_json = gr.Textbox(visible=False) file_out = gr.File(label=\"Descargar reporte (.txt)\", visible=False) # Al hacer click en Simular -> actualiza out_html y last_json (json string) btn.click(generate_simulation, inputs=[inp, cb_iocs, cb_mit], outputs=[out_html, last_json]) # Al hacer click en Generar reporte -> crea archivo y lo muestra en file_out download_btn.click(generate_report, inputs=[last_json, gr.Textbox(value=\"Reporte Red Team\", visible=False)], outputs=[file_out, file_out]) if __name__ == '__main__': demo.launch(server_name='0.0.0.0', server_port=7860)