Spaces:
Sleeping
Sleeping
| """ | |
| Auth Service β JWT + Supabase Auth | |
| Works in demo mode (no Supabase) with a simple in-memory user store | |
| """ | |
| import logging | |
| import os | |
| from datetime import datetime, timedelta, timezone | |
| from typing import Optional | |
| import jwt | |
| logger = logging.getLogger(__name__) | |
| JWT_SECRET = os.getenv("JWT_SECRET", "qs-dev-secret-change-in-prod-32c") | |
| JWT_EXPIRES_MINS = int(os.getenv("JWT_EXPIRES_MINUTES", "60")) | |
| _supabase = None | |
| # Demo user store (only used if Supabase not configured) | |
| _demo_users: dict = {} | |
| def init_supabase(): | |
| global _supabase | |
| url = os.getenv("SUPABASE_URL") | |
| key = os.getenv("SUPABASE_SERVICE_KEY") | |
| if url and key: | |
| try: | |
| from supabase import create_client | |
| _supabase = create_client(url, key) | |
| logger.info("Auth service: Supabase connected") | |
| except Exception as e: | |
| logger.warning(f"Auth Supabase init failed: {e} β using demo mode") | |
| else: | |
| logger.info("Auth service: No Supabase config β demo mode") | |
| def make_jwt(user_id: str, email: str, role: str = "reader") -> str: | |
| payload = { | |
| "sub": user_id, | |
| "email": email, | |
| "role": role, | |
| "iat": datetime.now(timezone.utc), | |
| "exp": datetime.now(timezone.utc) + timedelta(minutes=JWT_EXPIRES_MINS), | |
| } | |
| return jwt.encode(payload, JWT_SECRET, algorithm="HS256") | |
| def verify_jwt(token: str) -> Optional[dict]: | |
| try: | |
| return jwt.decode(token, JWT_SECRET, algorithms=["HS256"]) | |
| except jwt.ExpiredSignatureError: | |
| raise ValueError("Token expired") | |
| except jwt.InvalidTokenError: | |
| raise ValueError("Invalid token") | |
| def register(email: str, password: str, full_name: str) -> dict: | |
| """Register user β Supabase if configured, else demo store""" | |
| if _supabase: | |
| from gotrue.errors import AuthApiError | |
| try: | |
| r = _supabase.auth.sign_up({ | |
| "email": email, "password": password, | |
| "options": {"data": {"full_name": full_name}}, | |
| }) | |
| if not r.user: | |
| raise ValueError("Registration failed") | |
| user = r.user | |
| role = get_user_role(user.id) | |
| token = make_jwt(user.id, user.email, role) | |
| # Ensure user_profiles row exists (FR-04) | |
| try: | |
| _supabase.table("user_profiles").insert({"id": user.id, "role": "reader"}).execute() | |
| except Exception: | |
| pass | |
| return {"access_token": token, "expires_in": JWT_EXPIRES_MINS * 60, | |
| "user": {"id": user.id, "email": user.email, "full_name": full_name, "role": role}} | |
| except AuthApiError as e: | |
| raise ValueError(str(e.message)) | |
| # Demo mode | |
| if email in _demo_users: | |
| raise ValueError("Email already registered") | |
| import uuid, hashlib | |
| uid = str(uuid.uuid4()) | |
| _demo_users[email] = {"id": uid, "password": hashlib.sha256(password.encode()).hexdigest(), | |
| "full_name": full_name, "role": "writer"} | |
| token = make_jwt(uid, email, "writer") | |
| return {"access_token": token, "expires_in": JWT_EXPIRES_MINS * 60, | |
| "user": {"id": uid, "email": email, "full_name": full_name, "role": "writer"}} | |
| def login(email: str, password: str) -> dict: | |
| """Login β Supabase if configured, else demo store""" | |
| if _supabase: | |
| from gotrue.errors import AuthApiError | |
| try: | |
| r = _supabase.auth.sign_in_with_password({"email": email, "password": password}) | |
| if not r.user: | |
| raise ValueError("Invalid credentials") | |
| user = r.user | |
| role = get_user_role(user.id) | |
| meta = user.user_metadata or {} | |
| token = make_jwt(user.id, user.email, role) | |
| # Lazy profile creation for users registered before F04 (FR-04) | |
| try: | |
| existing = _supabase.table("user_profiles").select("id").eq("id", user.id).limit(1).execute() | |
| if not existing.data: | |
| _supabase.table("user_profiles").insert({"id": user.id, "role": "reader"}).execute() | |
| except Exception: | |
| pass | |
| return {"access_token": token, "expires_in": JWT_EXPIRES_MINS * 60, | |
| "user": {"id": user.id, "email": user.email, | |
| "full_name": meta.get("full_name", ""), "role": role}} | |
| except AuthApiError: | |
| raise ValueError("Invalid email or password") | |
| # Demo mode | |
| import hashlib | |
| u = _demo_users.get(email) | |
| if not u or u["password"] != hashlib.sha256(password.encode()).hexdigest(): | |
| raise ValueError("Invalid email or password") | |
| token = make_jwt(u["id"], email, u["role"]) | |
| return {"access_token": token, "expires_in": JWT_EXPIRES_MINS * 60, | |
| "user": {"id": u["id"], "email": email, "full_name": u["full_name"], "role": u["role"]}} | |
| def send_reset_email(email: str): | |
| if _supabase: | |
| try: | |
| _supabase.auth.reset_password_email(email) | |
| except Exception: | |
| pass | |
| def get_user_role(user_id: str) -> str: | |
| if not _supabase: | |
| return "writer" # demo mode: trust all logged-in users | |
| try: | |
| r = _supabase.table("user_profiles").select("role").eq("id", user_id).single().execute() | |
| return (r.data or {}).get("role", "reader") | |
| except Exception: | |
| return "reader" | |