BudgetBuddy / core /auth.py
KrishnaGarg's picture
Deploy BudgetBuddy update
13dea3d verified
Raw
History Blame Contribute Delete
4.76 kB
"""Username + PIN accounts (Phase 4).
Light identity so each person sees only their own spending — a real improvement
over the old "type any ID" model. NOT bank-grade: the dataset is private and
writes go through the Space's server-side token, but PINs are short. We store a
salted PBKDF2-HMAC-SHA256 hash (never the PIN) and enforce unique usernames.
Registry: users.jsonl in the HF Dataset, rows of
{username, pin_hash, salt, created_at}.
"""
from __future__ import annotations
import hashlib
import hmac
import os
import re
from datetime import datetime, timezone
from core import hubio
USERS_FILE = "users.jsonl"
# Per-process secret for signing session tokens. A custom (stateless) frontend
# holds a token, not a raw username, so a client can't read another user's data
# by guessing a name. Restarting the Space invalidates tokens (users re-login).
_SESSION_SECRET = os.environ.get("BB_SESSION_SECRET") or os.urandom(32).hex()
_USERNAME_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{2,29}$")
_PBKDF2_ROUNDS = 100_000
# --------------------------------------------------------------------------- #
# Validation + hashing
# --------------------------------------------------------------------------- #
def normalize_username(username: str) -> str:
return (username or "").strip().lower()
def validate_username(username: str) -> str | None:
"""Return an error message if invalid, else None."""
u = normalize_username(username)
if not _USERNAME_RE.match(u):
return (
"Username must be 3–30 chars: lowercase letters, numbers, '-' or '_', "
"starting with a letter or number."
)
return None
def validate_pin(pin: str) -> str | None:
pin = (pin or "").strip()
if not (pin.isdigit() and 4 <= len(pin) <= 8):
return "PIN must be 4–8 digits."
return None
def _hash_pin(pin: str, salt: str) -> str:
return hashlib.pbkdf2_hmac(
"sha256", pin.encode("utf-8"), salt.encode("utf-8"), _PBKDF2_ROUNDS
).hex()
def _gen_salt() -> str:
return os.urandom(16).hex()
# --------------------------------------------------------------------------- #
# Registry
# --------------------------------------------------------------------------- #
def _find_user(username: str, force: bool = False) -> dict | None:
u = normalize_username(username)
for row in hubio.read_jsonl(USERS_FILE, force=force):
if normalize_username(row.get("username", "")) == u:
return row
return None
def signup(username: str, pin: str) -> tuple[bool, str]:
"""Create a new account. Returns (ok, message)."""
err = validate_username(username) or validate_pin(pin)
if err:
return False, err
u = normalize_username(username)
# Re-read latest before the uniqueness check to minimise the signup race.
if _find_user(u, force=True) is not None:
return False, f"Username “{u}” is taken — pick another."
salt = _gen_salt()
try:
hubio.append_jsonl(
USERS_FILE,
{
"username": u,
"pin_hash": _hash_pin(pin, salt),
"salt": salt,
"created_at": datetime.now(timezone.utc).isoformat(timespec="seconds"),
},
message=f"Add user {u}",
)
except Exception as e: # pragma: no cover - network/permission edge cases
return False, f"Could not create account: {e}"
return True, f"Welcome, {u}! Account created."
def login(username: str, pin: str) -> tuple[bool, str]:
"""Verify credentials. Returns (ok, message)."""
if validate_username(username) or validate_pin(pin):
return False, "Invalid username or PIN."
row = _find_user(username, force=True)
if row is None:
return False, "No such account — sign up first."
if _hash_pin(pin, row.get("salt", "")) != row.get("pin_hash"):
return False, "Incorrect PIN."
return True, f"Signed in as {normalize_username(username)}."
# --------------------------------------------------------------------------- #
# Session tokens (for the stateless custom frontend)
# --------------------------------------------------------------------------- #
def issue_token(username: str) -> str:
u = normalize_username(username)
sig = hmac.new(_SESSION_SECRET.encode(), u.encode(), hashlib.sha256).hexdigest()
return f"{u}.{sig}"
def verify_token(token: str) -> str | None:
"""Return the username for a valid token, else None."""
if not token or "." not in str(token):
return None
u, _, sig = str(token).partition(".")
expected = hmac.new(_SESSION_SECRET.encode(), u.encode(), hashlib.sha256).hexdigest()
return u if hmac.compare_digest(sig, expected) else None