| """ |
| captcha.py β Slider CAPTCHA using Bouncypotato.gif as background. |
| |
| Flow: |
| GET /api/captcha/challenge β returns {bg, piece, piece_y, piece_w, piece_h, bg_w, bg_h, token} |
| POST /api/captcha/verify β {token, answer} β sets signed session cookie on success |
| """ |
|
|
| import base64 |
| import hashlib |
| import hmac as _hmac |
| import io |
| import json |
| import os |
| import random |
| import threading |
| import time |
|
|
| _DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data") |
| _SECRET_FILE = os.path.join(_DATA_DIR, "captcha_secret.bin") |
| _IP_FILE = os.path.join(_DATA_DIR, "captcha_ips.json") |
|
|
|
|
| def _load_or_create_secret() -> bytes: |
| """Load persisted secret or create one. Survives restarts.""" |
| os.makedirs(_DATA_DIR, exist_ok=True) |
| try: |
| with open(_SECRET_FILE, "rb") as f: |
| secret = f.read() |
| if len(secret) >= 32: |
| return secret |
| except FileNotFoundError: |
| pass |
| secret = os.urandom(32) |
| with open(_SECRET_FILE, "wb") as f: |
| f.write(secret) |
| return secret |
|
|
|
|
| CAPTCHA_SECRET = _load_or_create_secret() |
| CAPTCHA_COOKIE = "ajaj_v" |
| CAPTCHA_TTL = 86400 * 7 |
| CHALLENGE_TTL = 300 |
| TOLERANCE = 20 |
|
|
| |
| _IP_TTL = 86400 * 30 |
| _ip_lock = threading.Lock() |
| _approved_ips: dict[str, float] = {} |
|
|
| try: |
| with open(_IP_FILE) as f: |
| raw = json.load(f) |
| if isinstance(raw, list): |
| |
| _approved_ips = {ip: time.time() for ip in raw} |
| elif isinstance(raw, dict): |
| _approved_ips = raw |
| except Exception: |
| pass |
|
|
|
|
| def _save_ips(): |
| try: |
| with open(_IP_FILE, "w") as f: |
| json.dump(_approved_ips, f) |
| except Exception: |
| pass |
|
|
|
|
| def approve_ip(ip: str) -> None: |
| """Record an IP that passed captcha.""" |
| with _ip_lock: |
| _approved_ips[ip] = time.time() |
| _save_ips() |
|
|
|
|
| def is_ip_approved(ip: str) -> bool: |
| """Check if an IP has previously passed captcha (within 30 days).""" |
| ts = _approved_ips.get(ip) |
| if ts is None: |
| return False |
| if time.time() - ts > _IP_TTL: |
| with _ip_lock: |
| _approved_ips.pop(ip, None) |
| _save_ips() |
| return False |
| return True |
|
|
| PIECE_W = 85 |
| PIECE_H = 57 |
| BG_W = 340 |
| BG_H = 170 |
|
|
| SOURCE_URL = "https://images.uncyclomedia.co/necyklopedie/cs/4/4a/Bouncypotato.gif" |
| LOGO_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static", "logo.png") |
|
|
| _img_cache = None |
| _logo_cache = None |
|
|
|
|
| def _load_base_image(): |
| global _img_cache |
| if _img_cache is not None: |
| return _img_cache.copy() |
|
|
| from PIL import Image |
| try: |
| import requests as _req |
| resp = _req.get(SOURCE_URL, timeout=10) |
| resp.raise_for_status() |
| img = Image.open(io.BytesIO(resp.content)) |
| try: |
| img.seek(0) |
| except EOFError: |
| pass |
| img = img.convert("RGBA") |
| except Exception: |
| |
| img = _make_fallback_image() |
|
|
| img = img.resize((BG_W, BG_H), Image.LANCZOS) |
| _img_cache = img |
| return img.copy() |
|
|
|
|
| def _make_fallback_image(): |
| """Colorful potato-less fallback if network unavailable.""" |
| from PIL import Image, ImageDraw |
| img = Image.new("RGBA", (BG_W, BG_H), (28, 28, 48)) |
| draw = ImageDraw.Draw(img) |
| |
| for i in range(-BG_H, BG_W + BG_H, 22): |
| draw.line([(i, 0), (i + BG_H, BG_H)], fill=(50, 80, 110, 180), width=10) |
| |
| for _ in range(40): |
| x, y = random.randint(0, BG_W), random.randint(0, BG_H) |
| r = random.randint(3, 9) |
| draw.ellipse([x - r, y - r, x + r, y + r], fill=(127, 212, 160, 200)) |
| return img |
|
|
|
|
| def _load_logo_piece(): |
| """Load logo.png scaled to PIECE_WΓPIECE_H, transparent background.""" |
| global _logo_cache |
| if _logo_cache is not None: |
| return _logo_cache.copy() |
| from PIL import Image |
| try: |
| logo = Image.open(LOGO_PATH).convert("RGBA") |
| logo.thumbnail((PIECE_W, PIECE_H), Image.LANCZOS) |
| |
| canvas = Image.new("RGBA", (PIECE_W, PIECE_H), (0, 0, 0, 0)) |
| ox = (PIECE_W - logo.width) // 2 |
| oy = (PIECE_H - logo.height) // 2 |
| canvas.paste(logo, (ox, oy), logo) |
| except Exception: |
| from PIL import ImageDraw |
| canvas = Image.new("RGBA", (PIECE_W, PIECE_H), (0, 0, 0, 0)) |
| ImageDraw.Draw(canvas).ellipse([4, 4, PIECE_W - 4, PIECE_H - 4], |
| fill=(200, 160, 48, 230)) |
| _logo_cache = canvas |
| return canvas.copy() |
|
|
|
|
| def generate_challenge() -> dict: |
| from PIL import Image, ImageDraw |
|
|
| bg_img = _load_base_image() |
| logo = _load_logo_piece() |
| w, h = bg_img.size |
|
|
| hole_x = random.randint(PIECE_W + 20, w - PIECE_W - 20) |
| hole_y = random.randint(8, h - PIECE_H - 8) |
|
|
| |
| pw, ph = PIECE_W + 4, PIECE_H + 4 |
| piece_final = Image.new("RGBA", (pw, ph), (0, 0, 0, 0)) |
| |
| shadow_layer = Image.new("RGBA", (PIECE_W, PIECE_H), (0, 0, 0, 140)) |
| piece_final.paste(shadow_layer, (3, 3), logo.split()[3]) |
| piece_final.paste(logo, (0, 0), logo) |
|
|
| |
| bg = bg_img.copy() |
| logo_alpha = logo.split()[3] |
|
|
| |
| hole_fill = Image.new("RGBA", (PIECE_W, PIECE_H), (15, 10, 5, 200)) |
| bg.paste(hole_fill, (hole_x, hole_y), logo_alpha) |
|
|
| |
| from PIL import ImageFilter |
| border_mask = logo_alpha.filter(ImageFilter.MaxFilter(5)) |
| |
| import numpy as np |
| border_arr = np.array(border_mask).astype(int) - np.array(logo_alpha).astype(int) |
| border_arr = np.clip(border_arr, 0, 255).astype(np.uint8) |
| border_alpha = Image.fromarray(border_arr, mode="L") |
| gold_border = Image.new("RGBA", (PIECE_W, PIECE_H), (200, 160, 48, 200)) |
| bg.paste(gold_border, (hole_x, hole_y), border_alpha) |
|
|
| def to_b64(im: Image.Image) -> str: |
| buf = io.BytesIO() |
| im.save(buf, format="PNG") |
| return base64.b64encode(buf.getvalue()).decode() |
|
|
| ts = int(time.time()) |
| msg = f"{hole_x}:{hole_y}:{ts}" |
| sig = _hmac.new(CAPTCHA_SECRET, msg.encode(), hashlib.sha256).hexdigest()[:20] |
| token = f"{msg}:{sig}" |
|
|
| return { |
| "bg": to_b64(bg), |
| "piece": to_b64(piece_final), |
| "piece_y": hole_y, |
| "piece_w": pw, |
| "piece_h": ph, |
| "bg_w": w, |
| "bg_h": h, |
| "token": token, |
| } |
|
|
|
|
| def verify_challenge(token: str, answer_x: int) -> bool: |
| try: |
| parts = token.split(":") |
| if len(parts) != 4: |
| return False |
| x_str, y_str, ts_str, sig = parts |
| if time.time() - int(ts_str) > CHALLENGE_TTL: |
| return False |
| msg = f"{x_str}:{y_str}:{ts_str}" |
| expected = _hmac.new(CAPTCHA_SECRET, msg.encode(), hashlib.sha256).hexdigest()[:20] |
| if not _hmac.compare_digest(expected, sig): |
| return False |
| return abs(int(x_str) - answer_x) <= TOLERANCE |
| except Exception: |
| return False |
|
|
|
|
| def make_session_cookie() -> str: |
| ts = int(time.time()) |
| msg = f"ok:{ts}" |
| sig = _hmac.new(CAPTCHA_SECRET, msg.encode(), hashlib.sha256).hexdigest()[:20] |
| return f"{msg}:{sig}" |
|
|
|
|
| def verify_session_cookie(cookie: str) -> bool: |
| try: |
| parts = cookie.split(":") |
| if len(parts) != 3: |
| return False |
| ok, ts_str, sig = parts |
| if ok != "ok": |
| return False |
| if time.time() - int(ts_str) > CAPTCHA_TTL: |
| return False |
| msg = f"ok:{ts_str}" |
| expected = _hmac.new(CAPTCHA_SECRET, msg.encode(), hashlib.sha256).hexdigest()[:20] |
| return _hmac.compare_digest(expected, sig) |
| except Exception: |
| return False |
|
|