doctorlinux commited on
Commit
ac107b3
·
verified ·
1 Parent(s): 7095f1e

Upload app.py

Browse files
Files changed (1) hide show
  1. app.py +87 -96
app.py CHANGED
@@ -1,7 +1,5 @@
1
- # redteam_simulator_with_download.py
2
  # Simulador Red Team (alto nivel, defensivo) - Gradio app + generación de reporte TXT
3
- # Requisitos: gradio, requests
4
- # Instrucciones: sube este archivo a tu Space y configura OPENAI_API_KEY en Settings -> Variables and secrets
5
  import os
6
  import json
7
  import re
@@ -10,87 +8,86 @@ import requests
10
  import gradio as gr
11
  from typing import Tuple
12
 
13
- # ------------------ Config ------------------
14
  OPENAI_API_URL = "https://api.openai.com/v1/responses"
15
  OPENAI_MODEL_FALLBACK = ["gpt-4o-mini", "gpt-4o", "gpt-5-mini"]
16
 
17
- PROMPT_TEMPLATE = \"\"\"Eres un asistente de ciberseguridad (red-team simulator) que debe simular a alto nivel cómo un atacante podría aprovechar el contenido que se le entrega (email, URL o fragmento). NORMAS (obligatorias):
18
  1. Proporciona sólo un resumen del objetivo del atacante y del vector. NO entregues instrucciones técnicas, comandos, código de exploit, payloads, o pasos paso-a-paso para cometer un ataque.
19
  2. Devuelve 3 secciones en JSON:
20
- - \"simulation\": breve párrafo (1-3 frases) explicando la estrategia del atacante (alto nivel).
21
- - \"iocs\": lista de indicadores accionables para detección (dominios, patrones de URL, encabezados sospechosos, extensiones).
22
- - \"mitigations\": lista de contramedidas operativas (bloqueos, políticas, educación, verificación técnica).
23
  3. Si el material es insuficiente, indica qué faltaría.
24
  4. Limita la respuesta a lenguaje defensivo y educacional. NO ofrezcas código ni tácticas para explotar vulnerabilidades.
