File size: 1,977 Bytes
e197abb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | import logging
import secrets
import time
from typing import Optional
from utils import hash_password, verify_password
from exceptions import AuthError
logger = logging.getLogger(__name__)
_SESSION_TTL = 3600
_MAX_ATTEMPTS = 5
_LOCKOUT = 300
_sessions: dict[str, dict] = {}
_failures: dict[str, list[float]] = {}
def _prune() -> None:
now = time.time()
dead = [t for t, s in _sessions.items()
if now - s["ts"] > _SESSION_TTL]
for t in dead:
del _sessions[t]
def _locked(username: str) -> bool:
cutoff = time.time() - _LOCKOUT
recent = [t for t in _failures.get(username, []) if t > cutoff]
_failures[username] = recent
return len(recent) >= _MAX_ATTEMPTS
def login(username: str, password: str, stored_digest: str,
stored_salt: str) -> str:
if _locked(username):
raise AuthError("Account temporarily locked")
if not verify_password(password, stored_digest, stored_salt):
_failures.setdefault(username, []).append(time.time())
raise AuthError("Invalid credentials")
_prune()
token = secrets.token_hex(32)
_sessions[token] = {"username": username, "ts": time.time()}
logger.info("Login: %s", username)
return token
def logout(token: str) -> None:
s = _sessions.pop(token, None)
if s:
logger.info("Logout: %s", s["username"])
def whoami(token: str) -> Optional[str]:
s = _sessions.get(token)
if not s:
return None
if time.time() - s["ts"] > _SESSION_TTL:
del _sessions[token]
return None
return s["username"]
def require_auth(token: str) -> str:
user = whoami(token)
if user is None:
raise AuthError()
return user
def refresh(token: str) -> str:
user = require_auth(token)
logout(token)
new = secrets.token_hex(32)
_sessions[new] = {"username": user, "ts": time.time()}
return new
def session_count() -> int:
_prune()
return len(_sessions)
|