""" Auth Service — JWT + Supabase Auth Works in demo mode (no Supabase) with a simple in-memory user store """ import logging import os from datetime import datetime, timedelta, timezone from typing import Optional import jwt logger = logging.getLogger(__name__) JWT_SECRET = os.getenv("JWT_SECRET", "qs-dev-secret-change-in-prod-32c") JWT_EXPIRES_MINS = int(os.getenv("JWT_EXPIRES_MINUTES", "60")) _supabase = None # Demo user store (only used if Supabase not configured) _demo_users: dict = {} def init_supabase(): global _supabase url = os.getenv("SUPABASE_URL") key = os.getenv("SUPABASE_SERVICE_KEY") if url and key: try: from supabase import create_client _supabase = create_client(url, key) logger.info("Auth service: Supabase connected") except Exception as e: logger.warning(f"Auth Supabase init failed: {e} — using demo mode") else: logger.info("Auth service: No Supabase config — demo mode") def make_jwt(user_id: str, email: str, role: str = "reader") -> str: payload = { "sub": user_id, "email": email, "role": role, "iat": datetime.now(timezone.utc), "exp": datetime.now(timezone.utc) + timedelta(minutes=JWT_EXPIRES_MINS), } return jwt.encode(payload, JWT_SECRET, algorithm="HS256") def verify_jwt(token: str) -> Optional[dict]: try: return jwt.decode(token, JWT_SECRET, algorithms=["HS256"]) except jwt.ExpiredSignatureError: raise ValueError("Token expired") except jwt.InvalidTokenError: raise ValueError("Invalid token") def register(email: str, password: str, full_name: str) -> dict: """Register user — Supabase if configured, else demo store""" if _supabase: from gotrue.errors import AuthApiError try: r = _supabase.auth.sign_up({ "email": email, "password": password, "options": {"data": {"full_name": full_name}}, }) if not r.user: raise ValueError("Registration failed") user = r.user role = get_user_role(user.id) token = make_jwt(user.id, user.email, role) # Ensure user_profiles row exists (FR-04) try: _supabase.table("user_profiles").insert({"id": user.id, "role": "reader"}).execute() except Exception: pass return {"access_token": token, "expires_in": JWT_EXPIRES_MINS * 60, "user": {"id": user.id, "email": user.email, "full_name": full_name, "role": role}} except AuthApiError as e: raise ValueError(str(e.message)) # Demo mode if email in _demo_users: raise ValueError("Email already registered") import uuid, hashlib uid = str(uuid.uuid4()) _demo_users[email] = {"id": uid, "password": hashlib.sha256(password.encode()).hexdigest(), "full_name": full_name, "role": "writer"} token = make_jwt(uid, email, "writer") return {"access_token": token, "expires_in": JWT_EXPIRES_MINS * 60, "user": {"id": uid, "email": email, "full_name": full_name, "role": "writer"}} def login(email: str, password: str) -> dict: """Login — Supabase if configured, else demo store""" if _supabase: from gotrue.errors import AuthApiError try: r = _supabase.auth.sign_in_with_password({"email": email, "password": password}) if not r.user: raise ValueError("Invalid credentials") user = r.user role = get_user_role(user.id) meta = user.user_metadata or {} token = make_jwt(user.id, user.email, role) # Lazy profile creation for users registered before F04 (FR-04) try: existing = _supabase.table("user_profiles").select("id").eq("id", user.id).limit(1).execute() if not existing.data: _supabase.table("user_profiles").insert({"id": user.id, "role": "reader"}).execute() except Exception: pass return {"access_token": token, "expires_in": JWT_EXPIRES_MINS * 60, "user": {"id": user.id, "email": user.email, "full_name": meta.get("full_name", ""), "role": role}} except AuthApiError: raise ValueError("Invalid email or password") # Demo mode import hashlib u = _demo_users.get(email) if not u or u["password"] != hashlib.sha256(password.encode()).hexdigest(): raise ValueError("Invalid email or password") token = make_jwt(u["id"], email, u["role"]) return {"access_token": token, "expires_in": JWT_EXPIRES_MINS * 60, "user": {"id": u["id"], "email": email, "full_name": u["full_name"], "role": u["role"]}} def send_reset_email(email: str): if _supabase: try: _supabase.auth.reset_password_email(email) except Exception: pass def get_user_role(user_id: str) -> str: if not _supabase: return "writer" # demo mode: trust all logged-in users try: r = _supabase.table("user_profiles").select("role").eq("id", user_id).single().execute() return (r.data or {}).get("role", "reader") except Exception: return "reader"