doctorlinux commited on
Commit
69cf4db
·
verified ·
1 Parent(s): 54676f7

Upload app.py

Browse files
Files changed (1) hide show
  1. app.py +133 -106
app.py CHANGED
@@ -6,10 +6,11 @@ import re
6
  import time
7
  import requests
8
  import gradio as gr
9
- from typing import Tuple, List
10
 
11
- OPENAI_API_URL = "https://api.openai.com/v1/responses"
12
- OPENAI_MODEL_FALLBACK = ["gpt-4o-mini", "gpt-4o", "gpt-5-mini"]
 
13
 
14
  PROMPT_TEMPLATE = """Eres un asistente de ciberseguridad (red-team simulator) que debe simular a alto nivel cómo un atacante podría aprovechar el contenido que se le entrega (email, URL o fragmento). NORMAS (obligatorias):
15
  1. Proporciona sólo un resumen del objetivo del atacante y del vector. NO entregues instrucciones técnicas, comandos, código de exploit, payloads, o pasos paso-a-paso para cometer un ataque.
@@ -25,15 +26,14 @@ Contenido a analizar:
25
  {input}
26
  """
27
 
28
- # --------- Filtro seguro (sin regex unificado para evitar errores) ----------
29
  FORBIDDEN_KEYWORDS = [
30
  "exploit", "payload", "meterpreter", "msfconsole",
31
  "reverse shell", "sqlmap", "chmod", "chown", "exec "
32
  ]
33
  FORBIDDEN_CMDLIKE = ["curl ", "wget ", "sudo ", "bash -i", "nc ", "ncat ", "rm -rf"]
34
- # OJO: usar \\x aquí para que el archivo final contenga \x y no sea interpretado como escape por Python
35
- FORBIDDEN_SUBSTR = ["$(", "${", "\\x", "base64 -d"]
36
- FORBIDDEN_HEX_RE = re.compile(r"0x[0-9a-fA-F]{2,}") # compilar por separado, simple y seguro
37
 
38
  def contains_forbidden(text: str) -> bool:
39
  if not text:
@@ -43,35 +43,33 @@ def contains_forbidden(text: str) -> bool:
43
  return True
44
  if any(k in t for k in FORBIDDEN_CMDLIKE):
45
  return True
46
- if any(s in text for s in FORBIDDEN_SUBSTR): # mantener mayúsculas para secuencias
47
  return True
48
  if FORBIDDEN_HEX_RE.search(text):
49
  return True
50
  return False
51
 
52
- # ---------------------------------------------------------------------------
53
- # Lee la API key de varias variables aceptadas (para que puedas usar OPENAI_API_KEY_ATAQUE)
54
  ENV_CANDIDATES = ["OPENAI_API_KEY", "OPENAI_API_KEY_ATAQUE", "OPENAI_APIKEY", "OPENAI_KEY", "HF_OPENAI_API_KEY"]
55
 
56
- def get_api_key() -> str | None:
57
  for name in ENV_CANDIDATES:
58
  v = os.environ.get(name)
59
  if v:
60
  return v
61
  return None
62
 
63
- def call_openai_responses(prompt: str, api_key: str, models=None, timeout: int = 20) -> Tuple[bool, str]:
64
  if models is None:
65
  models = OPENAI_MODEL_FALLBACK
66
  headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
 
 
67
  for model in models:
68
- payload = {"model": model, "input": prompt}
69
  try:
70
- r = requests.post(OPENAI_API_URL, headers=headers, json=payload, timeout=timeout)
71
- except Exception as e:
72
- return False, f"Error de conexión al llamar a la API: {e}"
73
- if r.status_code == 200:
74
- try:
75
  j = r.json()
76
  out = ""
77
  if "output" in j:
