Spaces:
Sleeping
Sleeping
| # app_fixed.py | |
| # Phishing detector - improved version with heuristics, SPF/DKIM checks and optional OpenAI integration. | |
| # Requires: gradio, tldextract, dnspython, dkimpy, requests | |
| import os | |
| import re | |
| import json | |
| import traceback | |
| import requests | |
| import tldextract | |
| import gradio as gr | |
| import dns.resolver | |
| import dkim | |
| from email import policy | |
| from email.parser import BytesParser | |
| from typing import List, Dict, Any | |
| # ---------------------------- | |
| # Config | |
| # ---------------------------- | |
| SHORTENER_DOMAINS = { | |
| "bit.ly", "t.co", "tinyurl.com", "goo.gl", "ow.ly", "is.gd", "buff.ly", | |
| "shorturl.at", "rb.gy", "tiny.one", "clk.im" | |
| } | |
| # Model fallback list: try in order | |
| OPENAI_MODEL_FALLBACK = [ | |
| "gpt-4o-mini", | |
| "gpt-4o", | |
| "gpt-5-mini", | |
| ] | |
| OPENAI_API_URL = "https://api.openai.com/v1/responses" | |
| # ---------------------------- | |
| # Utilities | |
| # ---------------------------- | |
| def extract_links(text: str) -> List[str]: | |
| url_re = re.compile(r"(?i)\bhttps?://[^\s<>'\)\]]+") | |
| found = set() | |
| for m in url_re.finditer(text or ""): | |
| url = m.group(0).rstrip('.,:;\")') | |
| found.add(url) | |
| return sorted(found) | |
| def url_hostname(url: str) -> str: | |
| try: | |
| parsed = tldextract.extract(url) | |
| if parsed.domain: | |
| if parsed.suffix: | |
| return f"{parsed.domain}.{parsed.suffix}" | |
| return parsed.domain | |
| m = re.match(r"https?://([^/]+)", url) | |
| return m.group(1).lower() if m else url | |
| except Exception: | |
| return url | |
| def is_shortener(url: str) -> bool: | |
| host = url_hostname(url) | |
| return any(host.endswith(s) for s in SHORTENER_DOMAINS) | |
| def contains_ip(url: str) -> bool: | |
| return bool(re.search(r"https?://(\d{1,3}(?:\.\d{1,3}){3})", url)) | |
| def contains_urgent_language(text: str) -> bool: | |
| urgent_re = re.compile(r"\b(urgente|inmediatamente|verifique|actualice|pago|riesgo|suspendido|caduca|vencimiento|bloqueado|atenci[oó]n|urgencia)\b", re.I) | |
| return bool(urgent_re.search(text or "")) | |
| # ---------------------------- | |
| # Email parsing & checks | |
| # ---------------------------- | |
| def parse_email_raw(raw_text: str) -> Dict[str, Any]: | |
| """Try to parse headers and body from a raw email text. Returns dict.""" | |
| out = {"from": None, "reply_to": None, "subject": None, "body": raw_text, "raw_bytes": None} | |
| try: | |
| # Ensure bytes for the BytesParser | |
| if isinstance(raw_text, str): | |
| raw_bytes = raw_text.encode('utf-8', errors='ignore') | |
| else: | |
| raw_bytes = raw_text | |
| out['raw_bytes'] = raw_bytes | |
| parser = BytesParser(policy=policy.default) | |
| try: | |
| msg = parser.parsebytes(raw_bytes) | |
| except Exception: | |
| msg = None | |
| if msg: | |
| out['from'] = str(msg.get('From') or "").strip() | |
| out['reply_to'] = str(msg.get('Reply-To') or "").strip() | |
| out['subject'] = str(msg.get('Subject') or "").strip() | |
| # get body (prefer plain) | |
| if msg.is_multipart(): | |
| parts = [] | |
| for part in msg.walk(): | |
| ctype = part.get_content_type() | |
| disp = str(part.get_content_disposition() or "") | |
| if ctype == 'text/plain' and disp != 'attachment': | |
| try: | |
| parts.append(part.get_content()) | |
| except Exception: | |
| parts.append(part.get_payload(decode=True).decode('utf-8', errors='ignore')) | |
| out['body'] = "\n\n".join(p for p in parts if p) | |
| if not out['body']: | |
| # fallback to first text part | |
| for part in msg.walk(): | |
| if part.get_content_type().startswith('text/'): | |
| try: | |
| out['body'] = part.get_content() | |
| break | |
| except: | |
| pass | |
| else: | |
| try: | |
| out['body'] = msg.get_content() | |
| except: | |
| out['body'] = msg.get_payload(decode=True).decode('utf-8', errors='ignore') if msg.get_payload(decode=True) else raw_text | |
| except Exception as e: | |
| print("PARSE RAW ERROR:", repr(e)) | |
| traceback.print_exc() | |
| return out | |
| def spf_check(ip: str, domain: str) -> Dict[str, Any]: | |
| """Simple SPF presence check: queries TXT records for the domain and returns if spf record found.""" | |
| try: | |
| answers = dns.resolver.resolve(domain, 'TXT', lifetime=5) | |
| txts = [b"".join(r.strings).decode('utf-8', errors='ignore') for r in answers] | |
| spf = [t for t in txts if t.lower().startswith('v=spf1')] | |
| return {"ok": bool(spf), "records": txts} | |
| except Exception as e: | |
| return {"ok": False, "error": str(e)} | |
| def dkim_check(raw_bytes: bytes) -> Dict[str, Any]: | |
| """Attempt DKIM verification using dkimpy; returns result dict.""" | |
| try: | |
| # dkim.verify expects full message bytes | |
| res = dkim.verify(raw_bytes) | |
| return {"ok": bool(res)} | |
| except Exception as e: | |
| return {"ok": False, "error": str(e)} | |
| # ---------------------------- | |
| # Heuristics | |
| # ---------------------------- | |
| def analyze_heuristics(raw_text: str, from_header: str = "") -> Dict[str, Any]: | |
| links = extract_links(raw_text) | |
| reasons = [] | |
| score = 0 | |
| # domain mismatch | |
| from_dom = "" | |
| if from_header: | |
| m = re.search(r"@([\w\.-]+)", from_header) | |
| from_dom = m.group(1).lower() if m else "" | |
| for u in links: | |
| host = url_hostname(u) | |
| if from_dom and host and from_dom not in host: | |
| reasons.append("Dominio de enlaces distinto al dominio del remitente") | |
| score += 20 | |
| break | |
| if any(contains_ip(u) for u in links): | |
| reasons.append("Enlaces con IP en vez de dominio") | |
| score += 20 | |
| if any(is_shortener(u) for u in links): | |
| reasons.append("Enlace acortado sospechoso") | |
| score += 15 | |
| if contains_urgent_language(raw_text): | |
| reasons.append("Lenguaje de urgencia / presión") | |
| score += 15 | |
| if re.search(r'\.(exe|scr|bat|cmd|msi|zip)\b', raw_text, re.I): | |
| reasons.append("Adjunto ejecutable o extensión peligrosa detectada") | |
| score += 15 | |
| # reply-to different | |
| m_reply = re.search(r"Reply-To:\s*(.+)", raw_text, re.I) | |
| m_from = re.search(r"From:\s*(.+)", raw_text, re.I) | |
| if m_reply and m_from: | |
| reply = m_reply.group(1).strip() | |
| frm = m_from.group(1).strip() | |
| if reply and frm and (reply.lower() not in frm.lower()): | |
| reasons.append("Reply-To diferente al From") | |
| score += 10 | |
| # normalize | |
| score = max(0, min(100, score)) | |
| return {"score": score, "reasons": reasons, "links": links, "from_domain": from_dom} | |
| # ---------------------------- | |
| # OpenAI helper with fallbacks & robust error messages | |
| # ---------------------------- | |
| def call_openai(prompt_text: str, api_key: str, models=None, timeout=20): | |
| if models is None: | |
| models = OPENAI_MODEL_FALLBACK | |
| headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} | |
| for model in models: | |
| payload = {"model": model, "input": prompt_text} | |
| try: | |
| resp = requests.post(OPENAI_API_URL, headers=headers, json=payload, timeout=timeout) | |
| except Exception as e: | |
| print("AI CALL ERROR (connection):", repr(e)) | |
| traceback.print_exc() | |
| return False, f"Error de conexión: {e}" | |
| if resp.status_code == 200: | |
| try: | |
| j = resp.json() | |
| # extract output text from Responses API | |
| out = "" | |
| if "output" in j: | |
| if isinstance(j["output"], list): | |
| parts = [] | |
| for item in j["output"]: | |
| if isinstance(item, dict): | |
| c = item.get("content") or item.get("text") or item.get("output_text") | |
| if isinstance(c, str): | |
| parts.append(c) | |
| elif isinstance(c, list): | |
| for el in c: | |
| if isinstance(el, dict): | |
| txt = el.get("text") or el.get("output_text") or el.get("content") | |
| if txt: | |
| parts.append(str(txt)) | |
| else: | |
| parts.append(str(el)) | |
| out = "\n\n".join(parts).strip() | |
| elif isinstance(j["output"], str): | |
| out = j["output"].strip() | |
| if not out and "choices" in j and isinstance(j.get("choices"), list) and j["choices"]: | |
| ch = j["choices"][0] | |
| out = ch.get("text") or ch.get("message", {}).get("content", {}).get("text") or "" | |
| if not out: | |
| out = json.dumps(j, ensure_ascii=False)[:4000] | |
| return True, out | |
| except Exception as e: | |
| print("AI CALL ERROR (parse):", repr(e)) | |
| traceback.print_exc() | |
| return False, f"Error al parsear respuesta de OpenAI: {e}" | |
| else: | |
| try: | |
| err_json = resp.json() | |
| except Exception: | |
| err_json = {"status_code": resp.status_code, "text": resp.text} | |
| print(f"AI CALL HTTP ERROR model={model}: status={resp.status_code} body={str(err_json)[:1000]}") | |
| if resp.status_code == 401: | |
| return False, "AuthenticationError (401): clave inválida o revocada. Revoca y crea una nueva en platform.openai.com" | |
| if resp.status_code == 429: | |
| return False, "RateLimitError (429): cuota superada o límite de velocidad en OpenAI." | |
| # model not found? try next | |
| msg = "" | |
| if isinstance(err_json, dict): | |
| msg = err_json.get("error", {}).get("message") or err_json.get("message") or str(err_json) | |
| if msg and "model" in msg.lower(): | |
| # try next model | |
| continue | |
| return False, f"Error HTTP {resp.status_code} al llamar a OpenAI: {msg or resp.text}" | |
| return False, "Ningún modelo disponible o permitido en la cuenta de OpenAI." | |
| # ---------------------------- | |
| # Main analyze function | |
| # ---------------------------- | |
| def analyze_email(raw_text: str, use_ai: bool = False, do_spf: bool = False, do_dkim: bool = False) -> Dict[str, Any]: | |
| result = {"heuristic": None, "spf": None, "dkim": None, "ai": None} | |
| try: | |
| parsed = parse_email_raw(raw_text or "") | |
| heur = analyze_heuristics(parsed.get('body', raw_text), parsed.get('from') or parsed.get('reply_to') or "") | |
| result['heuristic'] = heur | |
| # technical checks | |
| # SPF: try to extract an IP from Received headers (simple heuristic) | |
| if do_spf: | |
| # find first Received header IP | |
| m = re.search(r"Received: .*\[?(\d{1,3}(?:\.\d{1,3}){3})\]?", raw_text or "", re.I) | |
| ip = m.group(1) if m else None | |
| domain = heur.get('from_domain') or (parsed.get('from') and re.search(r"@([\w\.-]+)", parsed.get('from')) and re.search(r"@([\w\.-]+)", parsed.get('from')).group(1)) | |
| if domain and ip: | |
| result['spf'] = spf_check(ip, domain) | |
| else: | |
| result['spf'] = {"ok": False, "error": "No se pudo extraer IP o dominio para SPF"} | |
| if do_dkim: | |
| raw_bytes = parsed.get('raw_bytes') | |
| if raw_bytes: | |
| result['dkim'] = dkim_check(raw_bytes) | |
| else: | |
| result['dkim'] = {"ok": False, "error": "No raw bytes disponibles para DKIM"} | |
| # AI | |
| if use_ai: | |
| key = os.environ.get('OPENAI_API_KEY') | |
| if not key: | |
| result['ai'] = {"error": "OPENAI_API_KEY no configurada en Settings → Variables and secrets."} | |
| else: | |
| prompt = ( | |
| "Eres un detector de phishing. Recibiste este correo (incluye cabeceras y cuerpo):\n\n" + | |
| (raw_text or "") + | |
| "\n\nResponde con JSON válido con campos: verdict ('phishing'|'suspicious'|'legitimate'), score (float 0-1), reasons (lista de strings). SOLO devuelve JSON puro." | |
| ) | |
| ok, out = call_openai(prompt, key) | |
| if not ok: | |
| result['ai'] = {"error": out} | |
| else: | |
| # try to parse json | |
| parsed_ai = None | |
| try: | |
| parsed_ai = json.loads(out) | |
| except Exception: | |
| # try to find JSON substring | |
| s = out.find('{') | |
| e = out.rfind('}') | |
| if s != -1 and e != -1 and e > s: | |
| try: | |
| parsed_ai = json.loads(out[s:e+1]) | |
| except Exception: | |
| parsed_ai = {"raw": out} | |
| else: | |
| parsed_ai = {"raw": out} | |
| result['ai'] = parsed_ai | |
| return result | |
| except Exception as e: | |
| print("ANALYZE ERROR:", repr(e)) | |
| traceback.print_exc() | |
| return {"error": True, "message": str(e)} | |
| # ---------------------------- | |
| # UI | |
| # ---------------------------- | |
| def format_result_html(res: Dict[str, Any]) -> str: | |
| if res.get('error'): | |
| return f"<b>Error:</b> {res.get('message')}" | |
| parts = [] | |
| heur = res.get('heuristic') or {} | |
| parts.append(f"<h3>Resultado del análisis</h3>") | |
| parts.append(f"<b>Riesgo heurístico:</b> {heur.get('score',0)}%") | |
| parts.append("<h4>Heurísticas</h4>") | |
| if heur.get('reasons'): | |
| parts.append("<ul>") | |
| for r in heur.get('reasons'): | |
| parts.append(f"<li>{r}</li>") | |
| parts.append("</ul>") | |
| else: | |
| parts.append("<p>No se detectaron heurísticas sospechosas.</p>") | |
| parts.append("<h4>Enlaces detectados</h4>") | |
| links = heur.get('links') or [] | |
| if links: | |
| parts.append("<ul>") | |
| for u in links: | |
| parts.append(f"<li><a href=\"{u}\" target=\"_blank\">{u}</a></li>") | |
| parts.append("</ul>") | |
| else: | |
| parts.append("<p>-</p>") | |
| parts.append("<h4>Comprobaciones técnicas</h4>") | |
| if res.get('spf') is not None: | |
| spf = res['spf'] | |
| if spf.get('ok'): | |
| parts.append(f"<p>SPF: <b>Encontrado</b> (registros: {len(spf.get('records',[]))})</p>") | |
| else: | |
| parts.append(f"<p>SPF: <b>No verificado</b> - {spf.get('error') or ''}</p>") | |
| if res.get('dkim') is not None: | |
| d = res['dkim'] | |
| if d.get('ok'): | |
| parts.append("<p>DKIM: <b>Firma válida</b></p>") | |
| else: | |
| parts.append(f"<p>DKIM: <b>No válido</b> - {d.get('error') or ''}</p>") | |
| parts.append("<h4>Veredicto IA</h4>") | |
| if res.get('ai') is None: | |
| parts.append("<p>IA no activada.</p>") | |
| elif isinstance(res.get('ai'), dict) and res.get('ai').get('error'): | |
| parts.append(f"<p style='color:crimson;'><b>Error IA:</b> {res['ai'].get('error')}</p>") | |
| else: | |
| parts.append("<pre style='white-space:pre-wrap;background:#111;padding:10px;border-radius:6px;color:#d6d6d6;'>") | |
| parts.append(json.dumps(res.get('ai'), ensure_ascii=False, indent=2)) | |
| parts.append("</pre>") | |
| return '\\n'.join(parts) | |
| with gr.Blocks(css=".gradio-container .output_html { color: #ddd; }", analytics_enabled=False) as demo: | |
| gr.Markdown("## 🔎 Detector de Phishing — Mejorado (heurísticas + SPF/DKIM + OpenAI opcional)") | |
| with gr.Row(): | |
| with gr.Column(scale=7): | |
| inp = gr.Textbox(label="Correo (RAW o contenido)", lines=20, placeholder="Pega aquí el correo (ideal: RAW con cabeceras)") | |
| use_ai = gr.Checkbox(label="Usar IA (OpenAI)", value=False) | |
| do_spf = gr.Checkbox(label="Comprobar SPF (intentará extraer IP desde Received)", value=False) | |
| do_dkim = gr.Checkbox(label="Comprobar DKIM (si pegas el RAW completo)", value=False) | |
| btn = gr.Button("Analizar") | |
| with gr.Column(scale=5): | |
| out_html = gr.HTML("<i>Resultado aparecerá aquí</i>") | |
| def run(raw, use_ai_flag, spf_flag, dkim_flag): | |
| res = analyze_email(raw or "", use_ai=bool(use_ai_flag), do_spf=bool(spf_flag), do_dkim=bool(dkim_flag)) | |
| return format_result_html(res) | |
| btn.click(run, inputs=[inp, use_ai, do_spf, do_dkim], outputs=[out_html]) | |
| if __name__ == '__main__': | |
| demo.launch(server_name='0.0.0.0', server_port=7860) | |