"""Username + PIN accounts (Phase 4). Light identity so each person sees only their own spending — a real improvement over the old "type any ID" model. NOT bank-grade: the dataset is private and writes go through the Space's server-side token, but PINs are short. We store a salted PBKDF2-HMAC-SHA256 hash (never the PIN) and enforce unique usernames. Registry: users.jsonl in the HF Dataset, rows of {username, pin_hash, salt, created_at}. """ from __future__ import annotations import hashlib import hmac import os import re from datetime import datetime, timezone from core import hubio USERS_FILE = "users.jsonl" # Per-process secret for signing session tokens. A custom (stateless) frontend # holds a token, not a raw username, so a client can't read another user's data # by guessing a name. Restarting the Space invalidates tokens (users re-login). _SESSION_SECRET = os.environ.get("BB_SESSION_SECRET") or os.urandom(32).hex() _USERNAME_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{2,29}$") _PBKDF2_ROUNDS = 100_000 # --------------------------------------------------------------------------- # # Validation + hashing # --------------------------------------------------------------------------- # def normalize_username(username: str) -> str: return (username or "").strip().lower() def validate_username(username: str) -> str | None: """Return an error message if invalid, else None.""" u = normalize_username(username) if not _USERNAME_RE.match(u): return ( "Username must be 3–30 chars: lowercase letters, numbers, '-' or '_', " "starting with a letter or number." ) return None def validate_pin(pin: str) -> str | None: pin = (pin or "").strip() if not (pin.isdigit() and 4 <= len(pin) <= 8): return "PIN must be 4–8 digits." return None def _hash_pin(pin: str, salt: str) -> str: return hashlib.pbkdf2_hmac( "sha256", pin.encode("utf-8"), salt.encode("utf-8"), _PBKDF2_ROUNDS ).hex() def _gen_salt() -> str: return os.urandom(16).hex() # --------------------------------------------------------------------------- # # Registry # --------------------------------------------------------------------------- # def _find_user(username: str, force: bool = False) -> dict | None: u = normalize_username(username) for row in hubio.read_jsonl(USERS_FILE, force=force): if normalize_username(row.get("username", "")) == u: return row return None def signup(username: str, pin: str) -> tuple[bool, str]: """Create a new account. Returns (ok, message).""" err = validate_username(username) or validate_pin(pin) if err: return False, err u = normalize_username(username) # Re-read latest before the uniqueness check to minimise the signup race. if _find_user(u, force=True) is not None: return False, f"Username “{u}” is taken — pick another." salt = _gen_salt() try: hubio.append_jsonl( USERS_FILE, { "username": u, "pin_hash": _hash_pin(pin, salt), "salt": salt, "created_at": datetime.now(timezone.utc).isoformat(timespec="seconds"), }, message=f"Add user {u}", ) except Exception as e: # pragma: no cover - network/permission edge cases return False, f"Could not create account: {e}" return True, f"Welcome, {u}! Account created." def login(username: str, pin: str) -> tuple[bool, str]: """Verify credentials. Returns (ok, message).""" if validate_username(username) or validate_pin(pin): return False, "Invalid username or PIN." row = _find_user(username, force=True) if row is None: return False, "No such account — sign up first." if _hash_pin(pin, row.get("salt", "")) != row.get("pin_hash"): return False, "Incorrect PIN." return True, f"Signed in as {normalize_username(username)}." # --------------------------------------------------------------------------- # # Session tokens (for the stateless custom frontend) # --------------------------------------------------------------------------- # def issue_token(username: str) -> str: u = normalize_username(username) sig = hmac.new(_SESSION_SECRET.encode(), u.encode(), hashlib.sha256).hexdigest() return f"{u}.{sig}" def verify_token(token: str) -> str | None: """Return the username for a valid token, else None.""" if not token or "." not in str(token): return None u, _, sig = str(token).partition(".") expected = hmac.new(_SESSION_SECRET.encode(), u.encode(), hashlib.sha256).hexdigest() return u if hmac.compare_digest(sig, expected) else None