# app.py # Simulador Red Team (alto nivel, defensivo) - Gradio app + generación de reporte TXT import os import json import re import time import requests import gradio as gr from typing import Tuple, List, Optional OPENAI_API_URL_RESPONSES = "https://api.openai.com/v1/responses" OPENAI_API_URL_CHAT = "https://api.openai.com/v1/chat/completions" OPENAI_MODEL_FALLBACK = ["gpt-4o-mini", "gpt-4o"] PROMPT_TEMPLATE = """Eres un asistente de ciberseguridad (red-team simulator) que debe simular a alto nivel cómo un atacante podría aprovechar el contenido que se le entrega (email, URL o fragmento). NORMAS (obligatorias): 1. Proporciona sólo un resumen del objetivo del atacante y del vector. NO entregues instrucciones técnicas, comandos, código de exploit, payloads, o pasos paso-a-paso para cometer un ataque. 2. Devuelve 3 secciones en JSON: - "simulation": breve párrafo (1-3 frases) explicando la estrategia del atacante (alto nivel). - "iocs": lista de indicadores accionables para detección (dominios, patrones de URL, encabezados sospechosos, extensiones). - "mitigations": lista de contramedidas operativas (bloqueos, políticas, educación, verificación técnica). 3. Si el material es insuficiente, indica qué faltaría. 4. Limita la respuesta a lenguaje defensivo y educacional. NO ofrezcas código ni tácticas para explotar vulnerabilidades. 5. Devuelve SOLO JSON válido (objetivo: {"simulation":..., "iocs":[...], "mitigations":[...]}) Contenido a analizar: {input} """ # --------- Filtro seguro (ligero y robusto) ---------- FORBIDDEN_KEYWORDS = [ "exploit", "payload", "meterpreter", "msfconsole", "reverse shell", "sqlmap", "chmod", "chown", "exec " ] FORBIDDEN_CMDLIKE = ["curl ", "wget ", "sudo ", "bash -i", "nc ", "ncat ", "rm -rf"] FORBIDDEN_SUBSTR = ["$(", "${", "\\x", "base64 -d"] # doble-escape para dejar \x literal FORBIDDEN_HEX_RE = re.compile(r"0x[0-9a-fA-F]{2,}") def contains_forbidden(text: str) -> bool: if not text: return False t = text.lower() if any(k in t for k in FORBIDDEN_KEYWORDS): return True if any(k in t for k in FORBIDDEN_CMDLIKE): return True if any(s in text for s in FORBIDDEN_SUBSTR): return True if FORBIDDEN_HEX_RE.search(text): return True return False # --------- Utilidades API ---------- ENV_CANDIDATES = ["OPENAI_API_KEY", "OPENAI_API_KEY_ATAQUE", "OPENAI_APIKEY", "OPENAI_KEY", "HF_OPENAI_API_KEY"] def get_api_key() -> Optional[str]: for name in ENV_CANDIDATES: v = os.environ.get(name) if v: return v return None def call_openai_any(prompt: str, api_key: str, models=None, timeout: int = 25) -> Tuple[bool, str]: if models is None: models = OPENAI_MODEL_FALLBACK headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} last_error = None for model in models: # 1) Intento con /v1/responses try: r = requests.post(OPENAI_API_URL_RESPONSES, headers=headers, json={"model": model, "input": prompt}, timeout=timeout) if r.status_code == 200: j = r.json() out = "" if "output" in j: if isinstance(j["output"], list): parts: List[str] = [] for item in j["output"]: if isinstance(item, dict): c = item.get("content") or item.get("text") or item.get("output_text") if isinstance(c, str): parts.append(c) elif isinstance(c, list): for el in c: if isinstance(el, dict): txt = el.get("text") or el.get("output_text") or el.get("content") if txt: parts.append(str(txt)) else: parts.append(str(el)) out = "\n".join(parts).strip() elif isinstance(j["output"], str): out = j["output"].strip() if not out and "choices" in j: # algunos despliegues devuelven estilo choices ch = j["choices"][0] out = ch.get("text") or ch.get("message", {}).get("content", {}).get("text") or "" if not out: out = json.dumps(j, ensure_ascii=False)[:4000] return True, out else: try: jr = r.json() msg = jr.get("error", {}).get("message") or jr.get("message") or r.text except Exception: msg = r.text last_error = f"Responses API (HTTP {r.status_code}): {msg}" # si es 404/400/422 probamos chat inmediatamente except Exception as e: last_error = f"Responses API error: {e}" # 2) Fallback a /v1/chat/completions try: payload = {"model": model, "messages": [{"role": "user", "content": prompt}], "temperature": 0.2} rc = requests.post(OPENAI_API_URL_CHAT, headers=headers, json=payload, timeout=timeout) if rc.status_code == 200: jc = rc.json() out = "" if "choices" in jc and jc["choices"]: msg = jc["choices"][0].get("message", {}).get("content") if isinstance(msg, str): out = msg.strip() if not out: out = json.dumps(jc, ensure_ascii=False)[:4000] return True, out else: try: jc = rc.json() msg = jc.get("error", {}).get("message") or jc.get("message") or rc.text except Exception: msg = rc.text last_error = f"Chat API (HTTP {rc.status_code}): {msg}" except Exception as e: last_error = f"Chat API error: {e}" return False, (last_error or "No fue posible obtener respuesta del modelo.") def safe_parse_json_from_model(text: str): try: return json.loads(text) except Exception: s = text.find('{') e = text.rfind('}') if s != -1 and e != -1 and e > s: try: return json.loads(text[s:e+1]) except Exception: return {"raw": text} return {"raw": text} # --------- Handlers ---------- def generate_simulation(user_input: str, include_iocs: bool, include_mitigation: bool): try: api_key = get_api_key() if not api_key: return ("
Error: Falta la API key. " f"Añade una de estas variables en Settings → Variables and secrets: {', '.join(ENV_CANDIDATES)}.
", "") prompt = PROMPT_TEMPLATE.format(input=user_input) ok, out = call_openai_any(prompt, api_key) if not ok: return f"Error IA: {out}
", "" if contains_forbidden(out): safe_msg = ("La respuesta original fue bloqueada por contener contenido sensible que podría ser instructivo para ataques. " "He realizado un bloqueo por seguridad. Intenta proporcionar más contexto defensivo o limpia el contenido y vuelve a intentarlo.") return f"Contenido bloqueado por seguridad:
{safe_msg}
", "" parsed = safe_parse_json_from_model(out) html = [] html.append("Simulación: {parsed['simulation']}
") else: sim = parsed.get("simulation") if isinstance(parsed, dict) else None html.append(f"Simulación: {json.dumps(sim, ensure_ascii=False)}
") if include_iocs: html.append("{json.dumps(iocs, ensure_ascii=False)}
") if include_mitigation: html.append("{json.dumps(mit, ensure_ascii=False)}
") html.append("Nota: esta simulación es de alto nivel y educativa. No proporciona instrucciones de ataque. Use para mejorar defensas y detección.
") return "\n".join(html), json.dumps(parsed, ensure_ascii=False, indent=2) except Exception as e: return f"Error inesperado: {e}
", "" def generate_report(json_str: str, title: str = "Reporte Red Team") -> Tuple[str, str]: try: if not json_str: return "", "" try: parsed = json.loads(json_str) if isinstance(json_str, str) else json_str except Exception: parsed = {"raw": str(json_str)} timestamp = time.strftime("%Y%m%d_%H%M%S") filename = f"/mnt/data/redteam_report_{timestamp}.txt" with open(filename, 'w', encoding='utf-8') as f: f.write(f"{title}\nGenerated: {time.ctime()}\n\n") f.write("SIMULATION:\n") sim = parsed.get("simulation") if isinstance(parsed, dict) else None f.write((sim or "(no simulation)") + "\n\n") f.write("IOCS:\n") for i in (parsed.get("iocs") if isinstance(parsed, dict) and parsed.get("iocs") else []): f.write(f"- {i}\n") f.write("\nMITIGATIONS:\n") for m in (parsed.get("mitigations") if isinstance(parsed, dict) and parsed.get("mitigations") else []): f.write(f"- {m}\n") f.write("\nRAW:\n") f.write(json.dumps(parsed, ensure_ascii=False, indent=2)) return filename, filename except Exception as e: return "", "" # --------- UI ---------- with gr.Blocks(analytics_enabled=False) as demo: gr.Markdown("## 🧯 Simulación Red Team (alto nivel) — Defender con IA") with gr.Row(): with gr.Column(scale=7): inp = gr.Textbox(label="Pega aquí el correo RAW, URL o fragmento a analizar", lines=20, placeholder="Pega cabeceras, cuerpo o URL completa") cb_iocs = gr.Checkbox(label="Incluir IoCs (indicadores) en la salida", value=True) cb_mit = gr.Checkbox(label="Incluir mitigaciones", value=True) btn = gr.Button("Simular ataque (alto nivel)") download_btn = gr.Button("Generar reporte (.txt)") with gr.Column(scale=5): out_html = gr.HTML("Resultado aparecerá aquí") last_json = gr.Textbox(visible=False) file_out = gr.File(label="Descargar reporte (.txt)", visible=False) btn.click(generate_simulation, inputs=[inp, cb_iocs, cb_mit], outputs=[out_html, last_json]) download_btn.click(generate_report, inputs=[last_json, gr.Textbox(value="Reporte Red Team", visible=False)], outputs=[file_out, file_out]) if __name__ == '__main__': demo.launch(server_name='0.0.0.0', server_port=7860)