25
- 5. Devuelve SOLO JSON válido (objetivo: {\"simulation\":..., \"iocs\":[...], \"mitigations\":[...]})
26
 
27
  Contenido a analizar:
28
  {input}
29
- \"\"\"
30
 
31
  FORBIDDEN_PATTERNS = [
32
- r\"\\bexploit\\b\", r\"\\bpayload\\b\", r\"\\bmeterpreter\\b\", r\"\\bmsfconsole\\b\",
33
- r\"curl\\b\", r\"wget\\b\", r\"sudo\\b\", r\"rm\\s+-rf\\b\", r\"reverse shell\\b\",
34
- r\"exec\\b\", r\"bash -i\\b\", r\"nc\\b\", r\"ncat\\b\", r\"chmod\\b\", r\"chown\\b\",
35
- r\"\\bsqlmap\\b\", r\"\\\\x\", r\"0x[0-9a-fA-F]{2,}\", r\"base64 -d\", r\"\\\\b\\\\$\\\\(\", r\"\\\\$\\\\{\"
36
  ]
37
- FORBIDDEN_REGEX = re.compile(\"|\".join(FORBIDDEN_PATTERNS), re.I)
38
 
39
  def call_openai_responses(prompt: str, api_key: str, models=None, timeout: int = 20) -> Tuple[bool, str]:
40
  if models is None:
41
  models = OPENAI_MODEL_FALLBACK
42
- headers = {\"Authorization\": f\"Bearer {api_key}\", \"Content-Type\": \"application/json\"}
43
  for model in models:
44
- payload = {\"model\": model, \"input\": prompt}
45
  try:
46
  r = requests.post(OPENAI_API_URL, headers=headers, json=payload, timeout=timeout)
47
  except Exception as e:
48
- return False, f\"Error de conexión al llamar a la API: {e}\"
49
  if r.status_code == 200:
50
  try:
51
  j = r.json()
52
- out = \"\"
53
- if \"output\" in j:
54
- if isinstance(j[\"output\"], list):
55
  parts = []
56
- for item in j[\"output\"]:
57
  if isinstance(item, dict):
58
- c = item.get(\"content\") or item.get(\"text\") or item.get(\"output_text\")
59
  if isinstance(c, str):
60
  parts.append(c)
61
  elif isinstance(c, list):
62
  for el in c:
63
  if isinstance(el, dict):
64
- txt = el.get(\"text\") or el.get(\"output_text\") or el.get(\"content\")
65
  if txt:
66
  parts.append(str(txt))
67
  else:
68
  parts.append(str(el))
69
- out = \"\\n\".join(parts).strip()
70
- elif isinstance(j[\"output\"], str):
71
- out = j[\"output\"].strip()
72
- if not out and \"choices\" in j and isinstance(j.get(\"choices\"), list) and j[\"choices\"]:
73
- ch = j[\"choices\"][0]
74
- out = ch.get(\"text\") or ch.get(\"message\", {}).get(\"content\", {}).get(\"text\") or \"\"
75
  if not out:
76
  out = json.dumps(j, ensure_ascii=False)[:4000]
77
  return True, out
78
  except Exception as e:
79
- return False, f\"Error parseando respuesta de la API: {e}\"
80
  else:
81
  try:
82
  ej = r.json()
83
- msg = ej.get(\"error\", {}).get(\"message\") or ej.get(\"message\") or r.text
84
  except Exception:
85
  msg = r.text
86
  if r.status_code == 401:
87
- return False, \"AuthenticationError (401): OPENAI_API_KEY inválida o revocada.\"
88
  if r.status_code == 429:
89
- return False, \"RateLimitError (429): límite superado en OpenAI.\"
90
- if isinstance(msg, str) and \"model\" in msg.lower():
91
  continue
92
- return False, f\"HTTP {r.status_code}: {msg}\"
93
- return False, \"Ningún modelo disponible o permitido en la cuenta de OpenAI.\"
94
 
95
  def contains_forbidden(text: str) -> bool:
96
  if not text:
@@ -107,106 +104,100 @@ def safe_parse_json_from_model(text: str):
107
  try:
108
  return json.loads(text[s:e+1])
109
  except Exception:
110
- return {\"raw\": text}
111
- return {\"raw\": text}
112
 
113
  def generate_simulation(user_input: str, include_iocs: bool, include_mitigation: bool):
114
- api_key = os.environ.get(\"OPENAI_API_KEY\")
115
  if not api_key:
116
- return \"<p style='color:crimson'><b>Error:</b> OPENAI_API_KEY no configurada en Settings → Variables and secrets.</p>\", \"\"
117
 
118
  prompt = PROMPT_TEMPLATE.format(input=user_input)
119
  ok, out = call_openai_responses(prompt, api_key)
120
  if not ok:
121
- return f\"<p style='color:crimson'><b>Error IA:</b> {out}</p>\", \"\"
122
 
123
  if contains_forbidden(out):
124
- safe_msg = (\"La respuesta original fue bloqueada por contener contenido sensible que podría ser instructivo para ataques. "
125
- "He realizado un bloqueo por seguridad. Intenta proporcionar más contexto defensivo o limpia el contenido y vuelve a intentarlo.\")
126
- return f\"<p style='color:crimson'><b>Contenido bloqueado por seguridad:</b></p><p>{safe_msg}</p>\", \"\"
127
 
128
  parsed = safe_parse_json_from_model(out)
129
 
130
  html = []
131
- html.append(\"<h3>Simulación Red Team (alto nivel)</h3>\")
132
- if isinstance(parsed, dict) and parsed.get(\"simulation\"):
133
- html.append(f\"<p><b>Simulación:</b> {parsed['simulation']}</p>\")
134
  else:
135
- sim = parsed.get(\"simulation\") if isinstance(parsed, dict) else None
136
- html.append(f\"<p><b>Simulación:</b> {json.dumps(sim, ensure_ascii=False)}</p>\")
137
 
138
  if include_iocs:
139
- html.append(\"<h4>Indicadores (IoCs) sugeridos</h4>\")
140
- iocs = parsed.get(\"iocs\") if isinstance(parsed, dict) else None
141
  if isinstance(iocs, list) and iocs:
142
- html.append(\"<ul>\")
143
  for i in iocs:
144
- html.append(f\"<li>{i}</li>\")
145
- html.append(\"</ul>\")
146
  else:
147
- html.append(f\"<p>{json.dumps(iocs, ensure_ascii=False)}</p>\")
148
 
149
  if include_mitigation:
150
- html.append(\"<h4>Contramedidas y mitigación</h4>\")
151
- mit = parsed.get(\"mitigations\") if isinstance(parsed, dict) else None
152
  if isinstance(mit, list) and mit:
153
- html.append(\"<ul>\")
154
  for m in mit:
155
- html.append(f\"<li>{m}</li>\")
156
- html.append(\"</ul>\")
157
  else:
158
- html.append(f\"<p>{json.dumps(mit, ensure_ascii=False)}</p>\")
159
 
160
- html.append(\"<p style='font-size:0.9em;color:#bbb'>Nota: esta simulación es de alto nivel y educativa. No proporciona instrucciones de ataque. Use para mejorar defensas y detección.</p>\")
161
- # devolvemos tambien el JSON parseado como string para uso en reporte
162
- return \"\\n\".join(html), json.dumps(parsed, ensure_ascii=False, indent=2)
163
 
164
- def generate_report(json_str: str, title: str = \"Reporte Red Team\") -> Tuple[str, str]:
165
- \"\"\"Crea un archivo TXT con la simulación y mitigaciones y devuelve la ruta lista para descargar.\"\"\"
166
  if not json_str:
167
- return \"\", \"\"
168
  try:
169
  parsed = json.loads(json_str) if isinstance(json_str, str) else json_str
170
  except Exception:
171
- parsed = {\"raw\": str(json_str)}
172
 
173
- timestamp = time.strftime(\"%Y%m%d_%H%M%S\")
174
- filename = f\"/mnt/data/redteam_report_{timestamp}.txt\"
175
  with open(filename, 'w', encoding='utf-8') as f:
176
- f.write(f\"{title}\\nGenerated: {time.ctime()}\\n\\n\")
177
- f.write(\"SIMULATION:\\n\")
178
- sim = parsed.get(\"simulation\") if isinstance(parsed, dict) else None
179
- f.write((sim or \"(no simulation)\") + \"\\n\\n\")
180
- f.write(\"IOCS:\\n\")
181
- for i in (parsed.get(\"iocs\") if isinstance(parsed, dict) and parsed.get(\"iocs\") else []):
182
- f.write(f\"- {i}\\n\")
183
- f.write(\"\\nMITIGATIONS:\\n\")
184
- for m in (parsed.get(\"mitigations\") if isinstance(parsed, dict) and parsed.get(\"mitigations\") else []):
185
- f.write(f\"- {m}\\n\")
186
- f.write(\"\\nRAW:\\n\")
187
  f.write(json.dumps(parsed, ensure_ascii=False, indent=2))
188
- return filename, filename # return as two values (path, path) for compatibility
189
 
190
- # ------------------ UI ------------------
191
  with gr.Blocks(analytics_enabled=False) as demo:
192
- gr.Markdown(\"## 🧯 Simulador Red Team (alto nivel) — Defender con IA\")
193
  with gr.Row():
194
  with gr.Column(scale=7):
195
- inp = gr.Textbox(label=\"Pega aquí el correo RAW, URL o fragmento a analizar\", lines=20, placeholder=\"Pega cabeceras, cuerpo o URL completa\")
196
- cb_iocs = gr.Checkbox(label=\"Incluir IoCs (indicadores) en la salida\", value=True)
197
- cb_mit = gr.Checkbox(label=\"Incluir mitigaciones\", value=True)
198
- btn = gr.Button(\"Simular ataque (alto nivel)\")
199
- download_btn = gr.Button(\"Generar reporte (.txt)\")
200
  with gr.Column(scale=5):
201
- out_html = gr.HTML(\"<i>Resultado aparecerá aquí</i>\")
202
- # componente invisible para guardar el JSON parseado
203
  last_json = gr.Textbox(visible=False)
204
- file_out = gr.File(label=\"Descargar reporte (.txt)\", visible=False)
205
 
206
- # Al hacer click en Simular -> actualiza out_html y last_json (json string)
207
  btn.click(generate_simulation, inputs=[inp, cb_iocs, cb_mit], outputs=[out_html, last_json])
208
- # Al hacer click en Generar reporte -> crea archivo y lo muestra en file_out
209
- download_btn.click(generate_report, inputs=[last_json, gr.Textbox(value=\"Reporte Red Team\", visible=False)], outputs=[file_out, file_out])
210
 
211
  if __name__ == '__main__':
212
  demo.launch(server_name='0.0.0.0', server_port=7860)
 
1
+ # app.py
2
  # Simulador Red Team (alto nivel, defensivo) - Gradio app + generación de reporte TXT
 
 
3
  import os
4
  import json
5
  import re
 
8
  import gradio as gr
9
  from typing import Tuple
10
 
 
11
  OPENAI_API_URL = "https://api.openai.com/v1/responses"
12
  OPENAI_MODEL_FALLBACK = ["gpt-4o-mini", "gpt-4o", "gpt-5-mini"]
13
 
14
+ PROMPT_TEMPLATE = """Eres un asistente de ciberseguridad (red-team simulator) que debe simular a alto nivel cómo un atacante podría aprovechar el contenido que se le entrega (email, URL o fragmento). NORMAS (obligatorias):
15
  1. Proporciona sólo un resumen del objetivo del atacante y del vector. NO entregues instrucciones técnicas, comandos, código de exploit, payloads, o pasos paso-a-paso para cometer un ataque.
16
  2. Devuelve 3 secciones en JSON:
17
+ - "simulation": breve párrafo (1-3 frases) explicando la estrategia del atacante (alto nivel).
18
+ - "iocs": lista de indicadores accionables para detección (dominios, patrones de URL, encabezados sospechosos, extensiones).
19
+ - "mitigations": lista de contramedidas operativas (bloqueos, políticas, educación, verificación técnica).
20
  3. Si el material es insuficiente, indica qué faltaría.
21
  4. Limita la respuesta a lenguaje defensivo y educacional. NO ofrezcas código ni tácticas para explotar vulnerabilidades.
22
+ 5. Devuelve SOLO JSON válido (objetivo: {"simulation":..., "iocs":[...], "mitigations":[...]})
23
 
24
  Contenido a analizar:
25
  {input}
26
+ """
27
 
28
  FORBIDDEN_PATTERNS = [
29
+ r"\bexploit\b", r"\bpayload\b", r"\bmeterpreter\b", r"\bmsfconsole\b",
30
+ r"curl\b", r"wget\b", r"sudo\b", r"rm\s+-rf\b", r"reverse shell\b",
31
+ r"exec\b", r"bash -i\b", r"nc\b", r"ncat\b", r"chmod\b", r"chown\b",
32
+ r"\bsqlmap\b", r"\\x", r"0x[0-9a-fA-F]{2,}", r"base64 -d", r"\\b\\$\\(", r"\\$\\{"
33
  ]
34
+ FORBIDDEN_REGEX = re.compile("|".join(FORBIDDEN_PATTERNS), re.I)
35
 
36
  def call_openai_responses(prompt: str, api_key: str, models=None, timeout: int = 20) -> Tuple[bool, str]:
37
  if models is None:
38
  models = OPENAI_MODEL_FALLBACK
39
+ headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
40
  for model in models:
41
+ payload = {"model": model, "input": prompt}
42
  try:
43
  r = requests.post(OPENAI_API_URL, headers=headers, json=payload, timeout=timeout)
44
  except Exception as e:
45
+ return False, f"Error de conexión al llamar a la API: {e}"
46
  if r.status_code == 200:
47
  try:
48
  j = r.json()
49
+ out = ""
50
+ if "output" in j:
51
+ if isinstance(j["output"], list):
52
  parts = []
53
+ for item in j["output"]:
54
  if isinstance(item, dict):
55
+ c = item.get("content") or item.get("text") or item.get("output_text")
56
  if isinstance(c, str):
57
  parts.append(c)
58
  elif isinstance(c, list):
59
  for el in c:
60
  if isinstance(el, dict):
61
+ txt = el.get("text") or el.get("output_text") or el.get("content")
62
  if txt:
63
  parts.append(str(txt))
64
  else:
65
  parts.append(str(el))
66
+ out = "\n".join(parts).strip()
67
+ elif isinstance(j["output"], str):
68
+ out = j["output"].strip()
69
+ if not out and "choices" in j and isinstance(j.get("choices"), list) and j["choices"]:
70
+ ch = j["choices"][0]
71
+ out = ch.get("text") or ch.get("message", {}).get("content", {}).get("text") or ""
72
  if not out:
73
  out = json.dumps(j, ensure_ascii=False)[:4000]
74
  return True, out
75
  except Exception as e:
76
+ return False, f"Error parseando respuesta de la API: {e}"
77
  else:
78
  try:
79
  ej = r.json()
80
+ msg = ej.get("error", {}).get("message") or ej.get("message") or r.text
81
  except Exception:
82
  msg = r.text
83
  if r.status_code == 401:
84
+ return False, "AuthenticationError (401): OPENAI_API_KEY inválida o revocada."
85
  if r.status_code == 429:
86
+ return False, "RateLimitError (429): límite superado en OpenAI."
87
+ if isinstance(msg, str) and "model" in msg.lower():
88
  continue
89
+ return False, f"HTTP {r.status_code}: {msg}"
90
+ return False, "Ningún modelo disponible o permitido en la cuenta de OpenAI."
91
 
92
  def contains_forbidden(text: str) -> bool:
93
  if not text:
 
104
  try:
105
  return json.loads(text[s:e+1])
106
  except Exception:
107
+ return {"raw": text}
108
+ return {"raw": text}
109
 
110
  def generate_simulation(user_input: str, include_iocs: bool, include_mitigation: bool):
111
+ api_key = os.environ.get("OPENAI_API_KEY")
112
  if not api_key:
113
+ return "<p style='color:crimson'><b>Error:</b> OPENAI_API_KEY no configurada en Settings → Variables and secrets.</p>", ""
114
 
115
  prompt = PROMPT_TEMPLATE.format(input=user_input)
116
  ok, out = call_openai_responses(prompt, api_key)
117
  if not ok:
118
+ return f"<p style='color:crimson'><b>Error IA:</b> {out}</p>", ""
119
 
120
  if contains_forbidden(out):
121
+ safe_msg = ("La respuesta original fue bloqueada por contener contenido sensible que podría ser instructivo para ataques. "
122
+ "He realizado un bloqueo por seguridad. Intenta proporcionar más contexto defensivo o limpia el contenido y vuelve a intentarlo.")
123
+ return f"<p style='color:crimson'><b>Contenido bloqueado por seguridad:</b></p><p>{safe_msg}</p>", ""
124
 
125
  parsed = safe_parse_json_from_model(out)
126
 
127
  html = []
128
+ html.append("<h3>Simulación Red Team (alto nivel)</h3>")
129
+ if isinstance(parsed, dict) and parsed.get("simulation"):
130
+ html.append(f"<p><b>Simulación:</b> {parsed['simulation']}</p>")
131
  else:
132
+ sim = parsed.get("simulation") if isinstance(parsed, dict) else None
133
+ html.append(f"<p><b>Simulación:</b> {json.dumps(sim, ensure_ascii=False)}</p>")
134
 
135
  if include_iocs:
136
+ html.append("<h4>Indicadores (IoCs) sugeridos</h4>")
137
+ iocs = parsed.get("iocs") if isinstance(parsed, dict) else None
138
  if isinstance(iocs, list) and iocs:
139
+ html.append("<ul>")
140
  for i in iocs:
141
+ html.append(f"<li>{i}</li>")
142
+ html.append("</ul>")
143
  else:
144
+ html.append(f"<p>{json.dumps(iocs, ensure_ascii=False)}</p>")
145
 
146
  if include_mitigation:
147
+ html.append("<h4>Contramedidas y mitigación</h4>")
148
+ mit = parsed.get("mitigations") if isinstance(parsed, dict) else None
149
  if isinstance(mit, list) and mit:
150
+ html.append("<ul>")
151
  for m in mit:
152
+ html.append(f"<li>{m}</li>")
153
+ html.append("</ul>")
154
  else:
155
+ html.append(f"<p>{json.dumps(mit, ensure_ascii=False)}</p>")
156
 
157
+ html.append("<p style='font-size:0.9em;color:#bbb'>Nota: esta simulación es de alto nivel y educativa. No proporciona instrucciones de ataque. Use para mejorar defensas y detección.</p>")
158
+ return "\n".join(html), json.dumps(parsed, ensure_ascii=False, indent=2)
 
159
 
160
+ def generate_report(json_str: str, title: str = "Reporte Red Team") -> Tuple[str, str]:
 
161
  if not json_str:
162
+ return "", ""
163
  try:
164
  parsed = json.loads(json_str) if isinstance(json_str, str) else json_str
165
  except Exception:
166
+ parsed = {"raw": str(json_str)}
167
 
168
+ timestamp = time.strftime("%Y%m%d_%H%M%S")
169
+ filename = f"/mnt/data/redteam_report_{timestamp}.txt"
170
  with open(filename, 'w', encoding='utf-8') as f:
171
+ f.write(f"{title}\nGenerated: {time.ctime()}\n\n")
172
+ f.write("SIMULATION:\n")
173
+ sim = parsed.get("simulation") if isinstance(parsed, dict) else None
174
+ f.write((sim or "(no simulation)") + "\n\n")
175
+ f.write("IOCS:\n")
176
+ for i in (parsed.get("iocs") if isinstance(parsed, dict) and parsed.get("iocs") else []):
177
+ f.write(f"- {i}\n")
178
+ f.write("\nMITIGATIONS:\n")
179
+ for m in (parsed.get("mitigations") if isinstance(parsed, dict) and parsed.get("mitigations") else []):
180
+ f.write(f"- {m}\n")
181
+ f.write("\nRAW:\n")
182
  f.write(json.dumps(parsed, ensure_ascii=False, indent=2))
183
+ return filename, filename
184
 
 
185
  with gr.Blocks(analytics_enabled=False) as demo:
186
+ gr.Markdown("## 🧯 Simulador Red Team (alto nivel) — Defender con IA")
187
  with gr.Row():
188
  with gr.Column(scale=7):
189
+ inp = gr.Textbox(label="Pega aquí el correo RAW, URL o fragmento a analizar", lines=20, placeholder="Pega cabeceras, cuerpo o URL completa")
190
+ cb_iocs = gr.Checkbox(label="Incluir IoCs (indicadores) en la salida", value=True)
191
+ cb_mit = gr.Checkbox(label="Incluir mitigaciones", value=True)
192
+ btn = gr.Button("Simular ataque (alto nivel)")
193
+ download_btn = gr.Button("Generar reporte (.txt)")
194
  with gr.Column(scale=5):
195
+ out_html = gr.HTML("<i>Resultado aparecerá aquí</i>")
 
196
  last_json = gr.Textbox(visible=False)
197
+ file_out = gr.File(label="Descargar reporte (.txt)", visible=False)
198
 
 
199
  btn.click(generate_simulation, inputs=[inp, cb_iocs, cb_mit], outputs=[out_html, last_json])
200
+ download_btn.click(generate_report, inputs=[last_json, gr.Textbox(value="Reporte Red Team", visible=False)], outputs=[file_out, file_out])
 
201
 
202
  if __name__ == '__main__':
203
  demo.launch(server_name='0.0.0.0', server_port=7860)