Spaces:
Running on Zero
Running on Zero
| """Username + PIN accounts (Phase 4). | |
| Light identity so each person sees only their own spending — a real improvement | |
| over the old "type any ID" model. NOT bank-grade: the dataset is private and | |
| writes go through the Space's server-side token, but PINs are short. We store a | |
| salted PBKDF2-HMAC-SHA256 hash (never the PIN) and enforce unique usernames. | |
| Registry: users.jsonl in the HF Dataset, rows of | |
| {username, pin_hash, salt, created_at}. | |
| """ | |
| from __future__ import annotations | |
| import hashlib | |
| import hmac | |
| import os | |
| import re | |
| from datetime import datetime, timezone | |
| from core import hubio | |
| USERS_FILE = "users.jsonl" | |
| # Per-process secret for signing session tokens. A custom (stateless) frontend | |
| # holds a token, not a raw username, so a client can't read another user's data | |
| # by guessing a name. Restarting the Space invalidates tokens (users re-login). | |
| _SESSION_SECRET = os.environ.get("BB_SESSION_SECRET") or os.urandom(32).hex() | |
| _USERNAME_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{2,29}$") | |
| _PBKDF2_ROUNDS = 100_000 | |
| # --------------------------------------------------------------------------- # | |
| # Validation + hashing | |
| # --------------------------------------------------------------------------- # | |
| def normalize_username(username: str) -> str: | |
| return (username or "").strip().lower() | |
| def validate_username(username: str) -> str | None: | |
| """Return an error message if invalid, else None.""" | |
| u = normalize_username(username) | |
| if not _USERNAME_RE.match(u): | |
| return ( | |
| "Username must be 3–30 chars: lowercase letters, numbers, '-' or '_', " | |
| "starting with a letter or number." | |
| ) | |
| return None | |
| def validate_pin(pin: str) -> str | None: | |
| pin = (pin or "").strip() | |
| if not (pin.isdigit() and 4 <= len(pin) <= 8): | |
| return "PIN must be 4–8 digits." | |
| return None | |
| def _hash_pin(pin: str, salt: str) -> str: | |
| return hashlib.pbkdf2_hmac( | |
| "sha256", pin.encode("utf-8"), salt.encode("utf-8"), _PBKDF2_ROUNDS | |
| ).hex() | |
| def _gen_salt() -> str: | |
| return os.urandom(16).hex() | |
| # --------------------------------------------------------------------------- # | |
| # Registry | |
| # --------------------------------------------------------------------------- # | |
| def _find_user(username: str, force: bool = False) -> dict | None: | |
| u = normalize_username(username) | |
| for row in hubio.read_jsonl(USERS_FILE, force=force): | |
| if normalize_username(row.get("username", "")) == u: | |
| return row | |
| return None | |
| def signup(username: str, pin: str) -> tuple[bool, str]: | |
| """Create a new account. Returns (ok, message).""" | |
| err = validate_username(username) or validate_pin(pin) | |
| if err: | |
| return False, err | |
| u = normalize_username(username) | |
| # Re-read latest before the uniqueness check to minimise the signup race. | |
| if _find_user(u, force=True) is not None: | |
| return False, f"Username “{u}” is taken — pick another." | |
| salt = _gen_salt() | |
| try: | |
| hubio.append_jsonl( | |
| USERS_FILE, | |
| { | |
| "username": u, | |
| "pin_hash": _hash_pin(pin, salt), | |
| "salt": salt, | |
| "created_at": datetime.now(timezone.utc).isoformat(timespec="seconds"), | |
| }, | |
| message=f"Add user {u}", | |
| ) | |
| except Exception as e: # pragma: no cover - network/permission edge cases | |
| return False, f"Could not create account: {e}" | |
| return True, f"Welcome, {u}! Account created." | |
| def login(username: str, pin: str) -> tuple[bool, str]: | |
| """Verify credentials. Returns (ok, message).""" | |
| if validate_username(username) or validate_pin(pin): | |
| return False, "Invalid username or PIN." | |
| row = _find_user(username, force=True) | |
| if row is None: | |
| return False, "No such account — sign up first." | |
| if _hash_pin(pin, row.get("salt", "")) != row.get("pin_hash"): | |
| return False, "Incorrect PIN." | |
| return True, f"Signed in as {normalize_username(username)}." | |
| # --------------------------------------------------------------------------- # | |
| # Session tokens (for the stateless custom frontend) | |
| # --------------------------------------------------------------------------- # | |
| def issue_token(username: str) -> str: | |
| u = normalize_username(username) | |
| sig = hmac.new(_SESSION_SECRET.encode(), u.encode(), hashlib.sha256).hexdigest() | |
| return f"{u}.{sig}" | |
| def verify_token(token: str) -> str | None: | |
| """Return the username for a valid token, else None.""" | |
| if not token or "." not in str(token): | |
| return None | |
| u, _, sig = str(token).partition(".") | |
| expected = hmac.new(_SESSION_SECRET.encode(), u.encode(), hashlib.sha256).hexdigest() | |
| return u if hmac.compare_digest(sig, expected) else None | |