Spaces:
Sleeping
Sleeping
| # redteam_simulator.py | |
| # Simulador Red Team (alto nivel, defensivo) - Gradio app | |
| # Requisitos: gradio, requests | |
| # Instrucciones: sube este archivo a tu Space y configura OPENAI_API_KEY en Settings -> Variables and secrets | |
| import os | |
| import json | |
| import re | |
| import requests | |
| import gradio as gr | |
| from typing import Tuple | |
| # ------------------ Config ------------------ | |
| OPENAI_API_URL = "https://api.openai.com/v1/responses" | |
| OPENAI_MODEL_FALLBACK = ["gpt-4o-mini", "gpt-4o", "gpt-5-mini"] | |
| # Prompt template (defensivo, no instrucciones de explotación) | |
| PROMPT_TEMPLATE = """Eres un asistente de ciberseguridad (red-team simulator) que debe simular a alto nivel cómo un atacante podría aprovechar el contenido que se le entrega (email, URL o fragmento). NORMAS (obligatorias): | |
| 1. Proporciona sólo un resumen del objetivo del atacante y del vector. NO entregues instrucciones técnicas, comandos, código de exploit, payloads, o pasos paso-a-paso para cometer un ataque. | |
| 2. Devuelve 3 secciones en JSON: | |
| - \"simulation\": breve párrafo (1-3 frases) explicando la estrategia del atacante (alto nivel). | |
| - \"iocs\": lista de indicadores accionables para detección (dominios, patrones de URL, encabezados sospechosos, extensiones). | |
| - \"mitigations\": lista de contramedidas operativas (bloqueos, políticas, educación, verificación técnica). | |
| 3. Si el material es insuficiente, indica qué faltaría. | |
| 4. Limita la respuesta a lenguaje defensivo y educacional. NO ofrezcas código ni tácticas para explotar vulnerabilidades. | |
| 5. Devuelve SOLO JSON válido (objetivo: {\"simulation\":..., \"iocs\":[...], \"mitigations\":[...]}) | |
| Contenido a analizar: | |
| {input} | |
| """ | |
| # Palabras/prototipos prohibidos en la salida (si aparecen, se bloqueará la respuesta por seguridad) | |
| FORBIDDEN_PATTERNS = [ | |
| r"\bexploit\b", r"\bpayload\b", r"\bmeterpreter\b", r"\bmsfconsole\b", | |
| r"curl\b", r"wget\b", r"sudo\b", r"rm\s+-rf\b", r"reverse shell\b", | |
| r"exec\b", r"bash -i\b", r"nc\b", r"ncat\b", r"chmod\b", r"chown\b", | |
| r"\bsqlmap\b", r"\\x", r"0x[0-9a-fA-F]{2,}", r"base64 -d", r"\\b\\$\\(", r"\\$\\{" | |
| ] | |
| FORBIDDEN_REGEX = re.compile("|".join(FORBIDDEN_PATTERNS), re.I) | |
| # ------------------ Helpers ------------------ | |
| def call_openai_responses(prompt: str, api_key: str, models=None, timeout: int = 20) -> Tuple[bool, str]: | |
| if models is None: | |
| models = OPENAI_MODEL_FALLBACK | |
| headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} | |
| for model in models: | |
| payload = {"model": model, "input": prompt} | |
| try: | |
| r = requests.post(OPENAI_API_URL, headers=headers, json=payload, timeout=timeout) | |
| except Exception as e: | |
| return False, f"Error de conexión al llamar a la API: {e}" | |
| if r.status_code == 200: | |
| try: | |
| j = r.json() | |
| # Extraer texto de la Responses API | |
| out = "" | |
| if "output" in j: | |
| if isinstance(j["output"], list): | |
| parts = [] | |
| for item in j["output"]: | |
| if isinstance(item, dict): | |
| c = item.get("content") or item.get("text") or item.get("output_text") | |
| if isinstance(c, str): | |
| parts.append(c) | |
| elif isinstance(c, list): | |
| for el in c: | |
| if isinstance(el, dict): | |
| txt = el.get("text") or el.get("output_text") or el.get("content") | |
| if txt: | |
| parts.append(str(txt)) | |
| else: | |
| parts.append(str(el)) | |
| out = "\\n".join(parts).strip() | |
| elif isinstance(j["output"], str): | |
| out = j["output"].strip() | |
| # Fallback a choices | |
| if not out and "choices" in j and isinstance(j.get("choices"), list) and j["choices"]: | |
| ch = j["choices"][0] | |
| out = ch.get("text") or ch.get("message", {}).get("content", {}).get("text") or "" | |
| if not out: | |
| out = json.dumps(j, ensure_ascii=False)[:4000] | |
| return True, out | |
| except Exception as e: | |
| return False, f"Error parseando respuesta de la API: {e}" | |
| else: | |
| # error http: intentar siguiente modelo o devolver mensaje claro | |
| try: | |
| ej = r.json() | |
| msg = ej.get("error", {}).get("message") or ej.get("message") or r.text | |
| except Exception: | |
| msg = r.text | |
| if r.status_code == 401: | |
| return False, "AuthenticationError (401): OPENAI_API_KEY inválida o revocada." | |
| if r.status_code == 429: | |
| return False, "RateLimitError (429): límite superado en OpenAI." | |
| # si el mensaje menciona el modelo, intentar siguiente | |
| if isinstance(msg, str) and "model" in msg.lower(): | |
| continue | |
| return False, f"HTTP {r.status_code}: {msg}" | |
| return False, "Ningún modelo disponible o permitido en la cuenta de OpenAI." | |
| def contains_forbidden(text: str) -> bool: | |
| if not text: | |
| return False | |
| return bool(FORBIDDEN_REGEX.search(text)) | |
| def safe_parse_json_from_model(text: str): | |
| # Intenta parsear JSON; si falla, extrae el primer bloque JSON encontrado | |
| try: | |
| return json.loads(text) | |
| except Exception: | |
| s = text.find('{') | |
| e = text.rfind('}') | |
| if s != -1 and e != -1 and e > s: | |
| try: | |
| return json.loads(text[s:e+1]) | |
| except Exception: | |
| return {"raw": text} | |
| return {"raw": text} | |
| # ------------------ Generador ------------------ | |
| def generate_simulation(user_input: str, include_iocs: bool, include_mitigation: bool): | |
| api_key = os.environ.get("OPENAI_API_KEY") | |
| if not api_key: | |
| return "<p style='color:crimson'><b>Error:</b> OPENAI_API_KEY no configurada en Settings → Variables and secrets.</p>" | |
| # construir prompt | |
| prompt = PROMPT_TEMPLATE.format(input=user_input) | |
| ok, out = call_openai_responses(prompt, api_key) | |
| if not ok: | |
| return f"<p style='color:crimson'><b>Error IA:</b> {out}</p>" | |
| # filtro de seguridad: si la respuesta contiene patrones peligrosos, no se muestra | |
| if contains_forbidden(out): | |
| # devolver aviso y una versión segura: pedir al modelo reescribir de forma defensiva | |
| safe_msg = ("La respuesta original fue bloqueada por contener contenido sensible que podría ser instructivo para ataques. " | |
| "He realizado un bloqueo por seguridad. Intenta proporcionar más contexto defensivo o limpia el contenido y vuelve a intentarlo.") | |
| return f"<p style='color:crimson'><b>Contenido bloqueado por seguridad:</b></p><p>{safe_msg}</p>" | |
| parsed = safe_parse_json_from_model(out) | |
| # Construir HTML de salida | |
| html = [] | |
| html.append("<h3>Simulación Red Team (alto nivel)</h3>") | |
| if isinstance(parsed, dict) and parsed.get("simulation"): | |
| html.append(f"<p><b>Simulación:</b> {parsed['simulation']}</p>") | |
| else: | |
| sim = parsed.get("simulation") if isinstance(parsed, dict) else None | |
| html.append(f"<p><b>Simulación:</b> {json.dumps(sim, ensure_ascii=False)}</p>") | |
| if include_iocs: | |
| html.append("<h4>Indicadores (IoCs) sugeridos</h4>") | |
| iocs = parsed.get("iocs") if isinstance(parsed, dict) else None | |
| if isinstance(iocs, list) and iocs: | |
| html.append("<ul>") | |
| for i in iocs: | |
| html.append(f"<li>{i}</li>") | |
| html.append("</ul>") | |
| else: | |
| html.append(f"<p>{json.dumps(iocs, ensure_ascii=False)}</p>") | |
| if include_mitigation: | |
| html.append("<h4>Contramedidas y mitigación</h4>") | |
| mit = parsed.get("mitigations") if isinstance(parsed, dict) else None | |
| if isinstance(mit, list) and mit: | |
| html.append("<ul>") | |
| for m in mit: | |
| html.append(f"<li>{m}</li>") | |
| html.append("</ul>") | |
| else: | |
| html.append(f"<p>{json.dumps(mit, ensure_ascii=False)}</p>") | |
| html.append("<p style='font-size:0.9em;color:#bbb'>Nota: esta simulación es de alto nivel y educativa. No proporciona instrucciones de ataque. Use para mejorar defensas y detección.</p>") | |
| return "\\n".join(html) | |
| # ------------------ UI ------------------ | |
| with gr.Blocks(analytics_enabled=False) as demo: | |
| gr.Markdown(\"## 🧯 Simulador Red Team (alto nivel) — Defender con IA\") | |
| with gr.Row(): | |
| with gr.Column(scale=7): | |
| inp = gr.Textbox(label=\"Pega aquí el correo RAW, URL o fragmento a analizar\", lines=20, placeholder=\"Pega cabeceras, cuerpo o URL completa\") | |
| cb_iocs = gr.Checkbox(label=\"Incluir IoCs (indicadores) en la salida\", value=True) | |
| cb_mit = gr.Checkbox(label=\"Incluir mitigaciones\", value=True) | |
| btn = gr.Button(\"Simular ataque (alto nivel)\") | |
| with gr.Column(scale=5): | |
| out_html = gr.HTML(\"<i>Resultado aparecerá aquí</i>\") | |
| btn.click(generate_simulation, inputs=[inp, cb_iocs, cb_mit], outputs=[out_html]) | |
| if __name__ == '__main__': | |
| demo.launch(server_name='0.0.0.0', server_port=7860) | |