import logging import secrets import time from typing import Optional from utils import hash_password, verify_password from exceptions import AuthError logger = logging.getLogger(__name__) _SESSION_TTL = 3600 _MAX_ATTEMPTS = 5 _LOCKOUT = 300 _sessions: dict[str, dict] = {} _failures: dict[str, list[float]] = {} def _prune() -> None: now = time.time() dead = [t for t, s in _sessions.items() if now - s["ts"] > _SESSION_TTL] for t in dead: del _sessions[t] def _locked(username: str) -> bool: cutoff = time.time() - _LOCKOUT recent = [t for t in _failures.get(username, []) if t > cutoff] _failures[username] = recent return len(recent) >= _MAX_ATTEMPTS def login(username: str, password: str, stored_digest: str, stored_salt: str) -> str: if _locked(username): raise AuthError("Account temporarily locked") if not verify_password(password, stored_digest, stored_salt): _failures.setdefault(username, []).append(time.time()) raise AuthError("Invalid credentials") _prune() token = secrets.token_hex(32) _sessions[token] = {"username": username, "ts": time.time()} logger.info("Login: %s", username) return token def logout(token: str) -> None: s = _sessions.pop(token, None) if s: logger.info("Logout: %s", s["username"]) def whoami(token: str) -> Optional[str]: s = _sessions.get(token) if not s: return None if time.time() - s["ts"] > _SESSION_TTL: del _sessions[token] return None return s["username"] def require_auth(token: str) -> str: user = whoami(token) if user is None: raise AuthError() return user def refresh(token: str) -> str: user = require_auth(token) logout(token) new = secrets.token_hex(32) _sessions[new] = {"username": user, "ts": time.time()} return new def session_count() -> int: _prune() return len(_sessions)