""" ╔══════════════════════════════════════════════════════════╗ ║ SkyGuard CAPTCHA — Hugging Face Space ║ ║ نظام تحقق ذكي لمنع البوتات وهجمات DDoS ║ ╚══════════════════════════════════════════════════════════╝ الميزات: ✅ تحديات CAPTCHA (رياضيات + أسئلة عربية + صور ASCII) ✅ حظر IP لمدة 5 دقائق عند الفشل المتكرر ✅ كشف البوتات عبر User-Agent ✅ كشف DDoS (طلبات زائدة في دقيقة واحدة) ✅ توليد Token موقّع للتحقق من النجاح ✅ API للتحقق من صحة Token ✅ دعم تضمين iframe في المواقع الأخرى """ import gradio as gr import random import time import hashlib import hmac import os import json import re from collections import defaultdict from fastapi.responses import JSONResponse from fastapi import Request as FastAPIRequest # ════════════════════════════════════════════ # الإعدادات # ════════════════════════════════════════════ SECRET_KEY = os.environ.get("SECRET_KEY", "skyguard-secret-key-change-me-2024") BLOCK_DURATION = 300 # ثواني = 5 دقائق MAX_FAILURES = 3 # محاولات خاطئة قبل الحظر DDOS_THRESHOLD = 25 # طلب/دقيقة قبل اعتبارها DDoS TOKEN_TTL = 600 # صلاحية التوكن 10 دقائق # ════════════════════════════════════════════ # التخزين في الذاكرة # ════════════════════════════════════════════ ip_store = defaultdict(lambda: { "failures": 0, "blocked_until": 0.0, "req_times": [], # لكشف DDoS "total_reqs": 0, "flagged_bot": False, }) token_store: dict[str, float] = {} # token → expiry_unix # ════════════════════════════════════════════ # أنماط User-Agent للبوتات # ════════════════════════════════════════════ BOT_PATTERNS = [ r"bot\b", r"crawler", r"spider", r"scraper", r"\bwget\b", r"\bcurl\b", r"python-?requests", r"java\/", r"go-http-client", r"scrapy", r"phantomjs", r"headlesschrome", r"selenium", r"puppeteer", r"playwright", r"mechanize", r"libwww", r"httpclient", r"okhttp", r"axios\/", r"node-fetch", r"got\/", ] BOT_REGEX = re.compile("|".join(BOT_PATTERNS), re.IGNORECASE) # ════════════════════════════════════════════ # الدوال المساعدة # ════════════════════════════════════════════ def get_ip(request: gr.Request) -> str: """استخراج IP الحقيقي (يدعم CDN مثل Cloudflare)""" if not request: return "0.0.0.0" for h in ["cf-connecting-ip", "x-forwarded-for", "x-real-ip", "true-client-ip"]: val = request.headers.get(h, "") if val: return val.split(",")[0].strip() if hasattr(request, "client") and request.client: return request.client.host return "0.0.0.0" def get_ua(request: gr.Request) -> str: if not request: return "" return request.headers.get("user-agent", "") def is_bot_ua(ua: str) -> bool: """كشف بوت عبر User-Agent""" if not ua or len(ua) < 10: return True # بدون UA = مشبوه return bool(BOT_REGEX.search(ua)) def track_and_check_ddos(ip: str) -> bool: """ يضيف الطلب الحالي ويرجع True إذا تجاوز الحد (DDoS) """ now = time.time() store = ip_store[ip] store["req_times"] = [t for t in store["req_times"] if now - t < 60] store["req_times"].append(now) store["total_reqs"] += 1 if len(store["req_times"]) > DDOS_THRESHOLD: store["blocked_until"] = now + BLOCK_DURATION return True return False def blocked_status(ip: str) -> tuple[bool, int]: """(is_blocked, seconds_remaining)""" remaining = ip_store[ip]["blocked_until"] - time.time() return remaining > 0, max(0, int(remaining)) def fmt_remaining(secs: int) -> str: m, s = divmod(secs, 60) return f"{m}:{s:02d}" def sign_token(ip: str) -> str: """توليد توكن HMAC موقّع""" ts = str(int(time.time())) msg = f"{ip}|{ts}" sig = hmac.new(SECRET_KEY.encode(), msg.encode(), hashlib.sha256).hexdigest()[:24] token = f"{sig}.{ts}" token_store[token] = time.time() + TOKEN_TTL # تنظيف التوكنات المنتهية expired = [k for k, v in list(token_store.items()) if v < time.time()] for k in expired: del token_store[k] return token def verify_token_logic(token: str) -> dict: """التحقق من صحة التوكن""" now = time.time() if token in token_store and token_store[token] > now: return {"valid": True, "expires_in": int(token_store[token] - now)} return {"valid": False, "reason": "token_invalid_or_expired"} # ════════════════════════════════════════════ # توليد التحديات # ════════════════════════════════════════════ def make_math() -> tuple[str, str]: op = random.choice(["+", "-", "×"]) if op == "+": a, b = random.randint(8, 30), random.randint(8, 30) return f"{a} + {b} = ؟", str(a + b) elif op == "-": a = random.randint(20, 50) b = random.randint(1, a - 1) return f"{a} - {b} = ؟", str(a - b) else: a, b = random.randint(2, 12), random.randint(2, 12) return f"{a} × {b} = ؟", str(a * b) WORD_QA = [ ("كم عدد أيام الأسبوع؟", "7"), ("كم عدد شهور السنة؟", "12"), ("كم عدد أصابع اليدين معاً؟", "10"), ("كم ساعة في اليوم الواحد؟", "24"), ("كم دقيقة في الساعة الواحدة؟", "60"), ("كم ثانية في الدقيقة الواحدة؟", "60"), ("ما عاصمة المملكة العربية السعودية؟", "الرياض"), ("ما عاصمة الإمارات العربية المتحدة؟", "أبوظبي"), ("ما لون النبات عادةً؟", "أخضر"), ("ما لون السماء الصافية نهاراً؟", "أزرق"), ("كم عدد أيام شهر يناير؟", "31"), ("كم عدد أيام شهر فبراير في السنة العادية؟", "28"), ] SEQUENCE_QA = [ ("أكمل: 2، 4، 6، 8، ؟", "10"), ("أكمل: 5، 10، 15، 20، ؟", "25"), ("أكمل: 1، 3، 5، 7، ؟", "9"), ("أكمل: 10، 20، 30، ؟", "40"), ("أكمل: 3، 6، 9، 12، ؟", "15"), ("أكمل: 100، 90، 80، 70، ؟", "60"), ] def make_challenge() -> tuple[str, str, str]: """→ (icon, question, answer)""" r = random.random() if r < 0.45: q, a = make_math() return "🧮", q, a elif r < 0.75: q, a = random.choice(WORD_QA) return "❓", q, a else: q, a = random.choice(SEQUENCE_QA) return "🔢", q, a # ════════════════════════════════════════════ # واجهة المستخدم — Gradio # ════════════════════════════════════════════ CSS = """ @import url('https://fonts.googleapis.com/css2?family=Tajawal:wght@400;500;700;800&display=swap'); * { box-sizing: border-box; margin: 0; padding: 0; } body, .gradio-container { font-family: 'Tajawal', sans-serif !important; background: #0a0e1a !important; direction: rtl; } .captcha-wrap { max-width: 480px; margin: 40px auto; padding: 0 16px; } .captcha-card { background: linear-gradient(145deg, #111827, #1a2235); border: 1px solid rgba(99,179,237,0.2); border-radius: 20px; padding: 36px 32px 28px; box-shadow: 0 20px 60px rgba(0,0,0,0.6), 0 0 0 1px rgba(255,255,255,0.03); position: relative; overflow: hidden; } .captcha-card::before { content: ''; position: absolute; top: -60px; right: -60px; width: 200px; height: 200px; background: radial-gradient(circle, rgba(99,179,237,0.08) 0%, transparent 70%); pointer-events: none; } .captcha-header { display: flex; align-items: center; gap: 12px; margin-bottom: 28px; padding-bottom: 20px; border-bottom: 1px solid rgba(255,255,255,0.06); } .captcha-shield { font-size: 32px; filter: drop-shadow(0 0 12px rgba(99,179,237,0.5)); } .captcha-title { color: #e2e8f0; font-size: 20px; font-weight: 800; letter-spacing: -0.3px; } .captcha-subtitle { color: #64748b; font-size: 13px; font-weight: 400; margin-top: 2px; } .challenge-box { background: rgba(99,179,237,0.05); border: 1px solid rgba(99,179,237,0.15); border-radius: 14px; padding: 24px 20px; margin-bottom: 20px; text-align: center; } .challenge-icon { font-size: 28px; margin-bottom: 10px; display: block; } .challenge-text { color: #93c5fd; font-size: 22px; font-weight: 700; letter-spacing: 0.5px; } .answer-field input { background: rgba(255,255,255,0.05) !important; border: 1.5px solid rgba(99,179,237,0.25) !important; border-radius: 10px !important; color: #e2e8f0 !important; font-family: 'Tajawal', sans-serif !important; font-size: 16px !important; padding: 12px 16px !important; text-align: center !important; transition: border-color 0.2s, box-shadow 0.2s; } .answer-field input:focus { border-color: #63b3ed !important; box-shadow: 0 0 0 3px rgba(99,179,237,0.15) !important; outline: none !important; } .answer-field label { color: #94a3b8 !important; font-family: 'Tajawal', sans-serif !important; font-size: 14px !important; } .verify-btn button { width: 100% !important; background: linear-gradient(135deg, #3b82f6, #2563eb) !important; border: none !important; border-radius: 10px !important; color: white !important; font-family: 'Tajawal', sans-serif !important; font-size: 16px !important; font-weight: 700 !important; padding: 12px !important; cursor: pointer !important; transition: transform 0.15s, box-shadow 0.15s !important; box-shadow: 0 4px 15px rgba(59,130,246,0.4) !important; margin-top: 16px !important; } .verify-btn button:hover { transform: translateY(-1px) !important; box-shadow: 0 6px 20px rgba(59,130,246,0.5) !important; } .reload-btn button { width: 100% !important; background: transparent !important; border: 1px solid rgba(255,255,255,0.1) !important; border-radius: 10px !important; color: #64748b !important; font-family: 'Tajawal', sans-serif !important; font-size: 14px !important; padding: 9px !important; cursor: pointer !important; margin-top: 10px !important; transition: all 0.2s !important; } .reload-btn button:hover { border-color: rgba(99,179,237,0.3) !important; color: #93c5fd !important; } .status-area textarea, .status-area input { background: transparent !important; border: none !important; border-radius: 0 !important; color: transparent !important; height: 0 !important; padding: 0 !important; display: none !important; } .result-area { margin-top: 16px; min-height: 20px; } .result-area textarea { background: transparent !important; border: none !important; color: #64748b !important; font-family: 'Tajawal', sans-serif !important; font-size: 14px !important; text-align: center !important; resize: none !important; min-height: 40px !important; } .footer-info { display: flex; justify-content: space-between; align-items: center; margin-top: 20px; padding-top: 16px; border-top: 1px solid rgba(255,255,255,0.05); } .footer-brand { color: #334155; font-size: 12px; } .footer-secure { color: #334155; font-size: 12px; display: flex; align-items: center; gap: 5px; } /* Status colors via JS classes */ .status-success { color: #4ade80 !important; } .status-error { color: #f87171 !important; } .status-warn { color: #fbbf24 !important; } .status-info { color: #93c5fd !important; } /* Hide Gradio default footer/header */ footer, .svelte-1rjryqp { display: none !important; } #component-0 { padding: 0 !important; } """ JS_POST_MESSAGE = """ """ # ════════════════════════════════════════════ # منطق الحالة # ════════════════════════════════════════════ def on_load(request: gr.Request): """ يُستدعى عند تحميل الصفحة — يولّد التحدي أو يعرض رسالة الحظر Returns: (challenge_html, answer_input, submit_btn, reload_btn, hidden_answer, hidden_ip, result_text) """ ip = get_ip(request) ua = get_ua(request) # كشف DDoS if track_and_check_ddos(ip): return _blocked_response(ip, "🚨 هجوم DDoS محتمل — تم حظرك تلقائياً") # كشف بوت if is_bot_ua(ua): ip_store[ip]["blocked_until"] = time.time() + BLOCK_DURATION ip_store[ip]["flagged_bot"] = True return _blocked_response(ip, "🤖 تم اكتشافك كبوت تلقائي") # فحص الحظر blocked, remaining = blocked_status(ip) if blocked: return _blocked_response(ip, f"🔒 محظور — انتظر {fmt_remaining(remaining)} دقيقة") return _challenge_response(ip) def on_submit(user_ans: str, hidden_ans: str, hidden_ip: str, request: gr.Request): """التحقق من الإجابة""" ip = get_ip(request) blocked, remaining = blocked_status(ip) if blocked: msg = f"🔒 محظور لمدة {fmt_remaining(remaining)} دقيقة أخرى" return ( gr.update(), gr.update(value=""), gr.update(value=f"BLOCKED:{msg}"), hidden_ip ) correct = hidden_ans.strip().lower() given = (user_ans or "").strip().lower() if not given: return gr.update(), gr.update(), gr.update(value="⚠️ الرجاء إدخال الإجابة"), hidden_ip if given == correct: # نجاح ip_store[ip]["failures"] = 0 token = sign_token(ip) html = _success_html(token) return ( gr.update(value=html), gr.update(value="", visible=False), gr.update(value=f"TOKEN:{token}"), hidden_ip ) else: # فشل ip_store[ip]["failures"] += 1 fails = ip_store[ip]["failures"] if fails >= MAX_FAILURES: ip_store[ip]["blocked_until"] = time.time() + BLOCK_DURATION ip_store[ip]["failures"] = 0 msg = f"🔒 تم حظرك لمدة 5 دقائق بعد {MAX_FAILURES} محاولات فاشلة" return ( gr.update(value=_blocked_html(msg)), gr.update(value=""), gr.update(value=f"BLOCKED:{msg}"), hidden_ip ) left = MAX_FAILURES - fails return ( gr.update(), gr.update(value=""), gr.update(value=f"❌ إجابة خاطئة — تبقى {left} محاولة/محاولات"), hidden_ip ) def on_reload(request: gr.Request): ip = get_ip(request) blocked, remaining = blocked_status(ip) if blocked: return _blocked_response(ip, f"🔒 لا يزال محظوراً — {fmt_remaining(remaining)} دقيقة متبقية") return _challenge_response(ip) # ════════════════════════════════════════════ # مساعدو HTML # ════════════════════════════════════════════ def _challenge_html(icon: str, question: str) -> str: return f"""