@@ -93,28 +91,49 @@ def call_openai_responses(prompt: str, api_key: str, models=None, timeout: int =
93
  out = "\n".join(parts).strip()
94
  elif isinstance(j["output"], str):
95
  out = j["output"].strip()
96
- if not out and "choices" in j and isinstance(j.get("choices"), list) and j["choices"]:
 
97
  ch = j["choices"][0]
98
  out = ch.get("text") or ch.get("message", {}).get("content", {}).get("text") or ""
99
  if not out:
100
  out = json.dumps(j, ensure_ascii=False)[:4000]
101
  return True, out
102
- except Exception as e:
103
- return False, f"Error parseando respuesta de la API: {e}"
104
- else:
105
- try:
106
- ej = r.json()
107
- msg = ej.get("error", {}).get("message") or ej.get("message") or r.text
108
- except Exception:
109
- msg = r.text
110
- if r.status_code == 401:
111
- return False, "AuthenticationError (401): OPENAI_API_KEY inválida o revocada."
112
- if r.status_code == 429:
113
- return False, "RateLimitError (429): límite superado en OpenAI."
114
- if isinstance(msg, str) and "model" in msg.lower():
115
- continue
116
- return False, f"HTTP {r.status_code}: {msg}"
117
- return False, "Ningún modelo disponible o permitido en la cuenta de OpenAI."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
118
 
119
  def safe_parse_json_from_model(text: str):
120
  try:
@@ -129,82 +148,90 @@ def safe_parse_json_from_model(text: str):
129
  return {"raw": text}
130
  return {"raw": text}
131
 
 
132
  def generate_simulation(user_input: str, include_iocs: bool, include_mitigation: bool):
133
- api_key = get_api_key()
134
- if not api_key:
135
- return ("<p style='color:crimson'><b>Error:</b> Falta la API key. "
136
- f"Añade una de estas variables en Settings → Variables and secrets: {', '.join(ENV_CANDIDATES)}.</p>", "")
137
-
138
- prompt = PROMPT_TEMPLATE.format(input=user_input)
139
- ok, out = call_openai_responses(prompt, api_key)
140
- if not ok:
141
- return f"<p style='color:crimson'><b>Error IA:</b> {out}</p>", ""
142
-
143
- if contains_forbidden(out):
144
- safe_msg = ("La respuesta original fue bloqueada por contener contenido sensible que podría ser instructivo para ataques. "
145
- "He realizado un bloqueo por seguridad. Intenta proporcionar más contexto defensivo o limpia el contenido y vuelve a intentarlo.")
146
- return f"<p style='color:crimson'><b>Contenido bloqueado por seguridad:</b></p><p>{safe_msg}</p>", ""
147
-
148
- parsed = safe_parse_json_from_model(out)
149
-
150
- html = []
151
- html.append("<h3>Simulación Red Team (alto nivel)</h3>")
152
- if isinstance(parsed, dict) and parsed.get("simulation"):
153
- html.append(f"<p><b>Simulación:</b> {parsed['simulation']}</p>")
154
- else:
155
- sim = parsed.get("simulation") if isinstance(parsed, dict) else None
156
- html.append(f"<p><b>Simulación:</b> {json.dumps(sim, ensure_ascii=False)}</p>")
157
-
158
- if include_iocs:
159
- html.append("<h4>Indicadores (IoCs) sugeridos</h4>")
160
- iocs = parsed.get("iocs") if isinstance(parsed, dict) else None
161
- if isinstance(iocs, list) and iocs:
162
- html.append("<ul>")
163
- for i in iocs:
164
- html.append(f"<li>{i}</li>")
165
- html.append("</ul>")
166
- else:
167
- html.append(f"<p>{json.dumps(iocs, ensure_ascii=False)}</p>")
168
-
169
- if include_mitigation:
170
- html.append("<h4>Contramedidas y mitigación</h4>")
171
- mit = parsed.get("mitigations") if isinstance(parsed, dict) else None
172
- if isinstance(mit, list) and mit:
173
- html.append("<ul>")
174
- for m in mit:
175
- html.append(f"<li>{m}</li>")
176
- html.append("</ul>")
177
  else:
178
- html.append(f"<p>{json.dumps(mit, ensure_ascii=False)}</p>")
179
-
180
- html.append("<p style='font-size:0.9em;color:#bbb'>Nota: esta simulación es de alto nivel y educativa. No proporciona instrucciones de ataque. Use para mejorar defensas y detección.</p>")
181
- return "\n".join(html), json.dumps(parsed, ensure_ascii=False, indent=2)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
182
 
183
  def generate_report(json_str: str, title: str = "Reporte Red Team") -> Tuple[str, str]:
184
- if not json_str:
185
- return "", ""
186
  try:
187
- parsed = json.loads(json_str) if isinstance(json_str, str) else json_str
188
- except Exception:
189
- parsed = {"raw": str(json_str)}
190
-
191
- timestamp = time.strftime("%Y%m%d_%H%M%S")
192
- filename = f"/mnt/data/redteam_report_{timestamp}.txt"
193
- with open(filename, 'w', encoding='utf-8') as f:
194
- f.write(f"{title}\nGenerated: {time.ctime()}\n\n")
195
- f.write("SIMULATION:\n")
196
- sim = parsed.get("simulation") if isinstance(parsed, dict) else None
197
- f.write((sim or "(no simulation)") + "\n\n")
198
- f.write("IOCS:\n")
199
- for i in (parsed.get("iocs") if isinstance(parsed, dict) and parsed.get("iocs") else []):
200
- f.write(f"- {i}\n")
201
- f.write("\nMITIGATIONS:\n")
202
- for m in (parsed.get("mitigations") if isinstance(parsed, dict) and parsed.get("mitigations") else []):
203
- f.write(f"- {m}\n")
204
- f.write("\nRAW:\n")
205
- f.write(json.dumps(parsed, ensure_ascii=False, indent=2))
206
- return filename, filename
 
 
 
 
 
207
 
 
208
  with gr.Blocks(analytics_enabled=False) as demo:
209
  gr.Markdown("## 🧯 Simulación Red Team (alto nivel) — Defender con IA")
210
  with gr.Row():
@@ -215,7 +242,7 @@ with gr.Blocks(analytics_enabled=False) as demo:
215
  btn = gr.Button("Simular ataque (alto nivel)")
216
  download_btn = gr.Button("Generar reporte (.txt)")
217
  with gr.Column(scale=5):
218
- out_html = gr.HTML("<i>Resultado aparecerá aquí</i>")
219
  last_json = gr.Textbox(visible=False)
220
  file_out = gr.File(label="Descargar reporte (.txt)", visible=False)
221
 
 
6
  import time
7
  import requests
8
  import gradio as gr
9
+ from typing import Tuple, List, Optional
10
 
11
+ OPENAI_API_URL_RESPONSES = "https://api.openai.com/v1/responses"
12
+ OPENAI_API_URL_CHAT = "https://api.openai.com/v1/chat/completions"
13
+ OPENAI_MODEL_FALLBACK = ["gpt-4o-mini", "gpt-4o"]
14
 
15
  PROMPT_TEMPLATE = """Eres un asistente de ciberseguridad (red-team simulator) que debe simular a alto nivel cómo un atacante podría aprovechar el contenido que se le entrega (email, URL o fragmento). NORMAS (obligatorias):
16
  1. Proporciona sólo un resumen del objetivo del atacante y del vector. NO entregues instrucciones técnicas, comandos, código de exploit, payloads, o pasos paso-a-paso para cometer un ataque.
 
26
  {input}
27
  """
28
 
29
+ # --------- Filtro seguro (ligero y robusto) ----------
30
  FORBIDDEN_KEYWORDS = [
31
  "exploit", "payload", "meterpreter", "msfconsole",
32
  "reverse shell", "sqlmap", "chmod", "chown", "exec "
33
  ]
34
  FORBIDDEN_CMDLIKE = ["curl ", "wget ", "sudo ", "bash -i", "nc ", "ncat ", "rm -rf"]
35
+ FORBIDDEN_SUBSTR = ["$(", "${", "\\x", "base64 -d"] # doble-escape para dejar \x literal
36
+ FORBIDDEN_HEX_RE = re.compile(r"0x[0-9a-fA-F]{2,}")
 
37
 
38
  def contains_forbidden(text: str) -> bool:
39
  if not text:
 
43
  return True
44
  if any(k in t for k in FORBIDDEN_CMDLIKE):
45
  return True
46
+ if any(s in text for s in FORBIDDEN_SUBSTR):
47
  return True
48
  if FORBIDDEN_HEX_RE.search(text):
49
  return True
50
  return False
51
 
52
+ # --------- Utilidades API ----------
 
53
  ENV_CANDIDATES = ["OPENAI_API_KEY", "OPENAI_API_KEY_ATAQUE", "OPENAI_APIKEY", "OPENAI_KEY", "HF_OPENAI_API_KEY"]
54
 
55
+ def get_api_key() -> Optional[str]:
56
  for name in ENV_CANDIDATES:
57
  v = os.environ.get(name)
58
  if v:
59
  return v
60
  return None
61
 
62
+ def call_openai_any(prompt: str, api_key: str, models=None, timeout: int = 25) -> Tuple[bool, str]:
63
  if models is None:
64
  models = OPENAI_MODEL_FALLBACK
65
  headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
66
+ last_error = None
67
+
68
  for model in models:
69
+ # 1) Intento con /v1/responses
70
  try:
71
+ r = requests.post(OPENAI_API_URL_RESPONSES, headers=headers, json={"model": model, "input": prompt}, timeout=timeout)
72
+ if r.status_code == 200:
 
 
 
73
  j = r.json()
74
  out = ""
75
  if "output" in j:
 
91
  out = "\n".join(parts).strip()
92
  elif isinstance(j["output"], str):
93
  out = j["output"].strip()
94
+ if not out and "choices" in j:
95
+ # algunos despliegues devuelven estilo choices
96
  ch = j["choices"][0]
97
  out = ch.get("text") or ch.get("message", {}).get("content", {}).get("text") or ""
98
  if not out:
99
  out = json.dumps(j, ensure_ascii=False)[:4000]
100
  return True, out
101
+ else:
102
+ try:
103
+ jr = r.json()
104
+ msg = jr.get("error", {}).get("message") or jr.get("message") or r.text
105
+ except Exception:
106
+ msg = r.text
107
+ last_error = f"Responses API (HTTP {r.status_code}): {msg}"
108
+ # si es 404/400/422 probamos chat inmediatamente
109
+ except Exception as e:
110
+ last_error = f"Responses API error: {e}"
111
+
112
+ # 2) Fallback a /v1/chat/completions
113
+ try:
114
+ payload = {"model": model, "messages": [{"role": "user", "content": prompt}], "temperature": 0.2}
115
+ rc = requests.post(OPENAI_API_URL_CHAT, headers=headers, json=payload, timeout=timeout)
116
+ if rc.status_code == 200:
117
+ jc = rc.json()
118
+ out = ""
119
+ if "choices" in jc and jc["choices"]:
120
+ msg = jc["choices"][0].get("message", {}).get("content")
121
+ if isinstance(msg, str):
122
+ out = msg.strip()
123
+ if not out:
124
+ out = json.dumps(jc, ensure_ascii=False)[:4000]
125
+ return True, out
126
+ else:
127
+ try:
128
+ jc = rc.json()
129
+ msg = jc.get("error", {}).get("message") or jc.get("message") or rc.text
130
+ except Exception:
131
+ msg = rc.text
132
+ last_error = f"Chat API (HTTP {rc.status_code}): {msg}"
133
+ except Exception as e:
134
+ last_error = f"Chat API error: {e}"
135
+
136
+ return False, (last_error or "No fue posible obtener respuesta del modelo.")
137
 
138
  def safe_parse_json_from_model(text: str):
139
  try:
 
148
  return {"raw": text}
149
  return {"raw": text}
150
 
151
+ # --------- Handlers ----------
152
  def generate_simulation(user_input: str, include_iocs: bool, include_mitigation: bool):
153
+ try:
154
+ api_key = get_api_key()
155
+ if not api_key:
156
+ return ("<p style='color:crimson'><b>Error:</b> Falta la API key. "
157
+ f"Añade una de estas variables en Settings → Variables and secrets: {', '.join(ENV_CANDIDATES)}.</p>", "")
158
+
159
+ prompt = PROMPT_TEMPLATE.format(input=user_input)
160
+ ok, out = call_openai_any(prompt, api_key)
161
+ if not ok:
162
+ return f"<p style='color:crimson'><b>Error IA:</b> {out}</p>", ""
163
+
164
+ if contains_forbidden(out):
165
+ safe_msg = ("La respuesta original fue bloqueada por contener contenido sensible que podría ser instructivo para ataques. "
166
+ "He realizado un bloqueo por seguridad. Intenta proporcionar más contexto defensivo o limpia el contenido y vuelve a intentarlo.")
167
+ return f"<p style='color:crimson'><b>Contenido bloqueado por seguridad:</b></p><p>{safe_msg}</p>", ""
168
+
169
+ parsed = safe_parse_json_from_model(out)
170
+
171
+ html = []
172
+ html.append("<h3>Simulación Red Team (alto nivel)</h3>")
173
+ if isinstance(parsed, dict) and parsed.get("simulation"):
174
+ html.append(f"<p><b>Simulación:</b> {parsed['simulation']}</p>")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
175
  else:
176
+ sim = parsed.get("simulation") if isinstance(parsed, dict) else None
177
+ html.append(f"<p><b>Simulación:</b> {json.dumps(sim, ensure_ascii=False)}</p>")
178
+
179
+ if include_iocs:
180
+ html.append("<h4>Indicadores (IoCs) sugeridos</h4>")
181
+ iocs = parsed.get("iocs") if isinstance(parsed, dict) else None
182
+ if isinstance(iocs, list) and iocs:
183
+ html.append("<ul>")
184
+ for i in iocs:
185
+ html.append(f"<li>{i}</li>")
186
+ html.append("</ul>")
187
+ else:
188
+ html.append(f"<p>{json.dumps(iocs, ensure_ascii=False)}</p>")
189
+
190
+ if include_mitigation:
191
+ html.append("<h4>Contramedidas y mitigación</h4>")
192
+ mit = parsed.get("mitigations") if isinstance(parsed, dict) else None
193
+ if isinstance(mit, list) and mit:
194
+ html.append("<ul>")
195
+ for m in mit:
196
+ html.append(f"<li>{m}</li>")
197
+ html.append("</ul>")
198
+ else:
199
+ html.append(f"<p>{json.dumps(mit, ensure_ascii=False)}</p>")
200
+
201
+ html.append("<p style='font-size:0.9em;color:#bbb'>Nota: esta simulación es de alto nivel y educativa. No proporciona instrucciones de ataque. Use para mejorar defensas y detección.</p>")
202
+ return "\n".join(html), json.dumps(parsed, ensure_ascii=False, indent=2)
203
+ except Exception as e:
204
+ return f"<p style='color:crimson'><b>Error inesperado:</b> {e}</p>", ""
205
 
206
  def generate_report(json_str: str, title: str = "Reporte Red Team") -> Tuple[str, str]:
 
 
207
  try:
208
+ if not json_str:
209
+ return "", ""
210
+ try:
211
+ parsed = json.loads(json_str) if isinstance(json_str, str) else json_str
212
+ except Exception:
213
+ parsed = {"raw": str(json_str)}
214
+
215
+ timestamp = time.strftime("%Y%m%d_%H%M%S")
216
+ filename = f"/mnt/data/redteam_report_{timestamp}.txt"
217
+ with open(filename, 'w', encoding='utf-8') as f:
218
+ f.write(f"{title}\nGenerated: {time.ctime()}\n\n")
219
+ f.write("SIMULATION:\n")
220
+ sim = parsed.get("simulation") if isinstance(parsed, dict) else None
221
+ f.write((sim or "(no simulation)") + "\n\n")
222
+ f.write("IOCS:\n")
223
+ for i in (parsed.get("iocs") if isinstance(parsed, dict) and parsed.get("iocs") else []):
224
+ f.write(f"- {i}\n")
225
+ f.write("\nMITIGATIONS:\n")
226
+ for m in (parsed.get("mitigations") if isinstance(parsed, dict) and parsed.get("mitigations") else []):
227
+ f.write(f"- {m}\n")
228
+ f.write("\nRAW:\n")
229
+ f.write(json.dumps(parsed, ensure_ascii=False, indent=2))
230
+ return filename, filename
231
+ except Exception as e:
232
+ return "", ""
233
 
234
+ # --------- UI ----------
235
  with gr.Blocks(analytics_enabled=False) as demo:
236
  gr.Markdown("## 🧯 Simulación Red Team (alto nivel) — Defender con IA")
237
  with gr.Row():
 
242
  btn = gr.Button("Simular ataque (alto nivel)")
243
  download_btn = gr.Button("Generar reporte (.txt)")
244
  with gr.Column(scale=5):
245
+ out_html = gr.HTML("Resultado aparecerá aquí")
246
  last_json = gr.Textbox(visible=False)
247
  file_out = gr.File(label="Descargar reporte (.txt)", visible=False)
248