SRVCP's picture
Deploy market history and blog uploader updates
7a0d219
"""
Auth Service β€” JWT + Supabase Auth
Works in demo mode (no Supabase) with a simple in-memory user store
"""
import logging
import os
from datetime import datetime, timedelta, timezone
from typing import Optional
import jwt
logger = logging.getLogger(__name__)
JWT_SECRET = os.getenv("JWT_SECRET", "qs-dev-secret-change-in-prod-32c")
JWT_EXPIRES_MINS = int(os.getenv("JWT_EXPIRES_MINUTES", "60"))
_supabase = None
# Demo user store (only used if Supabase not configured)
_demo_users: dict = {}
def init_supabase():
global _supabase
url = os.getenv("SUPABASE_URL")
key = os.getenv("SUPABASE_SERVICE_KEY")
if url and key:
try:
from supabase import create_client
_supabase = create_client(url, key)
logger.info("Auth service: Supabase connected")
except Exception as e:
logger.warning(f"Auth Supabase init failed: {e} β€” using demo mode")
else:
logger.info("Auth service: No Supabase config β€” demo mode")
def make_jwt(user_id: str, email: str, role: str = "reader") -> str:
payload = {
"sub": user_id,
"email": email,
"role": role,
"iat": datetime.now(timezone.utc),
"exp": datetime.now(timezone.utc) + timedelta(minutes=JWT_EXPIRES_MINS),
}
return jwt.encode(payload, JWT_SECRET, algorithm="HS256")
def verify_jwt(token: str) -> Optional[dict]:
try:
return jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
except jwt.ExpiredSignatureError:
raise ValueError("Token expired")
except jwt.InvalidTokenError:
raise ValueError("Invalid token")
def register(email: str, password: str, full_name: str) -> dict:
"""Register user β€” Supabase if configured, else demo store"""
if _supabase:
from gotrue.errors import AuthApiError
try:
r = _supabase.auth.sign_up({
"email": email, "password": password,
"options": {"data": {"full_name": full_name}},
})
if not r.user:
raise ValueError("Registration failed")
user = r.user
role = get_user_role(user.id)
token = make_jwt(user.id, user.email, role)
# Ensure user_profiles row exists (FR-04)
try:
_supabase.table("user_profiles").insert({"id": user.id, "role": "reader"}).execute()
except Exception:
pass
return {"access_token": token, "expires_in": JWT_EXPIRES_MINS * 60,
"user": {"id": user.id, "email": user.email, "full_name": full_name, "role": role}}
except AuthApiError as e:
raise ValueError(str(e.message))
# Demo mode
if email in _demo_users:
raise ValueError("Email already registered")
import uuid, hashlib
uid = str(uuid.uuid4())
_demo_users[email] = {"id": uid, "password": hashlib.sha256(password.encode()).hexdigest(),
"full_name": full_name, "role": "writer"}
token = make_jwt(uid, email, "writer")
return {"access_token": token, "expires_in": JWT_EXPIRES_MINS * 60,
"user": {"id": uid, "email": email, "full_name": full_name, "role": "writer"}}
def login(email: str, password: str) -> dict:
"""Login β€” Supabase if configured, else demo store"""
if _supabase:
from gotrue.errors import AuthApiError
try:
r = _supabase.auth.sign_in_with_password({"email": email, "password": password})
if not r.user:
raise ValueError("Invalid credentials")
user = r.user
role = get_user_role(user.id)
meta = user.user_metadata or {}
token = make_jwt(user.id, user.email, role)
# Lazy profile creation for users registered before F04 (FR-04)
try:
existing = _supabase.table("user_profiles").select("id").eq("id", user.id).limit(1).execute()
if not existing.data:
_supabase.table("user_profiles").insert({"id": user.id, "role": "reader"}).execute()
except Exception:
pass
return {"access_token": token, "expires_in": JWT_EXPIRES_MINS * 60,
"user": {"id": user.id, "email": user.email,
"full_name": meta.get("full_name", ""), "role": role}}
except AuthApiError:
raise ValueError("Invalid email or password")
# Demo mode
import hashlib
u = _demo_users.get(email)
if not u or u["password"] != hashlib.sha256(password.encode()).hexdigest():
raise ValueError("Invalid email or password")
token = make_jwt(u["id"], email, u["role"])
return {"access_token": token, "expires_in": JWT_EXPIRES_MINS * 60,
"user": {"id": u["id"], "email": email, "full_name": u["full_name"], "role": u["role"]}}
def send_reset_email(email: str):
if _supabase:
try:
_supabase.auth.reset_password_email(email)
except Exception:
pass
def get_user_role(user_id: str) -> str:
if not _supabase:
return "writer" # demo mode: trust all logged-in users
try:
r = _supabase.table("user_profiles").select("role").eq("id", user_id).single().execute()
return (r.data or {}).get("role", "reader")
except Exception:
return "reader"