tst / app.py
Skydata001's picture
Upload 3 files
5340f1e verified
"""
โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—
โ•‘ SkyGuard CAPTCHA โ€” Hugging Face Space โ•‘
โ•‘ ู†ุธุงู… ุชุญู‚ู‚ ุฐูƒูŠ ู„ู…ู†ุน ุงู„ุจูˆุชุงุช ูˆู‡ุฌู…ุงุช DDoS โ•‘
โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
ุงู„ู…ูŠุฒุงุช:
โœ… ุชุญุฏูŠุงุช CAPTCHA (ุฑูŠุงุถูŠุงุช + ุฃุณุฆู„ุฉ ุนุฑุจูŠุฉ + ุตูˆุฑ ASCII)
โœ… ุญุธุฑ IP ู„ู…ุฏุฉ 5 ุฏู‚ุงุฆู‚ ุนู†ุฏ ุงู„ูุดู„ ุงู„ู…ุชูƒุฑุฑ
โœ… ูƒุดู ุงู„ุจูˆุชุงุช ุนุจุฑ User-Agent
โœ… ูƒุดู DDoS (ุทู„ุจุงุช ุฒุงุฆุฏุฉ ููŠ ุฏู‚ูŠู‚ุฉ ูˆุงุญุฏุฉ)
โœ… ุชูˆู„ูŠุฏ Token ู…ูˆู‚ู‘ุน ู„ู„ุชุญู‚ู‚ ู…ู† ุงู„ู†ุฌุงุญ
โœ… API ู„ู„ุชุญู‚ู‚ ู…ู† ุตุญุฉ Token
โœ… ุฏุนู… ุชุถู…ูŠู† iframe ููŠ ุงู„ู…ูˆุงู‚ุน ุงู„ุฃุฎุฑู‰
"""
import gradio as gr
import random
import time
import hashlib
import hmac
import os
import json
import re
from collections import defaultdict
from fastapi.responses import JSONResponse
from fastapi import Request as FastAPIRequest
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ุงู„ุฅุนุฏุงุฏุงุช
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
SECRET_KEY = os.environ.get("SECRET_KEY", "skyguard-secret-key-change-me-2024")
BLOCK_DURATION = 300 # ุซูˆุงู†ูŠ = 5 ุฏู‚ุงุฆู‚
MAX_FAILURES = 3 # ู…ุญุงูˆู„ุงุช ุฎุงุทุฆุฉ ู‚ุจู„ ุงู„ุญุธุฑ
DDOS_THRESHOLD = 25 # ุทู„ุจ/ุฏู‚ูŠู‚ุฉ ู‚ุจู„ ุงุนุชุจุงุฑู‡ุง DDoS
TOKEN_TTL = 600 # ุตู„ุงุญูŠุฉ ุงู„ุชูˆูƒู† 10 ุฏู‚ุงุฆู‚
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ุงู„ุชุฎุฒูŠู† ููŠ ุงู„ุฐุงูƒุฑุฉ
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
ip_store = defaultdict(lambda: {
"failures": 0,
"blocked_until": 0.0,
"req_times": [], # ู„ูƒุดู DDoS
"total_reqs": 0,
"flagged_bot": False,
})
token_store: dict[str, float] = {} # token โ†’ expiry_unix
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ุฃู†ู…ุงุท User-Agent ู„ู„ุจูˆุชุงุช
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
BOT_PATTERNS = [
r"bot\b", r"crawler", r"spider", r"scraper",
r"\bwget\b", r"\bcurl\b", r"python-?requests",
r"java\/", r"go-http-client", r"scrapy",
r"phantomjs", r"headlesschrome", r"selenium",
r"puppeteer", r"playwright", r"mechanize",
r"libwww", r"httpclient", r"okhttp",
r"axios\/", r"node-fetch", r"got\/",
]
BOT_REGEX = re.compile("|".join(BOT_PATTERNS), re.IGNORECASE)
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ุงู„ุฏูˆุงู„ ุงู„ู…ุณุงุนุฏุฉ
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def get_ip(request: gr.Request) -> str:
"""ุงุณุชุฎุฑุงุฌ IP ุงู„ุญู‚ูŠู‚ูŠ (ูŠุฏุนู… CDN ู…ุซู„ Cloudflare)"""
if not request:
return "0.0.0.0"
for h in ["cf-connecting-ip", "x-forwarded-for", "x-real-ip", "true-client-ip"]:
val = request.headers.get(h, "")
if val:
return val.split(",")[0].strip()
if hasattr(request, "client") and request.client:
return request.client.host
return "0.0.0.0"
def get_ua(request: gr.Request) -> str:
if not request:
return ""
return request.headers.get("user-agent", "")
def is_bot_ua(ua: str) -> bool:
"""ูƒุดู ุจูˆุช ุนุจุฑ User-Agent"""
if not ua or len(ua) < 10:
return True # ุจุฏูˆู† UA = ู…ุดุจูˆู‡
return bool(BOT_REGEX.search(ua))
def track_and_check_ddos(ip: str) -> bool:
"""
ูŠุถูŠู ุงู„ุทู„ุจ ุงู„ุญุงู„ูŠ ูˆูŠุฑุฌุน True ุฅุฐุง ุชุฌุงูˆุฒ ุงู„ุญุฏ (DDoS)
"""
now = time.time()
store = ip_store[ip]
store["req_times"] = [t for t in store["req_times"] if now - t < 60]
store["req_times"].append(now)
store["total_reqs"] += 1
if len(store["req_times"]) > DDOS_THRESHOLD:
store["blocked_until"] = now + BLOCK_DURATION
return True
return False
def blocked_status(ip: str) -> tuple[bool, int]:
"""(is_blocked, seconds_remaining)"""
remaining = ip_store[ip]["blocked_until"] - time.time()
return remaining > 0, max(0, int(remaining))
def fmt_remaining(secs: int) -> str:
m, s = divmod(secs, 60)
return f"{m}:{s:02d}"
def sign_token(ip: str) -> str:
"""ุชูˆู„ูŠุฏ ุชูˆูƒู† HMAC ู…ูˆู‚ู‘ุน"""
ts = str(int(time.time()))
msg = f"{ip}|{ts}"
sig = hmac.new(SECRET_KEY.encode(), msg.encode(), hashlib.sha256).hexdigest()[:24]
token = f"{sig}.{ts}"
token_store[token] = time.time() + TOKEN_TTL
# ุชู†ุธูŠู ุงู„ุชูˆูƒู†ุงุช ุงู„ู…ู†ุชู‡ูŠุฉ
expired = [k for k, v in list(token_store.items()) if v < time.time()]
for k in expired:
del token_store[k]
return token
def verify_token_logic(token: str) -> dict:
"""ุงู„ุชุญู‚ู‚ ู…ู† ุตุญุฉ ุงู„ุชูˆูƒู†"""
now = time.time()
if token in token_store and token_store[token] > now:
return {"valid": True, "expires_in": int(token_store[token] - now)}
return {"valid": False, "reason": "token_invalid_or_expired"}
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ุชูˆู„ูŠุฏ ุงู„ุชุญุฏูŠุงุช
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def make_math() -> tuple[str, str]:
op = random.choice(["+", "-", "ร—"])
if op == "+":
a, b = random.randint(8, 30), random.randint(8, 30)
return f"{a} + {b} = ุŸ", str(a + b)
elif op == "-":
a = random.randint(20, 50)
b = random.randint(1, a - 1)
return f"{a} - {b} = ุŸ", str(a - b)
else:
a, b = random.randint(2, 12), random.randint(2, 12)
return f"{a} ร— {b} = ุŸ", str(a * b)
WORD_QA = [
("ูƒู… ุนุฏุฏ ุฃูŠุงู… ุงู„ุฃุณุจูˆุนุŸ", "7"),
("ูƒู… ุนุฏุฏ ุดู‡ูˆุฑ ุงู„ุณู†ุฉุŸ", "12"),
("ูƒู… ุนุฏุฏ ุฃุตุงุจุน ุงู„ูŠุฏูŠู† ู…ุนุงู‹ุŸ", "10"),
("ูƒู… ุณุงุนุฉ ููŠ ุงู„ูŠูˆู… ุงู„ูˆุงุญุฏุŸ", "24"),
("ูƒู… ุฏู‚ูŠู‚ุฉ ููŠ ุงู„ุณุงุนุฉ ุงู„ูˆุงุญุฏุฉุŸ", "60"),
("ูƒู… ุซุงู†ูŠุฉ ููŠ ุงู„ุฏู‚ูŠู‚ุฉ ุงู„ูˆุงุญุฏุฉุŸ", "60"),
("ู…ุง ุนุงุตู…ุฉ ุงู„ู…ู…ู„ูƒุฉ ุงู„ุนุฑุจูŠุฉ ุงู„ุณุนูˆุฏูŠุฉุŸ", "ุงู„ุฑูŠุงุถ"),
("ู…ุง ุนุงุตู…ุฉ ุงู„ุฅู…ุงุฑุงุช ุงู„ุนุฑุจูŠุฉ ุงู„ู…ุชุญุฏุฉุŸ", "ุฃุจูˆุธุจูŠ"),
("ู…ุง ู„ูˆู† ุงู„ู†ุจุงุช ุนุงุฏุฉู‹ุŸ", "ุฃุฎุถุฑ"),
("ู…ุง ู„ูˆู† ุงู„ุณู…ุงุก ุงู„ุตุงููŠุฉ ู†ู‡ุงุฑุงู‹ุŸ", "ุฃุฒุฑู‚"),
("ูƒู… ุนุฏุฏ ุฃูŠุงู… ุดู‡ุฑ ูŠู†ุงูŠุฑุŸ", "31"),
("ูƒู… ุนุฏุฏ ุฃูŠุงู… ุดู‡ุฑ ูุจุฑุงูŠุฑ ููŠ ุงู„ุณู†ุฉ ุงู„ุนุงุฏูŠุฉุŸ", "28"),
]
SEQUENCE_QA = [
("ุฃูƒู…ู„: 2ุŒ 4ุŒ 6ุŒ 8ุŒ ุŸ", "10"),
("ุฃูƒู…ู„: 5ุŒ 10ุŒ 15ุŒ 20ุŒ ุŸ", "25"),
("ุฃูƒู…ู„: 1ุŒ 3ุŒ 5ุŒ 7ุŒ ุŸ", "9"),
("ุฃูƒู…ู„: 10ุŒ 20ุŒ 30ุŒ ุŸ", "40"),
("ุฃูƒู…ู„: 3ุŒ 6ุŒ 9ุŒ 12ุŒ ุŸ", "15"),
("ุฃูƒู…ู„: 100ุŒ 90ุŒ 80ุŒ 70ุŒ ุŸ", "60"),
]
def make_challenge() -> tuple[str, str, str]:
"""โ†’ (icon, question, answer)"""
r = random.random()
if r < 0.45:
q, a = make_math()
return "๐Ÿงฎ", q, a
elif r < 0.75:
q, a = random.choice(WORD_QA)
return "โ“", q, a
else:
q, a = random.choice(SEQUENCE_QA)
return "๐Ÿ”ข", q, a
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ูˆุงุฌู‡ุฉ ุงู„ู…ุณุชุฎุฏู… โ€” Gradio
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
CSS = """
@import url('https://fonts.googleapis.com/css2?family=Tajawal:wght@400;500;700;800&display=swap');
* { box-sizing: border-box; margin: 0; padding: 0; }
body, .gradio-container {
font-family: 'Tajawal', sans-serif !important;
background: #0a0e1a !important;
direction: rtl;
}
.captcha-wrap {
max-width: 480px;
margin: 40px auto;
padding: 0 16px;
}
.captcha-card {
background: linear-gradient(145deg, #111827, #1a2235);
border: 1px solid rgba(99,179,237,0.2);
border-radius: 20px;
padding: 36px 32px 28px;
box-shadow: 0 20px 60px rgba(0,0,0,0.6), 0 0 0 1px rgba(255,255,255,0.03);
position: relative;
overflow: hidden;
}
.captcha-card::before {
content: '';
position: absolute;
top: -60px; right: -60px;
width: 200px; height: 200px;
background: radial-gradient(circle, rgba(99,179,237,0.08) 0%, transparent 70%);
pointer-events: none;
}
.captcha-header {
display: flex;
align-items: center;
gap: 12px;
margin-bottom: 28px;
padding-bottom: 20px;
border-bottom: 1px solid rgba(255,255,255,0.06);
}
.captcha-shield {
font-size: 32px;
filter: drop-shadow(0 0 12px rgba(99,179,237,0.5));
}
.captcha-title {
color: #e2e8f0;
font-size: 20px;
font-weight: 800;
letter-spacing: -0.3px;
}
.captcha-subtitle {
color: #64748b;
font-size: 13px;
font-weight: 400;
margin-top: 2px;
}
.challenge-box {
background: rgba(99,179,237,0.05);
border: 1px solid rgba(99,179,237,0.15);
border-radius: 14px;
padding: 24px 20px;
margin-bottom: 20px;
text-align: center;
}
.challenge-icon { font-size: 28px; margin-bottom: 10px; display: block; }
.challenge-text {
color: #93c5fd;
font-size: 22px;
font-weight: 700;
letter-spacing: 0.5px;
}
.answer-field input {
background: rgba(255,255,255,0.05) !important;
border: 1.5px solid rgba(99,179,237,0.25) !important;
border-radius: 10px !important;
color: #e2e8f0 !important;
font-family: 'Tajawal', sans-serif !important;
font-size: 16px !important;
padding: 12px 16px !important;
text-align: center !important;
transition: border-color 0.2s, box-shadow 0.2s;
}
.answer-field input:focus {
border-color: #63b3ed !important;
box-shadow: 0 0 0 3px rgba(99,179,237,0.15) !important;
outline: none !important;
}
.answer-field label {
color: #94a3b8 !important;
font-family: 'Tajawal', sans-serif !important;
font-size: 14px !important;
}
.verify-btn button {
width: 100% !important;
background: linear-gradient(135deg, #3b82f6, #2563eb) !important;
border: none !important;
border-radius: 10px !important;
color: white !important;
font-family: 'Tajawal', sans-serif !important;
font-size: 16px !important;
font-weight: 700 !important;
padding: 12px !important;
cursor: pointer !important;
transition: transform 0.15s, box-shadow 0.15s !important;
box-shadow: 0 4px 15px rgba(59,130,246,0.4) !important;
margin-top: 16px !important;
}
.verify-btn button:hover {
transform: translateY(-1px) !important;
box-shadow: 0 6px 20px rgba(59,130,246,0.5) !important;
}
.reload-btn button {
width: 100% !important;
background: transparent !important;
border: 1px solid rgba(255,255,255,0.1) !important;
border-radius: 10px !important;
color: #64748b !important;
font-family: 'Tajawal', sans-serif !important;
font-size: 14px !important;
padding: 9px !important;
cursor: pointer !important;
margin-top: 10px !important;
transition: all 0.2s !important;
}
.reload-btn button:hover {
border-color: rgba(99,179,237,0.3) !important;
color: #93c5fd !important;
}
.status-area textarea, .status-area input {
background: transparent !important;
border: none !important;
border-radius: 0 !important;
color: transparent !important;
height: 0 !important;
padding: 0 !important;
display: none !important;
}
.result-area {
margin-top: 16px;
min-height: 20px;
}
.result-area textarea {
background: transparent !important;
border: none !important;
color: #64748b !important;
font-family: 'Tajawal', sans-serif !important;
font-size: 14px !important;
text-align: center !important;
resize: none !important;
min-height: 40px !important;
}
.footer-info {
display: flex;
justify-content: space-between;
align-items: center;
margin-top: 20px;
padding-top: 16px;
border-top: 1px solid rgba(255,255,255,0.05);
}
.footer-brand {
color: #334155;
font-size: 12px;
}
.footer-secure {
color: #334155;
font-size: 12px;
display: flex;
align-items: center;
gap: 5px;
}
/* Status colors via JS classes */
.status-success { color: #4ade80 !important; }
.status-error { color: #f87171 !important; }
.status-warn { color: #fbbf24 !important; }
.status-info { color: #93c5fd !important; }
/* Hide Gradio default footer/header */
footer, .svelte-1rjryqp { display: none !important; }
#component-0 { padding: 0 !important; }
"""
JS_POST_MESSAGE = """
<script>
// ุฅุฑุณุงู„ ุงู„ุชูˆูƒู† ุฅู„ู‰ ุงู„ู…ูˆู‚ุน ุงู„ุฃุจ ุนู†ุฏ ู†ุฌุงุญ CAPTCHA
window._skyguardSuccess = function(token) {
const msg = { type: 'skyguard-captcha', status: 'success', token: token };
if (window.parent && window.parent !== window) {
window.parent.postMessage(msg, '*');
}
// ุฅุถุงูุฉ timestamp ู„ู„ุนุฑุถ
console.log('[SkyGuard] CAPTCHA passed, token:', token);
};
window._skyguardFail = function(reason) {
const msg = { type: 'skyguard-captcha', status: 'blocked', reason: reason };
if (window.parent && window.parent !== window) {
window.parent.postMessage(msg, '*');
}
};
// ุงู„ุงุณุชู…ุงุน ุฅู„ู‰ ุชุบูŠูŠุฑุงุช ุงู„ู†ุชูŠุฌุฉ
const observer = new MutationObserver(() => {
const resultEl = document.querySelector('.result-area textarea');
if (!resultEl) return;
const val = resultEl.value || '';
if (val.startsWith('TOKEN:')) {
const token = val.replace('TOKEN:', '').trim();
window._skyguardSuccess(token);
} else if (val.startsWith('BLOCKED:')) {
window._skyguardFail(val.replace('BLOCKED:', '').trim());
}
});
document.addEventListener('DOMContentLoaded', () => {
setTimeout(() => {
const el = document.querySelector('.result-area');
if (el) observer.observe(el, { subtree: true, characterData: true, childList: true });
}, 1500);
});
</script>
"""
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ู…ู†ุทู‚ ุงู„ุญุงู„ุฉ
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def on_load(request: gr.Request):
"""
ูŠูุณุชุฏุนู‰ ุนู†ุฏ ุชุญู…ูŠู„ ุงู„ุตูุญุฉ โ€” ูŠูˆู„ู‘ุฏ ุงู„ุชุญุฏูŠ ุฃูˆ ูŠุนุฑุถ ุฑุณุงู„ุฉ ุงู„ุญุธุฑ
Returns: (challenge_html, answer_input, submit_btn, reload_btn, hidden_answer, hidden_ip, result_text)
"""
ip = get_ip(request)
ua = get_ua(request)
# ูƒุดู DDoS
if track_and_check_ddos(ip):
return _blocked_response(ip, "๐Ÿšจ ู‡ุฌูˆู… DDoS ู…ุญุชู…ู„ โ€” ุชู… ุญุธุฑูƒ ุชู„ู‚ุงุฆูŠุงู‹")
# ูƒุดู ุจูˆุช
if is_bot_ua(ua):
ip_store[ip]["blocked_until"] = time.time() + BLOCK_DURATION
ip_store[ip]["flagged_bot"] = True
return _blocked_response(ip, "๐Ÿค– ุชู… ุงูƒุชุดุงููƒ ูƒุจูˆุช ุชู„ู‚ุงุฆูŠ")
# ูุญุต ุงู„ุญุธุฑ
blocked, remaining = blocked_status(ip)
if blocked:
return _blocked_response(ip, f"๐Ÿ”’ ู…ุญุธูˆุฑ โ€” ุงู†ุชุธุฑ {fmt_remaining(remaining)} ุฏู‚ูŠู‚ุฉ")
return _challenge_response(ip)
def on_submit(user_ans: str, hidden_ans: str, hidden_ip: str, request: gr.Request):
"""ุงู„ุชุญู‚ู‚ ู…ู† ุงู„ุฅุฌุงุจุฉ"""
ip = get_ip(request)
blocked, remaining = blocked_status(ip)
if blocked:
msg = f"๐Ÿ”’ ู…ุญุธูˆุฑ ู„ู…ุฏุฉ {fmt_remaining(remaining)} ุฏู‚ูŠู‚ุฉ ุฃุฎุฑู‰"
return (
gr.update(),
gr.update(value=""),
gr.update(value=f"BLOCKED:{msg}"),
hidden_ip
)
correct = hidden_ans.strip().lower()
given = (user_ans or "").strip().lower()
if not given:
return gr.update(), gr.update(), gr.update(value="โš ๏ธ ุงู„ุฑุฌุงุก ุฅุฏุฎุงู„ ุงู„ุฅุฌุงุจุฉ"), hidden_ip
if given == correct:
# ู†ุฌุงุญ
ip_store[ip]["failures"] = 0
token = sign_token(ip)
html = _success_html(token)
return (
gr.update(value=html),
gr.update(value="", visible=False),
gr.update(value=f"TOKEN:{token}"),
hidden_ip
)
else:
# ูุดู„
ip_store[ip]["failures"] += 1
fails = ip_store[ip]["failures"]
if fails >= MAX_FAILURES:
ip_store[ip]["blocked_until"] = time.time() + BLOCK_DURATION
ip_store[ip]["failures"] = 0
msg = f"๐Ÿ”’ ุชู… ุญุธุฑูƒ ู„ู…ุฏุฉ 5 ุฏู‚ุงุฆู‚ ุจุนุฏ {MAX_FAILURES} ู…ุญุงูˆู„ุงุช ูุงุดู„ุฉ"
return (
gr.update(value=_blocked_html(msg)),
gr.update(value=""),
gr.update(value=f"BLOCKED:{msg}"),
hidden_ip
)
left = MAX_FAILURES - fails
return (
gr.update(),
gr.update(value=""),
gr.update(value=f"โŒ ุฅุฌุงุจุฉ ุฎุงุทุฆุฉ โ€” ุชุจู‚ู‰ {left} ู…ุญุงูˆู„ุฉ/ู…ุญุงูˆู„ุงุช"),
hidden_ip
)
def on_reload(request: gr.Request):
ip = get_ip(request)
blocked, remaining = blocked_status(ip)
if blocked:
return _blocked_response(ip, f"๐Ÿ”’ ู„ุง ูŠุฒุงู„ ู…ุญุธูˆุฑุงู‹ โ€” {fmt_remaining(remaining)} ุฏู‚ูŠู‚ุฉ ู…ุชุจู‚ูŠุฉ")
return _challenge_response(ip)
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ู…ุณุงุนุฏูˆ HTML
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
def _challenge_html(icon: str, question: str) -> str:
return f"""
<div class="challenge-box">
<span class="challenge-icon">{icon}</span>
<div class="challenge-text">{question}</div>
</div>
"""
def _blocked_html(msg: str) -> str:
return f"""
<div style="background:rgba(239,68,68,0.08);border:1px solid rgba(239,68,68,0.3);
border-radius:14px;padding:24px;text-align:center;">
<div style="font-size:36px;margin-bottom:12px">๐Ÿ›ก๏ธ</div>
<div style="color:#f87171;font-size:17px;font-weight:700">{msg}</div>
<div style="color:#64748b;font-size:13px;margin-top:8px">
ูŠุชู… ุชุณุฌูŠู„ ู‡ุฐู‡ ุงู„ู…ุญุงูˆู„ุฉ
</div>
</div>
"""
def _success_html(token: str) -> str:
short = token[:12] + "โ€ฆ"
return f"""
<div style="background:rgba(74,222,128,0.08);border:1px solid rgba(74,222,128,0.3);
border-radius:14px;padding:24px;text-align:center;">
<div style="font-size:40px;margin-bottom:10px">โœ…</div>
<div style="color:#4ade80;font-size:18px;font-weight:800">ุชู… ุงู„ุชุญู‚ู‚ ุจู†ุฌุงุญ!</div>
<div style="color:#64748b;font-size:12px;margin-top:8px;
font-family:monospace;word-break:break-all;">
Token: {short}
</div>
<div style="color:#334155;font-size:11px;margin-top:6px">
ุฌุงุฑูŠ ุงู„ุฅุฑุณุงู„ ู„ู„ู…ูˆู‚ุนโ€ฆ
</div>
</div>
"""
def _challenge_response(ip: str):
icon, question, answer = make_challenge()
ch_html = _challenge_html(icon, question)
return (
gr.update(value=ch_html, visible=True),
gr.update(visible=True, value=""),
gr.update(visible=True),
gr.update(visible=True),
gr.update(value=answer),
gr.update(value=ip),
gr.update(value=""),
)
def _blocked_response(ip: str, msg: str):
return (
gr.update(value=_blocked_html(msg), visible=True),
gr.update(visible=False, value=""),
gr.update(visible=False),
gr.update(visible=False),
gr.update(value=""),
gr.update(value=ip),
gr.update(value=f"BLOCKED:{msg}"),
)
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ุจู†ุงุก ุงู„ูˆุงุฌู‡ุฉ
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
with gr.Blocks(title="SkyGuard CAPTCHA") as demo:
gr.HTML(JS_POST_MESSAGE)
with gr.Column(elem_classes="captcha-wrap"):
gr.HTML("""
<div class="captcha-card">
<div class="captcha-header">
<span class="captcha-shield">๐Ÿ›ก๏ธ</span>
<div>
<div class="captcha-title">SkyGuard CAPTCHA</div>
<div class="captcha-subtitle">ุฃุซุจุช ุฃู†ูƒ ู„ุณุช ุฑูˆุจูˆุชุงู‹</div>
</div>
</div>
""")
challenge_html = gr.HTML(value="<div style='height:80px'></div>")
answer_input = gr.Textbox(
placeholder="ุงูƒุชุจ ุฅุฌุงุจุชูƒ ู‡ู†ุงโ€ฆ",
label="ุงู„ุฅุฌุงุจุฉ",
elem_classes="answer-field",
visible=True
)
submit_btn = gr.Button("โœ… ุชุญู‚ู‚", elem_classes="verify-btn", visible=True)
reload_btn = gr.Button("๐Ÿ”„ ุณุคุงู„ ุฌุฏูŠุฏ", elem_classes="reload-btn", visible=True)
# ุญู‚ูˆู„ ู…ุฎููŠุฉ
hidden_answer = gr.Textbox(visible=False)
hidden_ip = gr.Textbox(visible=False)
# ุงู„ู†ุชูŠุฌุฉ
result_text = gr.Textbox(
label="",
interactive=False,
elem_classes="result-area",
show_label=False
)
gr.HTML("""
<div class="footer-info">
<span class="footer-brand">SkyGuard v1.0</span>
<span class="footer-secure">๐Ÿ”’ ู…ุญู…ูŠ ุจู€ HMAC-SHA256</span>
</div>
</div>
""")
# โ”€โ”€ ุงู„ุฃุญุฏุงุซ โ”€โ”€
OUTPUTS = [challenge_html, answer_input, submit_btn, reload_btn,
hidden_answer, hidden_ip, result_text]
demo.load(on_load, outputs=OUTPUTS)
submit_btn.click(
on_submit,
inputs=[answer_input, hidden_answer, hidden_ip],
outputs=[challenge_html, answer_input, result_text, hidden_ip]
)
reload_btn.click(on_reload, outputs=OUTPUTS)
answer_input.submit(
on_submit,
inputs=[answer_input, hidden_answer, hidden_ip],
outputs=[challenge_html, answer_input, result_text, hidden_ip]
)
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# API Endpoints (FastAPI ุชุญุช Gradio)
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
@demo.app.get("/api/verify")
async def api_verify(token: str, request: FastAPIRequest):
"""
ู†ู‚ุทุฉ ู†ู‡ุงูŠุฉ ู„ู„ุชุญู‚ู‚ ู…ู† ุงู„ุชูˆูƒู† โ€” ูŠุณุชุฎุฏู…ู‡ุง ู…ูˆู‚ุนูƒ ุงู„ุฎู„ููŠ
GET /api/verify?token=xxxx
"""
result = verify_token_logic(token)
# ุฅุฒุงู„ุฉ ุงู„ุชูˆูƒู† ุจุนุฏ ุงู„ุชุญู‚ู‚ (one-time use)
if result["valid"] and token in token_store:
del token_store[token]
return JSONResponse(result)
@demo.app.get("/api/status")
async def api_status(request: FastAPIRequest):
"""ุญุงู„ุฉ ุงู„ู†ุธุงู…"""
ip = request.headers.get("x-forwarded-for", request.client.host if request.client else "?")
ip = ip.split(",")[0].strip()
blocked, remaining = blocked_status(ip)
return JSONResponse({
"ip": ip,
"blocked": blocked,
"remaining_seconds": remaining if blocked else 0,
"active_tokens": len(token_store),
"tracked_ips": len(ip_store),
})
@demo.app.get("/api/health")
async def api_health():
return JSONResponse({"status": "ok", "service": "SkyGuard CAPTCHA"})
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ุงู„ุชุดุบูŠู„
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# HF Space ูŠุดุบู‘ู„ ุงู„ู…ู„ู ู…ุจุงุดุฑุฉ (ู„ูŠุณ ุนุจุฑ __main__)
demo.launch(
server_name="0.0.0.0",
server_port=7860,
css=CSS,
)