Spaces:
Sleeping
Sleeping
File size: 7,014 Bytes
a8e67fc d128756 a8e67fc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 | """DB-backed user lookup and channel resolution for RBAC."""
from __future__ import annotations
import logging
from functools import lru_cache
from typing import Optional
logger = logging.getLogger(__name__)
@lru_cache(maxsize=1)
def _client():
"""Lazy singleton Supabase client β avoids import-time crash if not configured."""
from supabase import create_client
from src.config import settings
if not settings.supabase_url or not settings.supabase_key:
raise RuntimeError("Supabase not configured (SUPABASE_URL / SUPABASE_KEY missing)")
return create_client(settings.supabase_url, settings.supabase_key)
def get_user_by_email(email: str) -> Optional[dict]:
"""Return user row from the users table, or None if not found / DB unavailable."""
try:
result = (
_client()
.table("users")
.select("id, workspace_id, email, name, password_hash, role, is_new_hire, is_active")
.eq("email", email.lower())
.eq("is_active", True)
.limit(1)
.execute()
)
return result.data[0] if result.data else None
except Exception:
logger.exception("auth_db: failed to look up user %s", email)
return None
def get_allowed_channel_ids(user_id: str, role: str) -> list[str]:
"""
Compute the channel IDs accessible to a user.
Resolution order:
1. Start with channels where the user's role is in channel_role_grants.
2. Add channels where user has an explicit user_channel_permissions(can_read=True).
3. Remove channels where user has user_channel_permissions(can_read=False) β explicit revoke.
Returns a list of channel UUID strings.
"""
try:
sb = _client()
# Channels accessible via role
role_result = (
sb.table("channel_role_grants")
.select("channel_id")
.eq("role", role)
.execute()
)
role_channels: set[str] = {r["channel_id"] for r in role_result.data}
# Per-user overrides
override_result = (
sb.table("user_channel_permissions")
.select("channel_id, can_read")
.eq("user_id", user_id)
.execute()
)
explicitly_granted: set[str] = {r["channel_id"] for r in override_result.data if r["can_read"]}
explicitly_revoked: set[str] = {r["channel_id"] for r in override_result.data if not r["can_read"]}
allowed = (role_channels | explicitly_granted) - explicitly_revoked
return list(allowed)
except Exception:
logger.exception("auth_db: failed to resolve channels for user %s", user_id)
return []
def get_user_team_id(user_id: str) -> Optional[str]:
"""Return the primary team_id for a user (first membership row), or None."""
try:
result = (
_client()
.table("user_teams")
.select("team_id")
.eq("user_id", user_id)
.limit(1)
.execute()
)
return result.data[0]["team_id"] if result.data else None
except Exception:
logger.exception("auth_db: failed to get team for user %s", user_id)
return None
_DEFAULT_WORKSPACE_ID = "00000000-0000-0000-0000-000000000001"
_OAUTH_USER_FIELDS = (
"id, workspace_id, email, name, password_hash, role, "
"is_new_hire, is_active, oauth_provider, oauth_sub"
)
def get_or_create_oauth_user(email: str, name: str, oauth_sub: str) -> Optional[dict]:
"""Find or create a Supabase user for a Google OAuth login.
Resolution order:
1. Existing row matched by (oauth_provider='google', oauth_sub) β returning SSO user.
2. Existing active row matched by email β links Google sub to a password account.
3. No match β creates a new SSO-only user (password_hash=None).
"""
try:
sb = _client()
# 1. Match by oauth_sub (fastest path for returning SSO users)
result = (
sb.table("users")
.select(_OAUTH_USER_FIELDS)
.eq("oauth_provider", "google")
.eq("oauth_sub", oauth_sub)
.eq("is_active", True)
.limit(1)
.execute()
)
if result.data:
return result.data[0]
# 2. Match by email β link Google sub to an existing password-based account
result = (
sb.table("users")
.select(_OAUTH_USER_FIELDS)
.eq("email", email.lower())
.eq("is_active", True)
.limit(1)
.execute()
)
if result.data:
user = result.data[0]
sb.table("users").update({
"oauth_provider": "google",
"oauth_sub": oauth_sub,
}).eq("id", user["id"]).execute()
user["oauth_provider"] = "google"
user["oauth_sub"] = oauth_sub
logger.info("auth_db: linked google oauth to existing user %s", email)
return user
# 3. Create new SSO-only user (no password)
insert_result = (
sb.table("users")
.insert({
"workspace_id": _DEFAULT_WORKSPACE_ID,
"email": email.lower(),
"name": name,
"password_hash": None,
"role": "engineer",
"is_new_hire": False,
"oauth_provider": "google",
"oauth_sub": oauth_sub,
})
.execute()
)
logger.info("auth_db: created new oauth user %s", email)
return insert_result.data[0] if insert_result.data else None
except Exception:
logger.exception("auth_db: get_or_create_oauth_user failed for %s", email)
return None
def record_audit(
actor_id: Optional[str],
action: str,
target_type: str,
target_id: str,
metadata: dict | None = None,
) -> None:
"""Append a row to rbac_audit_log. Fire-and-forget β never raises."""
try:
_client().table("rbac_audit_log").insert({
"actor_id": actor_id,
"action": action,
"target_type": target_type,
"target_id": target_id,
"metadata": metadata or {},
}).execute()
except Exception:
logger.warning("audit_log_failed: action=%s target=%s/%s", action, target_type, target_id)
# ---------------------------------------------------------------------------
# Default channel IDs for roles when DB is unavailable (dev fallback)
# These match the seeded rows in rbac_migration.sql
# ---------------------------------------------------------------------------
DEFAULT_CHANNEL_ID = "00000000-0000-0000-0000-000000000002"
ROLE_DEFAULT_CHANNELS: dict[str, list[str]] = {
"engineer": [DEFAULT_CHANNEL_ID],
"manager": [DEFAULT_CHANNEL_ID],
"admin": [DEFAULT_CHANNEL_ID],
"org_admin": [DEFAULT_CHANNEL_ID],
}
|